Kafka Streams

Configure the Kpow Streams Agent to unlock the following features:

  • Visualise Kafka Streams topologies in the Kpow Streams UI.
  • Monitor Kafka Streams metrics (e.g Stream-Thread, State Store, RocksDB, etc).
  • See summaries of Kafka Streams activity for your Kafka cluster(s).
  • Aggregate and Expose Kafka Streams metrics via Kpow Prometheus Endpoints (for alerting, etc).
  • (Soon) View Kpow Insights of your Kafka Streams applications (outlier metrics, etc).

See the Kpow Kafka Streams Spring Word Count Example for an integration of Kafka, Spring, and Kpow.

The Kpow Streams UI


The Kpow Stream Agent can be found on Maven Central.

Include the agent as a dependency in your Kafka Streams application.



In your application, just before you start your KafkaStreams instance:

import io.operatr.kpow.StreamsRegistry;

// Your Kafka Streams topology
Topology topology = createMyTopology();

// Your Kafka Streams config
Properties props = new createMyStreamProperties();

// Your Kafka Streams instance
KafkaStreams streams = new KafkaStreams(topology, props);

// Create a Kpow StreamsRegistry
StreamsRegistry registry = new StreamsRegistry(props);

// Register your KafkaStreams and Topology instances with the StreamsRegistry
registry.register(streams, topology);

// Start your Kafka Streams application

The StreamsRegistry is a single-threaded process that performs these actions once every minute:

  • Capture metadata about each registered Kafka Streams application.
  • Produce snapshots to the Kpow internal __oprtr_snapshot_state topic.

The StreamsRegistry does not talk directly to Kpow. Kpow reads streams data from the snapshot topic.


The StreamsRegistry Properties contains configuration to create the snapshot producer.

The StreamsRegistry configures its own Serdes on the snapshot producer, you do not need to set them.

Producer configuration means any of the following fields:

  • bootstrap.servers
  • ssl.truststore.type
  • ssl.truststore.password
  • ssl.truststore.location
  • ssl.truststore.certificates
  • ssl.trustmanager.algorithm
  • ssl.secure.random.implementation
  • ssl.provider
  • ssl.protocol
  • ssl.keystore.type
  • ssl.keystore.password
  • ssl.keystore.location
  • ssl.keystore.key
  • ssl.keystore.certificate.chain
  • ssl.keymanager.algorithm
  • ssl.key.password
  • ssl.endpoint.identification.algorithm
  • ssl.enabled.protocols
  • ssl.cipher.suites
  • security.protocol
  • sasl.mechanism
  • sasl.jaas.config
  • sasl.login.callback.handler.class

For more details visit the Producer section of the Apache Kafka documentation.

Minimum Required ACLs

If you secure your Kafka cluster with ACLs, the user provided in the Producer configuration must have permission to write to the internal Kpow topic.

./kafka-acls.sh \
  --bootstrap-server \
  --command-config client.conf \
  --add --allow-principal User:<your-producer-user> --operation Write --topic '__oprtr_snapshot_state'

Produce to the Primary Cluster

When managing a single Kafka cluster you can reuse the properties from your Kafka Streams application to create your StreamsRegisty. This is because the Kpow internal topic ___oprtr_snapshot_compute lives in the cluster that your Kafka Streams application connects to.

When managing multiple Kafka clusters configure your StreamsRegistry to produce snapshots to the primary Cluster that contains the internal Kpow snapshot topics. This is the first cluster in your Kpow configuration.

Single-Cluster Kpow

Reuse your Kafka Streams Properties to create your StreamsRegistry.

Properties streamsProps = new Properties();
KafkaStreams streams = new KafkaStreams(topology, streamsProps);

StreamsRegistry registry = new StreamsRegistry(streamsProps);

Multi-Cluster Kpow

Use a Properties with your primary cluster configuration to create your StreamsRegistry.

Properties streamsProps = createMyStreamProperties();
KafkaStreams streams = new KafkaStreams(topology, streamsProps);

Properties primaryProps = createMyPrimaryClusterProducerProperties();
StreamsRegistry registry = new StreamsRegistry(primaryProps);

See the Multi-cluster management guide for more information.

Multi-Cluster Configuration Feedback Request!

Is the requirement to produce to the primary Kpow cluster difficult for you?

Please let us know - we are considering the option of always writing to the same cluster as your Kafka Streams connects to and having Kpow gather snapshots from each cluster.

Register Multiple Kafka Streams Instances

You can register multiple Kafka Streams instances on the same StreamsRegistry.

KafkaStreams dedupeStreams = new KafkaStreams(dedupeTopology, dedupeProps);
KafkaStreams paymentStreams = new KafkaStreams(paymentTopology, paymentProps);
registry.register(paymentStreams, paymentTopology);
registry.register(dedupeStreams, dedupeTopology);

View Streams Topologies

Each Kafka Streams application configured with the Kpow Streams Agent is visible in the Kpow Streams UI, navigate to the 'Workflows' tab for full topology visualisation.

Prometheus Egress

If you have enabled Kpow's Prometheus integration, all Kafka Streams metrics collected from your running agents will be available from the endpoint /streams/v1