Skip to main content

Lab 8: Flink Table API - Loading Order Events from Kafka into Iceberg

Deploy a Kafka-to-Iceberg pipeline using Flink's Table API. This lab demonstrates how to configure the job, compile it as a shadow JAR, and deploy via CLI and Flex. Sink table is defined via Spark SQL due to Flink's partitioning limitations.

How to start

Clone project repository

git clone https://github.com/factorhouse/examples.git
cd examples

We'll use Factor House Local to quickly spin up Kafka and Flink environments that include Kpow and Flex as well as an analytics environment for Iceberg. We can use either the Community or Enterprise editions of Kpow/Flex. To begin, ensure valid licenses are available. For details on how to request and configure a license, refer to this section of the project README.

## Clone the Factor House Local Repository
git clone https://github.com/factorhouse/factorhouse-local.git

## Download Kafka/Flink Connectors and Spark Iceberg Dependencies
./factorhouse-local/resources/setup-env.sh

## Uncomment the sections to enable the edition and license.
# Edition (choose one):
# unset KPOW_SUFFIX # Enterprise
# unset FLEX_SUFFIX # Enterprise
# export KPOW_SUFFIX="-ce" # Community
# export FLEX_SUFFIX="-ce" # Community
# Licenses:
# export KPOW_LICENSE=<path-to-license-file>
# export FLEX_LICENSE=<path-to-license-file>

docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml up -d \
&& docker compose -p flex -f ./factorhouse-local/compose-flex.yml up -d

Persistent Catalogs

Two catalogs are pre-configured in both the Flink and Spark clusters:

  • demo_hv: a Hive catalog backed by the Hive Metastore
  • demo_ib: an Iceberg catalog also backed by the Hive Metastore

In Flink, the catalogs can be initialized automatically using an SQL script (init-catalogs.sql) on startup:

CREATE CATALOG demo_hv WITH (
'type' = 'hive',
'hive-conf-dir' = '/opt/flink/conf',
'default-database' = 'default'
);

CREATE CATALOG demo_ib WITH (
'type' = 'iceberg',
'catalog-type' = 'hive',
'uri' = 'thrift://hive-metastore:9083'
);

Spark

In Spark, catalog settings are defined in spark-defaults.conf:

# Enable Iceberg extensions
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

# Hive catalog (demo_hv)
spark.sql.catalog.demo_hv org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.demo_hv.type hive
spark.sql.catalog.demo_hv.hive.metastore.uris thrift://hive-metastore:9083
spark.sql.catalog.demo_hv.warehouse s3a://warehouse/

# Iceberg catalog (demo_ib)
spark.sql.catalog.demo_ib org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.demo_ib.type hive
spark.sql.catalog.demo_ib.uri thrift://hive-metastore:9083
spark.sql.catalog.demo_ib.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.demo_ib.s3.endpoint http://minio:9000
spark.sql.catalog.demo_ib.s3.path-style-access true
spark.sql.catalog.demo_ib.warehouse s3a://warehouse/

# Optional: set default catalog
spark.sql.defaultCatalog spark_catalog

Deploy source connector

This Flink streaming application reads Avro-encoded Kafka messages using the Table API and writes them to an Iceberg table registered with a Hive catalog. It does the following:

  • Uses the Flink Table API to define a Kafka source table backed by the Confluent Schema Registry (with basic auth support).
  • Configures checkpointing, parallelism, and fault tolerance for streaming execution.
  • Registers an Iceberg catalog backed by Hive Metastore using SQL and sets it as the active catalog.
  • Creates the Iceberg table programmatically using the Java Iceberg API to support hidden partitioning by DAY(bid_time), which is not currently supported in Flink SQL DDL.
  • Defines the source schema with watermarking and consumes data from Kafka in Avro format.
  • Casts fields as needed (e.g., price to DECIMAL(10,2)) and writes the resulting table into Iceberg using executeInsert().
  • Logs key events throughout the lifecycle, including job submission and failure handling.

To build and run this Flink application locally, make sure your environment has JDK 17 installed.

Our project uses the factorhouse/flink image, which includes the necessary dependencies for Hadoop and Iceberg. To avoid an excessively large fat JAR, the image is optimized for cluster deployment only.

Step 1: Build the Shadow JAR

From the project directory:

cd fh-local-labs/lab-08
./gradlew shadowJar

This creates the following deployable artifact:

build/libs/fh-local-flink-table-iceberg-1.0.jar

Copy the JAR into the Flink JobManager container and submit the job:

docker cp build/libs/fh-local-flink-table-iceberg-1.0.jar \
jobmanager:/tmp/fh-local-flink-table-iceberg-1.0.jar

docker exec jobmanager /opt/flink/bin/flink run -d -p 3 \
-c io.factorhouse.demo.MainKt /tmp/fh-local-flink-table-iceberg-1.0.jar

Or we can also submit the JAR using the Flex UI available at http://localhost:3001:

  1. Upload the JAR from the Jobs sidebar.

  1. Click the submit menu next to the uploaded JAR.

  1. Fill out the submission form, including:

    • Entry Class: io.factorhouse.demo.MainKt
    • Parallelism: e.g. 3
    • Optional: Savepoint path, job arguments, etc.

Monitoring the Job

We can monitor the Flink job via the Flink UI (http://localhost:8082) or Flex (http://localhost:3001). The screenshot below shows the job's logical plan as visualized in Flex.

In addition to monitoring the job, we can verify the output by inspecting the Parquet files written by the sink. These files are accessible via MinIO at http://localhost:9001 using admin as the username and password as the password. As shown in the screenshot below, the records have been successfully written to the appropriate partitions in the warehouse bucket.

Shutdown environment

Finally, stop and remove the Docker containers.

If you're not already in the project root directory, navigate there first. Then, stop and remove the Docker containers by running:

# Stops the containers and unsets environment variables
docker compose -p flex -f ./factorhouse-local/compose-flex.yml down \
&& docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml down

unset KPOW_SUFFIX FLEX_SUFFIX KPOW_LICENSE FLEX_LICENSE