Druid Query Optimization with FIFO: Lessons from Our 5000-Core Cluster

Druid’s Horizontal Scale

A large strength of using Druid as a data store and aggregation engine is its ability to horizontally scale. Whenever more data is in the system, or whenever faster compute times are desired, it is simply a matter of throwing more hardware at the problem, and Druid auto-detects, and auto-balances its workloads. At Metamarkets we are currently ingesting over 3M events/ second (replicated) into our Druid cluster and have multiple hundreds of historical nodes serving this data across multiple tiers.

Part of the power of this horizontal scale is how Druid breaks up data into shards. Each shard is a collection of pre-rolled up events, which are further rolled up at query time. This means that if you have four weeks of data (672 hrs) grouped by hour, and each hour has ten shards, that’s 6,720 shards of data Druid processes in parallel. If you want things to be super speedy for these queries you can throw 6,720 cores at the problem, and each core (optimally) would take one shard yielding a compute time on the historicals for uncached data, equal to approximately the segment scan time.

For real environments, however, having hundreds of beefy nodes can be cost prohibitive. And for most use cases, having query times faster than the wifi latency I get at home is only of marginal benefit. Additionally, the balancing algorithms in Druid are not perfect, which means segments will not be perfectly balanced across the cluster, but will usually be “good enough” for most use cases. As such, many clusters will end up with some degree of over-committing cores to number of data segments that are needing to be scanned. This leads to an interesting aspect of Druid’s processing queue. The release of Druid 0.9.0 adds the feature flag druid.processing.fifo. Let’s take a look at where this flag comes from and how it should be used.

Druid already had the concept of query priority. Simply set the key priority in the context field of a query, and Druid will put segment shards for this query before or after others of differing priority. If you are not using this existing feature of priority, you should. Within the Metamarkets platform, items which are usually very fast (like a search query)  get prioritized over things we believe are going to be slow (like an asynchronously downloaded topN over multiple months of data). This results in things which are slower having minimal to no impact by allowing the fast queries to cut in line and get processed as soon as a processing slot is available. It is worth noting here that the time a segment shard waits to be allowed a processing slot is called the query/wait/time.

There is a problem here though. The javadoc for PriorityBlockingQueue explicitly states Operations on this class make no guarantees about the ordering of elements with equal priority. This means that on systems which are very over-committed on cores and regularly have a processing backlog, the order in which that backlog is churned through has a nasty non-deterministic component to it. If you don’t have a predefined way to order your processing, then queries with a large quantity of processing items to churn through are going to have seemingly random behavior under highly cotenant workloads.

Forced Ordering of Processing

Enter the boolean setting druid.processing.fifo. The change to the processing queue priority for 0.9.0 works as follows: Whenever a processing unit of work is added to the queue, assign it a queue insertion number, and use query priority first, and queue insertion order second, when determining order. The druid.processing.fifo flag either sets the queue insertion number to a sequence if true, or always 0 if false. This way, enabling the flag allows the comparison to enable insertion order processing ordering, or default to treating processing work items of equal priority as completely equal as it did previously.

On over-committed nodes, enabling this setting is expected to have at least the following effects for equal-priority queries:

  • Increase mean query/wait/time, the time a query waits for a processing slot.
  • Decrease 90% query/time, the total time a query takes on a node.

The overall goal here is to make the query speeds more predictable and consistent, not necessarily faster. Having some queries randomly take an unexpectedly long time is detrimental to user experience and scalability planning.

Results in Production

At Metamarkets, our production environment has multiple tiers of service in which we control the quantity of resources that get thrown at a query. Exploring recent data in the dashboard usually hits our interactive tier, while periodic reports that don’t need user action go to an asynchronous tier. This later tier can have a large backlog of segment scans during peak load, and is therefore worth investigating for FIFO processing.

Effect on Over-Provisioned Nodes

At Metamarkets, we ran an experiment on about 5% of our asynchronous cluster. The first hypothesis to check is if the query wait time increased. The mean query wait time over 5 hours of peak load was taken per node, and the distribution of mean node times is shown below.


Node-Mean Async TopN Query Wait Time Distribution


As expected, enabling FIFO processing causes the mean wait time to rise notably. The second effect hypothesized is that the long tail of poor performance queries is reduced. That is confirmed both by looking at the mean query time as below and by looking at the 99% query times, which reduced from 45.4 seconds to 35.9 seconds.


Node-Mean Async TopN Query Time Distribution


For the nodes in the tier which service interactive queries, a 5 hr block including peak time was evaluated.


Node-Mean Interactive TopN Query Time Distribution


Here you can tell it certainly doesn’t help query time, and might actually be hurting it. The 99% goes from 3.8 seconds to 5.1 seconds when enabling FIFO processing… YIKES! This feature will be turned OFF for the interactive nodes.

So the lesson learned here is when there is a large backlog of work, having orderly execution helps everything get through in a predictable manner. However, when speed and nimbleness are key, let chaos reign.

Want to join us? We’re hiring Distributed Systems Engineers.

Charles Allen, Ph.D. is a Senior Software Engineer at Metamarkets and a Druid committer. He focuses on real-time services that enable event streams of 3M events per second to be instantly queryable in the Metamarkets platform.

Filed in Druid, Technology