Simplicity, stability, and transparency: how Samza makes data integration a breeze

Last summer, we blogged about the evolution of our Druid data integration pipelines from batch to real-time. Earlier this year, we went through another change: switching the stream processing component to Samza, largely due to its great operational characteristics. Switching out the stream processor mid-flight involved both development time and operational risk, so I’m glad to report that it turned out to be worth the effort. Even though we made the leap mostly for operational reasons, we also appreciated Samza’s simple programming model and its take on state management.

Since we’ve now been using it for a few months, I thought it’d be useful to write about our experiences so far. But before getting deeper into that, I should talk about how Metamarkets uses stream processors.

The inside-out database

For the most part, our backend engineering team is in the business of building indexes that support fast aggregates on top of event streams. In our data stack, the indexes live in Druid, but the story begins with the original event streams. These are generally standard ad-tech data types like bid requests, bid responses, impressions, and clicks.

Our first attempt at a streaming data stack lacked a data integration component. We tried to just use Druid to directly index incoming data from our customers. There was a serious mismatch here: even though Druid can index streams, it can only do that in a very particular way. It needs a single stream with dimensions and metrics cleanly separated. We didn’t have that: we had multiple streams with messy data. We needed to join the streams (so we could find out whether ads were served and clicked on) and we needed to transform them a bit. For a while, this mismatch prevented us from actually being able to index data in real-time. At first we tried to get our users to fix up their data for us, and send us data that was already well-suited for Druid indexing, but that flopped. People generally expected us to do the work on our end.

The solution we came up with looks something like the one Martin Kleppmann discussed in his excellent talk, Turning the database inside-out with Apache Samza. The talk encourages you to think of a sprawling data stack as one big deconstructed database. Streaming data stacks typically have some components responsible for data ingestion, some components responsible for data integration, and some components responsible for serving queries on the data. If you look at that kind of stack as an inside-out database, you can see the ingestion layer as your source of facts, the serving layer as a materialized view of those facts, and the data integration layer as a way to keep the materialized views up-to-date with the underlying facts. I think it’s a nice mental framework for thinking about things. Thanks, Martin!

So, we use stream processors to transform the raw data streams into a changelog for the view we want to store in Druid. Then we feed that changelog into Druid, which indexes the stream and lets us query it. In practice, most of our stream processing pipelines have three stages: shuffling, joining, and final output. Each stage is a separate Samza job.

kafka-samza-druid 2

The shuffle stage reads raw data from source-of-truth Kafka topics, partitions them by a join key (usually the impression ID), and writes the result to a Kafka topic. The nice thing about that topic is that we’re guaranteed that any impression and click with the same ID will be in the same partition. This is similar to the map stage of a map/reduce job.

The join stage reads the shuffled topic, uses local state to associate impressions with clicks, then writes joined data to another Kafka topic.

The output stage reads the joined topic, performs final transformation, and writes the result to Druid using Tranquility. Generally, the final transformation involves separating out dimensions and metrics, looking up names for IDs, and general data cleanup.

Together, the join and output stages are similar to the reduce stage of a map/reduce job.

What is Samza and how does it work?

Samza is a distributed stream processing framework that uses Kafka for messaging and state storage. Unlike many other stream processing frameworks, which involve jobs or tasks talking to each other, Samza instances only communicate with each other by reading and writing Kafka topics.

If you’ve ever written a map/reduce job, Samza’s programming model should be really familiar. It’s like a streaming version of map/reduce. Samza jobs consume messages from partitioned input streams and produce output messages to partitioned output streams. They can shuffle messages into different partitions while doing that, which gives you everything you need to do map/reduce.

The nice thing about this API is that it’s composable. You can have a single job, or you can have a series of related jobs where outputs from one job are used as inputs for the next. You can even have many jobs use the output from a single job.

Samza parallelism is partition-oriented. Kafka topics have a certain, user-configurable number of partitions each. You can tell Samza that you want a certain number of containers started up, and Samza will split up the Kafka partitions evenly across the containers you asked for by assigning one “task” per partition. The tasks in one container all execute interleaved on the same thread.

Each task can also have its own state, which Samza handles in an interesting way: your task maintains local state and writes a changelog for its state to Kafka, which acts as persistent storage. If a container dies, Samza replays the changelog topic to re-create each task’s state. With a few gigabytes of state, this usually adds a few minutes to the job startup time. Samza ships a RocksDB-based implementation of local state, but you can use anything you like.

Smooth operator

Our stream processing jobs produce tens of billions of messages per day for a variety of data pipelines. We run in the notoriously pretty-good-but-not-totally-reliable Amazon cloud, and also run very light on operations staff. Each of our clients has a dedicated data pipeline with a series of Samza jobs that have unique performance profiles, unique bottlenecks, and unique demands on system resources. All of this points towards good operational characteristics being really important.

Samza delivers so well here that this is actually the main reason we started using it.

The main pain points we’ve had with stream processing have been getting good behavior on a shared cluster (multi-tenancy), and getting visibility into what’s going on so we can debug problems. Samza hasn’t fixed either of these completely for us, but it has helped a lot.

Multiplexing the computation for all tasks onto a single thread per container is actually great for multi-tenancy. It means you can allocate a bundle of resources per container (we generally use 1.5 CPUs and 3GB of memory) and get reasonably predictable performance. You can vary the number of partitions per container based on how “large” or “small” the input topics are, and know that it won’t affect the resource demands of the container too much. This is great because it means that two containers for two different jobs are generally going to be comparable in resource needs. Once that’s set up, you can lock it in by using YARN to enforce container CPU and memory limits.

At first, we thought that splitting complex pipelines into stages was going to be a bad thing. It means you need to allocate space on Kafka for storing the intermediate topics, which seems like a waste of space. In practice, though, that hasn’t been too expensive, and has paid off by isolating the different stages from each other. This isolation makes it a lot easier to debug performance problems in the pipeline. If one of your pipelines is slow, you can tell which stage is causing the slowdown by looking at backlog metrics for each input topic. Then, if you like, you can attach a profiler to that stage and see exactly what’s happening. While you’re doing that, the slow stage won’t affect stages earlier in the pipeline, since the intermediate Kafka topic decouples them.

We were also concerned about using local state with remote changelogs. It means that when a container fails, or when you restart a job, you must want for local state to repopulate from the remote changelog. This delay in failover and restart time can be troublesome, especially since it causes user-visible data integration latency. There’s been some talk in the Samza community of making this better by potentially re-using local state after restarts, which would definitely be helpful.

Other than that downside, there are certainly some nice things about storing state locally rather than remotely. The Samza documentation on state management makes a case for why local state is a good thing. Our addition to the list is that it’s just much easier to write a high-quality state implementation when Samza handles the coordination and your code only has to worry about single-threaded access on a single machine. Even though we use a custom state implementation (not the builtin RocksDB), our new Samza state management code is about half the size of our previous system’s code. If we used RocksDB it would be even smaller.

The Samza/Kafka combination has not been totally bug-free, but the bugs we’ve seen so far have actually increased our confidence in the system. All of the production issues we’ve had since deploying Samza have been debuggable without too much headache, thanks to Samza’s simple execution model and Kafka’s penchant for materializing everything on disk. So far, the root causes have all been fixed fairly quickly and none have led to permanent data loss.

The sound you hear right now is me knocking on wood.

The mysterious future

I think that the addition of Samza to our data integration stack has really paid off. But, a data engineer’s work is never done! There are a few places that we’d like to go from here.

One is towards even better multi-tenancy. Samza’s really great for CPU and memory isolation, but network isolation is tough. In our experience so far, network saturation is the main cause of irregular performance. There’s an open issue in the YARN tracker for that: YARN-2140. Isolation of all sorts at Kafka is also tough, since Kafka doesn’t do much to isolate topics from each other. In practice we deal with this by treating Kafka as a shared resource pool that needs to be “big enough”, so we monitor it and add capacity as appropriate.

Another is towards auto-scaling for stream processing. Currently, we choose the number of containers per Samza job manually, and adjust periodically based on utilization. This system doesn’t scale well to a large number of pipelines, so we’re looking into automatically choosing a good number of containers for each job. To that end, we’ve added a set of utilization metrics to our Samza jobs and are building a service that can analyze them and relaunch jobs accordingly.

Finally, we want to move towards a streaming-only architecture. We still use a lambda architecture involving Hadoop for batch processing, Samza for stream processing, and Druid for queries. The reason we use batch processing is because in our eyes, lambda architectures are still the best way to build robust data pipelines. This is because of the generally poor support for transactional “exactly-once” processing in open-source software today.

Folks are working on this, though, and we want to be part of that! There are murmurs in the Kafka and Samza communities about adding exactly-once capabilities to the at-least-once model that exists now. On our end, we’re preparing for that world by looking into how we can support exactly-once streaming ingestion in Druid and throughout our entire streaming data pipeline.

If you’re interested in working on these kinds of things, we’re always hiring!

Filed in Technology