Saturday, December 26, 2015

DataProc - Spark Cluster on GCP in minutes

I’ve decided to try out running Apache Spark on various ways on Google Cloud Platform,
I’ll tell you a bit about my experience and the ways to perform all of the needed actions each way.

For the first way, I’ll start with the easiest way, using Google’s DataProc service (currently on Beta). If some of you are using Amazon’s AWS it’s the equivalent of their EMR (Elastic MapReduce) service, you can launch a Spark cluster with a GUI tool in the Google cloud console, REST API or via command line tool (I’ll show all of the possibilities next).

First you’ll need to create a Google Cloud account, you can do so in the next link, you get a free trial of 300$ of credits, it will be more than enough for all of your tests. Once you've created the account we can start using the web console to launch the cluster.

You might want to prepare in advance two things (If not, you’ll get the default which are fine too):
  1. Creating a “Cloud Storage staging bucket” to stage files, such as Hadoop jars, between client machines and the cluster. If not specified, a default bucket is used.
  2. Creating a Network for the Spark cluster the Compute Engine network to use for the cluster. If not specified, the default network will be chosen for you.
    I’ve added a screenshot of the network I’ve created that’s called “spark-cluster-network”, and opened up only the relevant Firewall rules (both for connecting to the cluster and to being able to see the UI features of the Spark cluster).


The next step will be to launch the cluster with DataProc, there are 3 ways to do that:
  1. A GUI tool of DataProc on your Cloud console: To get to the DataProc menu we’ll need to follow the next steps:

On the main console menu find the DataProc service:



Then you can create a new cluster, with all of the parameters we’ve talked about before, in our case it’s called “cluster-1”.

After giving the launch command the cluster was up and running after ~45 seconds and I was able to connect to it via SSH:


And you can see that Apache Spark was already pre-installed at “/var/lib/spark”.

Just to check that all is running well I ran spark-shell:


But note a very important thing, you’ll need to launch each application (spark-shell included) with the config parameter to override the Dynamic Allocation feature of YARN.
I got this problem when I launched the job without this configuration, that all of a sudden i would lose executors during the spark-shell was running: (you can see it in the spark master UI that the executors were removed)


Thanks to Vadim Solovey, Dynamic allocation causes Spark to relinquish idle executors back to YARN and unfortunately at the moment spark prints that spammy but harmless "lost executor" message. This was the classical problem of spark on YARN where spark originally paralyzed clusters it ran on because it would grab the maximum number of containers it thought it needed and then never give them up.

With dynamic allocation, when you start a long job, spark quickly allocates new containers (with something like exponential ramp-up to quickly be able to fill a full YARN cluster within a couple minutes) and when idle, relinquishes executors with the same ramp-down at an interval of about 60 seconds (if idle for 60 seconds, relinquish some executors).

If you want to disable dynamic allocation you can run: “spark-shell --conf spark.dynamicAllocation.enabled=false”
or,
“gcloud beta dataproc jobs submit spark --properties spark.dynamicAllocation.enabled=false --cluster <your-cluster> application.jar”

Alternatively, if you specify a fixed number of executors, it should also automatically disable dynamic allocation:
“spark-shell --conf spark.executor.instances=123”
or,
“gcloud beta dataproc jobs submit spark --properties spark.executor.instances=123 --cluster <your-cluster> application.jar”


Some other useful configuration you probably would like to run with are:
“--conf spark.logConf=true --conf spark.logConf=true --conf spark.ui.killEnabled=true”
  1. Command line tool that you’ll need to install the cloud SDK for that on your management machine.
    For example: (You can also generate the command from the GUI tool)

gcloud beta dataproc clusters create cluster-1 --zone us-central1-a --master-machine-type n1-standard-4 --master-boot-disk-size 500 --num-workers 2 --worker-machine-type n1-standard-4 --worker-boot-disk-size 500 --num-preemptible-workers 2 --image-version 0.2 --project gcp-tools-01

  1. REST API that you can launch a cluster as well from.

POST /v1beta1/projects/gcp-tools-01/clusters/
{
 "clusterName": "cluster-1",
 "projectId": "gcp-tools-01",
 "configuration": {
   "configurationBucket": "",
   "gceClusterConfiguration": {
     "networkUri": "https://www.googleapis.com/compute/v1/projects/gcp-tools-01/global/networks/default",
     "zoneUri": "https://www.googleapis.com/compute/v1/projects/gcp-tools-01/zones/us-central1-a"
   },
   "masterConfiguration": {
     "numInstances": 1,
     "machineTypeUri": "https://www.googleapis.com/compute/v1/projects/gcp-tools-01/zones/us-central1-a/machineTypes/n1-standard-4",
     "diskConfiguration": {
       "bootDiskSizeGb": 500,
       "numLocalSsds": 0
     }
   },
   "workerConfiguration": {
     "numInstances": 2,
     "machineTypeUri": "https://www.googleapis.com/compute/v1/projects/gcp-tools-01/zones/us-central1-a/machineTypes/n1-standard-4",
     "diskConfiguration": {
       "bootDiskSizeGb": 500,
       "numLocalSsds": 0
     }
   },
   "secondaryWorkerConfiguration": {
     "numInstances": "2",
     "isPreemptible": true
   },
   "softwareConfiguration": {
     "imageVersion": "0.2"
   }
 }
}

You can also create an initialization script, A list of scripts to be executed during initialization of the cluster. Each must be a GCS file with a gs:// prefix.
Here’s a list of all the DataProc initialization actions by Google at their GitHub account: https://github.com/GoogleCloudPlatform/dataproc-initialization-actions
And here are some more API docs of the way to create your own init-actions https://cloud.google.com/dataproc/init-actions.

Specific network configuration adjustments to be made:
Because we’ve created a network of our own that is now open for the outer world, we’ll need to open some vital port to expose the Web UI of some Spark services, Allowing the TCP ports of: 4040, 18080, 8088, 19888 will allow you the next services.
(Note: You might need to open other ports for the outer world if you choose to run other frameworks than the ones listed below:

Spark Master UI: http://<Master IP Address>:4040

Spark History Server: http://<Master IP Address>:18080

Yarn Application Master: http://<Master IP Address>:8088/cluster

Hadoop Job History Server: http://<Master IP Address>:19888/jobhistory




Conclusion:
There you have it, in a manner of minutes, even without knowing anything about DataProc / Spark cluster launching you’ll have a running environment on Google Cloud Platform.
In the process you can also choose a certain amount of Preemptible VMs as more executors that will be cheaper than Compute Engine VMs, they will be launched as part of your cluster.
With all of that said, It’s a paid service, and you need to take that in consideration, I think that most of the time you would want to run DataProc is for a pre-defined period of time jobs, that you’ll need to launch a cluster for, do a computation load and then destroy the cluster, and not for a Forever running Spark cluster that you might want to make adjustments and Install additional tools on.

So what are the other options to running Apache Spark on GCP?
next we will show bdutil by Google, A command line tool that provided API to manage Hadoop and Spark tool on GCP and another way to Launch a Mesos cluster on top of GCP and then running Apache Spark on it,
But that will be in future blog posts.


If you have any further questions,
please leave comments, hope this helps you get into the Spark world on GCP.

Saturday, December 5, 2015

Taking care of the Garbage in Cassandra



One of the main performance gains that I found very effective using Cassandra was changing the Garbage Collection algorithm from CMS (Concurrent Mark And Sweep) to G1.




I’ll start by describing the problem that I’ve been experiencing,
While running our Apache Cassandra (version 2.1.11) with the default configuration of 8GB sized heap (it’s calculated automatically by the cassandra-env.sh script at startup) I’ve encountered massive GC overhead and great latency peaks during queries (both Reads and Writes), when I looked at the metrics I saw that the heap of all of the nodes was filling up really quickly because of the massive wrtie throughput (about several million writes simultaneously),

I tried to expand the heap by 2GB each time before I ran the Spark application that was eventually doing the writes, but each time I would get the same result, one or more nodes crashed due to of OutOfMemoryError, I got up to a heap the size of 18GB and this was still happening.

So the solution was installing Java 8 on all of the nodes and changing the GC strategy to G1, and the results were amazing.
I’ve launched all nodes with 8GB of heap with the G1 Garbage collection algorithm and I saw an immediate improvement, the heap used didn’t go past 4GB per node, and the GC times decreased.


I’ll explain the steps I took to change the setting to run with the G1GC and I’ll elaborate a bit on how everything works.


(After I finished writing this post, I saw that it was just too long, so I’ve decided to be nice and left <TL;DR> A lot of things </TL;DR> for the enrichment parts, so feel free to skip them)

Install the newest Oracle JDK 8 to have the latest update of the G1GC. This will also work with Java JRE / JDK later than Java 7 update 4, but  in the manner of  Apache Cassandra Java 8 is recommended.


Installing Java 8 (For Debian based systems):
Adding the Oracle repository, updating the local apt-get cache and installing oracle-java8-installer package:


sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer


Automated installation (auto accept license)
echo oracle-java8-installer shared/accepted-oracle-license-v1-1 select true | sudo /usr/bin/debconf-set-selections


sudo apt-get install oracle-java8-set-default


Setting “JAVA_HOME” variable in the environment:
sudo vim /etc/environment


Add these lines in the end of the file.
JAVA_HOME="/usr/lib/jvm/java-8-oracle/"
PATH=$PATH:$JAVA_HOME/bin
Reload the newly added environment variables:
source /etc/environment


We now have the Java 8 JDK installed.

In order to setup the G1 algorithm to run on a specific node you’ll need to follow these steps:

  1. In the “cassandra-env.sh” file add the follwing lines:
# G1 - Parameters
JVM_OPTS="$JVM_OPTS -XX:+UseG1GC"
JVM_OPTS="$JVM_OPTS -XX:G1RSetUpdatingPauseTimePercent=5"
JVM_OPTS="$JVM_OPTS -XX:+PrintFlagsFinal"
  1. You’ll need to comment out all of the CMS specific flags that will probably be there by default. search for these and be careful for others:
    1. -XX:+UseParNewGC
    2. -XX:+UseConcMarkSweepGC
    3. -XX:+CMSParallelRemarkEnabled
    4. -XX:SurvivorRatio=8
    5. -XX:MaxTenuringThreshold=1
    6. -XX:CMSInitiatingOccupancyFraction=75
    7. -XX:+UseCMSInitiatingOccupancyOnly
    8. -XX:CMSWaitDuration=10000
    9. -XX:+CMSClassUnloadingEnabled


And that’s it, 
just restart the node and you’ll be up and running!


Now let’s understand how GC works in general, we’ll start with Concurrent Mark and Sweep and then continue on to G1, It’ll be much easier to see the performance gain that way.

What Is Garbage Collection?

Garbage Collection in our context is actually tracking down all the objects which are still used by the Java application and marking the rest as garbage. Let’s start digging into more details of how the process of automated memory reclamation called ‘Garbage Collection’ is implemented for Java Virtual Machine.
Disclaimer: The content focuses on Oracle Hotspot and OpenJDK behaviour. In other runtimes or even on other JVMs, such as jRockit, IBM J9 or Azul, some of the aspects can behave differently.
I will focus mainly on the JVM implementations after Java 7 update that has the G1 algorithm.
With the Oracle JVM implementation, come some additional algorithms:
  • Default GC (Serial Mark & Sweep, Serial NewGC)
  • CMS - Concurrent Mark and Sweep.
  • G1 - Garbage First (Oracle JDK 7 update 4 and later releases)

<TL;DR>


  • Default GC (Serial Mark & Sweep, Serial NewGC)
    Shows highest throughput as long as no Full GC is triggered. If your system has to run for a limited amount of time (say 12 hours) and you are willing to invest into a very careful programming style regarding allocation; keep large datasets Off-Heap, DefaultGC can be the best choice. Of course there are applications which are ok with some long GC pauses here and there.
  • CMS does best in pause-free low latency operation as long you are willing to throw memory at it. Throughput is pretty good. Unfortunately it does not compact the heap, so fragmentation can be an issue over time. CMS is still way behind commercial low-latency solutions such as Azul's Zing VM.
  • G1 excels in robustness, memory efficiency with acceptable throughput. While CMS and DefaultGC react to OldSpace overflow with Full-Stop-GC of several seconds up to minutes (depends on Heap size and Object graph complexity), G1 is more robust in handling those situations. Taking into account the benchmark represents a worst case scenario in allocation rate and programming style, the results are encouraging.


I think that It’ll be much easier if I’ll show the phases and the architecture of both CMS and G1 GC algorithms.


CMS Collection Phases and Heap Structure
The CMS collector performs the following phases on the old generation of the heap:
  1. Initial Mark (Stop the World Event) - Objects in old generation are “marked” as reachable including those who may be reachable from the young generation. Pause times are typically short in duration relative to minor collection pause times.
  2. Concurrent Marking - Traverse the tenured generation object graph for reachable objects concurrently while Java application threads are executing. Starts scanning from marked objects and transitively marks all objects reachable from the roots. The mutators are executing during the concurrent phases 2, 3, and 5 and any objects allocated in the CMS generation during these phases (including promoted objects) are immediately marked as live.
  3. Remark (Stop the World Event) - Finds objects that were missed by the concurrent mark phase due to updates by Java application threads to objects after the concurrent collector had finished tracing that object.
  4. Concurrent Sweep - Collects the objects identified as unreachable during marking phases. The collection of a dead object adds the space for the object to a free list for later allocation. Coalescing of dead objects may occur at this point. Note that live objects are not moved.
  5. Resetting - Prepare for next concurrent collection by clearing data structures.

You can see the illustration of the process in the next diagrams:



Mark and Sweep
First of all, the JVM is more specific about what constitutes reachability of an object. Instead of the vaguely defined blue labels that we saw on earlier, we have a very specific and explicit set of objects that are called the Garbage Collection Roots:
  • Local variables
  • Active threads
  • Static fields
  • JNI references
  • Others (will be discussed later)
The method used by JVM to track down all the reachable (live) objects and to make sure the memory claimed by non-reachable objects can be reused is called the Mark and Sweep algorithm. It consists of two steps:
  • Marking - is walking through all reachable objects and keeping a ledger in native memory about all such objects
  • Sweeping - is making sure the memory addresses occupied by non-reachable objects can be reused by the next allocations.
Different GC algorithms within the JVM, such as Parallel Scavenge, Parallel Mark+Copy or CMS, are implementing those phases slightly differently, but at conceptual level the process remains similar to the two steps described above.

The not-so-good news is that the application threads need to be stopped for the collection to happen as you cannot really count references if they keep changing all the time. Such a situation when the application is temporarily stopped so that the JVM can indulge in housekeeping activities is called a Stop The World pause. They may happen for many reasons, but garbage collection is by far the most popular one.


The G1 Garbage Collector Step by Step
The G1 collector takes a different approach to allocating the heap. The pictures that follow review the G1 system step by step.


The heap is one memory area split into many fixed sized regions. Region size is chosen by the JVM at startup. The JVM generally targets around 2000 regions varying in size from 1 to 32Mb.


G1 Heap Allocation
In reality, these regions are mapped into logical representations of Eden, Survivor, and old generation spaces.


The colors in the picture shows which region is associated with which role. Live objects are evacuated (i.e., copied or moved) from one region to another. Regions are designed to be collected in parallel with or without stopping all other application threads.
As shown regions can be allocated into Eden, survivor, and old generation regions. In addition, there is a fourth type of object known as Humongous regions. These regions are designed to hold objects that are 50% the size of a standard region or larger. They are stored as a set of contiguous regions. Finally the last type of regions would be the unused areas of the heap.


Young Generation in G1
The heap is split into approximately 2000 regions. Minimum size is 1Mb and maximum size is 32Mb. Blue regions hold old generation objects and green and orange regions hold young generation objects.
Note that the regions are not required to be contiguous like the older garbage collectors.


A Young GC in G1
Live objects are evacuated (i.e., copied or moved) to one or more survivor regions. If the aging threshold is met, some of the objects are promoted to old generation regions.
This is a stop the world (STW) pause. Eden size and survivor size is calculated for the next young GC. Accounting information is kept to help calculate the size. Things like the pause time goal are taken into consideration.
This approach makes it very easy to resize regions, making them bigger or smaller as needed.


End of a Young GC with G1
Live objects have been evacuated to survivor regions or to old generation regions.
Recently promoted objects are shown in dark blue. Survivor regions in green.


In summary, the following can be said about the young generation in G1:
    • The heap is a single memory space split into regions.
    • Young generation memory is composed of a set of non-contiguous regions. This makes it easy to resize when needed.
    • Young generation garbage collections, or young GCs, are stop the world events. All application threads are stopped for the operation.
    • The young GC is done in parallel using multiple threads.
    • Live objects are copied to new survivor or old generation regions.
Old Generation Collection with G1
Like the CMS collector, the G1 collector is designed to be a low pause collector for old generation objects. The following table describes the G1 collection phases on old generation.


G1 Collection Phases - Concurrent Marking Cycle Phases
The G1 collector performs the following phases on the old generation of the heap. Note that some phases are part of a young generation collection. (Much similar to the phases in CMS)
  1. Initial Mark (Stop the World Event) - This is a stop the world event. With G1, it is piggybacked on a normal young GC. Mark survivor regions (root regions) which may have references to objects in old generation.
  2. Root Region Scanning - Scan survivor regions for references into the old generation. This happens while the application continues to run. The phase must be completed before a young GC can occur.
  3. Concurrent Marking - Find live objects over the entire heap. This happens while the application is running. This phase can be interrupted by young generation garbage collections.
  4. Remark (Stop the World Event) - Completes the marking of live object in the heap. Uses an algorithm called snapshot-at-the-beginning (SATB) which is much faster than what was used in the CMS collector.
  5. Cleanup (Stop the World Event and Concurrent) - Performs accounting on live objects and completely free regions. (Stop the world) Scrubs the Remembered Sets. (Stop the world) Reset the empty regions and return them to the free list. (Concurrent)
  6. Copying (Stop the World Event) - These are the stop the world pauses to evacuate or copy live objects to new unused regions. This can be done with young generation regions which are logged as [GC pause (young)]. Or both young and old generation regions which are logged as [GC Pause (mixed)].


G1 Old Generation Collection Step by Step
With the phases defined, let's look at how they interact with the old generation in the G1 collector.


  1. Initial Marking Phase - Initial marking of live object is piggybacked on a young generation garbage collection. In the logs this is noted as GC pause (young)(inital-mark).
  2. Concurrent Marking Phase - If empty regions are found (as denoted by the "X"), they are removed immediately in the Remark phase. Also, "accounting" information that determines liveness is calculated.
  3. Remark Phase - Empty regions are removed and reclaimed. Region liveness is now calculated for all regions.
  4. Copying/Cleanup Phase - G1 selects the regions with the lowest "liveness", those regions which can be collected the fastest. Then those regions are collected at the same time as a young GC. This is denoted in the logs as [GC pause (mixed)]. So both young and old generations are collected at the same time.
  5. After Copying/Cleanup Phase - The regions selected have been collected and compacted into the dark blue region and the dark green region shown in the diagram.
Summary of Old Generation GC
In summary, there are a few key points we can make about the G1 garbage collection on the old generation.
  • Concurrent Marking Phase
    • Liveness information is calculated concurrently while the application is running.
    • This liveness information identifies which regions will be best to reclaim during an evacuation pause.
    • There is no sweeping phase like in CMS.
  • Remark Phase
    • Uses the Snapshot-at-the-Beginning (SATB) algorithm which is much faster then what was used with CMS.
    • Completely empty regions are reclaimed.
  • Copying/Cleanup Phase
    • Young generation and old generation are reclaimed at the same time.
    • Old generation regions are selected based on their liveness.


For more elaboration on the subject you are welcome to read the next great resources that I took a lot of data of:

Best Practices of G1
There are a few best practices you should follow when using G1.
  1. Do not Set Young Generation Size
    • Explicitly setting young generation size via -Xmn meddles with the default behavior of the G1 collector.
    • G1 will no longer respect the pause time target for collections. So in essence, setting the young generation size disables the pause time goal.
    • G1 is no longer able to expand and contract the young generation space as needed. Since the size is fixed, no changes can be made to the size.
  2. Response Time Metrics
    Instead of using average response time (ART) as a metric to set the XX:MaxGCPauseMillis=<N>, consider setting value that will meet the goal 90% of the time or more. This means 90% of users making a request will not experience a response time higher than the goal. Remember, the pause time is a goal and is not guaranteed to always be met.
  3. What is an Evacuation Failure?
    A promotion failure that happens when a JVM runs out of heap regions during the GC for either survivors and promoted objects. The heap can't expand because it is already at max. This is indicated in the GC logs when using -XX:+PrintGCDetails by to-space overflow. This is expensive!
    • GC still has to continue so space has to be freed up.
    • Unsuccessfully copied objects have to be tenured in place.
    • Any updates to RSets of regions in the CSet have to be regenerated.
    • All of these steps are expensive.
  4. How to avoid Evacuation Failure
  5. To avoid evacuation failure, consider the following options.
    • Increase heap size
      • Increase the -XX:G1ReservePercent=n, the default is 10.
      • G1 creates a false ceiling by trying to leave the reserve memory free in case more 'to-space' is desired.
    • Start the marking cycle earlier
    • Increase the number of marking threads using the -XX:ConcGCThreads=n option.The default value varies with the platform on which the JVM is running.


Setting the Log Detail
You can set the detail to three different levels:
  1. -verbosegc (which is equivalent to -XX:+PrintGC) sets the detail level of the log to fine.
    Sample Output
[GC pause (G1 Humongous Allocation) (young) (initial-mark) 24M- >21M(64M), 0.2349730 secs]
[GC pause (G1 Evacuation Pause) (mixed) 66M->21M(236M), 0.1625268 secs]    
  1. -XX:+PrintGCDetails sets the detail level to finer. The options shows the following information:
    • Average, Min, and Max time are displayed for each phase.
    • Root Scan, RSet Updating (with processed buffers information), RSet Scan, Object Copy, Termination (with number of attempts).
    • Also shows “other” time such as time spent choosing CSet, reference processing, reference enqueuing and freeing CSet.
    • Shows the Eden, Survivors and Total Heap occupancies.
Sample Output
[Ext Root Scanning (ms): Avg: 1.7 Min: 0.0 Max: 3.7 Diff: 3.7]
[Eden: 818M(818M)->0B(714M) Survivors: 0B->104M Heap: 836M(4096M)->409M(4096M)]
  1. -XX:+UnlockExperimentalVMOptions -XX:G1LogLevel=finest sets the detail level to its finest. Like finer but includes individual worker thread information.
    [Ext Root Scanning (ms): 2.1 2.4 2.0 0.0
              Avg: 1.6 Min: 0.0 Max: 2.4 Diff: 2.3]
          [Update RS (ms):  0.4  0.2  0.4  0.0
              Avg: 0.2 Min: 0.0 Max: 0.4 Diff: 0.4]
              [Processed Buffers : 5 1 10 0
              Sum: 16, Avg: 4, Min: 0, Max: 10, Diff: 10]
Determining Time
A couple of switches determine how time is displayed in the GC log.
  1. -XX:+PrintGCTimeStamps - Shows the elapsed time since the JVM started.
    Sample Output
    1.729: [GC pause (young) 46M->35M(1332M), 0.0310029 secs]
  2. -XX:+PrintGCDateStamps - Adds a time of day prefix to each entry.
    2012-05-02T11:16:32.057+0200: [GC pause (young) 46M->35M(1332M), 0.0317225 secs]


</TL;DR> :)


Summary (Long story short)


You have been given an overview of the G1 garbage collector included in the Java JVM. First you learned how the Heap and the Garbage Collector are key parts of any Java JVM. Next you reviewed how garbage collection works using the CMS collector and the G1 collector.
G1 GC is a regionalized, parallel-concurrent, incremental garbage collector that provides more predictable pauses compared to other HotSpot GCs. The incremental nature lets G1 GC work with larger heaps and still provide reasonable worst-case response times. The adaptive nature of G1 GC just needs a maximum soft-real time pause-time goal along-with the desired maximum and minimum size for the Java heap on the JVM command line.


Hope this gives a comprehensive overview of the Garbage collection algorithms of Java and the solution of the OutOfMemoryError I’ve been experiencing with Apache Cassandra nodes.
If you have any questions, feel free to leave comment and I’ll try my best to answer :)