Lab 1: Kafka Clients - Producing and Consuming Kafka Messages with Schema Registry
Learn how to produce and consume Avro-encoded Kafka messages using Python clients and the Confluent Schema Registry. This lab covers schema evolution, working with both generic and specific records, and validating end-to-end data flow.
How to start
Clone project repository
git clone https://github.com/factorhouse/examples.git
cd examples
Start Kafka environment
We can get our Kafka environment including Kpow up and running using Factor House Local. We can use either the Kpow Community or Enterprise edition. To get started, let's make sure a valid Kpow license is available. For details on how to request and configure a license, refer to this section of the project README.
## Clone the Factor House Local Repository
git clone https://github.com/factorhouse/factorhouse-local.git
## Download Kafka/Flink Connectors and Spark Iceberg Dependencies
./factorhouse-local/resources/setup-env.sh
## Uncomment the sections to enable the edition and license.
# Edition (choose one):
# unset KPOW_SUFFIX # Enterprise
# export KPOW_SUFFIX="-ce" # Community
# License:
# export KPOW_LICENSE=<path-to-license-file>
docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml up -d
Start Kafka applications
Create a virual environment and install dependent packages.
python -m venv venv
source venv/bin/activate
# windows
# .\venv\Scripts\activate
pip install -r fh-local-labs/lab-01/requirements.txt
Kafka producer
We can start the producer using either version 1 or 2. The newer version includes an additional field, is_premium
, which is randomly assigned.
MODEL_VERSION
can be set to1
or2
.MAX_RECORDS
defines how many records to produce. If omitted, the producer will run indefinitely.
MODEL_VERSION=1 MAX_RECORDS=3 \
python fh-local-labs/lab-01/producer.py
# [2025-06-02 10:25:10,804] INFO: bootstrap: localhost:9092, topic: orders-clnt, model: 1
# [2025-06-02 10:25:10,923] INFO: Topic 'orders-clnt' created
# [2025-06-02 10:25:11,956] INFO: Sent to orders-clnt into partition 0, offset 0
# [2025-06-02 10:25:13,004] INFO: Sent to orders-clnt into partition 2, offset 0
# [2025-06-02 10:25:14,055] INFO: Sent to orders-clnt into partition 1, offset 0
# [2025-06-02 10:25:14,055] INFO: Max records (3) reached. Stop producing messages.
MODEL_VERSION=2 MAX_RECORDS=3 \
python fh-local-labs/lab-01/producer.py
# [2025-06-02 10:26:02,369] INFO: bootstrap: localhost:9092, topic: orders-clnt, model: 2
# [2025-06-02 10:26:02,415] WARNING: Topic 'orders-clnt' already exists.
# [2025-06-02 10:26:03,432] INFO: Sent to orders-clnt into partition 2, offset 1
# [2025-06-02 10:26:04,444] INFO: Sent to orders-clnt into partition 1, offset 1
# [2025-06-02 10:26:05,454] INFO: Sent to orders-clnt into partition 1, offset 2
# [2025-06-02 10:26:05,454] INFO: Max records (3) reached. Stop producing messages.
Check schema and messages
After sending messages, we can inspect the Avro schema in the Schema menu, where the latest version appears in the subjects table. By default, the Confluent Schema Registry uses backward compatibility, allowing new schema versions to evolve without breaking existing consumers. This setting is configured on the server side and applies unless explicitly changed. By clicking the icon next to the subject name, we can view, edit, or delete the subject, as well as update its compatibility settings.
Also, we can inspect the messages in the orders
topic. For proper Avro decoding in Kpow, set the Key Deserializer to String, the Value Deserializer to AVRO, and select Local Schema Registry. Then, click the Search button to view the records.
Messages produced with the updated schema (version 2) will include the new field is_premium
:
On the other hand, those produced with the earlier schema (version 1) will not include this field:
Start consumer
To consume messages, we can choose between generic records and specific records:
- Generic record: Messages are deserialized into plain Python dictionaries, without binding to a specific model. This mode works for any schema version.
- Specific record: Messages are deserialized into typed objects (
OrderV1
orOrderV2
) by setting theMODEL_VERSION
environment variable.
If using a specific record and the message lacks a field (e.g., is_premium
in version 1), a default value (e.g., false
) will be used during deserialization.
Generic record
python fh-local-labs/lab-01/consumer.py
# ...
# [2025-06-02 10:30:10,704] INFO: Received: {'order_id': '987d3a38-eb39-46ac-9da5-e95a457e76b6', ...}
# [2025-06-02 10:30:10,711] INFO: Received: {'order_id': '7ab1aaf1-89be-4200-be14-3b4c63553fdd', ..., 'is_premium': True}
# [2025-06-02 10:30:10,711] INFO: Received: {'order_id': 'c82990c3-d0d5-492c-8aea-3fb439bb0fc6', ...}
# [2025-06-02 10:30:10,712] INFO: Received: {'order_id': '3c95afd4-9a75-42f3-8192-5792f2ae4947', ...}
# [2025-06-02 10:30:10,712] INFO: Received: {'order_id': '9ccbb8e1-e6eb-45be-940c-a666bbfe8efb', ..., 'is_premium': True}
# [2025-06-02 10:30:10,712] INFO: Received: {'order_id': 'a64e2843-32d2-4897-a648-955145724c79', ..., 'is_premium': False}
Specific record
MODEL_VERSION=1 python fh-local-labs/lab-01/consumer.py
# ...
# [2025-06-02 10:34:55,950] INFO: Received: OrderV1(order_id='987d3a38-eb39-46ac-9da5-e95a457e76b6', ...)
# [2025-06-02 10:34:55,959] INFO: Received: OrderV1(order_id='7ab1aaf1-89be-4200-be14-3b4c63553fdd', ...)
# [2025-06-02 10:34:55,959] INFO: Received: OrderV1(order_id='3c95afd4-9a75-42f3-8192-5792f2ae4947', ...)
# [2025-06-02 10:34:55,960] INFO: Received: OrderV1(order_id='9ccbb8e1-e6eb-45be-940c-a666bbfe8efb', ...)
# [2025-06-02 10:34:55,962] INFO: Received: OrderV1(order_id='a64e2843-32d2-4897-a648-955145724c79', ...)
# [2025-06-02 10:34:55,962] INFO: Received: OrderV1(order_id='c82990c3-d0d5-492c-8aea-3fb439bb0fc6', ...)
MODEL_VERSION=2 python fh-local-labs/lab-01/consumer.py
# ...
# [2025-06-02 10:36:28,053] INFO: Received: OrderV2(order_id='3c95afd4-9a75-42f3-8192-5792f2ae4947', ..., is_premium=False)
# [2025-06-02 10:36:28,064] INFO: Received: OrderV2(order_id='9ccbb8e1-e6eb-45be-940c-a666bbfe8efb', ..., is_premium=True)
# [2025-06-02 10:36:28,064] INFO: Received: OrderV2(order_id='a64e2843-32d2-4897-a648-955145724c79', ..., is_premium=False)
# [2025-06-02 10:36:28,064] INFO: Received: OrderV2(order_id='c82990c3-d0d5-492c-8aea-3fb439bb0fc6', ..., is_premium=False)
# [2025-06-02 10:36:28,065] INFO: Received: OrderV2(order_id='987d3a38-eb39-46ac-9da5-e95a457e76b6', ..., is_premium=False)
# [2025-06-02 10:36:28,065] INFO: Received: OrderV2(order_id='7ab1aaf1-89be-4200-be14-3b4c63553fdd', ..., is_premium=True)
Shutdown environment
Stop and remove the Docker containers.
If you're not already in the project root directory, navigate there first. Then, stop and remove the Docker containers by running:
# Stops the containers and unsets environment variables
docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml down
unset KPOW_SUFFIX KPOW_LICENSE