Apache Kafka Integration: Defining Consumer Jobs

{"URL":["/*.*/awa/pa_view_pa_view_CONSUMER_JOB_kafka"],"heroDescriptionIdentifier":"ice_hero_Kafka_Consumer_Job","customCards":[{"id":"ice_specific_Kafka_Consumer_Job","title":"Adding Consumer Job Parameters","type":"customize","url":"https://docs.automic.com/documentation/webhelp/english/ALL/components/IG_KAFKA/*.*/Agent%20Guide/Content/Apache_Kafka/Kafka_Jobs_CONSUMER.htm","languages":["en-us"]},{"id":"ice_Header_Kafka_Consumer_Job","title":"Adding Header Parameters","type":"customize","url":"https://docs.automic.com/documentation/webhelp/english/ALL/components/IG_KAFKA/*.*/Agent%20Guide/Content/Apache_Kafka/Kafka_Jobs_CONSUMER.htm","languages":["en-us"]},{"id":"ice_KeyValue_Kafka_Consumer_Job","title":"Adding Key & Value Parameters","type":"customize","url":"https://docs.automic.com/documentation/webhelp/english/ALL/components/IG_KAFKA/*.*/Agent%20Guide/Content/Apache_Kafka/Kafka_Jobs_CONSUMER.htm","languages":["en-us"]},{"id":"ice_filter_messages_Consumer_Job","title":"Filtering Messages in Consumer Jobs","type":"customize","url":"https://docs.automic.com/documentation/webhelp/english/ALL/components/IG_KAFKA/*.*/Agent%20Guide/Content/Apache_Kafka/Kafka_Jobs_CONSUMER.htm","languages":["en-us"]},{"id":"ice_examples_Consumer_Job","title":"Examples for Consumer Jobs","type":"customize","url":"https://docs.automic.com/documentation/webhelp/english/ALL/components/IG_KAFKA/*.*/Agent%20Guide/Content/Apache_Kafka/Kafka_Jobs_CONSUMER.htm","languages":["en-us"]},{"id":"ice_RA_Integration_Report","title":"RA / Integration Reports","type":"customize","url":"https://docs.automic.com/documentation/webhelp/english/ALL/components/IG_KAFKA/*.*/Agent%20Guide/Content/Apache_Kafka/Kafka_Jobs_RA_Properties.htm","languages":["en-us"]},{"id":"ice_Consumer_Kafka_script","title":"Setting Consumer Job Properties through Script","type":"customize","url":"https://docs.automic.com/documentation/webhelp/english/ALL/components/IG_KAFKA/*.*/Agent%20Guide/Content/Apache_Kafka/Kafka_Script.htm","languages":["en-us"]},{"id":"ice_related_information_Kafka_Consumer_Job","title":"Related Information","type":"customize","url":"https://docs.automic.com/documentation/webhelp/english/ALL/components/IG_KAFKA/*.*/Agent%20Guide/Content/Apache_Kafka/Kafka_Jobs_CONSUMER.htm","languages":["en-us"]}]}

Consumers in Apache Kafka are external applications that read the messages that arrive to the Topics in the target Apache Kafka environment to which they are subscribed. Consumers are always listening to the messages in those Topics. They are filters that consist of key-value pairs that contain the message metadata, the content, and so forth. When a message arrives that matches ALL the criteria defined in a Consumer, further actions are triggered. Consumers can be organized in groups (Consumer Groups). A Consumer Group has a unique ID and all Consumers in a group share this same ID. For more information about Consumers and Consumer Groups, see About Apache Kafka.

An Automic Automation Consumer Job represents a Consumer in your Apache Kafka environment. It contains the same filter criteria as the Consumer and listens to the same Topic/Partition.

You define the Automic Automation Consumer Job on various pages. On the Consumer Job page you define the filter criteria that a message must meet to be a match. All filter criteria must be met to activate a Consumer Job.

Important!
  • Make sure that you have already selected the Apache Kafka Agent when you start defining the parameters on the Consumer Job page. To do so, go to the Attributes page and select it from the Agent dropdown list.

  • To configure an Automic Automation Consumer Job you must know certain parameters that are defined in the target Apache Kafka environment. If you are not aware of them, you must request them from the team that works with the target cloud solution.

  • If a Consumer Job contains both JSONPath expressions and regular expressions, ALL of them are evaluated and must be true so that the Job completes.

The following sections describe how to configure a Consumer Job:

Configuring the Consumer Job Parameters

In the Consumer Job Parameters section, you establish the connection to the target Apache Kafka environment and specify the filter criteria.

  • Configuration File Path

    The path on the Agent machine where the Apache Kafka configuration properties file is stored. It contains the Apache Kafka configuration details (host url, SSL connectivity, authorization mechanism properties, broker settings, partition settings, and so on). The same configuration file is used for Consumer and Producer Jobs.

    Important!

    Usually, Automic Automation Jobs require Connection objects to establish the connection to the target cloud platform. This is NOT true for the Apache Kafka integration. The configuration file contains the parameters required to establish the connection between the Job in Automic Automation and the broker in the target Apache Kafka environment.

    Example:

    • UNIX: /opt/home/kafka/server.properties

    • Windows: C:\kafka\server.properties

    For more information about the available configuration parameters, please refer to the Apache Kafka official product documentation at https://kafka.apache.org/documentation/#consumerconfigs and https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#.

    Example 1

    acks=all

    bootstrap.servers= <server>

    key.serializer= org.apache.kafka.common.serialization.StringSerializer

    value.serializer= org.apache.kafka.common.serialization.StringSerializer

    key.deserializer= org.apache.kafka.common.serialization.StringDeserializer

    value.deserializer= org.apache.kafka.common.serialization.StringDeserializer

    max.poll.records= 1

    auto.offset.reset= earliest

    Example 2: Connecting to a server running over SSL

    acks=all

    bootstrap.servers= <server>

    key.serializer= org.apache.kafka.common.serialization.StringSerializer

    value.serializer= org.apache.kafka.common.serialization.StringSerializer

    key.deserializer= org.apache.kafka.common.serialization.StringDeserializer

    value.deserializer= org.apache.kafka.common.serialization.StringDeserializer

    max.poll.records= 1

    auto.offset.reset= earliest

    ssl.protocol=TLSv1.2

    security.protocol=SSL

    ssl.keystore.location=/root/kafka_ssl/kafka.server.keystore.jks

    ssl.keystore.password=<password>

    ssl.key.password=<password>

    ssl.truststore.location=/root/kafka_ssl/kafka.server.truststore.jks

    ssl.truststore.password=<password>

    ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1

  • Topic

    The topic within Apache Kafka to which the Consumer Job subscribes.

  • Group ID

    If the Consumer Job is part of a Consumer Group, this is the ID that uniquely identifies the group. All consumers in a group share the Group ID. For more information on Consumer Groups, see Consumer Groups.

  • Define Failure Criteria

    Click this checkbox if you want Automic Automation to fail messages in case specific conditions occur. When you click it, the Failure Key & Value Parameters and Failure Header Parameters sections are displayed. Here you enter the Key/Value pairs that must be true for Automic Automation to fail the Job.

Configuring Key & Value Parameters for Consumer Jobs

In this section, you configure the filter that listens to the key-value pairs of the messages that are written to the topic to which this Job is subscribed. The Value fields contain the actual message. You can enter a string or regular expression (Key/Value) or JSONPath expression (Key JSONPath/Value JSONPath).

For details on filtering Apache Kafka messages, see Filtering Kafka Messages in Consumer Jobs

Configuring Header Parameters for Consumer Jobs

In this section, you configure the filter that listens to the headers of the messages that are written to the Topic that it is subscribed to. The header contains metadata that provide additional information about the message itself. You can enter a string or regular expression (Header Key/Header Value) or JSON (Header Key JSONPath/Header Value JSONPath).

For details on filtering Apache Kafka messages, see Filtering Kafka Messages in Consumer Jobs

Filtering Kafka Messages in Consumer Jobs

In addition to reading messages from an Apache Kafka topic, the Automic Automation Consumer job allows you to apply filters and trigger actions when specific conditions are met.

Apache Kafka messages may contain the following information:

  • Message Headers

    There can be multiple headers, each consisting of a key and a corresponding value.

    • Header Key

    • Header Value

  • Message Key

  • Message Content / Message Value

Defining Filters on Message Headers in Consumer Jobs

Use the Header Parameters section to define filters on message headers.

  • Header Key

    The provided value is matched against keys in the message headers. It supports regular expressions.

  • Header Key JSON Path

    Use this field if the Header Key contains data in JSON format. Provide the JSON Path expression on the header key to match the specified Header Key against the JSON expression result.

  • Header Value

    The provided value is matched against values in the message headers. It supports regular expressions.

  • Header Value JSON Path

    Use this field if the Header Value contains data in JSON format. Provide the JSON Path expression on the header to match the specified Header Value against the JSON expression result.

Defining Filters on Keys/Message Content in Consumer Jobs

Use the Key & Value Parameters section to define filters on Keys/Message Content.

  • Key

    The provided value is matched against message key in the message. It supports regular expressions.

  • Key JSON Path

    Use this field if the Message Key contains data in JSON format. Provide the JSON Path expression on the Message Key to match the provided Key against JSON expression result.

  • Value

    The provided value is matched against the Message Content in the message. It supports regular expressions.

  • Value JSON Path

    Use this field if the Message Content contains data in JSON format. Provide the JSON Path expression on the Message Content to match the provided Value against JSON expression result.

Apache Kafka Consumer Job Processing

Apache Kafka Consumer Job processing goes through the following steps:

  • Wait for an Apache Kafka message to arrive.

  • An Apache Kafka message is received.

  • If no conditions or filters are defined, the job ends with a success status: JOB ENDED_OK.

  • The job checks for any defined failure conditions. If such conditions exist and are met by the Apache Kafka message, the job ends with a failed status: JOB ENDED_NOT_OK.

  • The job then checks if Header and Key/Value conditions are defined. If these conditions are met by the Apache Kafka message, the job ends with a success status: JOB ENDED_OK.

  • Continue waiting for the next Apache Kafka message if the job has not yet ended.

Consumer Job Examples

Filtering Apache Kafka Messages

Let's consider the following Apache Kafka message:

 

  • Message Headers is defined as follows:

    {

    "destination" : "Automic",

    "messageformat": "JSON"

    }

    Message Headers has two headers where destination" is the key and Automic the value. messageformat is the key and JSON is the value.

    Message Key is defined as Automic and the Message Content is defined as follows:

    {

    "eventstatus": "SUCCESS",

    "date": "2023-07-12",

    "eventtype": "TRIGGER_JOB"

    }

Filtering Apache Kafka Messages: Use Case 1

Let's consider you want to filter for Apache Kafka messages that meet the following conditions:

Message Headers should have destination as key and Automic as value.

Message Data should have eventstatus as SUCCESS.

You could configure the following in the Automic Automation Consumer Job:

  • Header Parameters section

    Header Key: destination

    Header Value: Automic

  • Key & Value Parameters section

    Value: SUCCESS

    Value JSON Path: $.eventstatus

Screenshot showing the Consumer Job page with the fields filled in for Use Case 1.

Filtering Apache Kafka Messages: Use Case 2

Now, let's filter for Apache Kafka messages that meet the following conditions:

Message Headers should have "destination" as key and Automic as value.

Message Data should have eventstatus as SUCCESS and date as 2023-07-12.

You could configure the following in the Automic Automation Consumer Job:

  • Header Parameters section

    Header Key: destination

    Header Value: Automic

  • Key & Value Parameters section

    Value: (?s)(?=.*"eventstatus":\s*"SUCCESS")(?=.*"date":\s*"2023-07-12")

Screenshot showing the Consumer Job page with the fields filled in for Use Case 1.

Example of a JSONPath Expression

A JSONPath expression begins with $ followed by a sequence of child elements separated by either a period (dot notation) or square brackets (bracket notation).

Important!

The Automic Automation / Apache Kafka Agent integration supports the dot notation only.

Let's suppose that the following message using a JSONPath expression arrives from the Apache Kafka Topic :

{
  "event": {
    "pet": "dog",
    "data": {
      "name": "Rex",
      "gender": "male"
    }
  }
}

In this message, event is the root node, pet and data are child nodes and name and gender are child elements of data. You can extract the pet name as follows:

  • Using the dot notation

    $.event.data.name

  • Using dot notation and regex

    event.data.*

Note:

Use JSONPath fields only if the message and key values in the Producer contain JSON data.

See also: