Skip to main content

Lab 5: Flink DataStream API - Real-Time Analytics from Orders to Supplier Stats

Implement a Flink DataStream job that reads orders from Kafka, performs event-time tumbling window aggregations, and publishes supplier statistics to a Kafka sink. Learn how to work with watermarks, Avro serialization, and event-time semantics in a production-style stream pipeline.

How to start

Clone project repository

git clone https://github.com/factorhouse/examples.git
cd examples

We'll use Factor House Local to quickly spin up Kafka and Flink environments that include Kpow and Flex. We can use either the Community or Enterprise editions of Kpow/Flex. To begin, ensure valid licenses are available. For details on how to request and configure a license, refer to this section of the project README.

The Flink cluster loads custom dependency JAR files from the directories specified in the CUSTOM_JARS_DIRS environment variable within the compose-flex.yml file. However, the Hive connectors included in these custom JARs are bundled with an outdated, unshaded version of Avro, which leads to class-loading conflicts. Since this application does not depend on Hive, Hadoop, Iceberg, or similar components, we can safely comment out the CUSTOM_JARS_DIRS environment variable. This allows the application to manage its dependencies without conflicts.

Before:

x-common-environment: &flink_common_env_vars
AWS_REGION: us-east-1
HADOOP_CONF_DIR: /opt/hadoop/etc/hadoop
CUSTOM_JARS_DIRS: "/tmp/hadoop;/tmp/hive;/tmp/iceberg;/tmp/parquet"

After:

x-common-environment: &flink_common_env_vars
AWS_REGION: us-east-1
HADOOP_CONF_DIR: /opt/hadoop/etc/hadoop
# CUSTOM_JARS_DIRS: "/tmp/hadoop;/tmp/hive;/tmp/iceberg;/tmp/parquet"

Start services:

## Clone the Factor House Local Repository
git clone https://github.com/factorhouse/factorhouse-local.git

## Download Kafka/Flink Connectors and Spark Iceberg Dependencies
./factorhouse-local/resources/setup-env.sh

## Uncomment the sections to enable the edition and license.
# Edition (choose one):
# unset KPOW_SUFFIX # Enterprise
# unset FLEX_SUFFIX # Enterprise
# export KPOW_SUFFIX="-ce" # Community
# export FLEX_SUFFIX="-ce" # Community
# Licenses:
# export KPOW_LICENSE=<path-to-license-file>
# export FLEX_LICENSE=<path-to-license-file>

docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml up -d \
&& docker compose -p flex -f ./factorhouse-local/compose-flex.yml up -d

Deploy source connector

We will create a source connector that generates fake order records to a Kafka topic (orders). See the Kafka Connect via Kpow UI and API lab for details about how to create the connector.

Once deployed, we can check the connector and its tasks in Kpow.

To build and run this Flink application locally, make sure your environment has JDK 17 installed.

The application supports two primary execution modes, each suited to different stages of development:

1. Run via Gradle (Development Mode)

This mode is ideal for local development and quick testing. It enables fast iteration by launching the application directly from the Gradle build system - no need to package it into a JAR.

cd fh-local-labs/lab-05
./gradlew run

For staging or production-like environments, we can package the application into a self-contained "fat" JAR that includes all dependencies.

Step 1: Build the Shadow JAR
cd fh-local-labs/lab-05
./gradlew shadowJar

This creates the following deployable artifact:

build/libs/fh-local-flink-ds-stats-1.0.jar

We should copy the JAR into the container and submit the job:

docker cp build/libs/fh-local-flink-ds-stats-1.0.jar \
jobmanager:/tmp/fh-local-flink-ds-stats-1.0.jar

docker exec jobmanager /opt/flink/bin/flink run -d -p 3 \
-c io.factorhouse.demo.MainKt /tmp/fh-local-flink-ds-stats-1.0.jar

Or we can also submit the JAR using the Flex UI available at http://localhost:3001:

  1. Upload the JAR from the Jobs sidebar.

  1. Click the submit menu next to the uploaded JAR.

  1. Fill out the submission form, including:

    • Entry Class: io.factorhouse.demo.MainKt
    • Parallelism: e.g. 3
    • Optional: Savepoint path, job arguments, etc.

Monitoring the Job

We can monitor the Flink job via the Flink UI (http://localhost:8082) or Flex (http://localhost:3001). The screenshot below shows the job's logical plan as visualized in Flex.

Verifying the Output in Kafka

We can also verify that the sink connector registers the supplier-stats-ds-value schema in Kpow (http://localhost:3000).

To explore the supplier statistics, inspect the messages in the supplier-stats-ds Kafka topic. For proper Avro decoding in Kpow, set the Key Deserializer to String, the Value Deserializer to AVRO, and select Local Schema Registry. Then, click the Search button to view the records.

Shutdown environment

Finally, stop and remove the Docker containers.

If you're not already in the project root directory, navigate there first. Then, stop and remove the Docker containers by running:

# Stops the containers and unsets environment variables
docker compose -p flex -f ./factorhouse-local/compose-flex.yml down \
&& docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml down

unset KPOW_SUFFIX FLEX_SUFFIX KPOW_LICENSE FLEX_LICENSE