Kafka (om_kafka)
This module implements an Apache Kafka producer for publishing event records to a Kafka topic. See also the im_kafka module.
| The om_kafka module is not supported on IBM AIX as the underlying librdkafka library is unstable on this operating system. Use om_kafka on IBM AIX at your own risk. | 
The module uses an internal persistent queue to back up event records that should be pushed to a Kafka broker. Once the module receives an acknowledgement from the Kafka server that the message has been delivered successfully, the module removes the corresponding message from the internal queue. If the module is unable to deliver a message to a Kafka broker (for example, due to connectivity issues or the Kafka server being down), this message is retained in the internal queue (including cases when NXLog Agent restarts) and the module will attempt to re-deliver the message again.
The number of re-delivery attempts can be specified by passing the
message.send.max.retries property via the Option
directive (for
example, Option message.send.max.retries 5). By default, the number of retries
is set to 2 and the time interval between two subsequent retries is 5 minutes.
Thus, by altering the number of retries, it is possible to control the total
time for a message to remain in the internal queue. If a message cannot be
delivered within the allowed retry attempts, the message is dropped. The
maximum size of the internal queue defaults to 100 messages. To increase the
size of the internal queue, you can use the LogqueueSize directive.
Configuration
The om_kafka module accepts the following directives in addition to the common module directives. The BrokerList and Topic directives are required.
Required directives
The following directives are required for the module to start.
| This directive specifies the list of Kafka brokers to connect to for publishing logs.
The list should include ports and be comma-delimited (for example,  
 | |||
| This directive specifies the initial Kafka topic to publish records to. You can change the active topic after the module starts with the set_kafka_topic() procedure. | 
SASL directives
The following directives are used for authentication and data security using SASL.
om_kafka relies on the librdkafka library for authentication.
OS-specific NXLog Agent installation packages use the library available on the OS.
The generic DEB and RPM installation packages include the librdkafka library without libcurl.
The SASL features available to om_kafka depend on how librdkafka was built.
Refer to the librdkafka instructions for configuring Apache Kafka brokers to accept SASL/Kerberos authentication from clients:
- 
For Kafka brokers running on Linux and UNIX: Using SASL with librdkafka 
- 
For Kafka brokers running on Windows: Using SASL with librdkafka on Windows 
| The Kerberos service name to be used for SASL authentication.
The service name is required when SASLMechanism is set to  This directive is only available on Linux and UNIX. On Windows, the login user’s principal name and credentials are used for SASL/Kerberos authentication. | |
| The client’s Kerberos principal name for the  This directive is only available on Linux and UNIX. On Windows, the login user’s principal name and credentials are used for SASL/Kerberos authentication. | |
| The path to the Kerberos keytab file containing the client’s allocated principal name. This directive is only available on Linux and UNIX and it is required when SASLMechanism is set to  | |
| This optional directive specifies SASL mechanism to use for authentication.
Supported mechanisms are  | |
| This directive specifies the public identifier for the application.
It must be unique across all clients that the authorization server handles.
This directive is mandatory when SASLMechanism is set to  | |
| This directive specifies the secret known only to the application and the authorization server.
This should be a sufficiently random string that is not guessable. This directive is mandatory when SASLMechanism is set to  | |
| This directive specifies the OAUTH issuer token endpoint HTTP(S) URI used to retrieve the token.
This directive is mandatory when SASLMechanism is set to  | |
| This directive specifies additional parameters as a comma-separated list of key=value pairs to be provided to the broker.
For example:  | |
| This directive specifies the scope of the access request to the broker.
This directive is optional and only available when SASLMechanism is set to  | 
TLS/SSL directives
The following directives configure secure data transfer via TLS/SSL.
| Specifies if the connection should be allowed with an expired certificate.
If set to  | |||
| Specifies if the certificate FQDN should be validated against the server hostname or not.
If set to  | |||
| Specifies if the connection should be allowed regardless of the certificate verification results.
If set to  | |||
| Path to a directory containing certificate authority (CA) certificates. These certificates will be used to verify the certificate presented by the remote host. The certificate files must be named using the OpenSSL hashed format, i.e. the hash of the certificate followed by .0, .1 etc. To find the hash of a certificate using OpenSSL: For example, if the certificate hash is  A remote host’s self-signed certificate (which is not signed by a CA) can also be trusted by including a copy of the certificate in this directory. | |||
| Path of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote host. A remote host’s self-signed certificate (which is not signed by a CA) can be trusted by specifying the remote host certificate itself. In the case of certificates signed by an intermediate CA, the certificate specified must contain the complete certificate chain (certificate bundle). | |||
| You can use this directive on Windows to specify a PCRE2-compliant regular expression for locating a suitable Certificate Authority (CA) certificate from the Windows Certificate Store. The pattern must be in the format  The above configuration will result in the following logging in the NXLog Agent log file: If the pattern matches multiple certificates, NXLog Agent will use the first one. If the certificate changes, you must restart NXLog Agent for it to start using the new certificate. This directive is mutually exclusive with the CAThumbprint, CADir, and CAFile directives. | |||
| This optional directive, supported only on Windows, specifies the thumbprint of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote host. The hexadecimal fingerprint string can be copied from Windows Certificate Manager (certmgr.msc). Whitespaces are automatically removed. The certificate must be added to a Windows certificate store that is accessible by NXLog Agent. This directive is mutually exclusive with the CADir, CAFile and CAPattern directives. | |||
| Path of the certificate file that will be presented to the remote host during the SSL handshake. | |||
| Path of the private key file that was used to generate the certificate specified by the CertFile directive. This is used for the SSL handshake. | |||
| This optional directive, supported only on Windows, defines a pattern for identifying a corresponding certificate and its thumbprint within the native Windows Certificate Storage.
The pattern must follow PCRE2 rules and use the format  
 Configuration examples: or A normal log output example would look like as follows:  | |||
| This optional directive, supported only on Windows, specifies the thumbprint of the certificate that will be presented to the remote server during the HTTPS handshake.
The hexadecimal fingerprint string can be copied from Windows Certificate Manager (certmgr.msc).
Whitespaces are automatically removed.
The certificate must be imported to the  When the global directive UseCNGCertificates is set to  
 On the contrary, when the global directive UseCNGCertificates is set to  The usage of the directive is the same in all cases: This directive is only supported on Windows and is mutually exclusive with the CertFile and CertKeyFile directives. | |||
| Path to a directory containing certificate revocation list (CRL) files. These CRL files will be used to check for certificates that were revoked and should no longer be accepted. The files must be named using the OpenSSL hashed format, i.e. the hash of the issuer followed by .r0, .r1 etc. To find the hash of the issuer of a CRL file using OpenSSL: For example if the hash is  | |||
| Path of the certificate revocation list (CRL) which will be used to check for certificates that have been revoked and should no longer be accepted. Example to generate a CRL file using OpenSSL:  | |||
| This optional directive specifies a file with dh-parameters for Diffie-Hellman key exchange. These parameters can be generated with dhparam(1ssl). If this directive is not specified, default parameters will be used. See the OpenSSL Wiki for further details. | |||
| Passphrase of the private key specified by the CertKeyFile directive. A passphrase is required when the private key is encrypted. The following example generates a private key with Triple DES encryption using OpenSSL: This directive is not required for passwordless private keys. | |||
| If set to  This directive is only supported on Windows. | |||
| Specifies if the remote host must present a certificate.
If set to  | |||
| This optional directive, supported only on Windows, if set to  | |||
| The signature algorithm parameter that is being sent to the Windows SSL library. Allowed values depend on the available encryption providers. This directive is only supported on Windows. | |||
| This optional directive specifies the hostname used for Server Name Indication (SNI). | |||
| This optional directive can be used to set the permitted cipher list for TLSv1.2 and below, overriding the default.
Use the format described in the ciphers(1ssl) man page.
For example specify  
 | |||
| This optional directive can be used to set the permitted cipher list for TLSv1.3. Use the same format as in the SSLCipher directive. Refer to the OpenSSL documentation for a list of valid TLS v1.3 cipher suites. The default value is: 
 | |||
| Specifies if data compression is enabled when sending data over the network.
The compression mechanism is based on the zlib compression library.
The default value is  
 | |||
| This directive can be used to set the allowed SSL/TLS protocol(s). It takes a comma-separated list of values which can be any of the following:  | |||
| This optional directive enables the logging of the TLS protocol version and cipher suite upon a successful SSL/TLS handshake.
Setting this directive to  | |||
| If set to TRUE, the module uses the Windows Cryptography API: Next Generation (CNG) to access the private keys associated with certificates identified by a thumbprint. This directive is only supported on Windows. | 
Optional directives
| Specifies the compression types to use during transfer.
The available types depend on the Kafka library ( 
 | |||
| This directive defines the maximum number of topic handlers om_kafka caches.
The module starts with one topic handler, defined by the Topic directive, and creates further topic handlers when you call the set_kafka_topic() procedure.
If the number of saved topics reaches the limit, the module will delete the topic handler unused for the longest time before creating a new one.
This directive accepts a number greater than zero.
The default value is  Determining the optimal maximum value depends on the number of topics you’re using and your environment. An insufficient number of allocated topic handlers will result in more CPU usage. However, more topic handlers require more memory. | |||
| Pass a custom configuration property to the librdkafka library.
For example, you can set the group ID string with  You can specify this directive multiple times. 
 | |||
| This optional integer directive specifies the topic partition to write to. If this directive is not given, messages are sent without a partition specified. | |||
| This optional directive specifies the protocol to use for connecting to the Kafka brokers.
Accepted values include  | 
Functions
The following functions are exported by om_kafka.
-  type: string get_stats()
- 
Return the statistic string. 
Procedures
The following procedures are exported by om_kafka.
-  set_kafka_topic(type: string topic_name);
- 
Change the Kafka Topic that the module publishes records to. If topic_namematches an existing topic, it is set as the current (active) topic. Iftopic_namedoesn’t match an existing topic, a new one is created and set as the current topic.If the topic name is not a valid string, the topic will not be changed. 
Examples
This configuration sends events to a Kafka cluster using the brokers
specified. Events are published to the first partition of the nxlog topic.
<Output out>
    Module          om_kafka
    BrokerList      localhost:9092,192.168.88.35:19092
    Topic           nxlog
    LogqueueSize    100000
    Partition       0
    Protocol        ssl
    CAFile          %CERTDIR%/ca.pem
    CertFile        %CERTDIR%/client-cert.pem
    CertKeyFile     %CERTDIR%/client-key.pem
    KeyPass         thisisasecret
</Output>This configuration sends events to a Kafka cluster using the brokers
specified. Authentication is done via SASL OAUTHBEARER mechanism with
support for OIDC. Events are published to the first partition of
the nxlog topic.
<Output out>
    Module                      om_kafka
    LogqueueSize                100000
    BrokerList                  localhost:9092,192.168.88.35:19092
    Topic                       nxlog
    Partition                   0
    Protocol                    sasl_ssl
    CAFile                      %CERTDIR%/ca.pem
    CertFile                    %CERTDIR%/client-cert.pem
    CertKeyFile                 %CERTDIR%/client-key.pem
    KeyPass                     thisisasecret
    SASLMechanism               OAUTHBEARER
    SASLOAuthBearerClientID     client-id
    SASLOAuthBearerClientSecret client-secret
    SASLOAuthBearerEndpointURL  https://oauth2.endpoint.com/
    SASLOAuthBearerExtensions   key1=value1,key2=value2
    SASLOAuthBearerScope        write
</Output>The built-in support for SCRAM in the librdkafka library and the flexibility of the Option directive implemented in NXLog Agent Agent allow you to set up a secure connection between your broker and NXLog Agent Agent.
This type of connection is considered the most secure when communicating with Kafka.
By default, the Kafka SCRAM authentication only requires three parameters to be set from the librdkafka library:
- 
Username 
- 
Password 
- 
SASL Mechanism 
It is important to note that the Kafka connection’s prerequisites may differ depending on the broker you are connecting to. Vendors usually have detailed documentation on their requirements so that you can find the correct settings for the Options directive.
| See the Integrate with a SIEM service using Kafka part in the Citrix documentation and the Configure SASL/SCRAM authentication for Confluent Platform in the Confluence documentation to understand better some of the unique requirements you may encounter for a SCRAM Kafka connection. Both of these are great examples, yet many others exist. | 
This example utilizes a set of dedicated directives to define Kafka parameters and another set of Option directives to define additional Kafka parameters related to SCRAM.
<Output out>
    Module         om_kafka
    BrokerList     localhost:9092,192.168.88.35:19092
    Topic          YOURTOPIC
    Protocol       sasl_ssl
    CAFile         %CERTDIR%/kafka.client.truststore.pem
    CertKeyFile    ssl.ca.location %CERTDIR%/kafka.client.truststore.pem
    Option         sasl.mechanism SCRAM-SHA-256   (1)
    Option         sasl.username USERNAME         (2)
    Option         sasl.password PASSWORD         (3)
    Option         session.timeout.ms 60000       (4)
    Option         auto.offset.reset earliest     (5)
</Output>| 1 | Defines the SASL authentication mechanism | 
| 2 | Your Kafka username | 
| 3 | Your Kafka password | 
| 4 | Session timeout for detecting client failures | 
| 5 | Defines the behavior while consuming data from a topic partition when there is no initial offset | 
The librdkafka library produces internal performance metrics in JSON format.
For more information on the JSON structure, see Statistics in the librdkafka documentation.
You can configure NXLog Agent to poll these metrics at a specified interval.
This configuration writes events to a Kafka topic named nxlog.
It also polls for librdkafka performance metrics every 10 seconds using the get_stats() function.
To produce and deliver librdkafka metrics synchronously, the statistics.interval.ms Option and the Schedule block should specify the same time interval.
The configuration writes the librdkafka metrics in the NXLog Agent log file using the log_info() procedure.
<Output kafka>
    Module        om_kafka
    Topic         nxlog
    BrokerList    localhost:9092
    Option        statistics.interval.ms 10000
    <Schedule>
        Every     10 sec
        Exec      log_info(kafka->get_stats());
    </Schedule>
</Output>You may encounter a scenario where you want to split your processed input logs into different Kafka topics based on specific conditions. You can achieve this by configuring dynamic topics. With this module’s "dynamic topics" concept, you can route your log messages to different Kafka topics using the set_kafka_topic() procedure. For this example, regular expressions are used for each condition.
The below configuration reads logs from a file.
On the output side, it processes each log line to determine the appropriate Kafka topic based on the log’s content using regular expression 1, 2, and 3 and then sends these logs to a Kafka server.
These expressions are evaluated in order in an Exec block for each log event.
If a log event matches regular expression 1, the Kafka topic is set to the value captured in $1, and the log is written into that topic on the Kafka server.
If the topic does not exist, it is automatically created.
Similarly, for regular expression 2 and 3, the Kafka topic is set based on the respective captured values $2 and $3.
If none of the regular expressions match, the Kafka topic is set to default-26.
<Input input>
    Module        im_file
    File          "/input/logs"
</Input>
 
<Output output>
    Module        om_kafka
    BrokerList    192.168.0.10:9092
    Topic         nxlog
    Partition     0
    <Exec> (1)
        if $raw_event =~ /^ regular expression 1/ (2)
        {
            set_kafka_topic($1);
            log_info("topic : " + $1 + " type of " + type($1));
        }
        else if ($raw_event =~ /^ regular expression 2/) (3)
        {
            set_kafka_topic($2);
            log_info("topic : " + $2 + " type of " + type($2));
        }
        else if ($raw_event =~ /^ regular expression 3/) (4)
        {
            set_kafka_topic($3);
            log_info("topic : " + $3 + " type of " + type($3));
        }
        else (5)
        {
            set_kafka_topic("default-26"); 
            log_info("topic : " + "default-26" + " type of " + type(default-26));
        }
    </Exec>
</Output>| 1 | The Execblock with the if-else statements for evaluating the logs | 
| 2 | The ifstatement defining the first condition withregular expression 1 | 
| 3 | The else ifstatement defining the second condition withregular expression 2 | 
| 4 | The else ifstatement defining the third condition withregular expression 3 | 
| 5 | The elsestatement defining the final condition if none of the previous are met | 
