Kafka (im_kafka)

This module implements an Apache Kafka consumer for collecting event records from a Kafka topic. See also the om_kafka module.

To examine the supported platforms, see the list of installation packages.

Configuration

The im_kafka module accepts the following directives in addition to the common module directives. The BrokerList and one of the mutually exclusive Topic, Assign or Subscribe directives are required.

Required directives

The following directives are required for the module to start.

Assign

This directive specifies a list of Kafka topic:partition pairs from which to collect records. In conjunction with GroupID and setting SavePos to broker, it also allows saving read offsets on the Kafka server.

BrokerList

This mandatory directive specifies the list of Kafka brokers to connect to for collecting logs. The list should include ports and must be comma-delimited, e.g., localhost:9092,192.168.88.35:19092. This directive is equal to bootstrap.servers.

Kafka brokers are metadata relays that return information about the cluster, including the available topics and how they are partitioned. NXLog Agent connects to the broker(s) specified in the BrokerList directive to retrieve the actual URI it needs to connect to for reading or writing data to the relevant topic. If NXLog Agent connects to the broker but fails to read or write data, ensure the broker is returning the correct endpoint information. See the Kafka documentation on listeners and advertised.listeners for more information.

Subscribe

This directive specifies a list of Kafka topics. The im_kafka instance will participate in automatic partition assignment for the Kafka consumer group specified by GroupID.

Topic

This mandatory directive specifies the Kafka topic from which to collect records.

SASL directives

The following directives are used for authentication and data security using SASL.

SASLKerberosServiceName

This directive specifies the Kerberos service name to be used for SASL authentication. The service name is required for the GSSAPI SASLMechanism.

SASLKerberosPrincipal

This specifies the client’s Kerberos principal name for the sasl_plaintext and sasl_ssl protocols. This directive is only available on Linux/UNIX and mandatory when SASLMechanism is set to GSSAPI. See note below.

SASLKerberosKeytab

Specifies the path to the Kerberos keytab file which contains the client’s allocated principal name. This directive is only available on Linux/UNIX and mandatory when SASLMechanism is set to GSSAPI.

The SASLKerberosServiceName and SASLKerberosPrincipal directives are only available on Linux/UNIX. On Windows, the login user’s principal name and credentials are used for SASL/Kerberos authentication.

For details about configuring Apache Kafka brokers to accept SASL/Kerberos authentication from clients, please follow the instructions provided by the librdkafka project:

SASLMechanism

This optional directive specifies SASL mechanism to use for authentication. Supported mechanisms are GSSAPI (the default) and OAUTHBEARER. This directive can only be specified, when Protocol is set to sasl_plaintext or sasl_ssl.

SASLOAuthBearerClientID

This directive specifies the public identifier for the application. It must be unique across all clients that the authorization server handles. This directive is mandatory when SASLMechanism is set to OAUTHBEARER.

SASLOAuthBearerClientSecret

This directive specifies the secret known only to the application and the authorization server. This should be a sufficiently random string that is not guessable. This directive is mandatory when SASLMechanism is set to OAUTHBEARER.

SASLOAuthBearerEndpointURL

This directive specifies the OAUTH issuer token endpoint HTTP(S) URI used to retrieve the token. This directive is mandatory when SASLMechanism is set to OAUTHBEARER.

SASLOAuthBearerExtensions

This directive specifies additional parameters as a comma-separated list of key=value pairs to be provided to the broker. For example: supportFeatureX=true,organizationId=sales-emea. This directive is optional and only available when SASLMechanism is set to OAUTHBEARER.

SASLOAuthBearerScope

This directive specifies the scope of the access request to the broker. This directive is optional and only available when SASLMechanism is set to OAUTHBEARER.

TLS/SSL directives

The following directives configure secure data transfer via TLS/SSL.

AllowExpired

Specifies if the connection should be allowed with an expired certificate. If set to TRUE, the remote host will be able to connect with an expired certificate. The default value is FALSE: the certificate must not be expired. This directive is only valid if RequireCert is set to TRUE.

AllowHostnameValidation

Specifies if the certificate FQDN should be validated against the server hostname or not. If set to TRUE, the connection will only be allowed if the certificate FQDN corresponds to the server hostname. The default value is FALSE: the remote server hostname is not validated.

AllowUntrusted

Specifies if the connection should be allowed regardless of the certificate verification results. If set to TRUE, the remote host will be able to connect with any unexpired certificate. The default value is FALSE: the remote host must present a trusted certificate.

CADir

Path to a directory containing certificate authority (CA) certificates. These certificates will be used to verify the certificate presented by the remote host. The certificate files must be named using the OpenSSL hashed format, i.e. the hash of the certificate followed by .0, .1 etc. To find the hash of a certificate using OpenSSL:

$ openssl x509 -hash -noout -in ca.crt

For example, if the certificate hash is e2f14e4a, then the certificate filename should be e2f14e4a.0. If there is another certificate with the same hash then it should be named e2f14e4a.1 and so on.

A remote host’s self-signed certificate (which is not signed by a CA) can also be trusted by including a copy of the certificate in this directory.

CAFile

Path of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote host. A remote host’s self-signed certificate (which is not signed by a CA) can be trusted by specifying the remote host certificate itself. In the case of certificates signed by an intermediate CA, the certificate specified must contain the complete certificate chain (certificate bundle).

CAPattern

This optional directive, supported only on Windows, defines a pattern for locating a suitable CA (Certificate Authority) certificate and its thumbprint in the native Windows Certificate Storage. The pattern must follow PCRE2 rules and use the format "SUBJECT=, CN=, DN=, SAN=" where DN is "CN=, O=, OU=, L=, ST=, C=". During configuration, this directive resolves into the corresponding CAThumbprint value. If multiple matching certificates are found, the first encountered thumbprint is selected. We recommend ensuring that the used certificate storage is well-maintained for optimal performance. This feature is not dynamic; the agent must be restarted if the certificate changes. This directive is mutually exclusive with the CAThumbprint directive.

Configuration examples:

CAPattern    'Test' + ' ' + 'Root'

or

CAPattern    $domain

A normal log output example would look like as follows:

matching pattern [DN=CN=Client\.example\.com;.*?SAN=DNS:Client\.example\.com] to certificate [SUBJECT=US, ClientState, ClientCity, ClientCompany, ClientUnit, Client.example.com, CN=Client.example.com; DN=CN=Client.example.com, O=ClientCompany, OU=ClientUnit, L=ClientCity, ST=ClientState, C=US; SAN=DNS:Client.example.com; DNS:www.Client.example.com; IP:127.0.0.3; ]

This directive is only supported on Windows.

CAThumbprint

This optional directive, supported only on Windows, specifies the thumbprint of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote host. The hexadecimal fingerprint string can be copied from Windows Certificate Manager (certmgr.msc). Whitespaces are automatically removed. The certificate must be added to a Windows certificate store that is accessible by NXLog Agent. This directive is mutually exclusive with the CADir, CAFile and CAPattern directives.

CertFile

Path of the certificate file that will be presented to the remote host during the SSL handshake.

CertKeyFile

Path of the private key file that was used to generate the certificate specified by the CertFile directive. This is used for the SSL handshake.

CertPattern

This optional directive, supported only on Windows, defines a pattern for identifying a corresponding certificate and its thumbprint within the native Windows Certificate Storage. The pattern must follow PCRE2 rules and use the format "SUBJECT=, CN=, DN=, SAN=" where DN is "CN=, O=, OU=, L=, ST=, C=". The certificate must be imported in PFX format into the Local Computer\Personal certificate store for NXLog Agent to locate it. During configuration, this directive is resolved into the corresponding CertThumbprint value. The first found thumbprint will be chosen if multiple certificates match the pattern. We recommend ensuring that the used certificate storage is well-maintained for optimal performance. This feature is not dynamic; the agent must be restarted if the certificate changes. This directive is mutually exclusive with the CertThumbprint directive.

Configuration examples:

CertPattern    $hostname + 'Cert'

or

CertPattern    DN=CN=Client\.example\.com;.*?SAN=DNS:Client\.example\.com

A normal log output example would look like as follows:

matching pattern [DN=CN=Client\.example\.com;.*?SAN=DNS:Client\.example\.com] to certificate [SUBJECT=US, ClientState, ClientCity, ClientCompany, ClientUnit, Client.example.com, CN=Client.example.com; DN=CN=Client.example.com, O=ClientCompany, OU=ClientUnit, L=ClientCity, ST=ClientState, C=US; SAN=DNS:Client.example.com; DNS:www.Client.example.com; IP:127.0.0.3; ]

CertThumbprint

This optional directive, supported only on Windows, specifies the thumbprint of the certificate that will be presented to the remote server during the HTTPS handshake. The hexadecimal fingerprint string can be copied from Windows Certificate Manager (certmgr.msc). Whitespaces are automatically removed. The certificate must be imported to the Local Computer\Personal certificate store in PFX format for NXLog Agent to find it. Run the following command to create a PFX file from the certificate and private key using OpenSSL:

$ openssl pkcs12 -export -out server.pfx -inkey server.key -in server.pem

When the global directive UseCNGCertificates is set to FALSE the private key associated with the certificate must be exportable.

  • If you generate the certificate request using Windows Certificate Manager, enable the Make private key exportable option from the certificate properties.

  • If you import the certificate with the Windows Certificate Import Wizard, make sure that the Mark this key as exportable option is enabled.

  • If you migrate the certificate and associated private key from one Windows machine to another, select Yes, export the private key when exporting from the source machine.

On the contrary, when the global directive UseCNGCertificates is set to TRUE the private key associated with the certificate does not have to be exportable. In cases like TPM modules, the private key is always nonexportable.

The usage of the directive is the same in all cases:

CertThumbprint    7c2cc5a5fb59d4f46082a510e74df17da95e2152

This directive is only supported on Windows and is mutually exclusive with the CertFile and CertKeyFile directives.

CRLDir

Path to a directory containing certificate revocation list (CRL) files. These CRL files will be used to check for certificates that were revoked and should no longer be accepted. The files must be named using the OpenSSL hashed format, i.e. the hash of the issuer followed by .r0, .r1 etc. To find the hash of the issuer of a CRL file using OpenSSL:

$ openssl crl -hash -noout -in crl.pem

For example if the hash is e2f14e4a, then the filename should be e2f14e4a.r0. If there is another file with the same hash then it should be named e2f14e4a.r1 and so on.

CRLFile

Path of the certificate revocation list (CRL) which will be used to check for certificates that have been revoked and should no longer be accepted. Example to generate a CRL file using OpenSSL:

$ openssl ca -gencrl -out crl.pem

DHFile

This optional directive specifies a file with dh-parameters for Diffie-Hellman key exchange. These parameters can be generated with dhparam(1ssl). If this directive is not specified, default parameters will be used. See the OpenSSL Wiki for further details.

KeyPass

Passphrase of the private key specified by the CertKeyFile directive. A passphrase is required when the private key is encrypted.

The following example generates a private key with Triple DES encryption using OpenSSL:

$ openssl genrsa -des3 -out server.key 2048

This directive is not required for passwordless private keys.

LoadCertificateChains

If set to TRUE, try to load higher-level certificates from the referenced PEM file which may contain only one certificate or the whole chain. The default value is FALSE: certificates will be instead loaded from the operating system certification storage.

This directive is only supported on Windows.

RequireCert

Specifies if the remote host must present a certificate. If set to TRUE and a certificate is not presented during the SSL handshake, the connection will be refused. The default value is TRUE: each connection must use a certificate.

SearchAllCertStores

This optional directive, supported only on Windows, if set to TRUE, enables the loading of all available Windows certificates into NXLog Agent for use during remote certificate verification. Any required certificates must be added to a Windows certificate store that NXLog Agent can access. This directive is mutually exclusive with the CAThumbprint, CADir, and CAFile directives.

Sigalgs

The signature algorithm parameter that is being sent to the Windows SSL library. Allowed values depend on the available encryption providers.

This directive is only supported on Windows.

SNI

This optional directive specifies the hostname used for Server Name Indication (SNI).

SSLCipher

This optional directive can be used to set the permitted cipher list for TLSv1.2 and below, overriding the default. Use the format described in the ciphers(1ssl) man page. For example specify RSA:!COMPLEMENTOFALL to include all ciphers with RSA authentication but leave out ciphers without encryption.

If RSA or DSA ciphers with Diffie-Hellman key exchange are used, DHFile can be set for specifying custom dh-parameters.

SSLCiphersuites

This optional directive can be used to set the permitted cipher list for TLSv1.3. Use the same format as in the SSLCipher directive. Refer to the OpenSSL documentation for a list of valid TLS v1.3 cipher suites. The default value is:

TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256

SSLCompression

Specifies if data compression is enabled when sending data over the network. The compression mechanism is based on the zlib compression library. The default value is FALSE: compression is disabled.

Some Linux packages (for example, Debian) use the OpenSSL library provided by the OS and may not support the zlib compression mechanism. The module will emit a warning on startup if the compression support is missing. The generic deb/rpm packages are bundled with a zlib-enabled libssl library.

SSLProtocol

This directive can be used to set the allowed SSL/TLS protocol(s). It takes a comma-separated list of values which can be any of the following: SSLv2, SSLv3, TLSv1, TLSv1.1, TLSv1.2, and TLSv1.3. By default, the TLSv1.2 and TLSv1.3 protocols are allowed. Note that the OpenSSL library shipped by Linux distributions may not support SSLv2 and SSLv3, and these will not work even if enabled with this directive.

UseCNGCertificates

If set to TRUE, the module uses the Windows Cryptography API: Next Generation (CNG) to access the private keys associated with certificates identified by a thumbprint.

This directive is only supported on Windows.

Optional directives

GroupID

This directive specifies a Kafka consumer ID and is mandatory if the Subscribe directive is used and/or SavePos is set to broker. It identifies the consumer group to use for automatic partition assignment when in Subscribe mode. It is also used for Kafka offset storage when SavePos is set to broker for either Subscribe or Assign modes. In this case, offsets are stored server-side, per (group ID, topic, partition) tuple, such that each group has its offsets, but consumers that join the same group will share the same partition offsets. This directive cannot be used with the Topic directive.

Option

This directive can be used to pass a custom configuration property to the Kafka library (librdkafka). For example, the group ID string can be set with Option group.id mygroup, or SCRAM authentication can be enabled by defining Option sasl.mechanism SCRAM-SHA-256. This directive may be used more than once to specify multiple options. For a list of configuration properties, see the librdkafka CONFIGURATION.md file.

Passing librdkafka configuration properties via the Option directive should be done with care since these properties are used for the fine-tuning of the librdkafka performance and may result in various side effects.

Partition

This optional integer directive specifies the topic partition to read from. If this directive is not given, messages are collected from partition 0. It can only be used with the Topic directive.

Protocol

This optional directive specifies the protocol to use for connecting to the Kafka brokers. Accepted values include plaintext (the default) and ssl, sasl_plaintext and sasl_ssl. If Protocol is set to ssl or sasl_ssl, then the CAFile directive must also be provided.

ReadFromLast

This optional boolean directive instructs the module to only read logs that arrive after NXLog Agent is started. This directive comes into effect if a saved offset is not found, for example, on the first start or when the SavePos directive is FALSE. When the SavePos directive is TRUE, and a previously saved offset is found, the module will always resume reading from the saved offset. If ReadFromLast is FALSE, the module will read all logs from the beginning of the Kafka partition. This can result in a lot of messages and is usually not the expected behavior. It defaults to TRUE.

SavePos

This directive can specify either a boolean value or one of cache or broker values. Kafka partition offsets will be saved locally in NXLog Agent’s configuration cache when set to cache. The partition offsets will be saved on the Kafka server when set to broker. The default value, TRUE, enables local cache offset storage for the Assign mode and server-side offset storage for the Subscribe mode. Local cache offset storage is preferred for the Assign mode because it ensures "exactly once" semantics. In contrast, server-side offset storage is preferred for the Subscribe mode because offsets must be shared with other Kafka consumers even though this mode only ensures "at least once" semantics (meaning that in some rare cases, two consumers may end up consuming the same message). However, this directive’s cache and broker values allow overriding the default behavior if needed.

Creating and populating fields

When the im_kafka module reads a message from a broker, it creates and populates the following fields which are then recorded to $raw_event:

Table 1. List of fields recorded to $raw_event
Key Description

MessageKey

Optional key associated with the message.

Message

Message text.

The following core fields are also created and populated by NXLog Agent:

Table 2. List of fields
Field Description

$EventReceivedTime

The time when the event is received. The value is not modified if the field already exists.

$SourceModuleName

The name of the module instance, for input modules. The value is not modified if the field already exists.

$SourceModuleType

The type of module instance (such as im_file), for input modules. The value is not modified if the field already exists.

Functions

The following functions are exported by im_kafka.

string get_stats()

Return the statistic string.

Fields

The following fields are used by im_kafka.

$raw_event (type: string)

A list of event fields in key-value pairs.

Examples

Example 1. Using the im_kafka module

This configuration collects events from a Kafka cluster using the brokers specified. Events are read from the first partition of the nxlog topic.

nxlog.conf
<Input in>
    Module      im_kafka
    BrokerList  localhost:9092,192.168.88.35:19092
    Topic       nxlog
    Partition   0
    Protocol    ssl
    CAFile      %CERTDIR%/ca.pem
    CertFile    %CERTDIR%/client-cert.pem
    CertKeyFile %CERTDIR%/client-key.pem
    KeyPass     thisisasecret
</Input>
Example 2. Using the im_kafka module with SASL OAUTHBEARER authentication

This configuration collects events from a Kafka cluster using the brokers specified. Authentication is done via SASL OAUTHBEARER mechanism with support for OIDC. Events are read from the first partition of the nxlog topic.

nxlog.conf
<Input in>
    Module                      im_kafka

    BrokerList                  localhost:9092,192.168.88.35:19092
    Topic                       nxlog
    Partition                   0

    Protocol                    sasl_ssl

    CAFile                      %CERTDIR%/ca.pem
    CertFile                    %CERTDIR%/client-cert.pem
    CertKeyFile                 %CERTDIR%/client-key.pem
    KeyPass                     thisisasecret

    SASLMechanism               OAUTHBEARER
    SASLOAuthBearerClientID     client-id
    SASLOAuthBearerClientSecret client-secret
    SASLOAuthBearerEndpointURL  https://oauth2.endpoint.com/
    SASLOAuthBearerExtensions   key1=value1,key2=value2
    SASLOAuthBearerScope        write
</Input>

The built-in support for SCRAM in the librdkafka library and the flexibility of the Option directive implemented in NXLog Agent Agent allow you to set up a secure connection between your broker and NXLog Agent Agent. This type of connection is considered the most secure when communicating with Kafka. By default, the Kafka SCRAM authentication only requires three parameters to be set from the librdkafka library library:

  • Username

  • Password

  • SASL Mechanism

It is important to note that the Kafka connection’s prerequisites may differ depending on the broker you are connecting to. Vendors usually have detailed documentation on their requirements so that you can find the correct settings for the Options directive.

See the Integrate with a SIEM service using Kafka part in the Citrix documentation and the Configure SASL/SCRAM authentication for Confluent Platform in the Confluence documentation to understand better some of the unique requirements you may encounter for a SCRAM Kafka connection. Both of these are great examples, yet many others exist.

Example 3. Using the im_kafka module with SASL SCRAM authentication

This example utilizes a set of dedicated directives to define Kafka parameters and another set of Option directives to define additional Kafka parameters related to SCRAM.

nxlog.conf
<Input in>
    Module         im_kafka
    BrokerList     localhost:9092,192.168.88.35:19092
    Subscribe      YOURTOPIC
    GroupID        YOURGROUPID
    Protocol       sasl_ssl
    CAFile         %CERTDIR%/kafka.client.truststore.pem
    CertKeyFile    ssl.ca.location %CERTDIR%/kafka.client.truststore.pem
    Option         sasl.mechanism SCRAM-SHA-256   (1)
    Option         sasl.username USERNAME         (2)
    Option         sasl.password PASSWORD         (3)
    Option         session.timeout.ms 60000       (4)
    Option         auto.offset.reset earliest     (5)
</Input>
1 Defines the SASL authentication mechanism
2 Your Kafka username
3 Your Kafka password
4 Session timeout for detecting client failures
5 Defines the behavior while consuming data from a topic partition when there is no initial offset

The librdkafka library can produce its performance statistics and format it to JSON. All fields from the JSON structure are explained on the Statistics page of the librdkafka project on the GitHub website. NXLog Agent can be configured to poll this data at a specified fixed interval. The result can be saved to the internal logger.

Example 4. Collecting internal statistics

To read statistical data of the librdkafka library, the millisecond polling interval needs to be set via the Option directive using the statistics.interval.ms option.

The Schedule block sets the interval to run the code of the nested Exec block. Inside the Exec block, the log_info() procedure is called with the kafka_in->get_stats() parameter passed.

To get the librdkafka statistics produced and delivered synchronously, the statistics.interval.ms option and the Schedule block should specify the same interval amount.

nxlog.conf, writing to the internal logger
<Input to_kafka>
    Module        im_kafka
    Topic         nxlog
    BrokerList    localhost:9092
    Option        statistics.interval.ms 10000
    <Schedule>
        Every     10 sec
        Exec      log_info(to_kafka->get_stats());
    </Schedule>
</Input>