# Distributing Data in Druid at Petabyte Scale

###### July 6th, 2016 Xavier Léauté

At Metamarkets we run one of the largest production Druid clusters out there, so when it comes to scalability, we are almost always the first ones to encounter issues of running Druid at scale. Sometimes, however, performance problems are much simpler, and the downside of a large cluster is that it tends to average out problems that are hiding in plain sight, making them harder to pinpoint.

Recently, we started noticing that, despite being able to scale our cluster almost horizontally, performance would not always increase accordingly. While we don’t expect a linear increase in speed, some of the numbers were still puzzling. Initially, we attributed those discrepancies to other known bottlenecks in our system but we were not fully satisfied with that explanation. It turns out, however, we had a much simpler problem that goes back to the way we distribute the data throughout our cluster.

**How Druid Thinks About Data**

Druid is a data store designed for event data, so it naturally partitions data by time into units we call “data segments”. Those segments can further be partitioned in several ways, but each of them still covers a well defined time interval. Those intervals typically range between 1 and 24 hours and most Druid deployments will use a constant segment granularity.

The segment is also the unit of work in Druid, which means when a query is run, each segment gets assigned to one core, and work is parallelized across segments. Typical queries that hit our cluster will range from several hours to several weeks worth of data. For each of those queries we would like to maximize the number of CPU cores used. That is, for any given time range, we’d like to spread the data across as many servers as possible.

Responsibility for distributing the data segments across nodes falls on the Druid coordinator. The coordinator is essentially the Druid “brain”, and decides which set of nodes any given data segment should live on, taking into account the replication strategy for the data. Most Druid cluster are not static however, and data will continuously move in and out based on predefined retention rules – e.g. we might only want to keep the last X months of data for a given client – so the coordinator is also tasked with continuously moving segments around to keep the cluster well-balanced.

**How Druid Balances Data**

Early implementations of the segment balancing algorithm would essentially do so by randomly choosing any given node that had available disk space. This works fine until the cluster needs to be dynamically scaled up to accommodate new data, at which point, newer data almost inevitably ends up on the newly provisioned server server, causing significant hotspots on those nodes.

As a result, early on in Druid’s history, we developed a smarter balancing algorithm that would take into account the nature of the data and fulfill two main objectives:

- avoid co-locating segments likely to be queried simultaneously to prevent hotspots, or alternatively, spread segments for any given query across as many servers as possible, to maximize parallelism.
- Account for differences in segment sizes and how that might impact the amount of cpu time necessary for a given segment.

To achieve this, we define a joint cost function $cost(x,y)$ for any two segments $x$ and $y$. This function reflects some cost of placing those two segments onto the same node, and should ideally take into account the objectives defined above (more on that in a bit).

We can then define the total cost of placing a given segment $x$ on a server $S_k$ to be the sum of the joint cost for all segments on that node.

$$ TotalCost(x, S_k) = \sum_{y \in S_k \setminus \{x\}} cost(x, y) $$

For any new data segment, the coordinator simply computes the total cost for each server, and places the segment on the server with the lowest total cost.

To balance the cluster, Druid uses a greedy rebalancing scheme, picking a segment at random, and placing it on the server with the lowest cost at that time.

The crux of the problem of course boils down to what a good cost function might look like.

Our existing cost function used various heuristics, such as the file size of the segments, and the time interval covered by segments to determine how “far apart” segment intervals are from each other. The idea being that bigger segments would likely require more cpu time to scan, and segments “closer” in time were more likely to get queried simultaneously.

At a high level, this cost function seemed to work fine. Our metrics indicated query speeds were pretty constant across servers. The amount of data on each server appeared to relatively well distributed. However, there was a small, but constant difference between the most loaded and the least loaded server. Those differences were even more apparent on one of our smaller clusters, which made us suspect our balancing could be improved.

**Issues with the Existing Cost Function**

Once we started suspecting balancing problems, we decided to dissect the cluster and have a look at how individual data segments were distributed across servers.

So what does an actual segment distribution look like?

We wanted to visualize the actual distribution of data across time both time and servers. Segments technically represent intervals, but to make it easy to visualize, we’ll just use a dot representing the segment start time – most segments have similar interval lengths anyway.

Segment time is represented on the y-axis and servers are placed on the x-axis. Each vertical band represents one server, with each cloud of points representing the set of segment on that server. Each color represent a different data sources – which in Druid speak is roughly the equivalent to a table in a traditional relational database.

Now that we can “see” the distribution, things are obvious. Our initial choice of cost function seems to have created clusters of segments at regular time intervals. While the total number of segments is roughly balanced across nodes, it does not evenly distribute segments within a given time range. This not only causes significant hotspots, but for queries that only cover a short time interval, a large portion of the cluster resources remain completely unused.

I will spare you the mathematical details, but in a nutshell, if two segments were placed just at the right distance, the cost of adding a third segment anywhere in between would always be higher than adding an overlapping one. Naturally, this would self-perpetuate and create clusters of data at regular time intervals, staggered across different servers.

Note:If you are curious how to create the visualization above, we queried the data from the Druid coordinator endpoint at

`http://coordinator:8081/druid/coordinator/v1/datasources/${datasource}/intervals/${interval}/serverview`

to create a dataframe that looks as follows, containing the

datasource,server, andstart timefor each segment in the cluster.

`Source: local data frame [100 x 3] datasource server start (chr) (chr) (time) 1 table_6ac7f 254 2016-02-23 23:00:00 2 table_5e842 142 2016-05-15 12:00:00 3 table_f432c 174 2016-03-30 21:00:00 4 table_6fe78 70 2016-05-07 12:00:00 5 table_52f63 224 2016-03-13 19:00:00 6 table_5e842 43 2016-05-08 03:00:00 7 table_81cc7 189 2016-03-30 21:00:00 8 table_cbf97 4 2016-04-11 18:00:00 9 table_0f98a 24 2016-04-13 07:00:00 10 table_daf97 84 2016-02-26 23:00:00 .. ... ... ...`

We can then use our beloved ggplot2 to create the various point clouds for each server.

`dist_plot <- function(segment_data, t = 0) { ggplot(segment_data, aes(x=server, y=start, color = datasource)) + geom_jitter(alpha = 0.3, size = 0.5) + theme(axis.text.x = element_blank()) + ylab("Segment Interval Start") + theme(legend.position = "none") + ggtitle(paste("t +", t, "hours")) }`

**Redesigning the Cost Function**

The problem with the original cost function was that it had too many parameters, and the fact that it had a lot of “kinks” and “plateaus”, that caused odd local minima, resulting in the segment clustering we saw.

Our improved cost function is designed to be much simpler, and beyond the first two properties we already mentioned earlier, we’ve added a third:

- Our cost function should reflect the cost of scanning a segment.
- The joint cost should reflect some likelihood of segments being scanned simultaneously.
- Our cost function should be additive, i.e. given three segments $A$, $B$, and $C$, $cost(A, B \cup C) = cost(A, B) + cost(A, C)$

This last property ensures our cost function is independent of how we shard the data. Whether we have one segment covering a day’s worth of data or 24 segments each covering one hour, the cost should be the same.

To satisfy 1. and reduce the parameter space, we will assume that the cost of scanning a unit time-slice of data is constant. This means our cost function will only depend on segment intervals. This assumption is true if the average time spent querying segments of a given time interval is roughly the same for all segments in the cluster. Ensuring the unit of computation is constant is also a good practice to reduce variance in request times across the cluster.

To model the joint cost of querying two time-slices and satisfy 2., we use a simple exponential decay interaction function $e^{-\lambda t}$ , where t represents the relative time difference between two to time slices, and $\lambda$ defines the rate at which the interaction decays. This parameter controls the “half life” of the interaction, reducing the joint cost of two time-slices as their time difference increases.

Since we assume the cost of querying every time-slice is constant, we can then compute the joint cost of two segments by simply integrating the interaction function over the respective segment intervals, i.e. for two segments $X$ and $Y$ covering intervals $ X=[x_0,x_1)$ and $Y=[y_0,y_1) $ respectively, our joint cost function becomes:

$$ cost(X, Y) = \int_{x_0}^{x_1} \int_{y_0}^{y_1} e^{-\lambda |x-y|}dxdy $$

Thanks to the additivity of integrals, this has the nice property of satisfying point number 3. above. Using a simple exponential decay also gives us a nice closed form solution. Every iteration of our balancing algorithm will do millions of evaluations of this function, so it is key to avoid numerical integration to make balancing computationally tractable.

The half-life can be adjusted based on the type of data served. For our purposes we use a half-life of 24 hours, since most of our queries don’t exceed a few days at a time.

One important aspect we haven’t covered is the fact that each segment belongs to a given data source. Segments for the same data source are highly likely to get queried simultaneously.

Let’s imagine we have a cluster with two machines, and two data sources, each with two segments covering the same two hours of data. In that case we would rather have each data source be spread across both servers than having each data source occupy it’s own server to maximize resource utilization.

The cost function we have so far doesn’t address this problem, so we also multiply the cost by a constant factor (2 in this case) if the two segments are from the same data source. This way, if two data sources are distributed equally, the cost will be lower to spread them across servers.

$$ cost(X, Y) = (1 + \delta_{XY}) \cdot \int_{x_0}^{x_1} \int_{y_0}^{y_1} e^{-\lambda |x-y|}dxdy \\

\textrm{where} \quad \delta_{XY} =

\begin{cases}

1 & \text{if $X$ and $Y$ belong to the same data source} \\

0 & \text{otherwise}

\end{cases} $$

**Closed Form Solution and Numerical Considerations**

We’re almost done; all that’s left is to come up with a closed form solution to our cost function and implementing the actual code to compute it – feel free to skip this section if you are less interested in those aspects.

First, assuming we have two non-overlapping intervals, we can alway choose $X$ to be the one with lower bounds, i.e. $x_1 < y_0$. This way we can remove the absolute value term, and we can ask our fellow Math nerds to solve it for us, which gives us.

$$ \int_{x_0}^{x_1} \int_{y_0}^{y_1} e^{-\lambda |x-y|}dxdy = {1\over\lambda^2} e^{-\lambda(y_0 + y_1)} ( e^{\lambda x_0} – e^{\lambda x_1}) (e^{\lambda y_0} – e^{\lambda y_1}) \quad \textrm{if} \quad x_0 \leq x_1 \leq y_0 \leq y_1$$

The cost for overlapping intervals can always be decomposed into a set of non-overlapping intervals, and the overlapping intersection, i.e.

$$

\begin{align*}

cost(X, Y) & = cost(X \setminus Y, Y) + cost(X \cap Y, Y) \\

& =cost(X \setminus Y, Y) + cost(X \cap Y, X \cap Y) + cost(X \cap Y, Y \setminus X)

\end{align*}

$$

Given that our cost function is translation invariant, the cost of that intersection is reduced to (omitting the datasource factor for readability)

$$ cost(X \cap Y, X \cap Y) = \int_0^t \int_0^t e^{-\lambda |x-y|}dxdy \\

\text{where $t$ is the length of interval $ X \cap Y $}

$$

which closed form expression is:

$$ cost(X \cap Y, X \cap Y) = {2\over{\lambda^2}} ( \lambda t + e^{-\lambda t} – 1) $$

When implementing the closed form solution to the cost function, we also have to take into the limits of floating point math. If we were to just plug the solutions above into our code, we’ll likely end up with infinite values most of the time if we evaluate $ e^{\lambda x_0} $ directly.

To avoid this problem we can do two things:

- We can leverage the fact that our function is translation-invariant, and translate our intervals such that the one with the lowest bound always starts at zero, effectively making interval bounds as small as possible.
- Our cost function inherently depends on the size of the intervals and the distance between their bounds, so we can rewrite the solution to reflect that. For exactly overlapping intervals this is already the case – it only depends on the size of the interval – and for non-overlapping intervals, assuming we translate our intervals such that $x_0 = 0$, then we have

$$

\begin{align*}

{1\over\lambda^2} e^{-\lambda(y_0 + y_1)} ( e^{\lambda x_0} – e^{\lambda x_1}) (e^{\lambda y_0} – e^{\lambda y_1}) & = {1\over\lambda^2} ( e^{\lambda x_0} – e^{\lambda x_1}) (e^{-\lambda y_1} – e^{-\lambda y_0}) \\

& = {1\over\lambda^2} [ (e^{-\lambda y_1} – e^{-\lambda y_0}) – (e^{-\lambda (x_1-y_1)} – e^{-\lambda (x_1-y_0)})] \quad \textrm{if} \quad x_0 = 0

\end{align*}

$$

This form only depends on the distance between interval bounds, and all $x_i$ and $y_i$ are greater than zero.

If you are curious about the actual implementation, the source code is available here:

https://github.com/druid-io/druid/blob/master/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java#L80

Performance-wise, our new cost function was more costly to evaluate than our old one, mainly due to the complexity of computing exponentials. We further optimized the calculation by using fast approximations to the exponential function and removing the need for lambda in the cost function by scaling the input variables up-front. Lastly, we also did some work to better parallelize the cost calculations among multiple threads.

**Rolling it Out in Production**

We first rolled out our improved cost function to one of our smaller internal clusters to verify the performance of the balancing algorithm was not affected, and to make sure our improvements had the intended effects. Here is a short animation showing the massive improvements we saw on our test cluster.

Note:to create the animation above, we simply have R output one plot for each value of t using a function like this:

`save_plot <- function(segment_data, name, t) { jpeg(filename = file.path("animation", sprintf("%s_%03d.jpg", name, t)), width = 1024, height = 576, res = 96) print(dist_plot(segment_data, t)) dev.off() }`

and then combined the resulting jpeg files using ImageMagick’sconverttool

`$ convert -delay 100 animation/*.jpg animation.gif`

The results of rolling out this new cost function speak for themselves. Segment distribution in our cluster has seen massive improvements for the problem cases depicted above. Rebalancing can take several days, partly due to how the greedy algorithm works. also due to the fact that we intentionally throttle segment movement to avoid performance impact on a live cluster.

Our production cluster saw similar improvements – here is how segment distribution evolved for a portion of our 300+ node cluster after rolling out the changes – note the glitch in the matrix at t+124 hours, where one of our servers had to be replaced.

Of course seeing an improvement in distribution is nice, but ultimately how does this affect query performance?

It took a little over two weeks for our production cluster to fully rebalance several million data segments. During that time our load tends to vary significantly, both throughout the week and week over week, so we also have to account for those variations when analyzing cluster-wide changes. Bottom line, we saw significant improvements, query speeds are 30-40% faster, combined with a 15% increase in number of queries.

As an interesting side-note, it is typical for us to see an increase in the number of queries as performance improves. This makes sense, given that most of those queries power an interactive dashboard. When our dashboard becomes more responsive, our users are able to work more quickly and answer more questions, which in turn leads to the increase in query count.

For folks running your own clusters out there, the cost function improvements are part of the latest Druid 0.9.1.1 release, and we’d love to hear how this improves your cluster performance.

If you are excited to work on those types of problems at scale, we’re always looking to hire people to join the team! Feel free to reach out directly to me or check out our jobs page.