Building a Data Pipeline That Handles Billions of Events in Real-Time

At Metamarkets our goal is to help our clients make sense of large amounts of data in real-time. Our platform ingests tens of billions of new events every day, and currently comprises trillions of aggregated events. Our real-time analytics platform has two separate yet equally important goals: interactivity (real-time queries) and data freshness (real-time ingestion).

We’ve written before about how Druid, our open-source datastore, is able to offer fast, interactive queries. In this post, we’re going to focus on the challenges around achieving data freshness. We’ll talk about the batch-oriented pipelines we started with, and how we approached building real-time pipelines with these important goals:

  • Latency: Real-time latency means being able to query events seconds after they happen.
  • Power: Most data pipelines do not simply copy data around; they need to join, transform, and aggregate data as it flows through the system.
  • Reliability: Queries must accurately reflect the original input data; we don’t want to drop any events or introduce any duplicates.
  • Scalability: The system must be able to handle our current load—tens of billions of events per day—and scale well into the future.

Hadoop was effective, but slow

The first version of our platform used Druid to offer real-time query speed, but it took hours to load in any newly-arrived data. At the time, our Druid cluster was kept up-to-date by data pipelines running entirely on Hadoop Map/Reduce. We asked our clients to send us data by uploading files, which we copied to HDFS and processed in batches of a few hours at a time. We used Pig to join and transform newly-arrived data, then used Druid’s Hadoop indexer to load up the result, and then began the cycle anew.

The data pipelines served a number of purposes. Druid requires a denormalized, flat data model, but we rarely receive such data directly. Many of our clients send us data feeds that need to be joined together (for example, online ad requests, impressions, and clicks are typically tracked separately). Nearly all of our clients’ datasets need ID-to-name lookups that must be done using separately provided lookup tables. We also do some transformations of the raw data, like user agent sniffing, URL domain extraction, and metric bucketing.

This sort of processing worked well enough on Hadoop, but our clients often wanted their data to be available faster. We realized that the Hadoop-based approach was incapable of achieving low latency—so we needed to try something else.

Building a faster pipeline

Druid is capable of real-time ingestion, so we explored how we could use that to speed up the data pipelines. The general idea behind Druid’s real-time ingestion setup is that you send your events, as they occur, to a message bus like Kafka, and Druid’s real-time indexing service then connects to the bus and streams a copy of the data. We decided to build a web service that our clients could use to send us data, and which we’d use to validate the data and write it to Kafka, where Druid would be able to ingest it.

The biggest obstacle to making this real-time data pipeline work for us was addressing the need to join and transform data before loading it into Druid—something we relied on Pig for, and that Druid could not do on its own. We looked at a number of open-source stream processors, and eventually decided that Storm was the most mature and suitable for our needs at the time. We wrote a set of Storm bolts to do tasks like streaming joins, transformations, aggregations, and sending events to Druid. We now use combinations of these bolts to build Storm topologies that read events from Kafka, retain only those that are “on time”, process those events, and send the post-processed events to Druid in real-time. Druid can immediately answer queries over this data, yielding a very low latency event pipeline.

Putting it all together

Our story would end here if real-time processing were perfect. But it’s not: some events can come in days late, some time ranges need to be re-processed after initial ingestion due to code changes or data revisions, various components of the real-time pipeline can fail, and so on. So to handle these cases, we run two separate data pipelines, one on Storm and one on Hadoop Map/Reduce. This sort of setup has been described by others and is sometimes called a “lambda architecture.”

Data-ingestion-image

In our version of this architecture, Kafka acts as the origin data source for both pipelines. In the batch pipeline, all events are copied from Kafka to S3 and are then processed by a Hadoop job that applies the same processing logic as the Storm topology. We use a high-level abstraction to generate both the Hadoop and Storm jobs, although it’s also possible to write them independently. Once loaded into Druid, this data completely replaces the data that was loaded in real-time for the same interval, providing the reliability guarantee we need.

Do try this at home

If you’d like to build something similar, you’re in luck: Kafka, Storm, Hadoop, and Druid are all open source. The connecting bits are open too: Storm includes a Kafka spout, and we have open-sourced Tranquility, our toolset for streaming data to Druid, which includes the Storm bolt we use in production. LinkedIn has open-sourced a Kafka-to-HDFS program, and Druid includes a Hadoop-based indexer. You don’t need to implement anything to perform aggregations or to merge real-time and batch results: Druid handles all of that for you.

We’ve found that this combination of technologies is fast and flexible enough to handle a wide variety of processing requirements and query loads. Each piece of the stack is designed to do a specific set of tasks very well.

If you’re interested in joining us to work on these sorts of systems, check out our jobs page for our set of open positions. We’d love to hear from you!

You can also follow Metamarkets and Druid on Twitter.

Filed in Corporate, Technology
  • skruzel

    Great work as always. It’s nice to see Storm + Druid getting more exposure. Could you elaborate on what are typical methods people use to deploy / manage a cluster like this? Thank you.

  • Gian Merlino

    I’m not sure what the most common thing is, but what we do for deployment is to build self-contained jars for Storm, Druid, Hadoop, Kafka, and so on, and then ship those jars to EC2 instances where we run them with init scripts. For management we use the UIs and APIs built into those projects along with a *lot* of metrics (builtin ones plus extra metrics we write into our jobs). In our case those metrics are fed back into Druid, where we can get at them for analysis and debugging, and also alert on things that look anomalous.

Subscribe to our Blog and Reports