MongoDB
MongoDB is a document-oriented database system.
NXLog Agent can be configured to collect data from a MongoDB database. A proof-of-concept Perl script is shown in the example below.
This configuration uses im_perl to execute a Perl script that reads data from a MongoDB database. The generated events are written to file with om_file.
When new documents are available in the database, the script sorts them by ObjectId and processes them sequentially.
Each document is passed to NXLog Agent by calling Log::Nxlog::add_input_data()
.
The script will poll the database continuously with Log::Nxlog::set_read_timer()
.
In the event that the MongoDB server is unreachable, the timer delay will be increased to attempt reconnection later.
After processing, documents are deleted from the collection. |
The Perl script shown here is a proof-of-concept only. The script must be modified to correspond with the data to be collected from MongoDB. |
<Input perl>
Module im_perl
PerlCode mongodb-input.pl
</Input>
<Output file>
Module om_file
File '/tmp/output.log'
</Output>
#!/usr/bin/perl
use strict;
use warnings;
use FindBin;
use lib $FindBin::Bin;
use Log::Nxlog;
use MongoDB;
use Try::Tiny;
my $counter;
my $client;
my $collection;
my $cur;
my $count;
my $logfile;
sub read_data_int
{
$counter //= 1;
# Connect to the server
$client //= MongoDB::MongoClient->new(host => 'localhost:27017');
# Select the database and collection
$collection //= $client->ns('zips.zips');
# Sort all existing documents by _id.
$cur = $collection->find()->sort({_id => 1});
# Do this only the first time around. Make our cursor immortal.
if ($counter == 1) {
$cur->immortal(1);
}
$counter++;
# If any new document exist, process them.
while ($cur->has_next()) {
my $event = Log::Nxlog::logdata_new();
my $obj = $cur->next;
my $line = "ID: " . $obj->{"_id"} . " City: " . $obj->{"city"} . " Loc: " . $obj->{"loc"}[0] . "," . $obj->{"loc"}[1] . " Pop: " . $obj->{"pop"} . " State: " . $obj->{"state"};
Log::Nxlog::set_field_string($event, 'raw_event', $line);
Log::Nxlog::add_input_data($event);
# Once the document is processed, delete it.
my $result = $collection->delete_one( { _id => $obj->{"_id"} } );
#print $logfile "Extracted document with _id: " . $obj->{"_id"} . " Deleting returned: " . $result->deleted_count . "\n";
}
}
sub read_data
{
# Use a try/catch block in order to resume when mongodb is unreachable.
#open ($logfile, '>>', '/tmp/perl-input.log') or die "Could not open log file";
try {
read_data_int();
# Adjust this timer for how often to look for new documents.
Log::Nxlog::set_read_timer(1);
} catch {
#print $logfile "Error thrown: $_ Will retry in 10 seconds.";
# Adjust this timer for how often to try to reconnect.
Log::Nxlog::set_read_timer(10);
};
}
For this example, a JSON data set of US ZIP (postal) codes was used.
The data set was fed to MongoDB with mongoimport -d zips -c zips --file zips.json
.
{ "_id" : "01001", "city" : "AGAWAM", "loc" : [ -72.622739, 42.070206 ], "pop" : 15338, "state" : "MA" }
{ "_id" : "01008", "city" : "BLANDFORD", "loc" : [ -72.936114, 42.182949 ], "pop" : 1240, "state" : "MA" }
{ "_id" : "01010", "city" : "BRIMFIELD", "loc" : [ -72.188455, 42.116543 ], "pop" : 3706, "state" : "MA" }
{ "_id" : "01011", "city" : "CHESTER", "loc" : [ -72.988761, 42.279421 ], "pop" : 1688, "state" : "MA" }
{ "_id" : "01020", "city" : "CHICOPEE", "loc" : [ -72.576142, 42.176443 ], "pop" : 31495, "state" : "MA" }
ID: 01001 City: AGAWAM Loc: -72.622739,42.070206 Pop: 15338 State: MA
ID: 01008 City: BLANDFORD Loc: -72.936114,42.182949 Pop: 1240 State: MA
ID: 01010 City: BRIMFIELD Loc: -72.188455,42.116543 Pop: 3706 State: MA
ID: 01011 City: CHESTER Loc: -72.988761,42.279421 Pop: 1688 State: MA
ID: 01020 City: CHICOPEE Loc: -72.576142,42.176443 Pop: 31495 State: MA