At Metamarkets, we ingest more than 100 billion events per day, which are processed both realtime and batch. We store received events to our Kafka cluster and the stored events in Kafka are processed by both Samza and Spark for real-time stream processing and batch processing, respectively.
We have clients who send us data in batch only, but batch processing is done for clients who send us data in real-time in order to fix up any incorrectness in produced data, including deduplicating and joining events that were outside of the real-time join window.
Batch processing is a two-step operation where the first step is a typical extraction and transformation that includes deduplication and data roll-up. After Step 1, there is a second job that connects to Druid’s indexing service and runs an indexing task with the rolled-up data to create Druid segments.
From Hadoop to Spark
Over 100 billion daily-ingested events can all vary quite a bit in terms of event size, but they are roughly converted to hundreds of terabytes of data. Processing hundreds of terabytes of data is not always smooth as it requires a pretty big cluster – and with more machines, as you know, there is a higher probability things will go haywire. Currently we run a large-scale Spark cluster where over 400,000 executors are launched per week on average. We hadn’t used Spark for batch processing from the beginning; we used to use Hadoop for the roll-up and indexing jobs. Hadoop’s map-reduce was fine, but as our data size grew, it became harder and harder to manage the Hadoop cluster, especially with an HDFS cluster. Scaling up and down the cluster with rebalancing occasionally saturated network bandwidth of the data nodes and lowered overall job performances. So we decided to move our batch processing platform to Spark and use Spark Druid Indexing Task instead of Hadoop Indexing Task.
Spark Standalone Cluster
Spark provides several different Cluster Managers that are in charge of resource allocations. We decided to go for Spark’s standalone cluster at first because that was the simplest deployment option and had a cool UI! While Spark’s standalone cluster manager served fairly well at managing the cluster at high scale, it had some restrictions that yielded sub-optimal cluster and resource management.
The first thing was that a smooth upgrade to a newer Spark version was not possible without additional resources. In the standalone cluster, all master(s), workers, and driver program are run with specific Spark distribution and if the any of the versions is different then there is a connection issue. So unless we build a new cluster with a new version and migrate all pipelines there, there would be a short downtime where none of the Spark applications would run successfully and all the running applications would fail because we would have to tear down the cluster and rebuild it with a new version at once.
Another thing was the lack of complete resource isolation and finer-granularity resource management. There are
spark.executor.memory configurations that define the number of CPUs and amount of memory each Spark executor takes, however, they are a rather soft limit and it is possible that a more memory-intensive Spark application can use up more memory than it was supposed to have in a multi-tenant environment. While it could be beneficial for some Spark applications to use more memory than they got, this is not ideal. Also, Spark’s standalone cluster doesn’t allow us to specify which machines I want my Spark application to run on. This can be annoying because it can limit us to having a cluster in one Availability Zone to avoid excessive data transfer fees due to shuffle operations.
Using Mesos as a Cluster Manager
So we decided to move from using a standalone cluster manager to using a Mesos cluster manager. Mesos is a general resource management software where different services run as Mesos Frameworks in the same pool of machines. Mesos slaves (virtual machines) can run as many different frameworks as their resources allow. For Spark, each application is submitted to Mesos master as a framework, and Mesos slaves will run tasks for the Spark framework that are Spark executors (in coarse-grained mode). Mesos provides benefits that address most of the problems we encountered while using the standalone cluster manager.
First of all, Mesos provides better resource isolation with cgroup isolation. Therefore, unlike standalone cluster, there is a hard limit on memory such that Spark executors (or tasks) will behave as if they are running on a machine with
[spark.executor.memory + spark.mesos.executor.memoryOverhead] note that
spark.executor.memory will be used to decide the amount of total memory including both heap and off-heap, so it is important to properly set
spark.executor.memory). You can also set
false (default is
true) in order not to use direct buffers during shuffle block transfers. While there is no support for disk and network isolation for Spark yet, it is nice to have a complete isolation in a multi-tenant environment.
Mesos also provides several components to let us manage the Spark cluster with finer granularity. When we launch Mesos slaves, we can specify resources to give to each role and attributes of the agent. The amount of resources that can be offered from the agent and the fraction of resources for each role can be specified to allow the resources to be reserved for some frameworks with a certain role. For example, we can say the following with a machine that has 4 CPUs and 20G memory total:
The above allocation will let any frameworks with default role (*) use resources from (2 CPUs, 10G MEM) pool and only the frameworks with “spark” role use resources from (2 CPUs, 5G MEM) pool AND (2 CPUs, 15G MEM) pool. Frameworks with a specific role can use resources for default roles but can not use resources for any other roles. This behavior allows us to reserve resources for Spark applications and have different tiers in the cluster that don’t share resources together. While this allocation allows frameworks with “spark” role to use at least ½ of the total CPU, it doesn’t necessarily mean 2 out of 4 cpu cores are reserved for them; rather, it means that ½ of cpu time is reserved for the frameworks with “spark” role. Because of this, it is worth checking JVM options of your Spark executors to see if you specified the GC threads or other thread counts. In our case, we forgot to set
-XX:ParallelGCThreads flag and a Spark executor running on AWS’s x1.32xlarge instance thought it had 128 cores and created so many GC threads!
Another handy feature is an attribute. Attributes are key-value pairs that can be anything you specify and can be passed along with the offer. For example, we can specify an attribute
AZ:us-east-1a for a Mesos agent running in us-east-1a and then add
spark.mesos.contraints=AZ:us-east-1a to a Spark configuration so that the application will only be run on Mesos agents in us-east-1a. This allows us to have multiple AZs in the cluster, as opposed to having to have one AZ for all nodes in the cluster. We run our Spark cluster on AWS spot instances, so this comes especially handy in case where spot prices in one AZ becomes unfavorable.
Finally, using Mesos cluster manager enables us to do a smooth rolling update of Spark versions. When a Mesos task for the Spark framework starts, Mesos executor (not a Spark executor) that runs the task downloads and installs the Spark assembly specified by
spark.executor.uri. Because Spark is getting downloaded and installed in the beginning of each Mesos task, it is possible that each Spark application uses a different Spark version. While this behavior comes with a bit of overhead for installation, it gives us a lot more flexibility on deployment and testing.
Upgrading Spark to 2.0.0
Spark provides two different run modes for Mesos: coarse-grained (mentioned above) and fine-grained (deprecated as of 2.0.0). The main difference is that fine-grained mode launches Mesos tasks for each Spark task and those tasks die away as Spark tasks are done, whereas coarse-grained mode launches Mesos tasks for each Spark executor and those Mesos tasks stay during the lifetime of application unless you use dynamic allocation or executor gets killed for various reasons. Fine-grained mode had too much overhead in our case where some Spark applications had more than 10,000 tasks per application because that meant Spark needs to be installed more than 10,000 times on Mesos agents, so we decided to use coarse-grained mode. We were running Spark 1.5.2 back then but we had to update to 2.0.0 due to how
MesosCoarseGrainedSchedulerBackend running on Spark driver program launched executors.
Until 2.0.0, Spark allowed only one executor to be run on each Mesos slave. The scheduler will look at the offer and take whatever is smaller between CPU offers and the number of cores left until it reaches the
spark.cores.max limit (which is basically unlimited by default). It uses
spark.executor.memory configuration, so there wasn’t a fixed memory-to-CPU ratio for Spark executors running in Mesos coarse-grained mode. This was not a very desirable behavior in a heterogeneous environment where a Spark application could unnecessarily use most of the resources in a machine that could be used for other frameworks. This could also lead to having different-sized executors where some executor has 5 cores and 30MB of memory and some other executor has 32 cores and 30MB of memory, which means that 5 tasks will run concurrently with 30MB of memory on the first executor but 32 tasks will run simultaneously with only 30MB of memory on the second executor. This will slow down the tasks running on the second executor and/or make the second executor to go out of memory, eventually failing the job. Spark 2.x solves this problem by allowing multiple Spark executors to be launched on the same Mesos slave as different Mesos tasks. In addition to the solution for the Mesos problem, Spark 2.0 came with a nice performance boost of 30-40% for most of our Spark applications.
So we updated to Spark 2.0, and things were all rosy with the new Spark version and Mesos! Ok, maybe mostly rosy. Soon after we updated to Spark 2.0, some of the machines that were running Spark driver programs started failing with an out of disk space error. After initial investigation, it was identified that the culprit was event logs of the Spark application.
Spark frameworks running on Mesos still provide a link to the job UI running on the Spark driver. While this is a nice UI, it disappears as soon as the application finishes. In order to keep the record and look it up after-the-fact, we enable spark event logging so that Spark History Server can reconstruct the job history UI after the event logs and is uploaded to the store. Spark 2.0.0 changed the task metrics to be internal accumulators, which is nice for collecting metrics. However, this caused all the internal metrics accumulators, including the one that contains information on all the updated blocks during the task, to be logged in event logs. As a result, we faced humongous 60GB event logs that Spark History Server wasn’t able to parse. After making a patch that skips logging internal accumulators and updated blocks information by setting
spark.eventLog.omitUpdatedBlockStatuses to true (the PR hasn’t been merged to the apache Spark master branch yet), everything was back to normal (phew!). If you want to run the same version we do, head over to https://github.com/metamx/spark.
A Pitfall of Scheduling
After resolving issues that came up during the migration, our Spark applications were running fine and smoothly until one day when most Spark applications were stuck in the cluster without any active Spark tasks. All the stuck applications had acquired some resources but not enough resources to actually start the job. We set
spark.scheduler.maxRegisteredResourcesWaitingTime to avoid the scenario where a Spark application starts with too little resources and a very small group of poor executors try to process all data and fail. The failure can happen because Spark has zero disk-awareness and is not able to prevent spilling too much to disk on too few executors. So if the job starts with too few executors, those executors will soon fill up all the disk space and other applications running on the machine can fail as well due to out of disk space error. In Spark’s standalone cluster, setting
spark.scheduler.minRegisteredResourcesRatio hadn’t been a problem even when the cluster was under-provisioned as long as there were enough resources to process the largest application in the queue because the cluster manager was using FIFO scheduling. Spark master will give the available resources to whichever application came first until it has the maximum number of cores it can have, and the application will be greedy and keep holding all the resources during its lifetime unless you use dynamic allocation (we do not currently use dynamic allocation).
Contrary to Spark’s standalone master, Mesos master uses Dominant Resource Fairness mechanism for scheduling. Long story short, DRF is a variation of fair-scheduling that defines a dominant resource for each framework (e.g. CPU-heavy or memory-heavy) and tries to maximize the smallest dominant share given to each framework in the queue. If the frameworks contend for the same type of dominant resources, then it’s just min-max fairness.
Let’s look at an example. There are 9 CPUs and 15GB of memory in the cluster. Framework A requires <3 CPUs, 1GB Mem> to launch a task and Framework B requires <1 CPU, 4GB Mem> to launch a task. Framework A registers first, and soon after Framework B follows. Dominant resource for Framework A is CPU (33% of entire pool of CPUs) and for Framework B it is Memory (20%). The scheduler will offer resources to a framework that has the smallest share of dominant resources, therefore the scheduling will look like the following:
- Framework A gets <3 CPUs, 1GB Mem>. Now A has a dominant share of 33%.
- Framework B gets <1 CPU, 4GB Mem>. Now B has a dominant share of 20%.
- Framework B gets another <1 CPU, 4GB Mem> so total of <2 CPUs, 8GB Mem> because it had a smaller dominant share (25% < 33%). Now it has a dominant share of 40%.
- Framework A gets another <3 CPUs, 1GB Mem> so total <6 CPUs, 2GB Mem>. Now A has a dominant share of 66%.
- Framework B gets <1 CPU, 4GB Mem>, so total of <3 CPUs, 12GB Mem>.
- Now there isn’t enough resources in the cluster. In the end, Framework A gets 2 running tasks and B gets 3 tasks.
Looks pretty fair and economically efficient, right? But bad things can happen once in a blue moon when the scheduling is combined with Spark’s default behavior.
As mentioned above, Spark doesn’t give up on resources that do not meet the minimum resources threshold. While this is an understandable behavior, if multiple Spark applications (Mesos frameworks) get submitted around the same time and there are not enough free resources to run all the applications at once, it is possible that none or only a few of them can meet the minimum resource threshold and those resources held by stuck applications keep clogging the cluster. We ended up mitigating the problem by decreasing
spark.scheduler.maxRegisteredResourcesWaitingTime, but the long-term goal would be to implement a better resource allocator (Mesos allows us to write a custom allocator that can be used in place of default DRF allocator) to handle scheduling between different Spark frameworks. I think it would be nice to have an allocator that has different scheduling mechanisms among different roles. As of now, Mesos allows different roles to have different weights that give some frameworks with a certain role higher priority in resource allocation. While this is nice, because each Spark application is a different framework, it doesn’t really make sense to give different roles to each Spark application. Therefore, I think the most desirable behavior for resource allocation is to use FIFO within frameworks with “spark” role on top of DRF among different Mesos roles.
Mesos has helped us with managing a large-scale Spark cluster by providing ways to manage the cluster with better resource isolation and finer granularity. While it is already nice to have these features, here is a list of stuff I would love to have for an even better cluster management:
- Support for disk space and network isolation. Right now Spark doesn’t support either of them so it is possible that one Spark application takes up most of disk space and saturates network bandwidth. It would be nice to have isolations between Spark applications so no one Spark application can slow down or fail other Spark applications.
- Better task names in Spark application. Spark and Mesos share similar terms (e.g. executors and tasks) and with coarse-grained mode, each Mesos task in the framework for a Spark application is actually a Spark executor which is named by its executor number. This could be confusing for some people, so it would be nice to have better names for Mesos tasks for Spark applications.
- Support for Mesos’ draining mode for maintenance. Mesos provides draining mode for graceful shutdown of Mesos agents. However, this just appends additional information in the offers (from the agent) about the agent maintenance window and it is completely up to frameworks how to handle those offers. Spark currently handle the additional information in the offers. This is not that great for scaling down the cluster or shutting down instances for some problems on the machine because any executor loss caused by shutting down the machine will result in recomputing the missing RDD partitions from the beginning because we do not run in HA mode (no RDD replication). Also, as of now Spark launches executors in a round-robin manner to distribute the tasks within one application among as many nodes as possible, so there is a very high chance that a large Spark application will experience some failures when we terminate an instance.
- An option for a Spark application to relinquish the resources it holds if it doesn’t acquire minimum resources for some period of time. While the application can simply start and fail with too few resources after
spark.scheduler.maxRegisteredResourcesWaitingTimehas passed, this behavior is a little risky as executing a job with too few resources potentially endangers other Spark applications (or other running Mesos frameworks).