Lab 8: Flink Table API - Loading Order Events from Kafka into Iceberg
Deploy a Kafka-to-Iceberg pipeline using Flink's Table API. This lab demonstrates how to configure the job, compile it as a shadow JAR, and deploy via CLI and Flex. Sink table is defined via Spark SQL due to Flink's partitioning limitations.
How to start
Clone project repository
git clone https://github.com/factorhouse/examples.git
cd examples
Start Kafka and Flink environments
We'll use Factor House Local to quickly spin up Kafka and Flink environments that include Kpow and Flex as well as an analytics environment for Iceberg. We can use either the Community or Enterprise editions of Kpow/Flex. To begin, ensure valid licenses are available. For details on how to request and configure a license, refer to this section of the project README.
## Clone the Factor House Local Repository
git clone https://github.com/factorhouse/factorhouse-local.git
## Download Kafka/Flink Connectors and Spark Iceberg Dependencies
./factorhouse-local/resources/setup-env.sh
## Uncomment the sections to enable the edition and license.
# Edition (choose one):
# unset KPOW_SUFFIX # Enterprise
# unset FLEX_SUFFIX # Enterprise
# export KPOW_SUFFIX="-ce" # Community
# export FLEX_SUFFIX="-ce" # Community
# Licenses:
# export KPOW_LICENSE=<path-to-license-file>
# export FLEX_LICENSE=<path-to-license-file>
docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml up -d \
&& docker compose -p flex -f ./factorhouse-local/compose-flex.yml up -d
Persistent Catalogs
Two catalogs are pre-configured in both the Flink and Spark clusters:
demo_hv
: a Hive catalog backed by the Hive Metastoredemo_ib
: an Iceberg catalog also backed by the Hive Metastore
Flink
In Flink, the catalogs can be initialized automatically using an SQL script (init-catalogs.sql
) on startup:
CREATE CATALOG demo_hv WITH (
'type' = 'hive',
'hive-conf-dir' = '/opt/flink/conf',
'default-database' = 'default'
);
CREATE CATALOG demo_ib WITH (
'type' = 'iceberg',
'catalog-type' = 'hive',
'uri' = 'thrift://hive-metastore:9083'
);
Spark
In Spark, catalog settings are defined in spark-defaults.conf
:
# Enable Iceberg extensions
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
# Hive catalog (demo_hv)
spark.sql.catalog.demo_hv org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.demo_hv.type hive
spark.sql.catalog.demo_hv.hive.metastore.uris thrift://hive-metastore:9083
spark.sql.catalog.demo_hv.warehouse s3a://warehouse/
# Iceberg catalog (demo_ib)
spark.sql.catalog.demo_ib org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.demo_ib.type hive
spark.sql.catalog.demo_ib.uri thrift://hive-metastore:9083
spark.sql.catalog.demo_ib.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.demo_ib.s3.endpoint http://minio:9000
spark.sql.catalog.demo_ib.s3.path-style-access true
spark.sql.catalog.demo_ib.warehouse s3a://warehouse/
# Optional: set default catalog
spark.sql.defaultCatalog spark_catalog
Deploy source connector
This Flink streaming application reads Avro-encoded Kafka messages using the Table API and writes them to an Iceberg table registered with a Hive catalog. It does the following:
- Uses the Flink Table API to define a Kafka source table backed by the Confluent Schema Registry (with basic auth support).
- Configures checkpointing, parallelism, and fault tolerance for streaming execution.
- Registers an Iceberg catalog backed by Hive Metastore using SQL and sets it as the active catalog.
- Creates the Iceberg table programmatically using the Java Iceberg API to support hidden partitioning by
DAY(bid_time)
, which is not currently supported in Flink SQL DDL. - Defines the source schema with watermarking and consumes data from Kafka in Avro format.
- Casts fields as needed (e.g.,
price
toDECIMAL(10,2)
) and writes the resulting table into Iceberg usingexecuteInsert()
. - Logs key events throughout the lifecycle, including job submission and failure handling.
To build and run this Flink application locally, make sure your environment has JDK 17 installed.
Our project uses the factorhouse/flink
image, which includes the necessary dependencies for Hadoop and Iceberg. To avoid an excessively large fat JAR, the image is optimized for cluster deployment only.
Step 1: Build the Shadow JAR
From the project directory:
cd fh-local-labs/lab-08
./gradlew shadowJar
This creates the following deployable artifact:
build/libs/fh-local-flink-table-iceberg-1.0.jar
Step 2: Deploy to Flink via Docker
Copy the JAR into the Flink JobManager container and submit the job:
docker cp build/libs/fh-local-flink-table-iceberg-1.0.jar \
jobmanager:/tmp/fh-local-flink-table-iceberg-1.0.jar
docker exec jobmanager /opt/flink/bin/flink run -d -p 3 \
-c io.factorhouse.demo.MainKt /tmp/fh-local-flink-table-iceberg-1.0.jar
Or we can also submit the JAR using the Flex UI available at http://localhost:3001
:
- Upload the JAR from the Jobs sidebar.
- Click the submit menu next to the uploaded JAR.
-
Fill out the submission form, including:
- Entry Class:
io.factorhouse.demo.MainKt
- Parallelism: e.g.
3
- Optional: Savepoint path, job arguments, etc.
- Entry Class:
Monitoring the Job
We can monitor the Flink job via the Flink UI (http://localhost:8082
) or Flex (http://localhost:3001
). The screenshot below shows the job's logical plan as visualized in Flex.
In addition to monitoring the job, we can verify the output by inspecting the Parquet files written by the sink. These files are accessible via MinIO at http://localhost:9001
using admin
as the username and password
as the password. As shown in the screenshot below, the records have been successfully written to the appropriate partitions in the warehouse
bucket.
Shutdown environment
Finally, stop and remove the Docker containers.
If you're not already in the project root directory, navigate there first. Then, stop and remove the Docker containers by running:
# Stops the containers and unsets environment variables
docker compose -p flex -f ./factorhouse-local/compose-flex.yml down \
&& docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml down
unset KPOW_SUFFIX FLEX_SUFFIX KPOW_LICENSE FLEX_LICENSE