Event Correlator (pm_evcorr)
The pm_evcorr module provides event correlation functionality in addition to the already available NXLog language features such as variables and statistical counters, which can be also used for event correlation purposes.
This module was greatly inspired by the Perl-based event correlation tool SEC. Some of the rules of the pm_evcorr module were designed to mimic those available in SEC. This module aims to be a better alternative to SEC with the following advantages:
-
The correlation rules in SEC work with the current time. With pm_evcorr it is possible to specify a time field that is used for elapsed time calculation making offline event correlation possible.
-
SEC uses regular expressions extensively, which can become quite slow if there are many correlation rules. In contrast, this module can correlate pre-processed messages using fields from, for example, the pattern matcher and syslog parsers without requiring the use of regular expressions (though these are also available for use by correlation rules). Thus testing conditions can be significantly faster when simple comparison is used instead of regular expression-based pattern matching.
-
This module was designed to operate on fields, making it possible to correlate structured logs in addition to simple free-form log messages.
-
Most importantly, this module is written in C, providing performance benefits (where SEC is written in pure Perl).
The rulesets of this module can use a context.
A context is an expression that is evaluated during runtime to a value and the correlation rule is checked in the context of this value.
For example, to count the number of failed logins per user and alert if the failed logins exceed 3 for the user, the $AccountName
would be used as the context.
There is a separate context storage for each correlation rule instance.
For global contexts accessible from all rule instances, see module variables and statistical counters.
Configuration
The pm_evcorr module accepts the following directives in addition to the common module directives.
The pm_evcorr configuration contains correlation rules which are evaluated for each log message processed by the module. Currently there are seven rule types supported by pm_evcorr: Absence,Group, Pair, Simple, Stop, Suppressed, and Thresholded. These rules are defined in configuration blocks. The rules are evaluated in the order they are defined. For example, a correlation rule can change a state, variable, or field which can be then used by a later rule. File inclusion can be useful to store correlation rules in a separate file.
Required directives
The following directives are required for the module to start.
This rule type does the opposite of Pair.
When TriggerCondition evaluates to
|
|||
This rule type groups messages together based on the specified correlation context. The Exec block is executed at each event. The last log data of each context group is available through get_prev_event_data(). This way, fields and information can be propagated from the previous group event to the following one.
|
|||
When TriggerCondition evaluates to
|
|||
This rule type is essentially the same as the Exec directive supported by all modules. Because Exec blocks are evaluated before the correlation rules, the Simple rule was also needed to be able to evaluate a statement as the other rules do, following the rule order. The Simple block has one directive also with the same name.
|
|||
This rule type will stop evaluating successive rules if the Condition evaluates to
|
|||
This rule type matches the given condition.
If the condition evaluates to
|
|||
This rule type will execute the statement(s) in the Exec directive(s) if the Condition evaluates to
|
Optional directives
When a Context is used in the correlation rules, these must be purged from memory after they are expired, otherwise using too many context values could result in a high memory usage.
This optional directive specifies the interval between context cleanups, in seconds. By default, a |
|
This specifies the name of the field to use for calculating elapsed time, such as |
Functions
The following functions are exported by pm_evcorr.
- unknown
get_prev_event_data(string field_name)
-
When the correlation rule triggers an Exec, the data might not be available. This function can be used to retrieve fields of the event that triggered the rule. The field must be specified as a string (for example,
get_prev_event_data("EventTime")
). This is applicable only for the Pair and Absence rule types.
Examples
The following configuration shows the
Absence directive. In this case, if
TriggerCondition evaluates to TRUE
, it waits for the seconds defined in
Interval for the RequiredCondition to become TRUE
. If the
RequiredCondition does not become TRUE
within the specified interval, then it
executes what is defined in Exec.
2010-01-01 00:00:26 absence-trigger
2010-01-01 00:00:29 absence-required - will not log 'got absence'
2010-01-01 00:00:46 absence-trigger
2010-01-01 00:00:57 absence-required - will log an additional 'absence-required not received within 10 secs'
<Input internal>
Module im_internal
<Exec>
$raw_event = $Message;
$EventTime = 2010-01-01 00:01:00;
</Exec>
</Input>
<Processor evcorr>
Module pm_evcorr
TimeField EventTime
<Absence>
TriggerCondition $Message =~ /^absence-trigger/
RequiredCondition $Message =~ /^absence-required/
Interval 10
<Exec>
log_info("'absence-required' not received within 10 secs");
</Exec>
</Absence>
</Processor>
absence-trigger
absence-required - will not log 'got absence'
absence-trigger
absence-required - will log an additional 'absence-required not received within 10 secs'
'absence-required' not received within 10 secs
The following configuration shows rules for the Group directive.
It rewrites the events to exclude the date and time, then rewrites the $raw_event
with the $Context
and $Message
.
After that, for every matched event, it adds the $Message
field of the newly matched event to it.
2010-01-01 00:00:01 [a] suppressed1
2010-01-01 00:00:02 [b] suppressed2
2010-01-01 00:00:03 [a] suppressed3
2010-01-01 00:00:04 [b] suppressed4
2010-01-01 00:00:04 [b] suppressed5
2010-01-01 00:00:05 [c] suppressed6
2010-01-01 00:00:06 [c] suppressed7
2010-01-01 00:00:34 [b] suppressed8
2010-01-01 00:01:00 [a] pair-first1
<Input in>
Module im_file
File "file/input"
<Exec>
if ($raw_event =~ /^(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d) \[(.+)\] (.+)/)
{
$EventTime = parsedate($1);
$Context = $2;
$Message = $3;
$raw_event = $2 + " " + $3;
}
</Exec>
</Input>
<Processor evcorr>
Module pm_evcorr
TimeField EventTime
ContextCleanTime 10
<Group>
Context $Context
<Exec>
if defined get_prev_event_data("raw_event")
{
$raw_event = get_prev_event_data("raw_event") + ", " + $Message;
}
else
{
$raw_event = "Context: " + $Context + " Messages: " + $Message;
}
</Exec>
</Group>
</Processor>
Context: a Messages: suppressed1
Context: b Messages: suppressed2
Context: a Messages: suppressed1, suppressed3
Context: b Messages: suppressed2, suppressed4
Context: b Messages: suppressed2, suppressed4, suppressed5
Context: c Messages: suppressed6
Context: c Messages: suppressed6, suppressed7
Context: b Messages: suppressed2, suppressed4, suppressed5, suppressed8
Context: a Messages: suppressed1, suppressed3, pair-first1
The following configuration shows rules for the Pair directive.
In this case, if TriggerCondition evaluates to TRUE
, it waits the seconds defined in Interval for the RequiredCondition to become TRUE
, then executes what is defined in Exec.
If the Interval is 0, there is no window for matching.
2010-01-01 00:00:12 pair-first - now look for pair-second
2010-01-01 00:00:22 pair-second - will log 'got pair'
2010-01-01 00:00:25 pair-first
2010-01-01 00:00:56 pair-second - will not log 'got pair' because it is over the interval
<Input filein>
Module im_file
File "input/file"
Exec if ($raw_event =~ /^(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d) (.+)/) { \
$EventTime = parsedate($1); \
$Message = $2; \
$raw_event = $Message; \
}
Exec $raw_event = $Message;
Exec $EventTime = 2010-01-01 00:01:00;
</Input>
<Processor evcorr>
Module pm_evcorr
TimeField EventTime
<Pair>
TriggerCondition $Message =~ /^pair-first/
RequiredCondition $Message =~ /^pair-second/
Interval 30
Exec $raw_event = "got pair";
</Pair>
</Processor>
pair-first - now look for pair-second
got pair
pair-first
The following configuration shows rules for the Simple directive.
In this case, if the $Message
field starts with simple
it is rewritten to got simple
.
2010-01-01 00:00:00 Not simple
2010-01-01 00:00:05 Not simple again
2010-01-01 00:00:10 simple1
2010-01-01 00:00:15 simple2
<Input filein>
Module im_file
File "input/file"
Exec if ($raw_event =~ /^(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d) (.+)/) { \
$EventTime = parsedate($1); \
$Message = $2; \
$raw_event = $Message; \
}
Exec $raw_event = $Message;
Exec $EventTime = 2010-01-01 00:01:00;
</Input>
<Processor evcorr>
Module pm_evcorr
TimeField EventTime
<Simple>
Exec if $Message =~ /^simple/ $raw_event = "got simple";
</Simple>
</Processor>
Not simple
Not simple again
got simple
got simple
The following configuration shows a rule for the
Stop directive in conjunction with the
Simple directive. In this case, if the Stop
condition evaluates to FALSE
, the Simple directive returns the output as
rewritten
.
2010-01-02 00:00:00 this will be rewritten
2010-01-02 00:00:10 this too
2010-01-02 00:00:15 as well as this
<Input filein>
Module im_file
File "input/file"
Exec if ($raw_event =~ /^(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d) (.+)/) { \
$EventTime = parsedate($1); \
$Message = $2; \
$raw_event = $Message; \
}
Exec $raw_event = $Message;
Exec $EventTime = 2010-01-01 00:01:00;
</Input>
<Processor evcorr>
Module pm_evcorr
TimeField EventTime
<Stop>
Condition $EventTime < 2010-01-01 00:00:00
Exec log_debug("got stop");
</Stop>
<Simple>
Exec $raw_event = "rewritten";
</Simple>
</Processor>
rewritten
rewritten
rewritten
The following configuration shows a rule for the Suppressed directive. In this case, the directive matches the input event and executes the corresponding action, but only for the time defined in the Interval condition in seconds. After that, it logs the input as is.
2010-01-01 00:00:01 to be suppressed1 - Suppress kicks in, will log 'suppressed..'
2010-01-01 00:00:21 to be suppressed2 - suppressed and logged as is
2010-01-01 00:00:23 to be suppressed3 - suppressed and logged as is
<Input filein>
Module im_file
File "input/file"
Exec if ($raw_event =~ /^(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d) (.+)/) { \
$EventTime = parsedate($1); \
$Message = $2; \
$raw_event = $Message; \
}
Exec $raw_event = $Message;
Exec $EventTime = 2010-01-01 00:01:00;
</Input>
<Processor evcorr>
Module pm_evcorr
TimeField EventTime
<Suppressed>
Condition $Message =~ /^to be suppressed/
Interval 30
Exec $raw_event = "suppressed..";
</Suppressed>
</Processor>
suppressed..
to be suppressed2 - suppressed and logged as is
to be suppressed3 - suppressed and logged as is
The following configuration shows rules for the Thresholded directive. In this case, if the number of events exceeds the given threshold within the interval period, the action defined in Exec is carried out.
2010-01-01 00:00:13 thresholded1 - not tresholded will log as is
2010-01-01 00:00:15 thresholded2 - not tresholded will log as is
2010-01-01 00:00:20 thresholded3 - will log 'got thresholded'
2010-01-01 00:00:25 thresholded4 - will log 'got thresholded' again
<Input filein>
Module im_file
File "input/file"
Exec if ($raw_event =~ /^(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d) (.+)/) { \
$EventTime = parsedate($1); \
$Message = $2; \
$raw_event = $Message; \
}
Exec $raw_event = $Message;
Exec $EventTime = 2010-01-01 00:01:00;
</Input>
<Processor evcorr>
Module pm_evcorr
TimeField EventTime
<Thresholded>
Condition $Message =~ /^thresholded/
Threshold 3
Interval 60
Exec $raw_event = "got thresholded";
</Thresholded>
</Processor>
thresholded1 - not tresholded will log as is
thresholded2 - not tresholded will log as is
got thresholded
got thresholded