Introducing pyTails

Atharva Inamdar
3 min readDec 1, 2019

The previous posts discussed how the MongoDB oplog works. In this post, we look at how to tail and introduce you to a python module for tailing MongoDB oplog.

I developed this as a fun exercise to see how the different pieces fit together and what needs to be considered for tailing the oplog.

Architecture

At a high level, this module works with technologies in AWS but is designed to be extensible. The module has 2 responsibilities: Collect oplog and push it to a stream. As discussed in the first post, I use pymongo package. But before that here’s the architecture and data flow.

pyTails Architecture and Data Flow

pyTails is essentially an infinite loop reading from a tailable cursor. This is executed as a container task in AWS Fargate/ECS and pushes batches to Kinesis Firehose. Firehose is then responsible for writing data to AWS S3. Each replica set to tail would need a separate instance of pyTails and Firehose. However, it could be writing to the same S3 prefix. pyTails uses DynamoDB to store checkpoints per replica set. This allows pyTails to start tailing from the last consumed oplog entry.

Both Firehose and DynamoDB support batch requests. As such, pyTails batches 500 oplog entries before writing to Firehose and checkpointing in DynamoDB.

pyTails

The pyTails project is split into several models that allow plugging in various data sinks, state stores and tailers. Tailers currently define MongoDB oplog tailer with the ability to either return raw oplog or the modified document. Returning the modified document is still a work in progress but raw oplog output is functional.

These oplog entries are published to one or more data sinks. it is possible to configure multiple sinks. Currently, there are three sinks available: console, Kinesis Data Streams, and Kinesis Firehose Delivery Streams. Firehose sink is batched but the others are not. Batching is handled inside the sinks as there may be different requirements per sink. Once the batch is published, a checkpoint is committed.

Checkpoints are a way for pyTails to keep track of last consumed oplog entry. This allows pyTails to resume consuming after a failure or maintenance downtime. Currently, only DynamoDB is implemented as a state store. pyTails can be forced to commit a checkpoint mid-batch by sending aSIGUSR1 signal to the process. This signal is available only when running on Linux.

Output

Below is a sample output of the oplog entry document.

{
"doc": {
"ts": {
"$timestamp": {
"t": 1550582659,
"i": 184
}
},
"t": 18,
"h": 8861252172934057000,
"v": 2,
"op": "u",
"ns": "record.publishingsessions",
"o2": {
"_id": {
"$oid": "5c69fe5bb3d27d0b2c17500f"
}
},
"o": {
"$set": {
"lp": {
"$date": 1550582659699
},
"u": {
"$date": 1550582659699
}
}
}
}
}

The oplog entry is embedded inside a doc object. this allows pyTails to optionally add ts field to the top level in a consistent format without regard for the source of data. Imagine in the future, this could read DynamoDB Stream or SQL Server CDC log, the timestamps are represented in different ways, pyTails can standardise this and present a single interface to consume the output data.

pyTails currently does not support MongoDB Change Streams which are available in v3.6 and later.

Docker

Use of docker is optional. However, this is the easiest and best way to package an application and predefine some of the best practices. A basic docker file is provided for pyTails.

Disclaimer: This project is developed as a side project with the main aim of learning the nuances of MongoDB oplog, and distributed applications. However, the intention is to develop a stable product. Any ideas, suggestions and code reviews are welcome.

--

--