Examples

At-least-once delivery

In this example, we create a consumer to store data into a SQL server database. We also create a producer with an “external commit topic” to guarantee at-least-once delivery of all messages consumed.

Create Kafka client

As the first step, we add a Kafka client with option “Enable metadata message processing” enabled. It is important to do this as the first step, because JSON paths for tags will be different when messages are consumed with retained metadata.

../_images/kafka-example-1-kafka-client.png

Create consumer topic

The consumer is configured to subscribe to a Kafka topic called “signal” where important signal data is expected to be delivered. Signal data consists of a JSON object containing a signal counter and a timestamp in the form:

{
    "signal": 1,
    "timestamp: "2025-06-01T12:00:00"
}

First, the consumer topic is configured as follows.

../_images/kafka-example-1-consumer-topic.png

Notably, the external commit topic is chosen to be “kafkaclient1_signal_counters_commit”.

Use the tag browser to add tags. The tag browser configures JSON paths as required (you can also do this manually).

../_images/kafka-example-1-tag-browser.png

As you can see, the payload for task pipelines include topic, partition, and offset, as well as the actual Kafka message containing the signal counter and timestamp.

Create producer topic

The producer topic is our “external commit” topic that is used to produce a message when a sample is successfully stored into the database. The producer must send messages consisting of a JSON object in the following form (example only, the actual content for partition and offset depends on the message forwarded by the consumer to the Dataristix task pipeline):

{
    "topic": "signal"
    "partition": 1,
    "offset: 1234
}

Create the producer topic with default settings and add a single tag called “kafkaclient1_signal_counters_commit”, matching the name of the external commit topic configured for the consumer.

../_images/kafka-example-1-producer-topic.png

Create “Process” topic for SQL Server

This topic defines how data is stored into the SQL database. Because we want to take action when storage is successful (to send a commit confirmation), the topic is defined as a “Process” topic that allows returning a result. We define the topic as follows.

../_images/kafka-example-1-sql-topic.png

The topic has “Process” access, a single tag called “RowCount” (added manually), and a process query as follows:

INSERT INTO SignalStore ([signal], [timestamp])
SELECT @signal, @timestamp
WHERE NOT EXISTS(SELECT * FROM SignalStore WHERE [signal]=@signal);
SELECT @@rowcount as [RowCount];

The query stores the signal value if it does not exist yet (consumed messages may be received more than once) and returns the affected row count. The value of the returned row count is irrelevant for our purposes; we just want to know that the query completed successfully.

Create task

We are now ready to create a storage task that ensures that each signal value is stored into the database. Because we want to process every individual message that arrives via the consumer input node separately, we disable input value collation in the task settings:

../_images/kafka-example-1-task-settings.png

The dataflow is designed as follows.

../_images/kafka-example-1-task.png

The task receives signal counter and timestamp values from the “Signal Counters” consumer topic, together with topic, partition, and offset of the Kafka message. Signal and timestamp values are stored into the SQL server database. If the query is successful, then the “RowCount” produces a value, which in turn causes the subsequent Trigger node to pass topic, partition, and offset values to the JSON encoder. The JSON encoder creates a JSON object in the form:

{
    "topic": "signal"
    "partition": 1,
    "offset: 1234
}

Finally, the JSON object is passed onto the “Signal Commit” task node with Kafka topic “kafkaclient1_signal_counters_commit”. Because the this topic is also subscribed to by the consumer (as the “external commit topic”), the consumer is notified that the message identified by topic, partition, and offset, has been successfully processed. The consumer now proceeds to commit the message to the broker, marking it as “received” on the Kafka broker side.

Notes

It is not necessary to produce a commit confirmation for every sample stored, especially when there are many messages to be processed. Since the storage query is designed to handle duplicates, it is safe to add a Debounce processor to the task that waits for a while for additional incoming messages to only forward data to the “Signal Commit” task node after a delay.