Dogfooding with Druid, Samza, and Kafka: Metametrics at Metamarkets

“Another flaw in the human character is that everybody wants to build and nobody wants to do maintenance.” – Kurt Vonnegut

Every engineer loves the feeling of standing up a new piece of open source infrastructure, satisfaction born from a grueling journey through community forums, outdated documentation, and mostly-uncommented source code.

The glory is fleeting, because not 20 minutes into having your shiny new service up and running, a hiccup hits and you’re forced to ask: what am I going to use to monitor and maintain this thing?

We’ve now entered an era where software increasingly runs not on a single box, but on numerous machines distributed across multiple data centers.  Maintaining these complex systems requires three pieces: 1) continuous monitoring of metrics such as throughput, latency, and memory pressure; 2) an understanding of what is normal for these metrics; and 3) a knowledge of what corrective actions are warranted for abnormal ranges.

At Metamarkets, we’ve composed several open source services — Kafka, Samza, Hadoop, and Druid — to form the real-time analytics data (RAD) stack at the core of our infrastructure.  When it comes to monitoring and maintenance of these systems, we’re fortunate to have a solution that fits the bill:  the very stack itself.  Much as many relational databases store metadata in the database itself, the Metamarkets’ production stack is monitored by a mirror, independently operated metrics stacks.

In this post, we detail how we do it.

Metrics and Monitoring at Metamarkets

At Metamarkets, we ingest over 1 million events per second of time-series data and issue ad-hoc analytic queries over trillions of events. To meet our technology needs, we run an open source analytics stack built with Kafka, Samza, Hadoop, and Druid. Client events are delivered to Kafka, processed in near real-time with Samza, and made queryable with Druid. Periodic Hadoop jobs correct the inaccuracies of the real-time pipeline. At the volume, speed, and scale of our operations, monitoring is critical.


To monitor every piece of our stack, we modified each of the aforementioned technologies to periodically emit metrics. These metrics tell us about how fast the data is being ingested, how fast queries are running, and the general health of the components of the stack. An example event that is emitted by our production Druid cluster when a user issues a query looks something like this:

{“timestamp”:”2015-05-01T12:34:56”, “metric”: “query/time”, “service”:”druid/prod/broker”, “value”:”234”, “type”:”timeseries”, …}

This event includes a “timestamp” key to indicate when the event occurred. The event also contains numerous tags (or dimensions if you are familiar with OLAP terminology) that describe attributes about the query being issued. These tags can include information such as the query type, interval, aggregators, id, and many other useful attributes. The query completion time is stored in the “value” metric. Different metric events will have different tags and different services will emit different metric events.

Our production cluster metrics are emitted to a dedicated metrics cluster that consists of the exact same set of technologies as our production stack. Metric events are delivered to Kafka, cleaned up and processed by Samza, and stored in Druid. Isolating the metrics cluster from the production cluster ensures that we can troubleshoot issues even if the production cluster is experiencing issues. In addition to metrics about Kafka, Samza, Hadoop and Druid, we also store metrics about how customers interact with our application. Our setup provides us with an end-to-end view of how customers use our product, and the impact of their usage on our infrastructure.

Prod Cluster

Exploratory Analytics

We elected to use the same technologies for our metrics stack as our production stack not only because we’re quite fond of these particular open source technologies, but also because we want to understand the root causes of a production issue. Although many existing data infrastructure solutions can provide trends and alerts, few solutions allow for arbitrary data exploration. We’ve written before about why many existing technologies are poor choices for exploring data, and how we designed Druid to solve this problem. Druid provides the functionality to be able to narrow in on exactly what is happening with both real-time and historical data.

Druid also has the ability to intake events with arbitrary dimensions, which works well for us because each of our services emits events with different schemas. Druid allows us to store all of the metrics for all of our services in a single Druid table. As an added bonus, we have a flexible number of attributes to describe our metric events, and we can be as verbose as we want. This enables us to examine our metrics data in a lot of ways, which helps us to precisely narrow down on which attributes cause problems in production.

Slow queries occur from time to time in production, and it is not always immediately clear why things are slow. A particular node may be bad, there may be heavy concurrent usage, or a single user may be issuing a very expensive query. To determine the root cause, we have to examine query metrics from many different angles, eliminating possibilities as we explore through the data. This is an iterative workflow, where a starting question, “why are queries slow?”, leads to a set of results that may inform further questions.

To determine the source of slowness, we can issue a query to our metrics cluster that returns the average query latencies for different nodes in our production cluster. Imagine that we find that a single node is returning results significantly slower than the others. This may be because the node is bad, or because sub-optimal data distribution led the node to hold more valuable data, creating a hot spot. Issuing another query to our metrics cluster for the number of queries received by each of our production nodes can confirm if the slow node is receiving an overload of queries. Looking at the number of exceptions that occurred on the slow node can help us diagnose if the node is bad. Asking questions, getting results, and using those results to inform further questions is known as exploratory analytics.

Trends and Reports are still Critical

The exploratory analytic workflow is great for diagnosing issues. However, many issues can be prevented with proper monitoring. In addition to the exploratory use case, we also use Druid to monitor key metrics, and to generate periodic performance reports. We leverage Druid’s built in approximate histogram and quantile support to determine 90th, 95th, and 99th query latencies. When the latencies violate a threshold we’ve set, alerts are fired and emails are sent.

The combination of monitoring, reporting, and exploratory analytics are powerful weapons in the fight against downtime. Monitoring and reporting can help catch many problems before they become serious, and exploratory analytics can be used to rapidly diagnose serious problems.

An Engineer’s Job is Never Done

Our solution is not perfect. There have been times where the root cause of a production problem was the result of an esoteric combination of events. At the end of the day, some problems can only be explained when you have a deep understanding of the systems you are working with. We are constantly working on improving the intelligence of our operations and monitoring tools. If you are interested in this type of work, you should come work with us.

Featured image courtesy of 20th Century Fox

Filed in Company, Druid