Skip to main content

Lab 11: Flink SQL Gateway - Serving Real-Time Supplier Stats via REST API

Run a Flink SQL streaming pipeline via the Flink SQL Gateway and access real-time query results through its REST API. This lab illustrates how to fetch and display live supplier statistics from a Kafka source using a Python client.

How to start

Clone project repository

git clone https://github.com/factorhouse/examples.git
cd examples

We'll use Factor House Local to quickly spin up Kafka and Flink environments that include Kpow and Flex. We can use either the Community or Enterprise editions of Kpow/Flex. To begin, ensure valid licenses are available. For details on how to request and configure a license, refer to this section of the project README.

The Flink cluster loads custom dependency JAR files from the directories specified by the CUSTOM_JARS_DIRS environment variable in the compose-flex.yml file. Since the Flink SQL Gateway does not support dynamic JAR loading, the required JARs (e.g., Kafka and Avro) must be preloaded by setting the appropriate paths in the CUSTOM_JARS_DIRS variable.

Before:

x-common-environment: &flink_common_env_vars
AWS_REGION: us-east-1
HADOOP_CONF_DIR: /opt/hadoop/etc/hadoop
CUSTOM_JARS_DIRS: "/tmp/hadoop;/tmp/hive;/tmp/iceberg;/tmp/parquet"

After:

x-common-environment: &flink_common_env_vars
AWS_REGION: us-east-1
HADOOP_CONF_DIR: /opt/hadoop/etc/hadoop
CUSTOM_JARS_DIRS: "/tmp/hadoop;/tmp/hive;/tmp/iceberg;/tmp/parquet;/tmp/connector"

This ensures the following JARs are loaded additionally into the Flink JobManager, TaskManagers, and SQL Gateway:

  • /tmp/connector/flink-sql-connector-kafka-3.3.0-1.20.jar
  • /tmp/connector/flink-sql-avro-confluent-registry-1.20.1.jar

Start services:

## Clone the Factor House Local Repository
git clone https://github.com/factorhouse/factorhouse-local.git

## Download Kafka/Flink Connectors and Spark Iceberg Dependencies
./factorhouse-local/resources/setup-env.sh

## Uncomment the sections to enable the edition and license.
# Edition (choose one):
# unset KPOW_SUFFIX # Enterprise
# unset FLEX_SUFFIX # Enterprise
# export KPOW_SUFFIX="-ce" # Community
# export FLEX_SUFFIX="-ce" # Community
# Licenses:
# export KPOW_LICENSE=<path-to-license-file>
# export FLEX_LICENSE=<path-to-license-file>

docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml up -d \
&& docker compose -p flex -f ./factorhouse-local/compose-flex.yml up -d

Deploy source connector

We will create a source connector that generates fake order records to a Kafka topic (orders). See the Kafka Connect via Kpow UI and API lab for details about how to create the connector.

Once deployed, we can check the connector and its tasks in Kpow.

We use a custom Python client (flink_sql_runner.py) to interact with the Flink SQL Gateway REST API.

The script performs:

  1. Session creation
  2. Kafka table registration (with Avro & Schema Registry)
  3. Tumbling window aggregation query
  4. Real-time result fetching
  5. Cleanup on exit or Ctrl+C

The source table is defined using the Kafka SQL connector, enabling Flink to consume Avro-encoded messages from the orders Kafka topic. To support time-based processing and windowed aggregations, an event-time watermark is introduced on bid_time using WATERMARK FOR bid_time AS bid_time - INTERVAL '5' SECOND. This watermark allows Flink to track event time progress and handle out-of-order events, which is required for time-based operations such as windowed aggregations or joins.

📌 Watermarking is applied on the bid_time column to enable proper event-time windowing.

Source Table DDL

CREATE TABLE orders (
order_id STRING,
item STRING,
price STRING,
supplier STRING,
bid_time TIMESTAMP(3),
WATERMARK FOR bid_time AS bid_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka-1:19092',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://schema:8081',
'avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
'avro-confluent.basic-auth.user-info' = 'admin:admin',
'avro-confluent.schema-registry.subject' = 'orders-value',
'scan.startup.mode' = 'earliest-offset'
);

Aggregation Query

This aggregates data per supplier in fixed 5-second windows, computing:

  • Window start/end (UTC)
  • Total price
  • Bid count
INSERT INTO supplier_stats
SELECT
DATE_FORMAT(window_start, 'yyyy-MM-dd''T''HH:mm:ss''Z''') AS window_start,
DATE_FORMAT(window_end, 'yyyy-MM-dd''T''HH:mm:ss''Z''') AS window_end,
supplier,
SUM(CAST(price AS DECIMAL(10, 2))) AS total_price,
count(*) AS `count`
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(bid_time), INTERVAL '5' SECOND))
GROUP BY window_start, window_end, supplier;

From the project root:

# Create and activate a virtual environment
python -m venv venv
source venv/bin/activate

# Install required Python packages
pip install -r fh-local-labs/lab-11/requirements.txt

# Run the Flink SQL runner application
python fh-local-labs/lab-11/flink_sql_runner.py

We'll see streaming results in the terminal:

We can monitor the Flink job via the Flink UI (http://localhost:8082) or Flex (http://localhost:3001). The screenshot below shows the job's logical plan as visualized in Flex.

Shutdown environments

Finally, stop and remove the Docker containers.

If you're not already in the project root directory, navigate there first. Then, stop and remove the Docker containers by running:

# Stops the containers and unsets environment variables
docker compose -p flex -f ./factorhouse-local/compose-flex.yml down \
&& docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml down

unset KPOW_SUFFIX FLEX_SUFFIX KPOW_LICENSE FLEX_LICENSE