Kafka (om_kafka)
This module implements an Apache Kafka producer for publishing event records to a Kafka topic. See also the im_kafka module.
The om_kafka module is not supported as the underlying librdkafka library is unstable on AIX. Use it on IBM AIX at your own risk. |
The module uses an internal persistent queue to back up event records that should be pushed to a Kafka broker. Once the module receives an acknowledgement from the Kafka server that the message has been delivered successfully, the module removes the corresponding message from the internal queue. If the module is unable to deliver a message to a Kafka broker (for example, due to connectivity issues or the Kafka server being down), this message is retained in the internal queue (including cases when NXLog restarts) and the module will attempt to re-deliver the message again.
The number of re-delivery attempts can be specified by passing the
message.send.max.retries
property via the Option
directive (for
example, Option message.send.max.retries 5
). By default, the number of retries
is set to 2 and the time interval between two subsequent retries is 5 minutes.
Thus, by altering the number of retries, it is possible to control the total
time for a message to remain in the internal queue. If a message cannot be
delivered within the allowed retry attempts, the message is dropped. The
maximum size of the internal queue defaults to 100 messages. To increase the
size of the internal queue, you can use the LogqueueSize directive.
Configuration
The om_kafka module accepts the following directives in addition to the common module directives. The BrokerList and Topic directives are required.
- BrokerList
-
This mandatory directive specifies the list of Kafka brokers to connect to for publishing logs. The list should include ports and be comma-delimited (for example,
localhost:9092,192.168.88.35:19092
).Kafka brokers are metadata relays that return information about the cluster, including the available topics and how they are partitioned. NXLog connects to the broker(s) specified in the
BrokerList
directive to retrieve the actual URI it needs to connect to for reading or writing data to the relevant topic. If NXLog connects to the broker but fails to read or write data, ensure the broker is returning the correct endpoint information. See the Kafka documentation on listeners and advertised.listeners for more information.
- Topic
-
This mandatory directive specifies the Kafka topic to publish records to.
- CAFile
-
This specifies the path of the certificate authority (CA) certificate, which will be used to check the certificate of the remote brokers. CAFile is required if Protocol is set to
ssl
orsasl_ssl
. To trust a self-signed certificate presented by the remote (which is not signed by a CA), provide that certificate instead.
- CAThumbprint
-
This optional directive specifies the thumbprint of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote server. The hexadecimal fingerprint string can be copied from Windows Certificate Manager (certmgr.msc). Whitespaces are automatically removed. The certificate must be added to a Windows certificate store that is accessible by NXLog. This directive is only supported on Windows and is mutually exclusive with the CADir and CAFile directives.
- CertFile
-
This specifies the path of the certificate file to be used for the SSL handshake.
- CertKeyFile
-
This specifies the path of the certificate key file to be used for the SSL handshake.
- Compression
-
This directive specifies the compression types to use during transfer. Available types depend on the Kafka library, and should include
none
(the default),gzip
,snappy
, andlz4
.
- KeyPass
-
With this directive, a password can be supplied for the certificate key file defined in CertKeyFile. This directive is not needed for passwordless private keys.
- Option
-
This directive can be used to pass a custom configuration property to the Kafka library (librdkafka). For example, the group ID string can be set with
Option group.id mygroup
. This directive may be used more than once to specify multiple options. For a list of configuration properties, see the librdkafka CONFIGURATION.md file.Passing librdkafka configuration properties via the Option directive should be done with care since these properties are used for the fine-tuning of the librdkafka performance and may result in various side effects.
- Partition
-
This optional integer directive specifies the topic partition to write to. If this directive is not given, messages are sent without a partition specified.
- Protocol
-
This optional directive specifies the protocol to use for connecting to the Kafka brokers. Accepted values include
plaintext
(the default),ssl
,sasl_plaintext
andsasl_ssl
. If Protocol is set tossl
orsasl_ssl
, then the CAFile directive must also be provided.
- SASLKerberosServiceName
-
This directive specifies the Kerberos service name to be used for SASL authentication. The service name is required for the
GSSAPI
SASLMechanism.
- SASLKerberosPrincipal
-
This specifies the client’s Kerberos principal name for the
sasl_plaintext
andsasl_ssl
protocols. This directive is only available on Linux/UNIX and is mandatory when SASLMechanism is set toGSSAPI
. See note below.
- SASLKerberosKeytab
-
Specifies the path to the kerberos keytab file which contains the client’s allocated principal name. This directive is only available on Linux/UNIX and is mandatory when SASLMechanism is set to
GSSAPI
.
The SASLKerberosServiceName and SASLKerberosPrincipal directives are only available on Linux/UNIX. On Windows, the login user’s principal name and credentials are used for SASL/Kerberos authentication. For details about configuring Apache Kafka brokers to accept SASL/Kerberos
authentication from clients, please follow the instructions provided by the
|
- SASLMechanism
-
This optional directive specifies the SASL mechanism to use for authentication. Supported mechanisms are
GSSAPI
(the default) andOAUTHBEARER
. This directive can only be specified, when Protocol is set tosasl_plaintext
orsasl_ssl
.
- SASLOAuthBearerClientID
-
This directive specifies the public identifier for the application. It must be unique across all clients that the authorization server handles. This directive is mandatory when SASLMechanism is set to
OAUTHBEARER
.
- SASLOAuthBearerClientSecret
-
This directive specifies the secret known only to the application and the authorization server. This should be a sufficiently random string that is not guessable. This directive is mandatory when SASLMechanism is set to
OAUTHBEARER
.
- SASLOAuthBearerEndpointURL
-
This directive specifies the OAUTH issuer token endpoint HTTP(S) URI used to retrieve the token. This directive is mandatory when SASLMechanism is set to
OAUTHBEARER
.
- SASLOAuthBearerExtensions
-
This directive specifies additional parameters as a comma-separated list of key=value pairs to be provided to the broker. For example:
supportFeatureX=true,organizationId=sales-emea
. This directive is optional and only available when SASLMechanism is set toOAUTHBEARER
.
- SASLOAuthBearerScope
-
This directive specifies a scope of the access request to the broker. This directive is optional and only available when SASLMechanism is set to
OAUTHBEARER
.
Functions
The following functions are exported by om_kafka.
- string
get_stats()
-
Return the statistic string.
Examples
This configuration sends events to a Kafka cluster using the brokers
specified. Events are published to the first partition of the nxlog
topic.
<Output out>
Module om_kafka
BrokerList localhost:9092,192.168.88.35:19092
Topic nxlog
LogqueueSize 100000
Partition 0
Protocol ssl
CAFile %CERTDIR%/ca.pem
CertFile %CERTDIR%/client-cert.pem
CertKeyFile %CERTDIR%/client-key.pem
KeyPass thisisasecret
</Output>
This configuration sends events to a Kafka cluster using the brokers
specified. Authentication is done via SASL OAUTHBEARER mechanism with
support for OIDC. Events are published to the first partition of
the nxlog
topic.
<Output out>
Module om_kafka
LogqueueSize 100000
BrokerList localhost:9092,192.168.88.35:19092
Topic nxlog
Partition 0
Protocol sasl_ssl
CAFile %CERTDIR%/ca.pem
CertFile %CERTDIR%/client-cert.pem
CertKeyFile %CERTDIR%/client-key.pem
KeyPass thisisasecret
SASLMechanism OAUTHBEARER
SASLOAuthBearerClientID client-id
SASLOAuthBearerClientSecret client-secret
SASLOAuthBearerEndpointURL https://oauth2.endpoint.com/
SASLOAuthBearerExtensions key1=value1,key2=value2
SASLOAuthBearerScope write
</Output>
The librdkafka
library can produce its performance statistics and format it
in JSON. All fields from the JSON structure are explained on the
Statistics
page of the librdkafka
project on the GitHub website. NXLog can be
configured to poll this data at a specified fixed interval. The result can be
saved to the internal logger.
To read statistical data of the librdkafka
library, the millisecond polling
interval needs to be specified against the Option
directive using the statistics.interval.ms
option.
The Schedule block sets the interval to
run the code of the nested Exec block. Inside the Exec block, the log_info() procedure is called with the
kafka_in->get_stats()
parameter passed.
To get the librdkafka
statistics produced and delivered synchronously, the
statistics.interval.ms
option and the Schedule block should specify the
same interval amount.
<Input to_kafka>
Module im_kafka
Topic nxlog
BrokerList localhost:9092
Option statistics.interval.ms 10000
<Schedule>
Every 10 sec
Exec log_info(to_kafka->get_stats());
</Schedule>
</Input>