Lab 12: Apache Pinot - Real-Time Analytics of Supplier Stats from Kafka
Stream Kafka orders into Apache Pinot and run real-time analytics with its multi-stage query engine. This lab includes a simulated tumbling window aggregation and demonstrates querying supplier stats through a Python client.
How to start
Clone project repository
git clone https://github.com/factorhouse/examples.git
cd examples
Start Kafka and Pinot environments
We'll use Factor House Local to quickly spin up Kafka and Pinot environments that includes Kpow. We can use either the Community or Enterprise editions of Kpow/Flex. To begin, ensure valid licenses are available. For details on how to request and configure a license, refer to this section of the project README.
## Clone the Factor House Local Repository
git clone https://github.com/factorhouse/factorhouse-local.git
## Download Kafka/Flink Connectors and Spark Iceberg Dependencies
./factorhouse-local/resources/setup-env.sh
## Uncomment the sections to enable the edition and license.
# Edition (choose one):
# unset KPOW_SUFFIX # Enterprise
# export KPOW_SUFFIX="-ce" # Community
# License:
# export KPOW_LICENSE=<path-to-license-file>
docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml up -d \
&& docker compose -p pinot -f ./factorhouse-local/compose-pinot.yml up -d
Deploy source connector
We will create a source connector that generates fake order records to a Kafka topic (orders
). See the Kafka Connect via Kpow UI and API lab for details about how to create the connector.
Once deployed, we can check the connector and its tasks in Kpow.
Deploy Workload: Register Pinot Table and Query
From the project root:
# Create and activate a virtual environment
python -m venv venv
source venv/bin/activate
# Install required Python packages
pip install -r fh-local-labs/lab-12/requirements.txt
We will register a Pinot schema and a real-time table - both named orders
- to ingest data from the Kafka orders
topic.
python fh-local-labs/lab-12/register_pinot_configs.py --action upload
You should see:
# Schema upload status: 200
# Schema response: {"unrecognizedProperties":{},"status":"orders successfully added"}
# Table upload status: 200
# Table response: {"unrecognizedProperties":{},"status":"Table orders_REALTIME successfully added"}
We can now query the orders
table from the Pinot Query Console at http://localhost:19000
.
To query supplier statistics, we'll use Pinot's multi-stage query engine (v2) to aggregate records over a fixed 5-second time window. Since Pinot doesn't support native tumble windows, we simulate them using FLOOR(bid_time / 5000)
logic:
SELECT
window_start,
window_end,
supplier,
total_price,
count
FROM (
SELECT
*
, ROW_NUMBER() OVER (PARTITION BY supplier ORDER BY window_start DESC) AS row_num
FROM (
SELECT
FLOOR(bid_time / 5000) * 5000 AS window_start,
FLOOR(bid_time / 5000) * 5000 + 5000 AS window_end,
supplier,
ROUND(SUM(CAST(price AS DOUBLE)), 2) AS total_price,
COUNT(*) AS count
FROM orders
WHERE bid_time >= (NOW() - 50000)
GROUP BY FLOOR(bid_time / 5000), supplier
ORDER BY window_start DESC, supplier
)
)
WHERE row_num = 1
We can run the query via Python:
python fh-local-labs/lab-12/query_supplier_stats.py
We'll see output like:
Shutdown environment
Finally, stop and remove the Docker containers.
If you're not already in the project root directory, navigate there first. Then, stop and remove the Docker containers by running:
# Stops the containers and unsets environment variables
docker compose -p pinot -f ./factorhouse-local/compose-pinot.yml down \
&& docker compose -p kpow -f ./factorhouse-local/compose-kpow.yml down
unset KPOW_SUFFIX KPOW_LICENSE