Kafka (om_kafka)

This module implements an Apache Kafka producer for publishing event records to a Kafka topic. See also the im_kafka module.

The om_kafka module is not supported on IBM AIX as the underlying librdkafka library is unstable on this operating system. Use om_kafka on IBM AIX at your own risk.

The module uses an internal persistent queue to back up event records that should be pushed to a Kafka broker. Once the module receives an acknowledgement from the Kafka server that the message has been delivered successfully, the module removes the corresponding message from the internal queue. If the module is unable to deliver a message to a Kafka broker (for example, due to connectivity issues or the Kafka server being down), this message is retained in the internal queue (including cases when NXLog Agent restarts) and the module will attempt to re-deliver the message again.

The number of re-delivery attempts can be specified by passing the message.send.max.retries property via the Option directive (for example, Option message.send.max.retries 5). By default, the number of retries is set to 2 and the time interval between two subsequent retries is 5 minutes. Thus, by altering the number of retries, it is possible to control the total time for a message to remain in the internal queue. If a message cannot be delivered within the allowed retry attempts, the message is dropped. The maximum size of the internal queue defaults to 100 messages. To increase the size of the internal queue, you can use the LogqueueSize directive.

Configuration

The om_kafka module accepts the following directives in addition to the common module directives. The BrokerList and Topic directives are required.

Required directives

The following directives are required for the module to start.

BrokerList

This directive specifies the list of Kafka brokers to connect to for publishing logs. The list should include ports and be comma-delimited (for example, localhost:9092,192.168.88.35:19092).

Kafka brokers are metadata relays that return information about the cluster, including the available topics and how they are partitioned. NXLog Agent connects to the broker(s) specified in the BrokerList directive to retrieve the actual URI it needs to connect to for reading or writing data to the relevant topic. If NXLog Agent connects to the broker but fails to read or write data, ensure the broker is returning the correct endpoint information. See the Kafka documentation on listeners and advertised.listeners for more information.

Topic

This directive specifies the initial Kafka topic to publish records to. You can change the active topic after the module starts with the set_kafka_topic() procedure.

SASL directives

The following directives are used for authentication and data security using SASL.

SASLKerberosServiceName

This directive specifies the Kerberos service name to be used for SASL authentication. The service name is required for the GSSAPI SASLMechanism.

SASLKerberosPrincipal

This specifies the client’s Kerberos principal name for the sasl_plaintext and sasl_ssl protocols. This directive is only available on Linux/UNIX and is mandatory when SASLMechanism is set to GSSAPI. See note below.

SASLKerberosKeytab

Specifies the path to the kerberos keytab file which contains the client’s allocated principal name. This directive is only available on Linux/UNIX and is mandatory when SASLMechanism is set to GSSAPI.

The SASLKerberosServiceName and SASLKerberosPrincipal directives are only available on Linux/UNIX. On Windows, the login user’s principal name and credentials are used for SASL/Kerberos authentication.

For details about configuring Apache Kafka brokers to accept SASL/Kerberos authentication from clients, please follow the instructions provided by the librdkafka project:

SASLMechanism

This optional directive specifies the SASL mechanism to use for authentication. Supported mechanisms are GSSAPI (the default) and OAUTHBEARER. This directive can only be specified, when Protocol is set to sasl_plaintext or sasl_ssl.

SASLOAuthBearerClientID

This directive specifies the public identifier for the application. It must be unique across all clients that the authorization server handles. This directive is mandatory when SASLMechanism is set to OAUTHBEARER.

SASLOAuthBearerClientSecret

This directive specifies the secret known only to the application and the authorization server. This should be a sufficiently random string that is not guessable. This directive is mandatory when SASLMechanism is set to OAUTHBEARER.

SASLOAuthBearerEndpointURL

This directive specifies the OAUTH issuer token endpoint HTTP(S) URI used to retrieve the token. This directive is mandatory when SASLMechanism is set to OAUTHBEARER.

SASLOAuthBearerExtensions

This directive specifies additional parameters as a comma-separated list of key=value pairs to be provided to the broker. For example: supportFeatureX=true,organizationId=sales-emea. This directive is optional and only available when SASLMechanism is set to OAUTHBEARER.

SASLOAuthBearerScope

This directive specifies a scope of the access request to the broker. This directive is optional and only available when SASLMechanism is set to OAUTHBEARER.

TLS/SSL directives

The following directives are for configuring secure data transfer via TLS/SSL.

CAFile

This specifies the path of the certificate authority (CA) certificate, which will be used to check the certificate of the remote brokers. CAFile is required if Protocol is set to ssl or sasl_ssl. To trust a self-signed certificate presented by the remote (which is not signed by a CA), provide that certificate instead.

CAPattern

This optional directive, supported only on Windows, defines a pattern for locating a suitable CA (Certificate Authority) certificate and its thumbprint in the native Windows Certificate Storage. The pattern must follow PCRE2 rules and use the format "SUBJECT=, CN=, DN=, SAN=" where DN is "CN=, O=, OU=, L=, ST=, C=". During configuration, this directive resolves into the corresponding CAThumbprint value. If multiple matching certificates are found, the first encountered thumbprint is selected. We recommend ensuring that the used certificate storage is well-maintained for optimal performance. This feature is not dynamic; the agent must be restarted if the certificate changes. This directive is mutually exclusive with the CAThumbprint directive.

Configuration examples:

CAPattern    'Test' + ' ' + 'Root'

or

CAPattern    $domain

A normal log output example would look like as follows:

matching pattern [DN=CN=Client\.example\.com;.*?SAN=DNS:Client\.example\.com] to certificate [SUBJECT=US, ClientState, ClientCity, ClientCompany, ClientUnit, Client.example.com, CN=Client.example.com; DN=CN=Client.example.com, O=ClientCompany, OU=ClientUnit, L=ClientCity, ST=ClientState, C=US; SAN=DNS:Client.example.com; DNS:www.Client.example.com; IP:127.0.0.3; ]

CAThumbprint

This optional directive specifies the thumbprint of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote server. The hexadecimal fingerprint string can be copied from Windows Certificate Manager (certmgr.msc). Whitespaces are automatically removed. The certificate must be added to a Windows certificate store that is accessible by NXLog Agent. This directive is only supported on Windows and is mutually exclusive with the CADir, CAFile and CAPattern directives.

CertFile

This specifies the path of the certificate file to be used for the SSL handshake.

CertKeyFile

This specifies the path of the certificate key file to be used for the SSL handshake.

Compression

This directive specifies the compression types to use during transfer. Available types depend on the Kafka library, and should include none (the default), gzip, snappy, and lz4.

KeyPass

With this directive, a password can be supplied for the certificate key file defined in CertKeyFile. This directive is not needed for passwordless private keys.

Optional directives

MaxTopics

This directive defines the maximum number of topic handlers om_kafka caches. The module starts with one topic handler, defined by the Topic directive, and creates further topic handlers when you call the set_kafka_topic() procedure. If the number of saved topics reaches the limit, the module will delete the topic handler unused for the longest time before creating a new one. This directive accepts a number greater than zero. The default value is 20. If you set an invalid value, the module logs a warning and reverts to the default.

Determining the optimal maximum value depends on the number of topics you’re using and your environment. An insufficient number of allocated topic handlers will result in more CPU usage. However, more topic handlers require more memory.

Option

This directive can be used to pass a custom configuration property to the Kafka library (librdkafka). For example, the group ID string can be set with Option group.id mygroup, or SCRAM authentication can be enabled by defining Option sasl.mechanism SCRAM-SHA-256. This directive may be used more than once to specify multiple options. For a list of configuration properties, see the librdkafka CONFIGURATION.md file.

Passing librdkafka configuration properties via the Option directive should be done with care since these properties are used for the fine-tuning of the librdkafka performance and may result in various side effects.

Partition

This optional integer directive specifies the topic partition to write to. If this directive is not given, messages are sent without a partition specified.

Protocol

This optional directive specifies the protocol to use for connecting to the Kafka brokers. Accepted values include plaintext (the default), ssl, sasl_plaintext and sasl_ssl. If Protocol is set to ssl or sasl_ssl, then the CAFile directive must also be provided.

Functions

The following functions are exported by om_kafka.

string get_stats()

Return the statistic string.

Procedures

The following procedures are exported by om_kafka.

set_kafka_topic(string topic_name);

Change the Kafka Topic that the module publishes records to.

If topic_name matches an existing topic, it is set as the current (active) topic. If topic_name doesn’t match an existing topic, a new one is created and set as the current topic.

If the topic name is not a valid string, the topic will not be changed.

Examples

Example 1. Using the om_kafka module

This configuration sends events to a Kafka cluster using the brokers specified. Events are published to the first partition of the nxlog topic.

nxlog.conf
<Output out>
    Module          om_kafka
    BrokerList      localhost:9092,192.168.88.35:19092
    Topic           nxlog
    LogqueueSize    100000
    Partition       0
    Protocol        ssl
    CAFile          %CERTDIR%/ca.pem
    CertFile        %CERTDIR%/client-cert.pem
    CertKeyFile     %CERTDIR%/client-key.pem
    KeyPass         thisisasecret
</Output>
Example 2. Using the om_kafka module with SASL OAUTHBEARER authentication

This configuration sends events to a Kafka cluster using the brokers specified. Authentication is done via SASL OAUTHBEARER mechanism with support for OIDC. Events are published to the first partition of the nxlog topic.

nxlog.conf
<Output out>
    Module                      om_kafka
    LogqueueSize                100000

    BrokerList                  localhost:9092,192.168.88.35:19092
    Topic                       nxlog
    Partition                   0

    Protocol                    sasl_ssl

    CAFile                      %CERTDIR%/ca.pem
    CertFile                    %CERTDIR%/client-cert.pem
    CertKeyFile                 %CERTDIR%/client-key.pem
    KeyPass                     thisisasecret

    SASLMechanism               OAUTHBEARER
    SASLOAuthBearerClientID     client-id
    SASLOAuthBearerClientSecret client-secret
    SASLOAuthBearerEndpointURL  https://oauth2.endpoint.com/
    SASLOAuthBearerExtensions   key1=value1,key2=value2
    SASLOAuthBearerScope        write
</Output>

The built-in support for SCRAM in the librdkafka library and the flexibility of the Option directive implemented in NXLog Agent Agent allow you to set up a secure connection between your broker and NXLog Agent Agent. This type of connection is considered the most secure when communicating with Kafka. By default, the Kafka SCRAM authentication only requires three parameters to be set from the librdkafka library:

  • Username

  • Password

  • SASL Mechanism

It is important to note that the Kafka connection’s prerequisites may differ depending on the broker you are connecting to. Vendors usually have detailed documentation on their requirements so that you can find the correct settings for the Options directive.

See the Integrate with a SIEM service using Kafka part in the Citrix documentation and the Configure SASL/SCRAM authentication for Confluent Platform in the Confluence documentation to understand better some of the unique requirements you may encounter for a SCRAM Kafka connection. Both of these are great examples, yet many others exist.

Example 3. Using the om_kafka module with SASL SCRAM authentication

This example utilizes a set of dedicated directives to define Kafka parameters and another set of Option directives to define additional Kafka parameters related to SCRAM.

nxlog.conf
<Output out>
    Module         om_kafka
    BrokerList     localhost:9092,192.168.88.35:19092
    Topic          YOURTOPIC
    Protocol       sasl_ssl
    CAFile         %CERTDIR%/kafka.client.truststore.pem
    CertKeyFile    ssl.ca.location %CERTDIR%/kafka.client.truststore.pem
    Option         sasl.mechanism SCRAM-SHA-256   (1)
    Option         sasl.username USERNAME         (2)
    Option         sasl.password PASSWORD         (3)
    Option         session.timeout.ms 60000       (4)
    Option         auto.offset.reset earliest     (5)
</Output>
1 Defines the SASL authentication mechanism
2 Your Kafka username
3 Your Kafka password
4 Session timeout for detecting client failures
5 Defines the behavior while consuming data from a topic partition when there is no initial offset

The librdkafka library can produce its performance statistics and format it in JSON. All fields from the JSON structure are explained on the Statistics page of the librdkafka project on the GitHub website. NXLog Agent can be configured to poll this data at a specified fixed interval. The result can be saved to the internal logger.

Example 4. Collecting Internal Statistics

To read statistical data of the librdkafka library, the millisecond polling interval needs to be specified against the Option directive using the statistics.interval.ms option.

The Schedule block sets the interval to run the code of the nested Exec block. Inside the Exec block, the log_info() procedure is called with the kafka_in->get_stats() parameter.

To get the librdkafka statistics produced and delivered synchronously, the statistics.interval.ms option and the Schedule block should specify the same interval amount.

nxlog.conf
<Input to_kafka>
    Module        im_kafka
    Topic         nxlog
    BrokerList    localhost:9092
    Option        statistics.interval.ms 10000
    <Schedule>
        Every     10 sec
        Exec      log_info(to_kafka->get_stats());
    </Schedule>
</Input>

You may encounter a scenario where you want to split your processed input logs into different Kafka topics based on specific conditions. You can achieve this by configuring dynamic topics. With this module’s "dynamic topics" concept, you can route your log messages to different Kafka topics using the set_kafka_topic() procedure. For this example, regular expressions are used for each condition.

Example 5. Using dynamic Kafka topics

The below configuration reads logs from a file. On the output side, it processes each log line to determine the appropriate Kafka topic based on the log’s content using regular expression 1, 2, and 3 and then sends these logs to a Kafka server.

These expressions are evaluated in order in an Exec block for each log event. If a log event matches regular expression 1, the Kafka topic is set to the value captured in $1, and the log is written into that topic on the Kafka server. If the topic does not exist, it is automatically created. Similarly, for regular expression 2 and 3, the Kafka topic is set based on the respective captured values $2 and $3. If none of the regular expressions match, the Kafka topic is set to default-26.

nxlog.conf
<Input input>
    Module        im_file
    File          "/input/logs"
</Input>
 
<Output output>
    Module        om_kafka
    BrokerList    192.168.0.10:9092
    Topic         nxlog
    Partition     0
    <Exec> (1)
        if $raw_event =~ /^ regular expression 1/ (2)
        {
            set_kafka_topic($1);
            log_info("topic : " + $1 + " type of " + type($1));
        }
        else if ($raw_event =~ /^ regular expression 2/) (3)
        {
            set_kafka_topic($2);
            log_info("topic : " + $2 + " type of " + type($2));
        }
        else if ($raw_event =~ /^ regular expression 3/) (4)
        {
            set_kafka_topic($3);
            log_info("topic : " + $3 + " type of " + type($3));
        }
        else (5)
        {
            set_kafka_topic("default-26"); 
            log_info("topic : " + "default-26" + " type of " + type(default-26));
        }
    </Exec>
</Output>
1 The Exec block with the if-else statements for evaluating the logs
2 The if statement defining the first condition with regular expression 1
3 The else if statement defining the second condition with regular expression 2
4 The else if statement defining the third condition with regular expression 3
5 The else statement defining the final condition if none of the previous are met