Skip to main content
CommunityTeamEnterprise

Tutorial

The kREPL is an IDE for exploring and controlling Apache Kafka®.

Klang is a language designed for navigating the time series. Klang syntax is a blend of JQ and Clojure. You can use either language or both.

Klang and the kREPL combine to provide an IDE for processing bounded sequences of structured data directly from Kafka topics.

Klang Syntax Examples

JQ Syntax

{"a" [1 2] "b" "-1"} | .a[1] * .b | to-long

Clojure Syntax

reduce + (map (comp inc to-long) ["2" "3"])

Klang functions like consume combine the power of easy time series navigation with the benefit of Kpow features like RBAC and Data Masking.

Consume Examples

Consume the latest 100 messages:

consume __oprtr_snapshot_state

Consume one minute of messages, filtered by JQ Lambda:

consume __oprtr_snapshot_state
{:serdes "transit-json"
:window [(now-pt5m)..pt1m]
:filter #>(.value:group | test( "oprtr.*"))}

Klang scripts are constrained to pure Clojure functions, pure JQ pipelines, and side-effecting Klang functions.

Klang in the kREPL is a practical tool that introduces as few new ideas as possible. If you know Kafka, JQ, or Clojure, you know Klang.

If you encounter any issues please raise a support ticket.

REPL Basics

Where you see a REPL expression (like below) click to load it into the REPL. The repl help expression shows this guide.

repl help

Evaluate the current REPL form with the shift-enter hot key. REPL history can be accessed with the up/down cursor keys.

The most recent evaluation is history item *1, then *2, and so on. You can reference previously evaluated state with these symbols.

REPL Housekeeping Commands

repl help     # Show this guide
repl clear # Clear REPL state
repl switch *2 # Switch to REPL history item

Quick Start

Get started with the kREPL by evaluating some simple Klang functions that show data about your Kafka resources:

kafka show cluster
kafka show topic
kafka show group
kafka show connect
kafka show schema

These commands are self-explanatory, run them and you will see Kpow's current view of your Kafka resources.

Take kafka show group as an example, you see a sequence of maps output similar to:

{:group "oprtr.compute.metrics.v2"
:n-topics 1
:state "Stable"
:n-assignments 12
:assignor "stream"
:abbreviated? true
:n-members 3
:n-idle-members 0
:offset-delta 4.34
...}

Variables and References

How about we derive something useful from that sequence of maps? Define a variable with the result of evaluating kafka show group:

def my-groups *1

We can now refer to that state with the reference $my-groups in later expressions:

$my-groups.[] | .:state

JQ Syntax

This JQ expression takes the my-groups data and pipes it into a lambda that returns the state of each group.

The slight extension to JQ that Klang offers is in the selector - .:state - the : indicates we are selecting a Keyword key rather than a String.

If we wanted to filter all groups of a particular state we would use the standard JQ select syntax:

$my-groups.[] | select(.:state == "Stable")

Clojure Syntax

Klang is constructed from JQ and Clojure - that means there is also a Clojure-ish way of running these expressions - a map function:

map :state $my-groups

And a filter function:

filter #>(.:state == "Stable") $my-groups

Wait! That #>(...) literal isn't Clojure - it's the JQ lambda from above enclosed within #>()

#>() indicates a JQ expression that is a lambda - an anonymous function that expects one argument.

S-Expression Flavors and Interpolation

Conversely, any Clojure expression can be piped into a JQ lambda like so:

(frequencies (map :state $my-groups)) | .Stable

That expression mixes a little Clojure, a little JQ, and is very Klangy - but why .Stable and not #>(.Stable)?

Klang expressions are Clojure-ish S-Expressions where the outermost parens are optional, the full form of above is:

#=((frequencies (map :state $my-groups)) | .Stable)

#=() indicates a JQ expression that takes no arguments and is evaluated in-place. It pipes data on the left hand side to a lambda on the right.

In this case the inner Clojure function is realized as data and then piped to the right hand side of the JQ expression.

Any pure JQ function can be interpolated into a Clojure expression, and so it goes on:

(str "There are " #=((frequencies (map :state $my-groups)) | .Stable) " Stable groups")

These S-Expression flavors constrained with simple rules on not mixing pure and effecting forms make Klang a powerful scripting language.

More importantly, the expanding nature of the syntax means you can simply consume a topic and interrogate it with normal JQ if you like.

This sounds complicated, why mixed syntax?

The most important part of any language is its constraints.

Klang is a scripting language that provides:

  • variable definition and referencing
  • a curated set of pure Clojure functions
  • one-pass pure JQ pipelines
  • our own custom side-effecting functions in Clojure syntax (e.g. consume)
  • most importantly - JQ Lambdas like #>(.Stable)

JQ Lambdas are further constrained in that you cannot reference variables from within them, they only accept one passed argument.

Klang does not provide you a method for defining functions, there is no destructuring, so how do we do complex restructuring?

JQ Construction Lambda

We use JQ Lambdas with data construction to fill the gap left by the removal of all forms of function definition.

This expression translates a sequence of input maps into a different sequence of output maps, via a JQ Construction Lambda:

map
#>(.role | {(.:type) (.start) :location [(.start) "at" (.location.y)]})
[{"id" 1, "role" {:type "indoor", "start" :tomorrow "location" {"x" 4, "y" 5}}}
{"id" 2, "role" {:type "outdoor" "start" :none}}]

Result:

=> 
{"indoor" :tomorrow, :location [:tomorrow "at" 5]}
{"outdoor" :none, :location [:none "at" nil]}

Klang is a LISP, and you're a Clojure-ish programmer now.

Finally, every REPL form is an S-Expression with optional outer parens, so even the previous expressions like:

kafka show group

Can more verbosely be written and evaluated as:

(kafka show group)

Language Definition

As described above Klang provides mixed-syntax s-expressions with two types of interpolation (in-place or piped).

S-Expressions

There are three types of Klang functions, they are all pure (no side-effects), and they are identified by their paren form:

Clojure Expressions

(set/difference
[(inc (reduce + [1 2])) 5 6]
[5 6 7])

JQ Expressions

#=([1 2 3].[] | . + 1)

JQ Lambdas

#>(.key.x[0] > 2 and
.value.y | startswith("abc"))

The first two Expressions are eagerly evaluated in the kREPL, the final JQ Lambda form is the only anonymous function available to you in Klang.

Klang Data

Klang is built to work with structured data on Kafka topics, that means AVRO, JSON, Transit, EDN, or custom SerDes that resolve to JSON or EDN.

Data Representations

Klang data representations are EDN, a subset of Clojure data structures and a superset of JSON. All JSON is valid EDN.

Using EDN as our data format means that we can provide every core pure Clojure function that works on EDN datastructures in the kREPL.

Klang provides the following built-in forms specifically for time-series navigation:

Evaluation Time

now

The symbol now evaluates to current unix epoch time in ms. 'Now' is evaluated once per expression, with all references having the same value.

ISO 8601 Durations

pt15m

Any valid ISO8601 Duration symbol evaluates to that duration in ms. For example, pt15m evaluates for 900000.

ISO 8601 Date Times

#dt"1979-03-09T02:34:56.768Z"

ISO 8106 Date Times are provided with the form above, they evaluate to unix epoch time in ms

Integer Ranges [..]

Klang supports consuming specific offsets or topic partitions, in each place you can provide an integer range like so: [1..20]

Time Windows

Klang is built for time series data and supports easy topic consumption of:

  • a fixed number of records from a point in time or to a point in time
  • all records between two points in time

All time windows are inclusive at the earliest point and exclusive at the latest, just like Kafka Streams.

Time Slice [..]

A window with one boundary is known as a slice. The expression below slices 'limit' (default 100) records from a topic starting 5 minutes ago:

consume __oprtr_snapshot_state {:window [(now-pt5m)..]}

Note the ability to perform an optional time equation by adding or subtracting an ISO 8601 duration in-place.

Similarly the expression below slices the most recent 100 records of a topic. This is the default consumption where no window is specified:

consume __oprtr_snapshot_state {:window [..now]}

Bounded Window [..]

A bounded time window consists of at least one point in time and a point or duration from which earlier and later bounds can be derived.

The following examples are all bounded windows:

consume __oprtr_snapshot_state {:window [#dt"2021-01-13T03:12:12.123Z" .. #dt"2021-01-13T03:17:35.876Z"]}
consume __oprtr_snapshot_state {:window [1610507532123 .. #dt"2021-01-13T03:17:35.876Z"]}
consume __oprtr_snapshot_state {:window [(now-pt5m)..pt1m]}
consume __oprtr_snapshot_state {:window [#dt"2021-01-13T03:17:35.876Z" .. (now - pt1h)]}

Duration Window [+-]

Duration windows consist of a point in time followed by +- then a duration.

They are a convenience for grabbing a window or records around a point in time:

consume __oprtr_snapshot_state {:window [(now-pt5m) +- pt1m]}

Granular Window [@]

Granular windows are a Klang paradigm for dealing with time windows that are chronologically fixed.

They consist of a duration followed by @ then a point in time. The realized window will contain that point given the granular duration.

Each granular window must be evenly composable into the next-highest granularity, so for example pt(1|2|3|4|5|6)m are granular but pt7m is not.

This is because pt7m does not divide evenly into pt1h. Granular windows are superb for periodic rollups and aggregations of time series data.

For example the following granular window:

consume __oprtr_snapshot_state {:window [pt5m @ #dt"2021-01-13T03:12:12.123Z"]}

Resolves to:

{:start #dt"2021-01-13T03:10:00.000Z"
:end #dt"2021-01-13T03:15:00.000Z"}

Data Construction

Arbitrary data structures that mix EDN literals, Klang extensions, and Klang expressions are easily constructed in the kREPL:

{:time         now
"a-list" [1 2 3 4 5]
"a-map" {:a 2}
"a-set" #{:a 2}
:clj-exp (map inc [1 2 3 4])
:jq-exp #=([1 2 3 4].[] | . + 1)
:clj-jq (map #>(. + 1) [1 2 3 4])
:jq-clj #=((map inc [1 2 3 4]).[] | . + 1)
:really-exp? #=((reduce + #=((map inc [1 2 3 4]).[] | . + 1)) | . + 1 == 19)
:nested-data [{:values "no-problem"}]}

Whitespace

Klang is not whitespace sensitive, in Klang commas are whitespace the same as space, tab, and newline.

Klang collection literals do not require commas or colons like JSON but you may add commas for readability if desired.

Side Effecting Functions

A side effecting function is any function that deals with the world outside the kREPL.

As of the current version of Kpow, side effecting functions are:

  • def - define a variable
  • repl * - manage the kREPL
  • kafka * - control or interrogate Kafka
  • consume * - consume data from a kafka topic

The most important thing to note about side-effecting functions is that you cannot mix them with pure functions.

The kREPL will not allow you to mix side-effecting and pure functions like so:

def my-groups (kafka show group)  # This won't work

You are very welcome to def a variable with the evaluation of a pure function:

def my-yes-its-a-partially-applied-fn (map (partial get-in [:a :b]) [{:a {:b :c}}])

This is because side-effecting functions each have different requirements that we manage for you, and at this time intermingling them is too hard.

For example, consume is a lazy function. It will prompt you to continue consuming of large datasets. Integrating that with reduce - that's hard.

Consume Function

Syntax

consume (topic | topics) {options}

Consume is a side-effecting function that pulls data from Kafka and decodes/filters it according to your specifications.

Consume has the same underlying implementation as Data Inspect, rather than click form elements you evaluate a function.

Consume one topic, multiple topics, or consume topics matching a regex in a single query within optional time windows, offsets, or filters.

consume __oprtr_snapshot_state
{:serdes "transit-json"
:window [(now-pt5m)..pt1m]
:filter #>(.value:group | test( "oprtr.*"))}

Parameters

  • consume - The reserved 'consume' symbol
  • topic | topics - a-topic | a-topic:1 | a-topic:[1..6] | [a-topic b-topic:1] | #".*topic" | #".*topic":[1..6], etc
  • options:
    • window - See: Time Windows and Slices
    • offsets - See: Integer Ranges
    • SerDes - The SerDes to use when decoding messages on a topic. Required if you intend to :filter
    • filter - A JQ Lambda expression to apply to each message consumed
    • limit - The maximum number of records to return in one kREPL interaction

Consumption Limits

Klang is a language for processing bounded data sets, it is designed for post-hoc or ad-hoc Kafka computation.

Each kREPL consumption is naturally limited by the number of records within your window or slice, with consumption lazily evaluated.

Kpow will attempt to return at most 'limit' records or spend seven seconds of consumer-time per interaction, with a max 'limit' of 500.

This means to consume an entire window you may need to press 'continue' a few times. This interaction will likely be automated shortly.

'Limit' applies to records returned to the browser. It is possible to traverse thousands of records per interaction when filtering in consume.

Curated Pure Clojure Functions

Klang provides 75 core Clojure functions that work on EDN datastructures and higher order functions like filter, map, and reduce.

Each function that takes more than a single argument can be partially applied:

map
(partial group-by #>(.:header:one))
[[{:header {:one "x" "two" "y"}}
{:header {:one "y" "two" "z"}}
{:header {:one "x" "two" "z"}}]]

You can use juxt and comp to create new functions in-place, as per normal Clojure:

map
(partial group-by (comp :one :header))
[[{:header {:one "x" "two" "y"}}
{:header {:one "y" "two" "z"}}
{:header {:one "x" "two" "z"}}]]

A description of each Clojure function and link to Clojure documentation will follow in a later release.

JQ Lambda Syntax

Klang provides a vastly expanded implementation of kJQ from our current Data Inspect feature (which remains as-is).

While JQ Lambdas cannot reference variables, they can use all Klang literals including durations and date-times.

Klang is not Clojure with a little JQ. The Klang implementation of JQ is a powerful subset of the language with sensible constraints.

For example, this complex nested logical predicate is valid Klang, and valid JQ:

[{"key" {"a" true "b" true "c" true}}
{"key" {"a" true "b" false "c" true}}
{"key" {"a" true "b" true "c" false}}
{"key" {"a" true "b" false "c" false "z" true}}
{"key" {"a" false "b" true "c" true}}
{"key" {"a" false "b" false "c" true "z" true}}
{"key" {"a" false "b" true "c" false}}
{"key" {"a" false "b" false "c" false}}].[] |
(((.key.a and (.key.b or (.key.c and ((.key.z and 1 == 1) | not)))) or ((.key.b | not) and .key.c) or .key.z) | not)