Apache Kafka Integration: Defining Consumer Jobs
Consumers in Apache Kafka are external applications that read the messages that arrive to the Topics in the target Apache Kafka environment to which they are subscribed. Consumers are always listening to the messages in those Topics. They are filters that consist of key-value pairs that contain the message metadata, the content, and so forth. When a message arrives that matches ALL the criteria defined in a Consumer, further actions are triggered. Consumers can be organized in groups (Consumer Groups). A Consumer Group has a unique ID and all Consumers in a group share this same ID. For more information about Consumers and Consumer Groups, see About Apache Kafka.
An Automic Automation Consumer Job represents a Consumer in your Apache Kafka environment. It contains the same filter criteria as the Consumer and listens to the same Topic/Partition.
You define the Automic Automation Consumer Job on various pages. On the Consumer Job page you define the filter criteria that a message must meet to be a match. All filter criteria must be met to activate a Consumer Job.
-
Make sure that you have already selected the Apache Kafka Agent when you start defining the parameters on the Consumer Job page. To do so, go to the Attributes page and select it from the Agent dropdown list.
-
To configure an Automic Automation Consumer Job you must know certain parameters that are defined in the target Apache Kafka environment. If you are not aware of them, you must request them from the team that works with the target cloud solution.
-
If a Consumer Job contains both JSONPath expressions and regular expressions, ALL of them are evaluated and must be true so that the Job completes.
The following sections describe how to configure a Consumer Job:
Configuring the Consumer Job Parameters
In the Consumer Job Parameters section, you establish the connection to the target Apache Kafka environment and specify the filter criteria.
-
Configuration File Path
The path on the Agent machine where the Apache Kafka configuration properties file is stored. It contains the Apache Kafka configuration details (host url, SSL connectivity, authorization mechanism properties, broker settings, partition settings, and so on). The same configuration file is used for Consumer and Producer Jobs.
Important!Usually, Automic Automation Jobs require Connection objects to establish the connection to the target cloud platform. This is NOT true for the Apache Kafka integration. The configuration file contains the parameters required to establish the connection between the Job in Automic Automation and the broker in the target Apache Kafka environment.
Example:
-
UNIX: /opt/home/kafka/server.properties
-
Windows: C:\kafka\server.properties
For more information about the available configuration parameters, please refer to the Apache Kafka official product documentation at https://kafka.apache.org/documentation/#consumerconfigs and https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#.
Example 1
acks=all
bootstrap.servers= <server>
key.serializer= org.apache.kafka.common.serialization.StringSerializer
value.serializer= org.apache.kafka.common.serialization.StringSerializer
key.deserializer= org.apache.kafka.common.serialization.StringDeserializer
value.deserializer= org.apache.kafka.common.serialization.StringDeserializer
max.poll.records= 1
auto.offset.reset= earliest
Example 2: Connecting to a server running over SSL
acks=all
bootstrap.servers= <server>
key.serializer= org.apache.kafka.common.serialization.StringSerializer
value.serializer= org.apache.kafka.common.serialization.StringSerializer
key.deserializer= org.apache.kafka.common.serialization.StringDeserializer
value.deserializer= org.apache.kafka.common.serialization.StringDeserializer
max.poll.records= 1
auto.offset.reset= earliest
ssl.protocol=TLSv1.2
security.protocol=SSL
ssl.keystore.location=/root/kafka_ssl/kafka.server.keystore.jks
ssl.keystore.password=<password>
ssl.key.password=<password>
ssl.truststore.location=/root/kafka_ssl/kafka.server.truststore.jks
ssl.truststore.password=<password>
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
-
-
Topic
The topic within Apache Kafka to which the Consumer Job subscribes.
-
Group ID
If the Consumer Job is part of a Consumer Group, this is the ID that uniquely identifies the group. All consumers in a group share the Group ID. For more information on Consumer Groups, see Consumer Groups.
-
Define Failure Criteria
Click this checkbox if you want Automic Automation to fail messages in case specific conditions occur. When you click it, the Failure Key & Value Parameters and Failure Header Parameters sections are displayed. Here you enter the Key/Value pairs that must be true for Automic Automation to fail the Job.
Configuring Key & Value Parameters for Consumer Jobs
In this section, you configure the filter that listens to the key-value pairs of the messages that are written to the topic to which this Job is subscribed. The Value fields contain the actual message. You can enter a string or regular expression (Key/Value) or JSONPath expression (Key JSONPath/Value JSONPath).
For details on filtering Apache Kafka messages, see Filtering Kafka Messages in Consumer Jobs
Configuring Header Parameters for Consumer Jobs
In this section, you configure the filter that listens to the headers of the messages that are written to the Topic that it is subscribed to. The header contains metadata that provide additional information about the message itself. You can enter a string or regular expression (Header Key/Header Value) or JSON (Header Key JSONPath/Header Value JSONPath).
For details on filtering Apache Kafka messages, see Filtering Kafka Messages in Consumer Jobs
Filtering Kafka Messages in Consumer Jobs
In addition to reading messages from an Apache Kafka topic, the Automic Automation Consumer job allows you to apply filters and trigger actions when specific conditions are met.
Apache Kafka messages may contain the following information:
-
Message Headers
There can be multiple headers, each consisting of a key and a corresponding value.
-
Header Key
-
Header Value
-
-
Message Key
-
Message Content / Message Value
Defining Filters on Message Headers in Consumer Jobs
Use the Header Parameters section to define filters on message headers.
-
Header Key
The provided value is matched against keys in the message headers. It supports regular expressions.
-
Header Key JSON Path
Use this field if the Header Key contains data in JSON format. Provide the JSON Path expression on the header key to match the specified Header Key against the JSON expression result.
-
Header Value
The provided value is matched against values in the message headers. It supports regular expressions.
-
Header Value JSON Path
Use this field if the Header Value contains data in JSON format. Provide the JSON Path expression on the header to match the specified Header Value against the JSON expression result.
Defining Filters on Keys/Message Content in Consumer Jobs
Use the Key & Value Parameters section to define filters on Keys/Message Content.
-
Key
The provided value is matched against message key in the message. It supports regular expressions.
-
Key JSON Path
Use this field if the Message Key contains data in JSON format. Provide the JSON Path expression on the Message Key to match the provided Key against JSON expression result.
-
Value
The provided value is matched against the Message Content in the message. It supports regular expressions.
-
Value JSON Path
Use this field if the Message Content contains data in JSON format. Provide the JSON Path expression on the Message Content to match the provided Value against JSON expression result.
Apache Kafka Consumer Job Processing
Apache Kafka Consumer Job processing goes through the following steps:
-
Wait for an Apache Kafka message to arrive.
-
An Apache Kafka message is received.
-
If no conditions or filters are defined, the job ends with a success status: JOB ENDED_OK.
-
The job checks for any defined failure conditions. If such conditions exist and are met by the Apache Kafka message, the job ends with a failed status: JOB ENDED_NOT_OK.
-
The job then checks if Header and Key/Value conditions are defined. If these conditions are met by the Apache Kafka message, the job ends with a success status: JOB ENDED_OK.
-
Continue waiting for the next Apache Kafka message if the job has not yet ended.
Consumer Job Examples
Filtering Apache Kafka Messages
Let's consider the following Apache Kafka message:
-
Message Headers is defined as follows:
{
"destination" : "Automic",
"messageformat": "JSON"
}
Message Headers has two headers where destination" is the key and Automic the value. messageformat is the key and JSON is the value.
Message Key is defined as Automic and the Message Content is defined as follows:
{
"eventstatus": "SUCCESS",
"date": "2023-07-12",
"eventtype": "TRIGGER_JOB"
}
Filtering Apache Kafka Messages: Use Case 1
Let's consider you want to filter for Apache Kafka messages that meet the following conditions:
Message Headers should have destination as key and Automic as value.
Message Data should have eventstatus as SUCCESS.
You could configure the following in the Automic Automation Consumer Job:
-
Header Parameters section
Header Key: destination
Header Value: Automic
-
Key & Value Parameters section
Value: SUCCESS
Value JSON Path: $.eventstatus
Filtering Apache Kafka Messages: Use Case 2
Now, let's filter for Apache Kafka messages that meet the following conditions:
Message Headers should have "destination" as key and Automic as value.
Message Data should have eventstatus as SUCCESS and date as 2023-07-12.
You could configure the following in the Automic Automation Consumer Job:
-
Header Parameters section
Header Key: destination
Header Value: Automic
-
Key & Value Parameters section
Value: (?s)(?=.*"eventstatus":\s*"SUCCESS")(?=.*"date":\s*"2023-07-12")
Example of a JSONPath Expression
A JSONPath expression begins with $ followed by a sequence of child elements separated by either a period (dot notation) or square brackets (bracket notation).
The Automic Automation / Apache Kafka Agent integration supports the dot notation only.
Let's suppose that the following message using a JSONPath expression arrives from the Apache Kafka Topic :
{ "event": { "pet": "dog", "data": { "name": "Rex", "gender": "male" } } }
In this message, event is the root node, pet and data are child nodes and name and gender are child elements of data. You can extract the pet name as follows:
-
Using the dot notation
$.event.data.name
-
Using dot notation and regex
event.data.*
Use JSONPath fields only if the message and key values in the Producer contain JSON data.
See also: