Thursday, September 10, 2015

Hacking and bending Spark Standalone's execution model

Hi all,

After switching from working with a YARN Spark cluster to a Standalone spark cluster, we've encountered some resource management problems,
Our main goal was to run multiple jobs on the cluster that will share the resources and will be in maximum resource utilization.




A bit about the environment, we were running our Spark cluster on Amazon's EMR, and we switched to running the standalone cluster on simple EC2 servers.


The logical thing that we knew was the next architectural explanation:
- There is a Master server: r3.xlarge (ec2 instance type) that controls the cluster.
- Core - the server on with the Spark "Driver" program would run on.
- Multiple Slaves (which are called also "Executors") r3.xlarge too. 

It changed a bit in the standalone cluster, only the "Core" component was dropped, all of the other things stayed the same.
Each Executor had a certain amount of physical cores (CPUs), 4 in the case of r3.xlarge and we would request in "spark-submit" the amount of executors we want, and got the product of Executors * CPUs. (via the parmeters: "--num-executors" and "--executor-cores")

All was great... 
If we took a deep dive to the structure of a single slave machine we would see:


The spark "Worker" is the whole machine, it has 4 cores (CPU's), and a total memory of 31GB (in the case of r3.xlarge).
Again sounds great by now....The topology of the Spark Standalone cluster looks like this now:


One master, many slave machines (workers), and each worker has 4 cores. Each worker also has it's RAM memory, in our case like we said 31GB.

Here comes the problem when we needed to execute multiple Spark applications on the same cluster and share the resources (note that different applications may require different amount of cores and memory).

Let's talk about an example with real numbers, We have a cluster of 10 machines (workers), each has 4 cores, so in total we have 40 cores.
In "spark-submit" in standalone mode, you can pass "--total-executor-cores" parameter which actually means how many cores you require from the whole cluster.
(I quote from - http://spark.apache.org/docs/latest/spark-standalone.html - : "You can also pass an option --total-executor-cores <numCores> to control the number of cores that spark-shell uses on the cluster.")

So what happened when we requested the next settings, an application that wants 12 cores, with 12GB of memory per executor (via "--executor-memory")?
You would expect that the reasonable outcome would be that the scheduler will take 3 workers, each with 4 cores, and each with 12GB, and would run all 12 cores in a dense way...
But that was not the situation, what actually happened that it had spread out all of the wanted cores throughout the whole cluster, and caught all of the machines, 
so we had a core per slave, and 2 slave with 2 cores.
The biggest problem was that on each "Executor" it caught up 12GB of RAM, and the no other applications could run because maybe the "cores" were free, but there was no RAM memory left for more applications.

We had to find a workaround, so we came up with the next solution, we've changed the default settings in the file "$SPARK_HOME/conf/spark-env.sh" in the next parameters:
SPARK_WORKER_INSTANCES (defaults to 1) -> 4
SPARK_WORKER_CORES (default to 4) -> 1

So instead of looking a slave (single physical machine) as a worker, we switched it to having 4 workers with 1 core (CPU) per worker, which gave us the next architecture per slave - Let's call it "Hacked":


And the new cluster topology looks like this:

We no longer look at the cluster as a bunch of slave machines, that each one of them is a worker with multiple cores, but, we look at the cluster as a bunch of workers, each with 1 core, and a certain amount of "worker" memory (In our case we defined it to being 7GB per worker, in the parameter called "spark.executor.memory" in the config file we are passing to "spark-submit" via "--config-file" option).

And for those asking about the parameter you can pass that is called: "spark.deploy.spreadOut" and it's default it true, and set it to false, it will try to stack up all of the wanted cores into a single worker! :)

To conclude our solution, we "Hacked" the cluster to being just a bunch of cores, and we request for each application a certain amount of cores, and the memory per core would be what we defined in the config file.

Thanks to my colleague Nizan Grauer, for the assistance of the creative solution and a lot of playing around with configuration of the cluster.

I know that a lot of people found some kinds of workarounds to this problem, 
But I hope this one helps at least one more person to save time!

If i can assist with anything else, please comment below and I'll try to answer :)