Elasticsearch (om_elasticsearch)

This module forwards logs to an Elasticsearch server. It will connect to the URL specified in the configuration in either plain HTTP or HTTPS mode. This module supports bulk data operations and dynamic indexing. Event data is sent in batches, reducing the latency caused by the HTTP responses, thus improving Elasticsearch server performance. HTTP protocol errors result in the entire batch being retried. For data errors reported by the Elasticsearch server, the server response is parsed and only the failed Event data is included in the retry. If the same batch (or partial batch) has not been accepted by the server after RetryLimit retries are exhausted, the batch will be dropped, or the module will stop (according to OnError).

This module requires the xm_json extension module to be loaded to convert the payload to JSON. See the Output log format section for information on the format of the payload.

Output log format

om_elasticsearch forwards log records over HTTP(S) as JSON payload. The JSON format depends on the value of the $raw_event field. The module checks if the value of $raw_event is valid JSON and applies the following rules:

  • If it is valid JSON, the value is forwarded as is.

  • If it is not valid JSON, the log record is converted to JSON in the following format:

    {
      "raw_event": "<json_escaped_raw_event>"
    }

Additional metadata, including the NXLog Agent-specific fields EventReceivedTime, SourceModuleName, and SourceModuleType, will not be included in the output unless these values have been written to the $raw_event field. The processing required to achieve this depends on the format of the input data, but generally it means you need to:

  1. Parse the log record according to the data format.

    • If the input data is already in JSON format, use parse_json() to parse $raw_event into fields.

    • If the input is unstructured plain text data, copy the value of $raw_event to a custom field.

  2. Create and populate any additional custom fields.

  3. Use to_json() to convert the fields to JSON format and update the value of $raw_event.

See the Examples section for NXLog Agent configuration examples of the above.

Date format in Elasticsearch

Date strings in the JSON output need to be in a format that is recognized by Elasticsearch for them to be saved as date fields. See the Elasticsearch documentation on the Date field type and format. NXLog Agent provides several ways to format datetime fields in the output and which one to use depends on how log records are being processed.

  • By default, the xm_json module outputs datetime fields in a format that is compatible with Elasticsearch: YYYY-MM-DDThh:mm:ss.sTZ. If you are using the to_json() procedure and need to output dates in a different format, specify the DateFormat module level directive.

  • If you are not using the xm_json module to convert data to JSON, you can make use of the DateFormat global directive or convert datetime fields individually with the strftime function.

Index type

The IndexType directive should be set to _doc. However, for compatibility with indices created with Elasticsearch 5.x or older, set IndexType as required for the configured mapping types. See the IndexType directive below for more information.

Configuration

The om_elasticsearch module accepts the following directives in addition to the common module directives.

The following directives are required for the module to start.

Required directives

URL

This mandatory directive specifies the URL for the module to POST the event data. If multiple URL directives are specified, the module works in a failover configuration. If a destination becomes unavailable, the module automatically fails over to the next one. If the last destination becomes unavailable, the module will fail over to the first destination. The module operates in plain HTTP or HTTPS mode depending on the URL provided. If the port number is not explicitly indicated in the URL, it defaults to port 80 for HTTP and port 443 for HTTPS. The URL should point to the _bulk endpoint, or Elasticsearch will return 400 Bad Request. When sending logs to a data stream, the URL needs to contain the stream’s name, e.g. https://example.com:9200/nxlog-stream/_bulk.

TLS/SSL directives

The following directives are for configuring secure data transfer via TLS/SSL.

HTTPBasicAuthUser

HTTP basic authorization username.

HTTPBasicAuthPassword

HTTP basic authorization password.

HTTP authorization works only when both HTTPBasicAuthUser and HTTPBasicAuthPassword parameters are set.

HTTPSAllowExpired

This boolean directive specifies whether the connection should be allowed with an expired certificate. If set to TRUE, the connection will be allowed even if the remote server presents an expired certificate. The default is FALSE: the remote server must present a certificate that is not expired.

HTTPSAllowUntrusted

This boolean directive specifies that the connection should be allowed regardless of the certificate verification results. If set to TRUE the connection will be allowed with any unexpired certificate provided by a server. The default value is FALSE: the remote server must present a trusted certificate.

HTTPSCADir

This directive specifies a path to a directory containing certificate authority (CA) certificates. These certificates will be used to verify the certificate presented by the remote server. The certificate files must be named using the OpenSSL hashed format, i.e. the hash of the certificate followed by .0, .1 etc. To find the hash of a certificate using OpenSSL:

$ openssl x509 -hash -noout -in ca.crt

For example, if the certificate hash is e2f14e4a, then the certificate filename should be e2f14e4a.0. If there is another certificate with the same hash then it should be named e2f14e4a.1 and so on.

A remote server’s self-signed certificate (which is not signed by a CA) can also be trusted by including a copy of the certificate in this directory.

The default operating system root certificate store will be used if this directive is not specified. Unix-like operating systems commonly store root certificates in /etc/ssl/certs. Windows operating systems use the Windows Certificate Store, while macOS uses the Keychain Access Application as the default certificate store.

HTTPSCAFile

This specifies the path of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote server. A remote server’s self-signed certificate (which is not signed by a CA) can be trusted by specifying the remote server certificate itself. In case of certificates signed by an intermediate CA, the certificate specified must contain the complete certificate chain (certificate bundle).

HTTPSCAThumbprint

This optional directive specifies the thumbprint of the certificate authority (CA) certificate that will be used to verify the certificate presented by the remote server. The hexadecimal fingerprint string can be copied from Windows Certificate Manager (certmgr.msc). Whitespaces are automatically removed. The certificate must be added to a Windows certificate store that is accessible by NXLog Agent. This directive is only supported on Windows and is mutually exclusive with the HTTPSCADir and HTTPSCAFile directives.

HTTPSCertFile

This specifies the path of the certificate file that will be presented to the remote server during the HTTPS handshake.

HTTPSCertKeyFile

This specifies the path of the private key file that was used to generate the certificate specified by the HTTPSCertFile directive. This is used for the HTTPS handshake.

HTTPSCertThumbprint

This optional directive specifies the thumbprint of the certificate that will be presented to the remote server during the HTTPS handshake. The hexadecimal fingerprint string can be copied from Windows Certificate Manager (certmgr.msc). Whitespaces are automatically removed. The certificate must be imported to the Local Computer\Personal certificate store in PFX format for NXLog Agent to find it. To create a PFX file from the certificate and private key using OpenSSL:

$ openssl pkcs12 -export -out server.pfx -inkey server.key -in server.pem

This directive is only supported on Windows and is mutually exclusive with the HTTPSCertFile and HTTPSCertKeyFile directives.

The private key associated with the certificate must be exportable.

  • If you generate the certificate request using Windows Certificate Manager, enable the Make private key exportable option from the certificate properties.

  • If you import the certificate with the Windows Certificate Import Wizard, make sure that the Mark this key as exportable option is enabled.

  • If you migrate the certificate and associated private key from one Windows machine to another, select Yes, export the private key when exporting from the source machine.

HTTPSCRLDir

This directive specifies a path to a directory containing certificate revocation list (CRL) files. These CRL files will be used to check for certificates that were revoked and should no longer be accepted. The files must be named using the OpenSSL hashed format, i.e. the hash of the issuer followed by .r0, .r1 etc. To find the hash of the issuer of a CRL file using OpenSSL:

$ openssl crl -hash -noout -in crl.pem

For example, if the hash is e2f14e4a, then the filename should be e2f14e4a.r0. If there is another file with the same hash then it should be named e2f14e4a.r1 and so on.

HTTPSCRLFile

This specifies the path of the certificate revocation list (CRL) which will be used to check for certificates that have been revoked and should no longer be accepted. Example to generate a CRL file using OpenSSL:

$ openssl ca -gencrl -out crl.pem

HTTPSDHFile

This optional directive specifies a file with dh-parameters for Diffie-Hellman key exchange. These parameters can be generated with dhparam(1ssl). If no directive is specified, default parameters will be used. See OpenSSL Wiki for further details.

HTTPSKeyPass

This directive specifies the passphrase of the private key specified by the HTTPSCertKeyFile directive. A passphrase is required when the private key is encrypted. Example to generate a private key with Triple DES encryption using OpenSSL:

$ openssl genrsa -des3 -out server.key 2048

This directive is not needed for passwordless private keys.

HTTPSSearchAllCertStores

This optional boolean directive, when set to TRUE, enables the loading of all available Windows certificates into NXLog Agent, for use during remote certificate verification. Any required certificates must be added to a Windows certificate store that NXLog Agent can access. This directive is mutually exclusive with the HTTPSCAThumbprint, HTTPSCADir, and HTTPSCAFile directives.

HTTPSSSLCipher

This optional directive can be used to set the permitted SSL cipher list, overriding the default. Use the format described in the ciphers(1ssl) man page. For example specify RSA:!COMPLEMENTOFALL to include all ciphers with RSA authentication but leave out ciphers without encryption.

If RSA or DSA ciphers with Diffie-Hellman key exchange are used, DHFile can be set for specifying custom dh-parameters.

HTTPSSSLCiphersuites

This optional directive can be used to set the permitted cipher list for TLSv1.3. Use the same format as in the HTTPSSSLCipher directive. Refer to the OpenSSL documentation for a list of valid TLS v1.3 cipher suites. The default value is:

TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256

HTTPSSSLCompression

This boolean directive allows you to enable data compression when sending data over the network. The compression mechanism is based on the zlib compression library. If the directive is not specified, it defaults to FALSE: compression is disabled.

Some Linux packages (for example, Debian) use the OpenSSL library provided by the OS and may not support the zlib compression mechanism. The module will emit a warning on startup if the compression support is missing. The generic deb/rpm packages are bundled with a zlib-enabled libssl library.

HTTPSSSLProtocol

This directive can be used to set the allowed SSL/TLS protocol(s). It takes a comma-separated list of values which can be any of the following: SSLv2, SSLv3, TLSv1, TLSv1.1, TLSv1.2 and TLSv1.3. By default, the TLSv1.2 and TLSv1.3 protocols are allowed. Note that the OpenSSL library shipped by Linux distributions may not support SSLv2 and SSLv3, and these will not work even if enabled with this directive.

Optional directives

AddHeader

This optional directive specifies an additional header to be added to each HTTP request.

DataStream

This boolean directive determines whether the event data is inserted into a data stream. By default its value is FALSE, and the Index directive is used for sending data to an index. If it is set to TRUE, the URL directive must contain the name of the data stream.

ID

This directive allows to specify a custom _id field for Elasticsearch documents. If the directive is not defined, Elasticsearch uses a GUID for the _id field. Setting custom _id fields can be useful for correlating Elasticsearch documents in the future and can help to prevent storing duplicate events in the Elasticsearch storage. The directive’s argument must be a string type expression. If the expression in the ID directive is not a constant string (it contains functions, field names, or operators), it will be evaluated for each event to be submitted. You can use a concatenation of event fields and the event timestamp to uniquely and informatively identify events in the Elasticsearch storage.

Index

This directive specifies the index to insert the event data into. It must be a string type expression. If the expression in the Index directive is not a constant string (it contains functions, field names, or operators), it will be evaluated for each event to be inserted. The default is nxlog. Typically, an expression with strftime() is used to generate an index name based on the event’s time or the current time (for example, strftime(now(), "nxlog-%Y%m%d"). This directive is not used when sending logs to a data stream.

IndexType

This directive specifies the index type to use in the bulk index command. It must be a string type expression. If the expression in the IndexType directive is not a constant string (it contains functions, field names, or operators), it will be evaluated for each event to be inserted. By default, no index type is sent for compatibility with Elasticsearch 8.x.

Index mapping types have been gradually deprecated starting with Elasticsearch 6.0.0, and support for them was completely removed in Elasticsearch 8.0.0. IndexType should only be used if required for Elasticsearch 7.x or older, or for custom types in Elasticsearch 8.x. See Removal of mapping types in the Elasticsearch Reference for more info.

LocalPort

This optional directive specifies the local port number of the connection. If this is not specified, a random high port number will be used, which is not always ideal in firewalled network environments.

Due to the required TIME-WAIT delay in closing connections, attempts to bind to LocalPort may fail. In such cases, the message Address already in use will be written to nxlog.log. If the situation persists, it could impede network performance.

Proxy

This optional directive is used to specify the IP address (or hostname) and port number of the HTTP proxy server to be used. The format is hostname:port. If the port number is omitted, it defaults to 80.

The om_elasticsearch module supports HTTP proxying only. SOCKS4/SOCKS5 proxying is not supported.

ProxyAddress

This directive has been deprecated. Please use the Proxy directive instead.

ProxyPort

This directive has been deprecated. Please use the Proxy directive instead.

Reconnect

This optional directive sets the reconnect interval in seconds. If it is set, the module attempts to reconnect in every defined second. If it is not set, the reconnect interval will start at 1 second and doubles with every attempt. If the duration of the successful connection is greater than the current reconnect interval, then the reconnect interval will be reset to 1 sec.

The Reconnect directive must be used with caution. If it is used on multiple systems, it can send reconnect requests simultaneously to the same destination, potentially overloading the destination system. It may also cause NXLog Agent to use unusually high system resources or cause NXLog Agent to become unresponsive.

RetryLimit

This specifies how many times the module will attempt to resend data events in the event that it is rejected by the server. A negative value disables retry limit checking. If not specified, it defaults to 2.

SNI

This optional directive specifies the hostname used for Server Name Indication (SNI) in HTTPS mode. If not specified, it defaults to the hostname in the URL directive.

OnError

This optional block directive can be used to specify a group of statements to handle errors reported by the Elastics server, for each document/record individually. All response status codes that are not between 200 and 299, treated as errors. OnError can be used to perform custom error handling. For example, records that are rejected by the Elastic serve can be droppped or rerouted.

Exec

This optional directive specifies statements to execute when om_elasticsearch receives error reponse to a document. Like the normal Exec directive, OnError Exec can be specified as a normal directive or as a block directive. The get_response_code and get_retry_count function can be used inside OnError Exec.

RetryLimit

This optional directive specifies the maximum number of times that a document can be re-sent in case of Elastic-errors. When this limit is reached, the record will be dropped. By default, the module will always try to resend failed record, until they succeed or are dropped via OnError Exec. The Exec block is carried out at each retry until retry_count reaches the preset value or a drop() is executed. If no manual drop() occurred during executing the block and retry_count reached the user set value then automatic drop() will be applied at the end of the exec block. Note that using this directive can result in unexpected data loss, so it should be used sparingly. The default value is set to 100.

Functions

The following functions are exported by om_elasticsearch.

integer get_response_code()

Returns the response code for the current record. This function can only be used inside OnError Exec blocks.

integer get_retry_count()

Returns the retry count for the current record. The retry count starts at 1 when processing the first JSON response received for a ES request, and is incremented by 1 for every subsequent response for the same record. This function can only be used inside OnError Exec blocks.

Procedures

The following procedures are exported by om_elasticsearch.

add_http_header(string name, string value);

Dynamically add a custom HTTP header to HTTP requests.

reconnect();

Force a reconnection. This can be used from a Schedule block to periodically reconnect to the server.

The reconnect() procedure must be used with caution. If configured, it can attempt to reconnect after every event sent, potentially overloading the destination system.

Examples

Example 1. Sending unstructured plain text logs

This configuration reads log records from file and forwards them to the Elasticsearch server on localhost. No further processing is done on the log records.

nxlog.conf
<Extension json>
    Module              xm_json
</Extension>

<Input file>
    Module              im_file
    File                '/var/log/myapp*.log'
    BatchSize           200
    BatchFlushInterval  2

    # Parse log here if needed
    # $EventTime should be set here
</Input>

<Output elasticsearch>
    Module              om_elasticsearch
    URL                 http://localhost:9200/_bulk

    # Create an index daily
    Index               strftime($EventTime, "nxlog-%Y%m%d")

    # Or use the following if $EventTime is not set
    # Index             strftime(now(), "nxlog-%Y%m%d")
</Output>
Input sample

The following is a log record sample read by NXLog Agent.

Mar 24 15:58:53 pc1 systemd[1452]: tracker-store.service: Succeeded.
Output sample

The following is the JSON-formatted log record that will be sent to Elasticsearch.

{
  "raw_event": "Mar 24 15:58:53 pc1 systemd[1452]: tracker-store.service: Succeeded."
}
Example 2. Sending plain text logs with metadata

This configuration reads log records from a file and adds a $Hostname metadata field. Log records are converted to JSON using the to_json() procedure of the xm_json module before they are forwarded to Elasticsearch.

nxlog.conf
<Extension json>
    Module    xm_json
</Extension>

<Input file>
    Module    im_file
    File      '/var/log/myapp*.log'
    Exec      $Hostname = hostname();
    Exec      $Message = $raw_event;
</Input>

<Output elasticsearch>
    Module    om_elasticsearch
    URL       http://localhost:9200/_bulk
    Exec      to_json();
</Output>
Input sample

The following is a log record sample read by NXLog Agent.

Mar 24 15:58:53 pc1 systemd[1452]: tracker-store.service: Succeeded.
Output sample

The following is the JSON-formatted log record that will be sent to Elasticsearch.

{
  "EventReceivedTime": "2021-03-24T16:52:20.457348+01:00",
  "SourceModuleName": "file",
  "SourceModuleType": "im_file",
  "Hostname": "pc1",
  "Message": "Mar 24 15:58:53 pc1 systemd[1452]: tracker-store.service: Succeeded."
}
Example 3. Sending structured syslog records

This configuration reads syslog records from a file. It uses the parse_syslog() procedure of the xm_syslog module to parse logs into structured data. Log records are then converted to JSON using the to_json() procedure of the xm_json module before they are forwarded to Elasticsearch.

nxlog.conf
<Extension syslog>
    Module    xm_syslog
</Extension>

<Extension json>
    Module    xm_json
</Extension>

<Input file>
    Module    im_file
    File      '/var/log/myapp*.log'
    Exec      parse_syslog();
</Input>

<Output elasticsearch>
    Module    om_elasticsearch
    URL       http://localhost:9200/_bulk
    Exec      to_json();
</Output>
Input sample

The following is a log record sample read by NXLog Agent.

Mar 24 15:58:53 pc1 systemd[1452]: tracker-store.service: Succeeded.
Output sample

The following is the JSON-formatted log record that will be sent to Elasticsearch.

{
  "EventReceivedTime": "2021-03-24T16:30:18.920342+01:00",
  "SourceModuleName": "file",
  "SourceModuleType": "im_file",
  "SyslogFacilityValue": 1,
  "SyslogFacility": "USER",
  "SyslogSeverityValue": 5,
  "SyslogSeverity": "NOTICE",
  "SeverityValue": 2,
  "Severity": "INFO",
  "Hostname": "pc1",
  "EventTime": "2021-03-24T15:58:53.000000+01:00",
  "SourceName": "systemd",
  "ProcessID": 1452,
  "Message": "tracker-store.service: Succeeded."
}
Example 4. Sending JSON-formatted logs with metadata

This configuration reads JSON-formatted log records from a file. It uses the parse_json() procedure of the xm_json module to parse logs into structured data and adds an $EventType metadata field. Log records are then converted back to JSON using the to_json() procedure before they are forwarded to Elasticsearch.

nxlog.conf
<Extension json>
    Module    xm_json
</Extension>

<Input file>
    Module    im_file
    File      '/var/log/myapp*.log'
    Exec      parse_json();
    Exec      $EventType = "browser-history";
</Input>

<Output elasticsearch>
    Module    om_elasticsearch
    URL       http://localhost:9200/_bulk
    Exec      to_json();
</Output>
Input sample

The following is a log record sample read by NXLog Agent.

{
  "AccessTime": "2021-03-24T16:30:43.000000+01:00",
  "URL": "https://nxlog.co",
  "Title": "High Performance Log Collection Solutions",
  "Username": "user1"
}
Output sample

The following is the JSON-formatted log record that will be sent to Elasticsearch.

{
  "EventReceivedTime": "2021-03-24T17:14:23.908155+01:00",
  "SourceModuleName": "file",
  "SourceModuleType": "im_file",
  "AccessTime": "2021-03-24T16:30:43.000000+01:00",
  "URL": "https://nxlog.co",
  "Title": "High Performance Log Collection Solutions",
  "Username": "user1",
  "EventType": "browser-history"
}
Example 5. Sending logs to an Elasticsearch server with failover

This configuration sends log records to an Elasticsearch server in a failover configuration (multiple URLs defined). The actual destinations used in this case are http://localhost:9200/_bulk,http://192.168.1.1:9200/_bulk, and http://example.com:9200/_bulk.

nxlog.conf
<Extension json>
    Module  xm_json
</Extension>

<Output elasticsearch>
    Module  om_elasticsearch
    URL     http://localhost:9200/_bulk
    URL     http://192.168.1.1:9200/_bulk
    URL     http://example.com:9200/_bulk
</Output>
Example 6. Handle record errors

This configuration collects all records that are failed to process by the Elastic server.

nxlog.conf
<Extension json>
    Module    xm_json
</Extension>

<Output elastic>
    Module		om_elasticsearch
    URL			http://localhost:9200/_bulk
    Index		strftime(now(), "test")
    NoDefaultIndexType	TRUE
    DropOnError		TRUE
  
    <Exec>
	json->to_json();
    </Exec>

    <OnRecordError>
	<Exec>
	    $resp_code = get_response_code();
	    if $resp_code == 400
	    {
	      reroute("reroute_es_errors");
	    }
	</Exec>
    </OnRecordError>
</Output>

<Input null>
    Module im_null
</Input>

<Output failed_es_logs>
  Module om_file
  File "/var/log/failed_es.logs"
</Output>

<Route reroute_es_errors>
  Path null => failed_es_logs
</Route>