MongoDB

MongoDB is a document-oriented database system.

NXLog can be configured to collect data from a MongoDB database. A proof-of-concept Perl script is shown in the example below.

Example 1. Collecting data from MongoDB

This configuration uses im_perl to execute a Perl script which reads data from a MongoDB database. The generated events are written to file with om_file.

When new documents are available in the database, the script sorts them by ObjectId and processes them sequentially. Each document is passed to NXLog by calling Log::Nxlog::add_input_data(). The script will poll the database continuously with Log::Nxlog::set_read_timer(). In the event that the MongoDB server is unreachable, the timer delay will be increased to attempt reconnection later.

After processing, documents are deleted from the collection.
The Perl script shown here is a proof-of-concept only. The script must be modified to correspond with the data to be collected from MongoDB.
nxlog.conf
<Input perl>
    Module      im_perl
    PerlCode    mongodb-input.pl
</Input>

<Output file>
    Module      om_file
    File        '/tmp/output.log'
</Output>
mongodb-input.pl
#!/usr/bin/perl

use strict;
use warnings;

use FindBin;
use lib $FindBin::Bin;
use Log::Nxlog;
use MongoDB;
use Try::Tiny;

my $counter;
my $client;
my $collection;
my $cur;
my $count;
my $logfile;

sub read_data_int
{
   $counter //= 1;
   # Connect to the server
   $client //= MongoDB::MongoClient->new(host => 'localhost:27017');

   # Select the database and collection
   $collection //= $client->ns('zips.zips');
   # Sort all existing documents by _id.
   $cur = $collection->find()->sort({_id => 1});

   # Do this only the first time around. Make our cursor immortal.
   if ($counter == 1) {
   $cur->immortal(1);
   }
   $counter++;

   # If any new document exist, process them.
   while ($cur->has_next()) {
       my $event = Log::Nxlog::logdata_new();
       my $obj = $cur->next;
       my $line = "ID: " . $obj->{"_id"} . " City: " . $obj->{"city"} . " Loc: " . $obj->{"loc"}[0] . "," . $obj->{"loc"}[1] . " Pop: " . $obj->{"pop"} . " State: " . $obj->{"state"};
       Log::Nxlog::set_field_string($event, 'raw_event', $line);
       Log::Nxlog::add_input_data($event);
       # Once the document is processed, delete it.
       my $result = $collection->delete_one( { _id => $obj->{"_id"} } );
       #print $logfile "Extracted document with _id: " . $obj->{"_id"} . " Deleting returned: " . $result->deleted_count . "\n";
    }
}

sub read_data
{
   # Use a try/catch block in order to resume when mongodb is unreachable.
   #open ($logfile, '>>', '/tmp/perl-input.log') or die "Could not open log file";
   try {
     read_data_int();
     # Adjust this timer for how often to look for new documents.
     Log::Nxlog::set_read_timer(1);
     } catch {
     #print $logfile "Error thrown: $_ Will retry in 10 seconds.";
     # Adjust this timer for how often to try to reconnect.
     Log::Nxlog::set_read_timer(10);
  };
}

For this example, a JSON data set of US ZIP (postal) codes was used. The data set was fed to MongoDB with mongoimport -d zips -c zips --file zips.json.

Input sample
{ "_id" : "01001", "city" : "AGAWAM", "loc" : [ -72.622739, 42.070206 ], "pop" : 15338, "state" : "MA" }
{ "_id" : "01008", "city" : "BLANDFORD", "loc" : [ -72.936114, 42.182949 ], "pop" : 1240, "state" : "MA" }
{ "_id" : "01010", "city" : "BRIMFIELD", "loc" : [ -72.188455, 42.116543 ], "pop" : 3706, "state" : "MA" }
{ "_id" : "01011", "city" : "CHESTER", "loc" : [ -72.988761, 42.279421 ], "pop" : 1688, "state" : "MA" }
{ "_id" : "01020", "city" : "CHICOPEE", "loc" : [ -72.576142, 42.176443 ], "pop" : 31495, "state" : "MA" }
Output sample
ID: 01001 City: AGAWAM Loc: -72.622739,42.070206 Pop: 15338 State: MA
ID: 01008 City: BLANDFORD Loc: -72.936114,42.182949 Pop: 1240 State: MA
ID: 01010 City: BRIMFIELD Loc: -72.188455,42.116543 Pop: 3706 State: MA
ID: 01011 City: CHESTER Loc: -72.988761,42.279421 Pop: 1688 State: MA
ID: 01020 City: CHICOPEE Loc: -72.576142,42.176443 Pop: 31495 State: MA
Disclaimer

While we endeavor to keep the information in this topic up to date and correct, NXLog makes no representations or warranties of any kind, express or implied about the completeness, accuracy, reliability, suitability, or availability of the content represented here.

Last revision: 28 March 2019