Kafka (im_kafka)
This module implements an Apache Kafka consumer for collecting event records from a Kafka topic. See also the om_kafka module.
To examine the supported platforms, see the list of installer packages in the Available Modules chapter. |
Configuration
The im_kafka module accepts the following directives in addition to the common module directives. The BrokerList and one of the mutually exclusive Topic, Assign or Subscribe directives are required.
Required directives
The following directives are required for the module to start.
This directive specifies a list of Kafka |
|||
This mandatory directive specifies the list of Kafka brokers to connect to for collecting logs.
The list should include ports and must be comma-delimited, e.g.,
|
|||
This directive specifies a list of Kafka topics. The im_kafka instance will participate in automatic partition assignment for the Kafka consumer group specified by GroupID. |
|||
This mandatory directive specifies the Kafka topic from which to collect records. |
SASL directives
The following directives are used for authentication and data security using SASL.
This directive specifies the Kerberos service name to be used for SASL authentication.
The service name is required for the |
|||
This specifies the client’s Kerberos principal name for the |
|||
Specifies the path to the Kerberos keytab file which contains the client’s allocated principal name.
This directive is only available on Linux/UNIX and mandatory when SASLMechanism is set to
|
|||
This optional directive specifies SASL mechanism to use for authentication.
Supported mechanisms are |
|||
This directive specifies the public identifier for the application.
It must be unique across all clients that the authorization server handles.
This directive is mandatory when SASLMechanism is set to |
|||
This directive specifies the secret known only to the application and the authorization server.
This should be a sufficiently random string that is not guessable. This directive is mandatory when SASLMechanism is set to |
|||
This directive specifies the OAUTH issuer token endpoint HTTP(S) URI used to retrieve the token.
This directive is mandatory when SASLMechanism is set to |
|||
This directive specifies additional parameters as a comma-separated list of key=value pairs to be provided to the broker.
For example: |
|||
This directive specifies the scope of the access request to the broker.
This directive is optional and only available when SASLMechanism is set to |
TLS/SSL directives
The following directives configure secure data transfer via TLS/SSL.
Specifies if the connection should be allowed with an expired certificate.
If set to |
|||
Specifies if the certificate FQDN should be validated against the server hostname or not.
If set to |
|||
Specifies if the connection should be allowed regardless of the certificate verification results.
If set to |
|||
Path to a directory containing certificate authority (CA) certificates. These certificates will be used to verify the certificate presented by the remote host. The certificate files must be named using the OpenSSL hashed format, i.e. the hash of the certificate followed by .0, .1 etc. To find the hash of a certificate using OpenSSL:
For example, if the certificate hash is A remote host’s self-signed certificate (which is not signed by a CA) can also be trusted by including a copy of the certificate in this directory. |
|||
Path of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote host. A remote host’s self-signed certificate (which is not signed by a CA) can be trusted by specifying the remote host certificate itself. In the case of certificates signed by an intermediate CA, the certificate specified must contain the complete certificate chain (certificate bundle). |
|||
This optional directive, supported only on Windows, defines a pattern for locating a suitable CA (Certificate Authority) certificate and its thumbprint in the native Windows Certificate Storage.
The pattern must follow PCRE2 rules and use the format Configuration examples:
or
A normal log output example would look like as follows:
This directive is only supported on Windows. |
|||
This optional directive, supported only on Windows, specifies the thumbprint of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote host. The hexadecimal fingerprint string can be copied from Windows Certificate Manager (certmgr.msc). Whitespaces are automatically removed. The certificate must be added to a Windows certificate store that is accessible by NXLog. This directive is mutually exclusive with the CADir, CAFile and CAPattern directives. |
|||
Path of the certificate file that will be presented to the remote host during the SSL handshake. |
|||
Path of the private key file that was used to generate the certificate specified by the CertFile directive. This is used for the SSL handshake. |
|||
This optional directive, supported only on Windows, defines a pattern for identifying a corresponding certificate and its thumbprint within the native Windows Certificate Storage.
The pattern must follow PCRE2 rules and use the format Configuration examples:
or
A normal log output example would look like as follows:
|
|||
This optional directive, supported only on Windows, specifies the thumbprint of the certificate that will be presented to the remote server during the HTTPS handshake.
The hexadecimal fingerprint string can be copied from Windows Certificate Manager (certmgr.msc).
Whitespaces are automatically removed.
The certificate must be imported to the
When the global directive UseCNGCertificates is set to
On the contrary, when the global directive UseCNGCertificates is set to The usage of the directive is the same in all cases:
This directive is only supported on Windows and is mutually exclusive with the CertFile and CertKeyFile directives. |
|||
Path to a directory containing certificate revocation list (CRL) files. These CRL files will be used to check for certificates that were revoked and should no longer be accepted. The files must be named using the OpenSSL hashed format, i.e. the hash of the issuer followed by .r0, .r1 etc. To find the hash of the issuer of a CRL file using OpenSSL:
For example if the hash is |
|||
Path of the certificate revocation list (CRL) which will be used to check for certificates that have been revoked and should no longer be accepted. Example to generate a CRL file using OpenSSL:
|
|||
This optional directive specifies a file with dh-parameters for Diffie-Hellman key exchange. These parameters can be generated with dhparam(1ssl). If this directive is not specified, default parameters will be used. See the OpenSSL Wiki for further details. |
|||
Passphrase of the private key specified by the CertKeyFile directive. A passphrase is required when the private key is encrypted. The following example generates a private key with Triple DES encryption using OpenSSL:
This directive is not required for passwordless private keys. |
|||
If set to This directive is only supported on Windows. |
|||
Specifies if the remote host must present a certificate.
If set to |
|||
This optional directive, supported only on Windows, if set to |
|||
The signature algorithm parameter that is being sent to the Windows SSL library. Allowed values depend on the available encryption providers. This directive is only supported on Windows. |
|||
This optional directive specifies the hostname used for Server Name Indication (SNI). |
|||
This optional directive can be used to set the permitted cipher list for TLSv1.2 and below, overriding the default.
Use the format described in the ciphers(1ssl) man page.
For example specify
|
|||
This optional directive can be used to set the permitted cipher list for TLSv1.3. Use the same format as in the SSLCipher directive. Refer to the OpenSSL documentation for a list of valid TLS v1.3 cipher suites. The default value is:
|
|||
Specifies if data compression is enabled when sending data over the network.
The compression mechanism is based on the zlib compression library.
The default value is
|
|||
This directive can be used to set the allowed SSL/TLS protocol(s). It takes a comma-separated list of values which can be any of the following: |
|||
If set to TRUE, the module uses the Windows Cryptography API: Next Generation (CNG) to access the private keys associated with certificates identified by a thumbprint. This directive is only supported on Windows. |
Optional directives
This directive specifies a Kafka consumer ID and is mandatory if the Subscribe directive is used and/or SavePos is set to broker.
It identifies the consumer group to use for automatic partition assignment when in Subscribe mode.
It is also used for Kafka offset storage when SavePos is set to |
|||
This directive can be used to pass a custom configuration property to the Kafka library (librdkafka).
For example, the group ID string can be set with
|
|||
This optional integer directive specifies the topic partition to read from. If this directive is not given, messages are collected from partition 0. It can only be used with the Topic directive. |
|||
This optional directive specifies the protocol to use for connecting to the Kafka brokers.
Accepted values include |
|||
This optional boolean directive instructs the module to only read logs that arrive after NXLog is started.
This directive comes into effect if a saved offset is not found, for example, on the first start or when the SavePos directive is |
|||
This directive can specify either a boolean value or one of |
Creating and populating fields
When the im_kafka module reads a message from a broker, it creates and
populates the following fields which are then recorded to $raw_event
:
Key | Description |
---|---|
|
Optional key associated with the message. |
|
Message text. |
The following core fields are also created and populated by NXLog:
Field | Description |
---|---|
|
The time when the event is received. The value is not modified if the field already exists. |
|
The name of the module instance, for input modules. The value is not modified if the field already exists. |
|
The type of module instance (such as im_file), for input modules. The value is not modified if the field already exists. |
Functions
The following functions are exported by im_kafka.
- string
get_stats()
-
Return the statistic string.
Fields
The following fields are used by im_kafka.
$raw_event
(type: string)-
A list of event fields in key-value pairs.
Examples
This configuration collects events from a Kafka cluster using the brokers specified.
Events are read from the first partition of the nxlog
topic.
<Input in>
Module im_kafka
BrokerList localhost:9092,192.168.88.35:19092
Topic nxlog
Partition 0
Protocol ssl
CAFile %CERTDIR%/ca.pem
CertFile %CERTDIR%/client-cert.pem
CertKeyFile %CERTDIR%/client-key.pem
KeyPass thisisasecret
</Input>
This configuration collects events from a Kafka cluster using the brokers
specified. Authentication is done via SASL OAUTHBEARER mechanism with
support for OIDC. Events are read from the first partition of
the nxlog
topic.
<Input in>
Module im_kafka
BrokerList localhost:9092,192.168.88.35:19092
Topic nxlog
Partition 0
Protocol sasl_ssl
CAFile %CERTDIR%/ca.pem
CertFile %CERTDIR%/client-cert.pem
CertKeyFile %CERTDIR%/client-key.pem
KeyPass thisisasecret
SASLMechanism OAUTHBEARER
SASLOAuthBearerClientID client-id
SASLOAuthBearerClientSecret client-secret
SASLOAuthBearerEndpointURL https://oauth2.endpoint.com/
SASLOAuthBearerExtensions key1=value1,key2=value2
SASLOAuthBearerScope write
</Input>
The built-in support for SCRAM in the librdkafka
library and the flexibility of the Option directive implemented in NXLog Agent allow you to set up a secure connection between your broker and NXLog Agent.
This type of connection is considered the most secure when communicating with Kafka.
By default, the Kafka SCRAM authentication only requires three parameters to be set from the librdkafka
library library:
-
Username
-
Password
-
SASL Mechanism
It is important to note that the Kafka connection’s prerequisites may differ depending on the broker you are connecting to. Vendors usually have detailed documentation on their requirements so that you can find the correct settings for the Options directive.
See the Integrate with a SIEM service using Kafka part in the Citrix documentation and the Configure SASL/SCRAM authentication for Confluent Platform in the Confluence documentation to understand better some of the unique requirements you may encounter for a SCRAM Kafka connection. Both of these are great examples, yet many others exist. |
This example utilizes a set of dedicated directives to define Kafka parameters and another set of Option directives to define additional Kafka parameters related to SCRAM.
<Input in>
Module im_kafka
BrokerList localhost:9092,192.168.88.35:19092
Subscribe YOURTOPIC
GroupID YOURGROUPID
Protocol sasl_ssl
CAFile %CERTDIR%/kafka.client.truststore.pem
CertKeyFile ssl.ca.location %CERTDIR%/kafka.client.truststore.pem
Option sasl.mechanism SCRAM-SHA-256 (1)
Option sasl.username USERNAME (2)
Option sasl.password PASSWORD (3)
Option session.timeout.ms 60000 (4)
Option auto.offset.reset earliest (5)
</Input>
1 | Defines the SASL authentication mechanism |
2 | Your Kafka username |
3 | Your Kafka password |
4 | Session timeout for detecting client failures |
5 | Defines the behavior while consuming data from a topic partition when there is no initial offset |
The librdkafka
library can produce its performance statistics and format it to JSON.
All fields from the JSON structure are explained on the Statistics page of the librdkafka
project on the GitHub website.
NXLog can be configured to poll this data at a specified fixed interval.
The result can be saved to the internal logger.
To read statistical data of the librdkafka
library, the millisecond polling interval needs to be set via the Option directive using the statistics.interval.ms
option.
The Schedule block sets the interval to run the code of the nested Exec block.
Inside the Exec block, the log_info() procedure is called with the kafka_in->get_stats()
parameter passed.
To get the librdkafka
statistics produced and delivered synchronously, the statistics.interval.ms
option and the Schedule block should specify the same interval amount.
<Input to_kafka>
Module im_kafka
Topic nxlog
BrokerList localhost:9092
Option statistics.interval.ms 10000
<Schedule>
Every 10 sec
Exec log_info(to_kafka->get_stats());
</Schedule>
</Input>