This module implements an Apache Kafka consumer for collecting event records from a Kafka topic. See also the om_kafka module.
|To examine the supported platforms, see the list of installer packages in the Available Modules chapter.|
The im_kafka module accepts the following directives in addition to the common module directives. The BrokerList and one of the mutually exclusive Topic, Assign or Subscribe directives are required.
This mandatory directive specifies the list of Kafka brokers to connect to for collecting logs. The list should include ports and be comma-delimited (for example,
Kafka brokers are metadata relays that return information about the cluster, including the available topics and how they are partitioned. NXLog connects to the broker(s) specified in the
BrokerListdirective to retrieve the actual URI it needs to connect to for reading or writing data to the relevant topic. If NXLog connects to the broker but fails to read or write data, ensure the broker is returning the correct endpoint information. See the Kafka documentation on listeners and advertised.listeners for more information.
This directive specifies a list of Kafka topics. The im_kafka instance will participate in automatic partition assignment for the Kafka consumer group specified by GroupID.
This mandatory directive specifies the Kafka topic from which to collect records.
This specifies the path of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote brokers. A remote broker’s self-signed certificate (which is not signed by a CA) can be trusted by specifying the remote broker certificate itself. In case of certificates signed by an intermediate CA, the certificate specified must contain the complete certificate chain (certificate bundle). CAFile is required if Protocol is set to
This specifies the path of the certificate file that will be presented to the remote broker during the SSL handshake.
This specifies the path of the private key file that was used to generate the certificate specified by the CertFile directive. This is used for the SSL handshake.
This directive specifies a Kafka consumer ID and is mandatory if the Subscribe directive is used and/or SavePos is set to broker. It identifies the consumer group to use for automatic partition assignment when in Subscribe mode. It is also used for Kafka offset storage when SavePos is set to
brokerfor either Subscribe or Assign modes. In this case, offsets are stored server-side, per (group ID, topic, partition) tuple, such that each group has its offsets, but consumers that join the same group will share the same partition offsets. This directive cannot be used with the Topic directive.
This directive specifies the passphrase of the private key specified by the CertKeyFile directive. A passphrase is required when the private key is encrypted. Example to generate a private key with Triple DES encryption using OpenSSL:
$ openssl genrsa -des3 -out server.key 2048
This directive is not needed for passwordless private keys.
This directive can be used to pass a custom configuration property to the Kafka library (librdkafka). For example, the group ID string can be set with
Option group.id mygroup. This directive may be used more than once to specify multiple options. For a list of configuration properties, see the librdkafka CONFIGURATION.md file.
Passing librdkafka configuration properties via the Option directive should be done with care since these properties are used for the fine-tuning of the librdkafka performance and may result in various side effects.
This optional integer directive specifies the topic partition to read from. If this directive is not given, messages are collected from partition 0. It can only be used with the Topic directive.
This optional directive specifies the protocol to use for connecting to the Kafka brokers. Accepted values include
plaintext(the default) and
sasl_ssl. If Protocol is set to
sasl_ssl, then the CAFile directive must also be provided.
This optional boolean directive instructs the module to only read logs that arrive after NXLog is started. This directive comes into effect if a saved offset is not found, for example, on the first start or when the SavePos directive is
FALSE. When the SavePos directive is
TRUE, and a previously saved offset is found, the module will always resume reading from the saved offset. If ReadFromLast is
FALSE, the module will read all logs from the beginning of the Kafka partition. This can result in a lot of messages and is usually not the expected behavior. It defaults to
This directive specifies the Kerberos service name to be used for SASL authentication. The service name is required for the
This specifies the client’s Kerberos principal name for the
sasl_sslprotocols. This directive is only available on Linux/UNIX and mandatory when SASLMechanism is set to
GSSAPI. See note below.
Specifies the path to the kerberos keytab file which contains the client’s allocated principal name. This directive is only available on Linux/UNIX and mandatory when SASLMechanism is set to
The SASLKerberosServiceName and SASLKerberosPrincipal directives are only available on Linux/UNIX. On Windows, the login user’s principal name and credentials are used for SASL/Kerberos authentication.
For details about configuring Apache Kafka brokers to accept SASL/Kerberos
authentication from clients, please follow the instructions provided by the
This optional directive specifies SASL mechanism to use for authentication. Supported mechanisms are
GSSAPI(the default) and
OAUTHBEARER. This directive can only be specified, when Protocol is set to
This directive specifies the public identifier for the application. It must be unique across all clients that the authorization server handles. This directive is mandatory when SASLMechanism is set to
This directive specifies the secret known only to the application and the authorization server. This should be a sufficiently random string that is not guessable. This directive is mandatory when SASLMechanism is set to
This directive specifies the OAUTH issuer token endpoint HTTP(S) URI used to retrieve the token. This directive is mandatory when SASLMechanism is set to
This directive specifies additional parameters as a comma-separated list of key=value pairs to be provided to the broker. For example:
supportFeatureX=true,organizationId=sales-emea. This directive is optional and only available when SASLMechanism is set to
This directive specifies a scope of the access request to the broker. This directive is optional and only available when SASLMechanism is set to
This directive can specify either a boolean value or one of
brokervalues. Kafka partition offsets will be saved locally in NXLog’s configuration cache when set to
cache. The partition offsets will be saved on the Kafka server when set to
broker. The default value,
TRUE, enables local cache offset storage for the Assign mode and server-side offset storage for the Subscribe mode. Local cache offset storage is preferred for the Assign mode because it ensures "exactly once" semantics. In contrast, server-side offset storage is preferred for the Subscribe mode because offsets must be shared with other Kafka consumers even though this mode only ensures "at least once" semantics (meaning that in some rare cases, two consumers may end up consuming the same message). However, this directive’s
brokervalues allow overriding the default behavior if needed.
Creating and populating fields
When the im_kafka module reads a message from a broker, it creates and
populates the following fields which are then recorded to
Optional key associated with the message.
The following core fields are also created and populated by NXLog:
The time when the event is received. The value is not modified if the field already exists.
The name of the module instance, for input modules. The value is not modified if the field already exists.
The type of module instance (such as im_file), for input modules. The value is not modified if the field already exists.
This configuration collects events from a Kafka cluster using the brokers specified.
Events are read from the first partition of the
<Input in> Module im_kafka BrokerList localhost:9092,192.168.88.35:19092 Topic nxlog Partition 0 Protocol ssl CAFile %CERTDIR%/ca.pem CertFile %CERTDIR%/client-cert.pem CertKeyFile %CERTDIR%/client-key.pem KeyPass thisisasecret </Input>
This configuration collects events from a Kafka cluster using the brokers
specified. Authentication is done via SASL OAUTHBEARER mechanism with
support for OIDC. Events are read from the first partition of
<Input in> Module im_kafka BrokerList localhost:9092,192.168.88.35:19092 Topic nxlog Partition 0 Protocol sasl_ssl CAFile %CERTDIR%/ca.pem CertFile %CERTDIR%/client-cert.pem CertKeyFile %CERTDIR%/client-key.pem KeyPass thisisasecret SASLMechanism OAUTHBEARER SASLOAuthBearerClientID client-id SASLOAuthBearerClientSecret client-secret SASLOAuthBearerEndpointURL https://oauth2.endpoint.com/ SASLOAuthBearerExtensions key1=value1,key2=value2 SASLOAuthBearerScope write </Input>
librdkafka library can produce its performance statistics and format it to JSON.
All fields from the JSON structure are explained on the Statistics page of the
librdkafka project on the GitHub website.
NXLog can be configured to poll this data at a specified fixed interval.
The result can be saved to the internal logger.
To read statistical data of the
librdkafka library, the millisecond polling interval needs to be set via the Option directive using the
The Schedule block sets the interval to run the code of the nested Exec block.
Inside the Exec block, the log_info() procedure is called with the
kafka_in->get_stats() parameter passed.
To get the
librdkafka statistics produced and delivered synchronously, the
statistics.interval.ms option and the Schedule block should specify the same interval amount.
<Input to_kafka> Module im_kafka Topic nxlog BrokerList localhost:9092 Option statistics.interval.ms 10000 <Schedule> Every 10 sec Exec log_info(to_kafka->get_stats()); </Schedule> </Input>