Eventually in the course of data growth, a company needs to make a major migration of data or processes from one physical location to another. This post is the story of how we moved a real-time data flow across cloud providers using Kafka, Samza, and some creative engineering.
Our technology stack for data processing is something we’ve spoken about before. We run a Lambda architecture with the real-time system comprising Kafka and Samza, which terminates in Druid real-time indexing tasks. The batch system is comprised of Spark, which reads and writes from S3. Druid historical nodes use S3 as their deep storage.
This setup served us well as we started to grow, but we are at a scale now where it is showing some of its limitations. Some of the early changes to help make it a more dynamic platform are captured here, and our growth strategy is something we are constantly re-evaluating.
The first consideration when looking where to move data was to look at moving across zones and across regions. In order to facilitate this, a lot of changes and re-working of the application interconnectivity needed to be accomplished. Since we were taking the opportunity to reconfigure key parts of our applications, we decided to also start exploring the possibility of moving some of our compute footprint into a new data center (new cloud provider).
We evaluated some offerings of cloud providers and decided to undertake a project to allow us to consume cloud compute resources from multiple vendors. The details of the multiple vendors are explored in deeper depth in another post. This post is focused more on the actual process of migrating services that push hundreds of terabytes per day across data center boundaries. For the purpose of this post, the prior data center (DC) is AWS’s us-east-1 in Virginia, and the new DC is GCP’s us-east1 in South Carolina. We now have multiple Gbs of fiber capacity running between the two locations on dedicated interconnects.
To split the data flow, it made sense to keep things that were connectivity or latency-sensitive together. That meant things like elections and locks should be local, and any data flows that had a steady stream should be local. Looking at how the different services operate, both the real-time component of Druid and our service for persisting data from Kafka have intermittent and latency-insensitive data flows to S3. So if the Kafka – Samza – DruidRT pipeline were kept in a single zone, the S3 transfer could be directed or delayed as needed.
For data persistence we had the option of keeping data local to the real-time workflow by using GCS, or by shipping it to S3. One major deciding factor here was the fact that Druid Historicals absolutely love shuffling data around all the time. Given this restriction, putting the “batch” workflow data in S3 was deemed a more straight forward solution given the current technology constraints.
One major challenge was that nasty line going right through Zookeeper. The way the services use Zookeeper was analyzed, and it was determined that if Kafka and Samza could use a local Zookeeper, the Druid-RT component could function on the guarantees of Zookeeper Observers.
So now we had a way where the bulk of traffic flowing across datacenters was either traffic to S3, or informational / queue related traffic going from DruidRT to the rest of the Druid cluster. The next major challenge was how to actually redirect the real-time data.
The general plan followed the following workflow (circles in the diagram above):
- Mirror all data to the new DC
- Stop processing real-time data in prior DC, only process real-time data in new DC
- Change DNS to send data directly to the new DC
Prior to moving things over, all the components were exercised independently and on test pipelines including our staging cluster we call our “metrics” cluster. A new zookeeper was spun up to isolate the Samza/Kafka usage of Zookeeper from the rest of the cluster. Observers were created and tested. Finally, multiple modifications to services were implemented and tested to make them compatible with the transition process. We also had to make modifications to the Druid task running and assignment system to be compatible with multiple environments.
The setup for the networking between the two clouds is something whose scope is outside of this post, and worthy of its own series.
Mirror All Data to the New DC
The first step in this stage was to mirror all data into GCP. This allowed us to burn-in the real-time processing pipelines to make sure there were no unexpected differences between the two cloud providers, or between how the configurations were set up. A custom mirroring service based on Samza was set up in the target cloud environment that would consume from the original Kafka cluster and produce to the new Kafka cluster. We originally evaluated Kafka’s Mirror Maker, but found some problems with version compatibility between the versions running in the different cloud environments. Our Samza-based approach did not have as good of performance as a native Mirror Maker, but since it was only doing network throughput, it was deemed an acceptable level of performance drift in order to attain the version compatibility.
All clients still published data into the prior DC. The mirroring service acted as a mock client to publish data directly into Kafka in the new DC. The intention being that when the DNS was finally switched to point to the new DC, the traffic from the mirroring service would taper off as clients began sending data through the “normal” means into the new Kafka cluster.
To prevent the data from the mirror from making duplicates in Druid, a special condition was added to the Kafka/Samza jobs that would allow setting a band-pass filter on event timestamp. The upper bound of the band-pass filter in the prior cloud was set equal to the lower bound of the band-pass filter in the new cloud, both of which were set far in the future to make sure data was only processed in the final step in the target environment.
Stop Processing Real-Time Data in prior DC, Only Process Real-Time Data in new DC
When it came time to pushing data through the real-time pipeline in the prior DC, and only process in the new DC, the band-pass filter bounds were reset to in the future, but only an hour or so away. This was coordinated with the Druid task migration settings enacted that allowed druid tasks to spawn in one cloud versus the other. This went really well in trial runs, but ran into some hiccups when run on the final transition. We were ready for such a scenario and interruptions in the real-time workflow were quickly fixed by the batch workflow. Otherwise the transition went very smoothly and there was no data loss.
Since the band-pass filter is based on event-time and not on server time, the events going through each system could simply be observed to determine when the prior DC real-time processing was no longer needed. Once all events in the prior DC were being filtered out by the band-pass filter, it was shut down.
At the end of this stage, clients were still publishing data into the prior DC, but it was all being shipped in real-time into the new DC for processing.
Change DNS to Send Data Directly to the New DC
This was one of the biggest unknown components. Whenever we change a DNS entry we have no idea how long clients are going to take to accept the change. Different technologies have widely varying adherence to a DNS server’s ttl settings, and company IT policies can add an extra layer of interference in trying to get DNS changes to be accepted globally by defining their own rules about how long to cache DNS entries. A simple example is the Java DNS ttl handling. If you look at the docs, you’ll see that unless you specify
networkaddress.cache.ttl explicitly in
java.security, you probably have no idea how long it will actually cache DNS entries.
Luckily, many of our clients picked up the DNS change very shortly after switchover, with only a small portion taking longer than expected. Once all clients were on the new DNS entry, we confirmed that all services were operating as expected and shut down the load balancer in the prior DC to prevent any stale IP caches from pushing to the wrong location. For safety in the case of emergency rollback, the real-time stateful items such as Kafka were left running in the prior DC until we were highly certain we would not need to roll traffic back.
Things worked as expected from our prior tests and burn-in. As such we were able to shut down the processing pipelines in the prior DC and call the migration a success!
Impact on Client Experience
One key aspect worth looking at is the impact on a client’s ability to send us data. Below maps out the change in request latency after we moved from AWS to GCP for a selection of our largest data feeds. A negative value means requests are persisted into Kafka faster when clients are pushing to GCP instead of AWS. It is worth noting that most clients micro-batch their data, so one request contains some quantity of AdTech data. The size of the micro-batch in terms of event count and total event size varies from client to client. The horizontal axis represents the total data volume across all requests a client sends us.
For our largest clients the general result was a significant reduction in server-side latency as a result of the migration. Each of the data points above is a particular data feed a client sends to us. Only the largest of the large are chosen for this plot, but the feeds are not normalized to request (micro-batch) size. So just because two dots are near each other on the x axis does not indicate the quantity of requests is the same. The data feeds smaller than this group seem to have their request latency dominated by noise.
Above we have the current state of processing in Metamarkets. Not only do we have the tools in place to move data around, we are also able becoming better positioned every day to have better control over how our compute resources are exercised.