At Metamarkets, we are receiving more than 100 billion events per day, totaling more than 100 terabytes. These events are processed in real-time streams, allowing our clients to visualize and dissect them on our interactive dashboards. This data firehose must be managed in a way that is reliable without sacrificing cost efficiency. This post will demonstrate how we have implemented scaling modeling in a turbulent environment to achieve right-sizing of part of our real-time data streams.

Our technical stack is based on Kafka, Samza, Spark and Druid and runs on Amazon Web Services. Incoming events are going first to Kafka, then processed in real-time by Samza, and finally end up in Druid, which is queried by our dashboards. We also periodically run Spark batch jobs over a few hours to fix inaccuracies of the real-time processing.

When you have lots of data, you have lots of hardware, which means a higher chance that at least one host might crash or become degraded. When this happens you need the ability to analyze and recover each real-time processing pipeline individually. Sometimes you need to do that during off-hours. Furthermore, you want to be notified when your real-time pipelines start to suffer.

Another thing that sometimes might keep us awake at night is the volatility of the volumes of data we receive. For example, here are graphs of events/bytes volumes Metamarkets was receiving per hour over the timespan of 8 days in August 2016, with the red dashed line showing how much hardware we would need to (over)provision to handle temporary spikes in data volumes. The issue with overprovisioning is that it’s not only expensive, it’s also not able to react fast enough to handle really big data volumes spikes.

What we really would like to see is an automated solution that would scale our Samza pipelines up and down with data volumes changes, as well as monitor them and notify us in the case of an issue it can’t handle itself. This might sounds like a pretty complex task, but luckily we already had a few bricks in place.

**Metrics and Monitoring**

We already had a metrics gathering and monitoring solution built using our own product. We gather a number of different metrics – some are more generic like CPU% utilization or bytes sent over the network; some are specific to the particular technology. For example, these are probably the most important metrics for our Samza real-time pipelines:

*Containers* – the number of YARN containers used by the job;

*Container Messages In* – the number of Kafka messages processed by the job;

*Kafka Backlog* – the number of Kafka messages that are still waiting to be processed;

*%Util* – a synthetic metric that shows how much time the job was actually processing the data rather than waiting for new messages to show up in Kafka.

Here we can see that once we had reduced the parallelism level (at ~12:30pm) of the Samza job, the backlog began piling up in Kafka and the *%Util* jumped to almost 100%.

**Scaling Strategies**

There are different scaling strategies, but for our purposes we can split them into two broad classes: reactive, where we react to the changes in the load, and proactive, where we try to predict what load patterns the system will endure in the future. We rarely experience abrupt load changes, so a reactive system would likely do it’s job. However, it would also be nice to take *Kafka backlog* into account and try to catch up as fast as it makes sense economically.

Besides *Kafka backlog*, we found *Containers, Container Messages In *and *%Util* to be useful to estimate the number of messages one container could process when fully loaded. We also found the CPU% utilization metric to be not so useful, mostly because it’s not always the case that CPU is a bottleneck: sometimes it can be the network, or in other specific rare cases – the disk subsystem.

Using Kafka backlog as a key metric for making scaling decisions is tricky. On the one hand, we want to process the backlog as soon as possible, on the other – we want to be cost-efficient. Amazon EC2 has hourly billing granularity, which means for the backlog that requires 60 instance*mins to be processed, spinning 60 instances for one minute is 60 times more expensive than spinning 1 instance for an hour.

To make a choice, we have introduced a cost function consisting of two factors: hardware cost and the cost of failing our SLAs. For example, a temporary delay of few minutes is not really significant for us now, but once it gets bigger than 10 minutes we can consider risk of service level violations to be a much bigger factor than hardware costs.

### Monte Carlo Tree Search

Our scaling strategy can actually consist of multiple actions: for example, if we have a huge backlog, we might want to scale up a lot, process the backlog and then scale down. This strategy consists of two actions, but if we are able to forecast incoming data volumes we might have even more. To be able to successfully decide between complex strategies we employ a concept called game tree, where every time we need to choose the number of containers, we have a branching in the tree.

Here is an example of the tree where we start with 2 Samza containers and 4 minutes of data lag. We then evaluate possible scaling strategies and assume we can choose only between 1, 2 and 3 containers:

In this example, 2 containers are just enough to process incoming data but not able to reduce the backlog. Scaling down to 1 container forces the backlog to grow, and scaling up to 3 containers reduces the backlog.

Given we might have plenty of different options for the number of containers and this number can be changed every few minutes, the tree can grow quite large. This is where randomized techniques could help us, and Monte Carlo tree search can be one of these. Monte Carlo tree search (MCTS) allows to grow the tree randomly and evaluate only “interesting” paths in the tree. One sample of MCTS consists of 4 steps: selection, expansion, simulation and backpropagation.

- During selection step we traverse the tree from the root node to one of the leafs. We choose this path randomly but select more often either more promising nodes or less visited nodes.
- In the expansion step we add children to the selected leaf node. Basically, we assume on each step we can choose between predefined minimum and maximum number of containers, and add children correspondingly.
- Once we have created new children, we do the simulation step – in our case it will be assuming that selected number of containers will stay the same for few hours. We calculate the cost for each of the newly added child as a sum of hardware cost and SLA violation penalties over these few hours.
- Finally, once new children costs are figured out, we backpropagate costs to the parent nodes. Assuming that we are going to act rationally during subsequent choices, we will always use a child node with the minimal cost.

### Search Space Optimizations

Ideally, we should evaluate every possible containers count and be able to change the number of containers every quant of time. However, this would lead to a very large search space and a very slow convergence of the tree search, so we use discretization on both containers count and time spent in the node.

If tree node has **n** containers, we can choose possible children as:

**n, n–1, n+1, [n–k], [n+k], [n–k^2], [n+k^2], …, [n–k^p], [n+k^p] **

where **k** is chosen empirically and **p** is chosen such way that:

**[n–k^p] >= min containers & [n+k^p] <= max containers.**

This logarithmically reduces the containers search subspace. An intuition is that it allows us to do a coarse grained search first in a reduced search space, but then fine tune the number of containers. I.e., if the data volumes are not changing too rapidly we can slightly change the number of containers to be “just right”, but will need to use an approximation in the case of drastic changes.

We can also discretize the time spent in each node based on the depth of this node: **time = c*m^node_depth**, where **c** and **m** are chosen empirically. This logarithmically reduces the number of nodes to cover specific time interval.

### Leaf Nodes Costs Comparison

By the nature of selection we tend to sample more promising nodes more often, which leads to an unbalanced tree. However, we want to compare apples to apples when comparing costs of nodes of different depth. To do that, we introduce a planning horizon for the tree: during the simulation for each leaf node we assume it will stay in this state until the time reaches planning horizon, which can be set to few hours. This means every scaling strategy in the tree will span over the same time interval regardless of how many actions it has.

**Elasticity for Samza & YARN**

Once the right number of containers for the Samza job is figured out, we need to actually scale the job. This is a complex task which requires an orchestration between configuring the Samza job, managing Hadoop YARN resources and scaling instances count in Amazon Web Services. To achieve this, we have introduced an internal service called Guardian.

Guardian queues requests for scaling up or down of Samza jobs, calculates the desired number of instances and updates AWS Auto Scaling Groups, awaits for newly provisioned instances to show up in YARN and once it’s possible – fulfills one or more scaling requests.

Taking into account metrics and the autoscaling decision maker, the final diagram looks the following way:

We would also like to be able to notice when autoscaling or real-time processing pipelines themselves break. It seems most natural to compare the timestamp of the last message written in a Kafka partition with the timestamp of the last message read by Samza job for this partition, and raise an alert if the difference is too big.

**Results**

On the dashboard below we can see how autoscaling performed for one of the pipelines. The number of Samza containers was changing between 16 and 36 with an average of 23 and achieved an average utilization of ~92%.

If we had to choose the number of containers manually, we would probably end up with somewhere around 36 containers to accommodate for unexpected spikes in traffic. This means for this particular pipeline we got ~1.5x improvement in the number of containers. For the entire cluster, we got 1.5x-2x improvement: some of the pipelines were over-provisioned even more to handle even spikier traffic.

An interesting effect can be seen on this dashboard: at some point (marked by the red arrow) the number of containers was changing pretty frequently instead of choosing the right number of containers and sticking to it. The answer is that to achieve an even load distribution across different Samza containers their number should divide evenly the number of Kafka partitions, which is 288 in our case. This means we can have 1, 2, 3, …, 24, 32, 36, …,96, 144, 288 containers, but not 29 for example.

In this particular case, the “ideal” number of containers to handle incoming data was ~26, so the best scaling strategy we found was this: scale down to 24 containers and accumulate some backlog, then scale up to 32 containers and process this backlog, repeat. Even when factoring in the cost of spinning new containers this strategy was more cost efficient than running 32 containers with no changes.

There is still lots of work to be done, but we’re focused on innovating on behalf of our customers and providing them with a solution that meets their business needs. Autoscaling algorithms described in this article work with stateless services, but some our services have stateful nature, so we are investigating what needs to be applied there. We are also thinking about better forecasting of incoming data volumes and planning to add a support of AWS Spot market so we have hardware options.

Come work with us if you’re interested in solving these kinds of problems at scale!