Druid and Spark Together – Mixing Analytics Workflows

One method of looking at the human thought process is that we have different brain workflows for different analytics and data processing needs. At Metamarkets we have the same thing for our data processing machines. This post will explore some of our experience with bin-packing query nodes featuring Druid with batch processing featuring Spark, using Apache Mesos as the resource coordinator.

Thinking Fast

The above shows a typical “work” pattern for a Druid historical node throughout a typical week day (times shown are EDT). It shows the quantity of CPU seconds consumed by the JVM to answer queries. There are two things that are of extreme interest in the usage: one is that there is a daily seasonality that peaks sometime just after noon in New York, and the other is that spikes of extreme work come in short bursts. The bursty queries are mitigated by properly configuring query priorities such that queries that are expected to be light and fast get higher priority than queries which are expected to take a longer time to answer. We also have different nodes tuned with different work queue settings. In order to make sure our >90% response times are reasonable, we have to provision CPU capacity for such peaks. Overall this means the CPU on these machines end up with a lot of idle time where they are doing no work.

Thinking Slow

Using Spark on Mesos we launch approximately 3,000 spark batch jobs (each launches many mesos-tasks, up to ~1,000) every day, which range in duration from a few minutes to a few hours. Our batch jobs are split into two stages: ETL and Indexing. The ETL stage does all of the data manipulation and has a specific JVM footprint that works well for it. Our Indexing jobs have a very different resource footprint, so they are broken into a separate job. To save costs, our Spark cluster was originally running on spot nodes. Spark’s ability to recover from spot market fluctuations is pretty good, but when you’re running as much spot as we do, you tend to dominate markets and can be very sensitive to spot market fluctuations.

A handy thing about running on Mesos is that switching spot resource pools is very easy, requiring just new nodes to launch. Spark automatically picks up the offers from the new nodes and responds appropriately. But still, intermittent failures were becoming too frequent as our spot footprint continued to grow. When a 6 hour batch job took 4 hours to run with minimal failures, regular spot market fluctuations could easily cause that batch workload to fall behind, causing us to scale up even MORE spot usage… thus creating a vicious cycle. Overall there is a need to have a more stable baseline pool of resources with the ability to spike into the spot market as needed, but still be able to do so at a reasonable price point.

The Marriage

The fast result work path through Druid has a lot of remnant CPU capacity that goes underutilized, and the slow work path through Spark has a need for a more stable and predictable baseline. If we put these two together properly, it means we should be able to evict Spark CPU usage to accommodate spikey Druid query load. And in doing so, allow Spark (which now has the added resources of the Druid cluster) and Druid (which now has the added resources of the Spark cluster) to have more resources available!

OS Lineage

Before delving into packing compute nodes with work, it is important to understand some of the underlying technologies and their lineage. One of the challenges with running many things on the same node is figuring out how to get everything on the node which needs to run, and to make sure it doesn’t interfere with other stuff on the node.

There are three options you can go through to solve this. The first option is to have dedicated machines for specific tasks and only install things you need to accomplish the tasks of that machine, plus some way to manage the state of the configurations and versions across your machines. The second option is to try and install everything on all your machines and hope you don’t have any version conflicts – in that case, when one component needs one thing upgraded then all your machines are going through an upgrade process with unknown and potentially unisolated impacts.

The third option is to install the absolute bare minimum of an installation as possible as your base system, and have applications bring their own chunks of libraries that can run independently and with pre-defined isolation and resource constraints. The third approach is the one practiced by VMs and some container operating systems. When viewing your fleet of compute resources as a collection of resources instead of a bunch of individual machines, it makes a lot of sense to pursue the third approach.

In order to accomplish this, you need as bare minimum of an underlying system as possible. Gentoo began in the very early 2000s and had a very interesting capability, using the Portage build system, to build your system by hand from practically nothing. In the process you could optionally install only the exact components desired. Users of Gentoo had a stigma because its configuration power allowed people to do things that often either made no sense (-O99) or were sometimes detrimental (-funroll-loops). These kinds of settings or flags permeated many Gentoo installations. Like many tools, in the right hands a distribution that is designed to be built from source and only include exactly what the user dictates can be very powerful. ChromeOS eventually adopted a Portage base with a more enterprise ready build and configuration system.

From ChromeOS spawned two operating systems intended to be used in clusters to launch containers. The Container-Optimized OS from Google, and Container Linux by CoreOS. Both of these feature Kubernetes as their key container orchestrator and resource manager. The root image of a CoreOS installation can easily be on the order of 200MB. This means that the entirety of the core of a cluster takes up a footprint about the size of the Java Runtime Environment! (and about half the size of the JDK)

OS Key Lineage Component
Gentoo Portage package management

Minimalist approach to dependencies

Scorched earth recovery

ChromeOS Scorched earth package and build configuration
Container-Optimized OS Kubernetes pre-packaged / pre-configured + GCE support
CoreOS More configurable Kubernetes pre-packaged and flexible image upgrade and configuration methodologies (exact relation to Container-Optimized OS is not clear)
MMX (CoreOS fork / patch set) Mesos pre-packaged

The table above highlights a few components of various predecessors or those related to the current OS package we use. These are components that help make the current OS we use a fantastic option for cluster container deployment.

We run our own set of packages and configurations for CoreOS. The key difference is that we pre-package Mesos in addition to Kubernetes (but just use Mesos currently). Since most of our services run in the JVM, which acts as a kind of container itself, the history of our deployments is just gzipped tarballs with the jars and configurations packed in. Most of our services also natively use discovery libraries instead of relying on DNS + static port definitions. As such, the need to use Docker is minimal to none. Mesos allows you to simply specify resources to pull down and run in what they call a “Mesos container,” which was a natural fit to the way our services had been running for years. Mesos also has a lot of configurability with regards to how it uses and announces resources, and extensive support for various kinds of modules to extend its functionality.

Mesos additionally has a nice feature where the agents running on a machine are independent of the tasks which the agents run. This means you can make some changes, like upgrading versions, without taking down the services that agent is running. This feature is very handy for stateful services. The persistent volume support in Mesos has also been around longer, and Mesos supports declaring nodes as about to go under maintenance. All these together make it a bit more mature for certain types of workloads, but nowhere near as easy to set up as a GKE cluster.

So at this point we have a very robust but minimalist system core with a couple of options for how we manage our applications at a cluster level.

Resource Isolation

Around 2006~2007, Google had skipped the Virtualization Host/Guest paradigm and was instead pursuing better resource isolation at the Linux kernel level. In doing so they introduced the concept of control groups. The basic notion of control groups is that you do not need to let every process a kernel sees have access to every resource, but instead would want to have more control over your exposure of system resources. Over the years many of the ways to do resource isolation have gained support in the libcontainer project. We use only the isolations we need, which mostly includes memory isolation and cpu shares isolation. Disk resources are isolated per block device rather than trying to manage the IOPS of any independent block device.

One of the key challenges in the industry is how to expose the functionality of container isolation without having the configuration of utilizing these restrictions contain more lines of code than the application itself. The way we have worked resource isolation is to have the resource management system configuration on the machine have proper isolation of resources. And instead of allowing applications to have infinite flexibility in how they declare resource needs, we have a set of performance expectations settled a-priori and simply announce the resources at a concept level, having set up the underlying systems to adhere to the expectations.

As an example, if we have a machine tuned to be able to run Druid, it does not announce general resources. Instead it announces simple resources (cpu / mem / disk) and has *more* underlying configurations not exposed to the service that adhere to the service’s expectations. For example, if I have a mysql service I need to launch, and I know I want it to be guaranteed on a single numa zone, to have local SSD backing the persistent volume, and have at least a certain amount of network bandwidth, I would work with operations to figure out a resource descriptor I could use to guarantee these constraints. Then I would simply launch against the simplified numbers of CPU / Memory / Disk.

As another example, if I had another mysql database that is rarely used and has very lenient SLOs, I can work with operations to let them know I have relatively light and unimportant disk constraints, and CPU that can be best-effort. I will then get a tag to identify the resources that meet this constraint, and can schedule my tasks against these resources. Then my low-priority mysql database simply has to worry about CPU / Memory / Disk that is also labeled with my special tag. When resource collections of a type get too low, we try to monitor and make more available.

At this point we have a small but robust footprint to launch resources in the cluster, and a way to make those resources discoverable to the services that need them while still maintaining SLOs desired by the services.

Resource Orchestration

In order to manage resources we use Mesos with the coarse grained backend for Spark, and Marathon for Druid. The coarse grained backend for Spark allows us to upgrade or modify Spark on a per-job basis without needing to worry about version conflicts. Using Marathon for Druid gives us persistent volume support and upgrade strategies for stateful tasks built in. Spark on Mesos has performed great! The big thing we have to keep track of is what availability zone jobs run in to prevent cross-zone network transfer costs. The biggest problem we have with Mesos and Marathon for Druid is that changing nodes in the cluster (adding or replacing) is still more manual than we would like. All of these use the Mesos containerizer by simply downloading and extracting tarballs into the working directory and running java, which is included among the tarballs.


The Good

In a previous blog post we wrote about using a large pool of compute in AWS.

Above is the change in per-day average CPU utilization across a chunk of our Druid cluster during and after the migration.

Below is the CPU utilization of a single host each minute of a typical day. The plateaus are at approximately 95% utilization.

Even better than higher utilization of our CPU infrastructure, we were able to take the resource pool used for Spark and combine it with the resource pool for Druid, giving BOTH services more CPU power at their disposal!

The Bad

The x1.32xlarge nodes feature four E7 8880 v3 processors. These processors are pretty beefy and have a lot of nice features. Unfortunately we’ve seen some really strange effects from the four socket architecture of the x1.32xlarge nodes. In one particular effect, one socket would be pinned at very high CPU utilization while the others would be much lower. Dynamic asymmetric NUMA zone performance (as opposed to zone-crossing considerations) is something I’m not even sure the Linux kernel scheduler handles in meaningful ways.

Since the exact measurement technique of htop is not obvious without digging into the code, this result was verified against /sys/fs/cgroup/cpuacct/cpuacct.usage_percpu deltas, which confirms this effect is real. Luckily these effects are short-lived but occur commonly enough to be a concern.

Looking at the Intel specification for MSRs in the (PDF warning!) System Programmer’s Guide, a key MSR of interest is the IA32_THERM_STATUS register at 0x19C (412 decimal). There is a correlation we found between the Power Throttling being active and this “high cpu %” effect on the socket being power throttled. There are a lot of tweaks available to control the processor state, of which only disabling turbo was attempted, which did not prevent this effect.

This random power throttling of cores, the effects of hyperthreads, and the heterogeneous nature of CPU load / availability / impact in a cluster-containerized architecture means CPU % is one of the least useful and most commonly misused metrics for a machine’s performance. What this means is that even though our average cpu utilization as measured by the guest OS is approximately 65%, getting to 100% (if that is even possible) will not yield a 50% increase in throughput on the same hardware. Even though CPU time is expensive, blindly aiming for 100% utilization will not yield the results you desire. The only helpful number for CPU % is 0%, which tells you either your metrics reporting is broken or there’s absolutely nothing going on. The way I read the CPU utilization graphs above is as follows: The weekly cycles are gone and we no longer dip down to near-0 utilization for extended periods.

The proper monitoring of knock-on effects to CPU (or other system resource) usage in a containerized environment is not something common in the industry.

The Ugly

For a portion of our cluster known externally as “Druid Basic,” we have significantly more page cache churn compared to our “Druid Ultra” offering. The “Druid Ultra” offering performed well most of the time, but the “Druid Basic” offering had significant problems when tested on this architecture.

The most common problem was page allocation failures from the Linux kernel. There are monotonically increasing counters for memory pressure warnings that surface for the mesos executors. Below is a time series (by hour) snapshot of what the warnings look like before a page allocation failure on a specific node (the vertical lines are days). The “spikey” nature of part of the graph is due to the lack of deduplication in this specific metric pipeline, and taking the “max” of the rolled up values. So if an event is sent twice in a given hour due to a small network blip, it will roll up to twice the “nominal” value. Orange and blue indicate Spark vs Druid data. The Druid tasks are generally longer lived, so their monotonically increasing counters reach higher values.

We tried cgroup memory isolation enabled and disabled but still encountered issues. We did not attempt to use cpu sets and memory controller affinity.

One of the page allocation failures we encountered during migration.

To try and eliminate memory issues we experimented with zone reclaim modes, disabling NUMA at boot, and a few settings to try and prevent fragmentation at the DMA level for the AWS ENA driver. Under NUMA architecture, it is possible that different NUMA nodes have different amounts of available memory. For example, by running numactl -H on the x1.32xl nodes, we found that the amount of free memory differs very much among the NUMA nodes.

We found that in most of our cases, if not all, it is Node 0 that always got the page allocation failure. By default, zone_reclaim_modes was set to 0, which means that no zone reclaim will happen and the memory allocation needs to be performed at other nodes. We tried setting vm.zone_reclaim_mode to 1 so that the cached memory can be reclaimed whenever there is memory shortage on the node, and increased the value of vm.min_free_kbytes in the hopes that increased minimal amount of free memory and zone reclamation might prevent the page allocation failure from happening.

Unfortunately, enabling zone reclamation and increasing minimal free memory didn’t help. Another theory we had was that high network activity led to the shortage of contiguous DMA pages. The ENA driver uses DMA buffers for data transfer. Because there are multiple Druid historical nodes that are actively loading/unloading segments and processing queries, in addition to multiple Spark jobs doing shuffles and etc., lots of data transfers happen on the node. We attempted to prevent too much fragmentation at the DMA level by increasing DMA protection with vm.lowmem_reserve_ratio. The default value of vm.lowmem_reserve_ratio is 256 256 32, where each number is a reciprocal number of ratio for each zone to protect. So we decided to lower the numbers to give more protection in each DMA zone. Unfortunately, this didn’t prevent the page allocation failure from happening.

At last, we tried disabling NUMA at boot to see if that helps with the situation. However, that didn’t help either.

The above is a snippet from the status of memory from each zone when the page allocation failure happened while NUMA was disabled. As you can see, there were lots of low-order pages but none greater than 64kB.

There are also numerous network socket memory tunings available which we did not tweak during this testing.

When paging in EBS volume data for Druid while doing background compute for Spark (which also uses page cache) we commonly encountered a nasty kernel bug. The state of the node after encountering the bug varied, but always ended up needing to be terminated. Try as we might, we have not been able to reproduce this bug in any synthetic environment, but can reproduce it with great regularity under certain production workloads.

As a final hiccup, our initial setups had RAID configurations for EBS volumes to give one giant disk view through the operating system, and Mesos would carve up the resources. This led to poorly tuned RAID setups that greatly harmed performance.  Instead of trying to do detailed tuning of raid configurations, we resolved this by removing the RAID layer from our setup, instead announcing multiple disk resource paths through Mesos. We had observed high disk usage correlated with page allocation failures on x1.32 instances also, which fueled speculation that RAID contributed to the allocation failure root cause.

We ended up not using the x1.32xlarge nodes, opting for nodes with fewer NUMA zones, and removed any sort of RAID on the disks. This has effectively eliminated the problems with the kernel memory management. Now the nodes are humming along day in and day out at a pretty good pace, and we’re happy with the state of running Druid and Spark mixed workloads on the setup and configuration we’ve found.