Serdes
By default, Kpow ships with the following serdes available from data inspect:
- JSON
- JSON Schema
- AVRO
- AVRO (Strict)
- Protobuf
- String
- EDN
- Double
- Float
- Integer
- Long
- Short
- UUID
- Transit / JSON
- Transit / JSON-Verbose
- Transit / Msgpack
AVRO, JSON Schema, and Protobuf
Schema Registry serdes in Kpow follow the wire format documented here.
See: Custom Serdes to use Kpow with Protobuf messages without a Schema Registry.
Kpow integrates with Confluent Schema Registry and allows for AVRO, JSON Schema and Protobuf serdes to be used in data inspect.
See Schema Registry for more on how to configure Confluent Schema Registry with Kpow.
Once configured, from within the data inspect UI you will now be able to select the schema and subject strategy when searching for records by key:
Configuring Serdes
Kpow offers some configuration on how serdes are presented in the UI.
Default Serdes
Set DEFAULT_KEY_SERDES
, DEFAULT_VALUE_SERDES
or DEFAULT_HEADERS_SERDES
to specify which serdes should be selected from the dropdown by default when using data inspect.
Available Serdes
To restrict the serdes available to your users set AVAILABLE_KEY_SERDES
or AVAILABLE_VALUE_SERDES
Eg: AVAILABLE_VALUE_SERDES=JSON,AVRO
to only ever show JSON or AVRO serdes from within Kpow's UI
When filtering serdes use the same label name as the one in the serdes dropdown. Eg: "AVRO (Strict)"
Custom Serdes
Kpow works with custom org.apache.kafka.common.serialization.Serde<String> implementations.
Why <String>?
Kpow expects your custom serdes to provide String key/value output when deserializing messages and will interpret that String key/value output according to the serdes format you configure.
Kpow will provide your custom serdes with String input when your users select a custom serdes to produce data to a topic, it is up to you how you interpret and encode that data for serialization.
Serdes with json
or clojure
format will have Data policies applied and can be searched with kJQ filters.
Requirements
Kafka 2.1.0 introduced default functions on the Deserializer and Serializer interface that accept message headers as an argument.
Kpow calls those default functions, so your serdes must be compiled with Kafka 2.1.0+.
Your custom serdes should provide a Deserializer<String>
and a Serializer<String>
.
Example Custom Serde<String>
Here is an example custom serde that simply passes through to string serialization.
package example.serdes;
import org.apache.kafka.common.serialization.*;
import java.util.Map;
public class CustomSerde implements Serde<String> {
private Map configs;
@Override
public void configure(Map configs, boolean isKey) {
this.configs = configs;
}
@Override
public Serializer<String> serializer() {
return new StringSerializer();
}
@Override
public Deserializer<String> deserializer() {
return new StringDeserializer();
}
public void close() {
}
public Map configs() {
return configs;
}
}
Implementation
When deserializing, Kpow calls the following default function on the org.apache.kafka.common.serialization.Deserializer
returned by your custom serde.
default T deserialize(String topic, Headers headers, byte[] data) {
When serializing, Kpow calls the following default function on the org.apache.kafka.common.serialization.Serializer
returned by your custom serde.
default byte[] serialize(String topic, Headers headers, T data) {
In the case of deserialization, the headers passed will be exactly as they exist off-the-wire.
In the case of serialization, the headers passed will have values of UTF-8 encoded byte arrays.
This divergence is down to the fact that Kpow is a text-based web UI. In the case of data production a user inputs headers in text format, and that's how we pass them to you.
Setup
Provide your custom serdes on the Kpow classpath (see How to add a Jar to the Classpath).
Use a plain Java command similar to below to start kpow with a set classpath, note factorhouse.kpow.
java -Xmx8G -cp /opt/kpow/lib/kpow.jar:/opt/org/custom-serdes.jar factorhouse.kpow
Configure Kpow with the CUSTOM_SERDES
environment variable:
# CUSTOM_SERDES accepts a comma-separated list of serdes classes
CUSTOM_SERDES=org.corp.XMLSerde,org.corp.MyCustomSerde
Configuration
Include a YAML configuration file in your jar file to configure custom serdes further (Optional).
E.g. org.corp.XMLSerde
can be configured with org/corp/XMLSerde.yml
The following fields are available to configure your serdes:
name
- the display name to use in Kpow's Data inspect UI.format
-json
,clojure
orstring
(default)isKey
- true for key data only, false for value data only, leave unset for both key/value data.config
- a map of config values passed into the serdesconfigure
method
Config
is converted to String/String and passed to your Serdes/configure method. Config
values starting with $ are resolved as environment variables. (e.g. $BOOTSTRAP below)
E.g. Single Serdes Configuration (json format, available to key and value fields).
name: PROTO
format: json
config:
bootstrap: $BOOTSTRAP
limit: 22
display: another-value
abc: $SOME_ENV
E.g. Multi Serdes Configuration (json format, separate configured serdes for key and value fields)
serdes:
- name: "PROTO 1"
format: "json"
isKey: true
config:
bootstrap: "some-value"
limit: 22
display: another-value
abc: $SOME_ENV
- name: "PROTO 2"
format: "json"
isKey: false
config:
bootstrap: "some-value"
limit: "100"
display: another-value
abc: $ANOTHER_ENV
Serdes default to String format and are available to keys and values where no config provided
On startup you will see details logged of each custom serdes:
INFO operatr.kafka.serdes – initializing custom serdes: kpow.serdes.CustomSerdesExample
INFO operatr.kafka.serdes – found kpow/serdes/CustomSerdesExample.yml
INFO operatr.kafka.serdes – serde configuration: Custom Serde!
WARN operatr.kafka.serdes – environment variable $SOME_ENV not set
INFO operatr.kafka.serdes – config 'Custom Serde!', isKey?: null, format: json, config: ("bootstrap" "limit" "display" "abc").