Correlate events with NXLog Agent
NXLog Agent includes a high-performance event correlation module called pm_evcorr, which allows you to create basic and complex event correlation rules. When paired with other NXLog language features, you can implement event correlation and trigger actions at the log forwarder level, allowing you to collect, enrich, and forward logs efficiently.
Below, we provide examples that portray real-world applications of how you can implement event correlation with NXLog Agent.
Detect unusual authentication failures
While standard login and authentication failures occur regularly for legitimate reasons, rapid login failures may signal a malicious actor trying to gain unauthorized access to a system through brute force or dictionary attacks.
Using the Thresholded correlation rule type allows you to set a baseline for the number of acceptable login failures and trigger additional actions if surpassed. The example below demonstrates how to enrich all failed login events over a threshold with a warning message.
This configuration uses the im_msvistalog input module to capture Event ID 4625
– An account failed to log on.
Next, it passes the events to the pm_evcorr module for processing.
The event is enriched with a custom message if there are more than five failed login events within ten seconds.
Finally, it forwards the events to a SIEM in JSON format using the om_tcp output module.
<Extension json>
Module xm_json
</Extension>
<Input eventlog>
Module im_msvistalog
<QueryXML>
<QueryList>
<Query Id="0" Path="Security">
<Select Path="Security">*[System[(EventID=4625)]]</Select>
</Query>
</QueryList>
</QueryXML>
</Input>
<Processor corr_processor>
Module pm_evcorr
TimeField EventTime (1)
<Thresholded>
Condition $EventID in 4625 (2)
Threshold 5 (3)
Interval 10 (4)
<Exec>
$SecurityWarning = "WARNING: Failed login threshold exceeded!"; (5)
</Exec>
</Thresholded>
</Processor>
<Output siem>
Module om_tcp
Host 192.168.1.123:3500
Exec to_json(); (6)
</Output>
<Route login_events_route>
Path eventlog => corr_processor => siem
</Route>
1 | The TimeField directive specifies the field to compare one event’s time to another. |
2 | The Condition directive tracks events with Event ID 4625 . |
3 | The Threshold directive specifies that an event must occur five times to trigger the action defined in the Exec block. |
4 | The Interval directive specifies a 10-second window in which the threshold value must be met to trigger the action. |
5 | If the rule is triggered, it enriches the event with a new field named $SecurityWarning . |
6 | Converts the record to JSON format using the to_json() procedure of the xm_json module. |
You can configure the Exec block to do much more, such as enriching logs with environment variables, alerting administrators, correlating with other events, or even calling external processes. |
Combine related log events
The Pair correlation rule type allows you to combine information from two distinct but related events.
For instance, Windows generates two events when a user deletes a file from a monitored directory.
You will find essential details about the deletion in Event ID 4660
– An object was deleted.
However, you will only find the deleted file’s name in a prior event with Event ID 4663
– An attempt was made to access an object.
In the example below, Event ID 4663
triggers the correlation rule.
When the rule is triggered, pm_evcorr checks for a subsequent event with Event ID 4660
, and if found, it extracts and enriches the log record with the file name.
This configuration uses the im_msvistalog input module to capture object access and deletion events.
Next, it passes the events to the pm_evcorr module for processing.
If Event ID 4663
is followed by Event ID 4660
, it enriches the latter with the deleted file name.
Finally, it forwards the events to a SIEM in JSON format using the om_tcp output module.
<Extension json>
Module xm_json
</Extension>
<Input eventlog>
Module im_msvistalog
<QueryXML>
<QueryList>
<Query Id="0" Path="Security">
<Select Path="Security">*[System[(EventID=4660 or EventID=4663)]]</Select>
</Query>
</QueryList>
</QueryXML>
</Input>
<Processor corr_processor>
Module pm_evcorr
<Pair>
TriggerCondition $EventID == 4663 (1)
RequiredCondition $EventID == 4660 AND \
$EventRecordID == get_prev_event_data("EventRecordID") + 1 (2)
Interval 5
Exec $FileName = get_prev_event_data("ObjectName"); (3)
</Pair>
</Processor>
<Output siem>
Module om_tcp
Host 192.168.1.123:3500
Exec to_json(); (4)
</Output>
<Route file_deletion_events_route>
Path eventlog => corr_processor => siem
</Route>
1 | The TriggerCondition directive specifies that events with ID 4663 activate the correlation rule. |
2 | The RequiredCondition directive specifies to run the Exec statement when Event ID 4663 is immediately followed by Event ID 4660 .
You can retrieve previous event data with the get_prev_event_data() function of the pm_evcorr module. |
3 | If the rule is triggered, the $ObjectName field is retrieved from the previous event and added to the current event as $FileName . |
4 | Converts the record to JSON format using the to_json() procedure of the xm_json module. |
You can use multiple Pair directives to correlate and combine any number of events from various sources. |
Detect unusual ICMP traffic
While some ICMP traffic is normal and is usually associated with ping requests, high ICMP traffic levels in a short period can signal a ping sweep.
Using Threshold correlation allows you to set a baseline for the expected ICMP traffic on your network and take action if that threshold is exceeded. The example below combines the Thresholded and Simple correlation rule types to enrich and forward network packets over the threshold and drop packets within the limit. By dropping packets within the limit, you avoid forwarding noise events to your SIEM so you can focus on events that signal a potential threat.
This configuration uses the im_pcap input module to capture all ICMP traffic on the specified network interface. Next, it passes the packets to the pm_evcorr module for processing. If a packet exceeds the defined threshold, it enriches it with a custom message; otherwise, it drops the packet. Finally, it forwards the events to a SIEM in JSON format using the om_tcp output module.
<Extension json>
Module xm_json
</Extension>
<Input icmp_capture>
Module im_pcap
Dev %NIC_1%
Filter "icmp"
<Protocol>
Type icmp
</Protocol>
<Protocol>
Type ip
</Protocol>
<Protocol>
Type ethernet
</Protocol>
<Protocol>
Type arp
</Protocol>
</Input>
<Processor corr_processor>
Module pm_evcorr
TimeField EventTime (1)
<Thresholded>
Condition $icmp.type IN "Echo (ping) request" (2)
Threshold 20 (3)
Interval 2 (4)
<Exec>
$SecurityWarning = "WARNING: High ICMP ping traffic detected!"; (5)
</Exec>
</Thresholded>
<Simple>
Exec if not defined($SecurityWarning) drop(); (6)
</Simple>
</Processor>
<Output siem>
Module om_tcp
Host 192.168.1.123:3500
Exec to_json(); (7)
</Output>
<Route icmp_events_route>
Path icmp_capture => corr_processor => siem
</Route>
1 | The TimeField directive specifies the field to compare one event’s time to another. |
2 | The Condition directive specifies tracking ICMP packets. |
3 | The Threshold directive specifies that the same packet must occur 20 times to trigger the action defined in the Exec block. |
4 | The Interval directive specifies a 20-second window in which the threshold value must be met to trigger the action. |
5 | If the rule is triggered, it enriches the packet with a new field named $SecurityWarning . |
6 | Packets that do not trigger the rule are discarded with the drop() procedure. |
7 | Converts the record to JSON format using the to_json() procedure of the xm_json module. |
You can configure the Exec block to do much more, such as enriching logs with environment variables, alerting administrators, correlating with other events, or even calling external processes. |