Kafka (im_kafka)

This module implements an Apache Kafka consumer for collecting event records from a Kafka topic. See also the om_kafka module.

To examine the supported platforms, see the list of installer packages in the Available Modules chapter.

Configuration

The im_kafka module accepts the following directives in addition to the common module directives. The BrokerList and Topic directives are required.

BrokerList

This mandatory directive specifies the list of Kafka brokers to connect to for collecting logs. The list should include ports and be comma-delimited (for example, localhost:9092,192.168.88.35:19092).

Topic

This mandatory directive specifies the Kafka topic to collect records from.


CAFile

This specifies the path of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote brokers. A remote broker’s self-signed certificate (which is not signed by a CA) can be trusted by specifying the remote broker certificate itself. In case of certificates signed by an intermediate CA, the certificate specified must contain the complete certificate chain (certificate bundle). CAFile is required if Protocol is set to ssl or sasl_ssl.

CertFile

This specifies the path of the certificate file that will be presented to the remote broker during the SSL handshake.

CertKeyFile

This specifies the path of the private key file that was used to generate the certificate specified by the CertFile directive. This is used for the SSL handshake.

KeyPass

This directive specifies the passphrase of the private key specified by the CertKeyFile directive. A passphrase is required when the private key is encrypted. Example to generate a private key with Triple DES encryption using OpenSSL:

$ openssl genrsa -des3 -out server.key 2048

This directive is not needed for passwordless private keys.

Option

This directive can be used to pass a custom configuration property to the Kafka library (librdkafka). For example, the group ID string can be set with Option group.id mygroup. This directive may be used more than once to specify multiple options. For a list of configuration properties, see the librdkafka CONFIGURATION.md file.

Passing librdkafka configuration properties via the Option directive should be done with care since these properties are used for the fine-tuning of the librdkafka performance and may result in various side effects.
Partition

This optional integer directive specifies the topic partition to read from. If this directive is not given, messages are collected from partition 0.

Protocol

This optional directive specifies the protocol to use for connecting to the Kafka brokers. Accepted values include plaintext (the default) and ssl, sasl_plaintext and sasl_ssl. If Protocol is set to ssl or sasl_ssl, then the CAFile directive must also be provided.

SASLKerberosServiceName

This directive specifies the Kerberos service name to be used for SASL authentication. The service name is required for the sasl_plaintext and sasl_ssl protocols.

SASLKerberosPrincipal

This specifies the client’s Kerberos principal name for the sasl_plaintext and sasl_ssl protocols. This directive is only available and mandatory on Linux/UNIX. See note below.

SASLKerberosKeytab

Specifies the path to the kerberos keytab file which contains the client’s allocated principal name. This directive is only available and mandatory on Linux/UNIX.

The SASLKerberosServiceName and SASLKerberosPrincipal directives are only available on Linux/UNIX. On Windows, the login user’s principal name and credentials are used for SASL/Kerberos authentication.

For details about configuring Apache Kafka brokers to accept SASL/Kerberos authentication from clients, please follow the instructions provided by the librdkafka project:

Creating and populating fields

When the im_kafka module reads a message from a broker, it creates and populates the following fields which are then recorded to $raw_event:

Table 1. List of fields recorded to $raw_event
Key Description

MessageKey

Optional key associated with the message.

Message

Message text.

The following core fields are also created and populated by NXLog:

Table 2. List of fields
Field Description

$EventReceivedTime

The time when the event is received. The value is not modified if the field already exists.

$SourceModuleName

The name of the module instance, for input modules. The value is not modified if the field already exists.

$SourceModuleType

The type of module instance (such as im_file), for input modules. The value is not modified if the field already exists.

Examples

Example 1. Using the im_kafka module

This configuration collects events from a Kafka cluster using the brokers specified. Events are read from the first partition of the nxlog topic.

nxlog.conf
<Input in>
    Module      im_kafka
    BrokerList  localhost:9092,192.168.88.35:19092
    Topic       nxlog
    Partition   0
    Protocol    ssl
    CAFile      /root/ssl/ca-cert
    CertFile    /root/ssl/client_debian-8.pem
    CertKeyFile /root/ssl/client_debian-8.key
    KeyPass     thisisasecret
</Input>

The librdkafka library can produce its performance statistics and format it to JSON. All fields from the JSON structure are explained on the Statistics page of the librdkafka project on the GitHub website. NXLog can be configured to poll this data at a specified fixed interval. The result can be saved to the internal logger.

Example 2. Collecting internal statistics

To read statistical data of the librdkafka library, the millisecond polling interval needs to be set via the Option directive using the statistics.interval.ms option.

The Schedule block sets the interval to run the code of the nested Exec block. Inside the Exec block, the log_info() procedure is called with the kafka_in->get_stats() parameter passed.

To get the librdkafka statistics produced and delivered synchronously, the statistics.interval.ms option and the Schedule block should specify the same interval amount.

nxlog.conf, writing to the internal logger
<Input to_kafka>
    Module        im_kafka
    Topic         nxlog
    BrokerList    localhost:9092
    Option        statistics.interval.ms 10000
    <Schedule>
        Every     10 sec
        Exec      log_info(to_kafka->get_stats());
    </Schedule>
</Input>