Saturday, June 15, 2013

Elastic Load Balancing (ELB) with a Java Web Application + Tomcat + Session Stickiness

Suppose you have a web application and you want to deploy it in Amazon cloud environment with load balance support. The whole process is pretty straight-forward and it generally doesn't take much time.

For this post, I'm using Apache Tomcat web server and I already have a war file from my HelloWorld application. 

Here is the Tomcat version I'm using:

I'm using two instances and I have extracted my tomcat zip file into /opt/ folder in each of those two instances. I have also placed HelloWorld.jar file into /opt/apache-tomcat-7.0.39/webapps folder.

Now, I will go to each of those two instances and will start tomcat server. After some minutes (or seconds) I should see my deployed web application is up and running. Which means, I can navigate to these URLs and able to see Log-In screen (initial page of my web app).

  • http://ip.address.instance-1:8080/HelloWorld/login.jsp
  • http://ip.address.instance-2:8080/HelloWorld/login.jsp

All of the above steps which I described so far, have nothing to do with Elastic Load Balancing (ELB). Just like everyone, I just deployed a web app in tomcat server. Before I start showing steps for ELB, I'm assuming your web application is also up and running and you can navigate through URLs separately.

Create Load Balancer

Step#1: On AWS EC2 console, click on the Load Balancer option under "Network & Security" section. If you do not have any ELB yet, you will see an empty list. Click on "Create Load Balancer" button.

Step#2: Write a name of your Load Balancer, this name will be used when it will create a default link. I'm also creating this Load Balancer inside my Virtual Private Cloud (VPC) that's why I'm selecting a specific VPC Id. By default, you might see only port 80 in the listerner configuration list, I have added port 8080 as my web app is running on port 8080. Add appropriate port based on your web application and click "Continue".

Step#3: This screen is dedicated to Health check configuration. Based on configuration, ELB will ping that path with that port to check the health condition and if it fails it will automatically remove your instances from the load balancer.

Since, Log-In is the default screen of my application (welcome page), so I'm using the path of Login screen as my ping path.

Step#4: Choose your Subnet id based on where you want to use your Load Balancer. For my case, subnet-2e961843 is my expected Subnet id.

Step#5: Next screen will ask you to select your security groups. I already have a security group for my VPC and I'm using it here too.

Step#6: In the "Add EC2 Instances" section, add the instances in where you already deployed Tomcat and your web application.

Step#7: This screen is for review purpose. Once you review it you can finally create your load balancer by clicking on the "Create" button.

Step#8: Once you create your balancer, it will redirect you to Load Balancer list and now you will see your newly created load balancer in the list. DNS Name column shows newly created DNS Name for your load balancer and you should be able to navigate it with proper port.

So for my case, I can navigate my load balancer by using:

Sticky Session:
Since you are using Tomcat with load balancer, it's pretty obvious that you might want to enable sticky session with session replication in Tomcat. My web application is a Spring MVC application and it uses Spring Security for all type of authorizations and authentications. If I directly go to the Log-In screen of my load balancer and try to authenticate, it might not work. It's expected as Tomcat gets confused when sending request and response in multiple instances. If I enable sticky session I will not face this issue.

You can do it with the help of AWS EC2 console. Open the Load Balancer screen and select your newly created load balancer.

If you look carefully at the port configuration part, you will see "Stickiness: Disabled" for all of your ports. By default, stickiness is disabled for all the ports you select for load balancer. Now click on the "edit" button of the port on where you want to enable stickiness. For my case, it will be port 8080. Once you click on the "edit" button, it will ask you how you want to enable session stickiness. You can either choose Load Balancer Generated Cookie Stickiness or Application Generated Cookie Stickiness. For my simple application, I have selected "Load Balancer Generated Cookie Stickiness" and I entered 86400 as my cookie expiration period which is a day in seconds.

After you enable it, you should be able to test your session stickiness. For my case, now I'm able to successfully authenticate to my application.

Some considerations: Sometimes you might see your load balancer is down or the link is not working or shows no page. In that case, best way to quickly test is to check each of the instance where tomcat is running and check whether you can access them individually (e.g. http://ip.address.instance-1:8080/HelloWorld/login.jsp). If you find that each of the instance is up and running, you can try removing them from your load balancer and add them again. Remember, "Status" section under "Description" tab of your load balancer does not get updated instantly. It takes some time and it waits for the result of the next health check. So wait few minutes until you see "Status: N of N instances in service".

That's pretty much it! This is the very basic AWS Load Balancer example with minimum configuration of Tomcat + Session Stickiness. Once its working for you, you can try other options (highly encouraged) and see how it works for you.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me :)

Saturday, June 1, 2013

Cassandra Performance Tuning

In my previous post, I discussed about how to stress test Cassandra. In this post, I will discuss on some easy steps to tune-up its performance. I'm a big fan of Cassandra. It is optimized for very fast and highly available data write. There are so many things you can do to optimize its write and read performance further. But today, I will only discuss on some major and easy tune-up steps which you can apply easily.

Dedicated Commit Log Disk: I think this is the first tune-up you may want to try as it gives you a significant performance improvement. But before changing commit log destination it would be better to know it gives performance boost. Cassandra write operations are occurred on a commit log on disk and then to an in-memory table structure called Memtable. When thresholds are reached, that Memtable is flushed to a disk in a format called SSTable. So if you separate out Commit Log locations, it will isolate Commit Log I/O traffics from other Cassandra Reads, Memtables and SSTables traffics. Remember, after the flush, the Commit Log is no longer needed and is deleted. So the Commit Log disk doesn't need to be large. It just need to be in the size where it can holds Memtable data before its flushed. You can follow the following steps to change commit log location for Cassandra.

Step#1: Mount a separate partition for commit log
Step#2: Make sure you give expected ownership and access on that drive
Step#3: Edit Cassandra configuration file which can be found at conf/cassandra.yaml. You will find a property "CommitLogDirectory", update it based on your mount location. For my case, it will be:
CommitLogDirectory: /mnt/commitlog
Step#4: Restart your Cassandra cluster.

Increasing Java Heap Size: Cassandra runs on JVM. So you might face out of memory issues when you run a heavy load on Cassandra. There is also a rule of thumb about how you want to keep your heap size.
  • Heap Size = 1/2 of System Memory when System Memory < 2GB
  • Heap Size = 1GB when System Memory >= 2GB and <= 4GB
  • Heap Size = 1/4 of System Memory(but not more than 8GB) when System Memory >4GB
Remember, just a larger heap size might not give you a performance boost. So a well-tuned Java heap size is very important. To change the Java heap size, you need to update file and then restart Cassandra cluster again. If you are using Opscenter, you should see the updated heap size on one of the Opscenter's metrics.

Tune Concurrent Reads and Writes: Staged Event-Driven Architecture(SEDA) is used for implementing Cassandra. It breaks the application into stages. Concurrent readers and writers control the maximum number of threads allocated to a particular stage. So having an optimal concurrent reads and concurrent writes value will improve Cassandra performance. But raising these values beyond the limit will decrease Cassandra performance. These values are highly tied with CPU cores of the system. As like, Java heap size, there is also a rule of thumb about how to select these values:
  • Concurrent Reads: 4 concurrent reads per processor core
  • Concurrent Writes: Most of the time you do not need it as write is usually fast. If needed, you can set the value to equal or higher than the concurrent reads.
To change the value, you need to update conf/cassandra.yaml configuration file. There are two parameters present for these two: ConcurrentReaders and ConcurrentWriters. Update those values based on your system and restart Cassandra to take the effect.

Tune-Up Key Cache: For each of the column families, key cache holds the location of row keys in memory. Since keys are usually small, it can store a large cache without using much memory. Each cache hit results in less disk activity. 200000 is the default key cache size of Cassandra and its enabled by default. You can alter the default value by following:

You can monitor key cache performance by using nodetool cfstats command.

Tune-Up Row Cache: In Cassandra, row cache is disabled by default. Row cache holds the entire content of the date in memory. So a column family with large rows could easily consume system memory and could impact Cassandra performance, that's why its disabled by default and should be remain disabled in most of the cases. But if your column data is too small then using row cache will significantly improve performance as row cache keeps the most accessed rows hot in memory. To enable row cache, you can alter your column family and can pass number of rows for row cache.

You can also monitor it by using nodetool cfstats command like above (watch for ."Row cache hit rate").

Conclusion: As I said early, these are only some of the tune-up steps, there are more (high performing RAID level, file system optimization, disabling swap memory, memory mapped disk modes and so on). But I gave you something you can start with, once you find out improved Cassandra performance you can try the rest of the tuning. Cassandra is highly scalable and scaling up is done by enhancing each node (more RAM, high network throughput, SSD, disk size, etc). Remember, if you are using AWS EC2 instance do not expect much performance improvement if you are using medium or small type instance as they are not optimized for better I/O or network, use xlarge+ instance instead.

And finally, DO NOT forget to check the Cassandra Performance and Scalability slides by Adrian Cockcroft.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me :)

Saturday, May 25, 2013

Cassandra Stress Test

In this post, I will go through how you can quickly stress test your Cassandra performance. Before you go for tuning your Cassandra you might want to see how well its performing so far or where its slowing down. You can definitely write a benchmark tool which inserts some random data and reads it after that and measure performance based on time. When I first asked to stress test Cassandra, I was writing pretty much same kind of tool. But in the middle I found an existing code which stress test Cassandra and which is good enough to start with. It's basically a pom based Java project which uses Hector (my project also use Hector - A Java Client for Cassandra).

You can directly go here to get more information about how its written and how to run it:

But if you just want a quick way to run it, you can follow the following steps:

Step#1: Install It

Step#2: Run It:
What the above command doing is:
  • Inserting (-o insert) 1000000 records (-n) into column family StressStandard which has 10 columns (-c)
  • Using 5 threads (-t) and each batch size is 1000(-b)
  • So each thread is getting 1000000 / 5 = 200000 inserts, as the batch size is 1000, so each thread is actually inserting 200000 / 1000 = 200 times.
After it inserts 1000000, it will show you a brief stat of data insertion performance. For the above test, it took around 3 minutes to insert all records (no optimization), which was 140.87 write request per seconds with bandwidth 15730.39 kb/sec. You can also test read performance, as well as some other Hector's API performance (rangeslice, multiget, etc).

I played with this stress tool a lot and later I converted it based on my needs(to work with my Cassandra keyspace andcolumn families) and ran it for my stress test. I highly recommend you to use this stress tool, it will serve most of the basic cases.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me :)

Friday, May 17, 2013

Chunk data import / Incremental Import in Sqoop

Recently I faced an issue while importing data from oracle with Sqoop. So far it was working fine till I faced a new requirement. Before discussing about the new requirement, let me quickly write about how it's currently working.

Currently I am running Sqoop from Oozie but I am not using coordinator job. So I am executing each Oozie job manually from command prompt.

You can check these links if you want to know how to run Sqoop and Oozie together.
In our option parameter file, I have a field something like this below:
ID <= 1000000
For each run, I used to change that field manually and re-run my Oozie job.

New Requirement

Now, what I have asked to do is run my Oozie job through coordinator and import block-wise/chunk data from Oracle. Based on the current requirement, what I'm trying to achieve is to import list of rows from M to N. Ideally for each run, I'm targeting to import 15 millions rows from that specific table and Hadoop will process those records and will be ready to process another batch before the following run.

As an example:
1st run: 1 to 20
2nd run: 21 to 40
3rd run: 41 to 60
and so on...

First thing which I started exploring is to use "--boundary-query" parameter which comes with sqoop. From their documents: "By default sqoop will use query select min(<split-by>), max(<split-by>) from <table name> to find out boundaries for creating splits. In some cases this query is not the most optimal so you can specify any arbitrary query returning two numeric columns using --boundary-query argument."

After spending some time on it and discussing in Sqoop mailing list, I came to know that incremental import is not working with chunks. It imports everything since last import (more specifically, everything from --last-value to end).

Then I decided to create a shell action in Oozie which will update the appropriate parameter after each execution of Sqoop, so that following Sqoop runs will have a new options for its import.

So I made some changes in my option parameter file (option.par) and here is the new one:
To store current index value and chunk-size, I used another property based file
My shell script will update the value of startIndex by the chunkSize. Here is the script ( which I wrote for this:

I want to add something here is that when you are modifying a file by a script and running through Oozie, a cache version of the file in HDFS actually being updated. That's why I had to copy back those files to my original location of HDFS. Again, behind the scene, a mapred user is doing the work but I'm running the oozie job as ambari_qa user (note: I'm using Hortonworks Hadoop, HDP 1.2.0). That's why I had to give back all the permissions on those files to all users.

Here is my Oozie workflow (workflow.xml):
I put everything inside my Oozie application path in HDFS. Here is my folder structure:
Don't forget to give the "write" permission when you first put it inside HDFS. Now you can run the Oozie workflow by executing this:
[ambari_qa@ip-10-0-0-91 ~]$ oozie job -oozie http://ip-10-0-0-91:11000/oozie -config -run
Here is the file:
This is it! Now every time you execute the Oozie job, it will import a new chunk of data from Oracle. How I'm running it as a coordinator job, I will put them in another post. Jarcec mentioned in one of the Sqoop user mail threads that Sqoop will have this feature soon but I'm not sure it's time frame. So I had to do this work around. It worked for me, I hope it will work for you too!

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Saturday, April 6, 2013

Configure Ganglia for multiple clusters in Unicast mode

In my previous post I talked about how to: Setting up Ganglia in CentOS environment. At that time, I used only a single cluster for the whole setup. But it's highly unlikely that you have only a single cluster in your development/production environment. Consider you have two clusters - 1. Storm 2. Kafka and you want to monitor all of these cluster nodes through a single Ganglia UI. You do not have to install Ganglia multiple times for that, you just need to configure your Ganglia. It would have been much easier if AWS supports multicast but as it doesn't support multicast, you need to do a work-around in unicast mode to achieve monitoring multiple clusters in one single Ganglia.

The idea behind this work-around is pretty straightforward. Suppose I have two clusters: cluster#1 - Storm and cluster#2 - Kafka and their respective IP addresses are: - Storm Cluster (supervisor 1) - Storm Cluster (supervisor 2) - Storm Cluster (supervisor 3) - Storm Cluster (nimbus) - Kafka Cluster - Kafka Cluster - Kafka Cluster - my client machine

What I am going to do is, I will configure each of the cluster to send collected data (gmond) to one of their specific node only and configure the gmetad daemon in a way that it can collects the data only from a designated node (gmond daemon) from each cluster. Ganglia will categorize each cluster data by their unique cluster name defined in gmond.conf file.

As you can see in the above figure that all Kakfa cluster's data is sending to one specific node - and all Storm cluster's data is sending to one of its node - Client machine ( is running gmetad daemon and I will configure that daemon so that it can look for two data sources for two clusters where their source IP addresses will be and for Kafka and Storm respectively.

I'm assuming that you already setup your Ganglia and it's running as expected. So I am not going to discuess about what is gmond.conf and gmetad.conf files. In case if you have not setup yet, you might want to take a look at this post
This is my gmond.conf file (only the part which I modified) which I'm using for all Kafka hosts (this file is unique for each host per cluster):
And here is my gmond.conf file for all Storm hosts (this file is unique for each host per cluster):

You notice that I'm using unique host address for udp_send_channel for each cluster. Now, I need to tell my gmetad daemon to look for those two host address to collect data from. Here is my gmetad.conf file:
You are done! Now restart all gmond daemons and gmetad daemon and wait for few minutes.
Once you navigate to your Ganglia UI url you should be able to see your grid and list of your clusters in the drop-down.

You can dig further to see each of your host for each cluster:

There is another work-around which you can also try to get a better understanding of Ganglia. In that case you need to use separate port number for each cluster. Here, I'm distinguishing each cluster's data source per IP address, but in that work-around you can have a single IP address for all clusters but multiple port numbers. You can try that work-around as an exercise :).

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Monday, April 1, 2013

Setting up Ganglia in CentOS

Ganglia is a scalable distributed monitoring system for high-performance computing systems such as clusters and Grids (ref). Installing and configuring Ganglia is very straight-forward. It has two major parts:

Gmond (Ganglia monitoring daemon): Runs on every single node and collects the data and sends to meta daemon node.

Gmetad (Ganglia meta daemon): Runs on a head (or client) node and gathers the data from all monitoring nodes and displays it on UI.

Assume I have 4 nodes cluster and one of the nodes also works as client. So, I will install the Ganglia PHP UI on that machine.

Here are their IP addresses and list of services I am going to install on them:
  • - client node (gmetad, gmond, ui)
  • - monitoring node (gmond)
  • - monitoring node (gmond)
  • - monitoring node (gmond)

On client node:
--> Install meta daemon, monitoring daemon and web UI by executing:
--> If they are not available, then you might need to install EPEL repositories to your machine.

On monitoring node:
--> Install monitoring daemon by:


By this point, everything is installed and now you need to configure your Ganglia.
  • /etc/ganglia/gmetad.conf --- configuration file for gmetad daemon
  • /etc/ganglia/gmond.conf --- configuration file for gmond daemon

I have updated only the following part on gmond.conf file in each monitoring node.
Notice that I have commented out mcast_join and bind because multicast is not supported by AWS EC2 and unicast is only the option for Ganglia. So, all monitoring nodes are sending collected data to the node ( which is collecting data (nodes which is running gmetad daemon).

On gmetad.conf file I have updated this:
data_source "Cloud for Beginners" 60
Here I'm telling to meta daemon the name of the cluster (name should be matched to organize list of hosts by cluster) and host's IP address and port from where data will be collected from and duration (collect data in every 60 seconds).

You are done! Now start monitoring daemon and meta daemon in all nodes.
After 1-2 minutes you should be able to see all your monitoring data through:

You might want to change boot configuration so that gmetad and gmond daemons will be started at boot:

Common Issue: 
In case if you are facing that the gmetad is not starting up, you can check the log by:
In log you might see "Please make sure that /var/lib/ganglia/rrds is owned by nobody" error, in that case you need to execute this:

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Thursday, March 21, 2013

A basic Oozie coordinator job

Suppose you want to run your workflow in every two hours or once per day, at that point coordinator job comes out very handy. There are several more use cases where you can use Oozie coordinator. Today I'm just showing you how to write a very basic Oozie coordinator job.

I'm assuming that you are already familiar with Oozie and have an workflow ready to be used as coodinator job. For this tutorial, my Oozie workflow is a shell-based action workflow. I want to execute a shell script in every two hours starting from today to next 10 days. My workflow.xml is already inside the a HDFS directory.
Without the coordinator, I'm currently running it like this:
Here is my file:
Now I want to run this workflow with coordinator. Oozie Coordinator Engine is responsible for the coordinator job and the input of the engine is a Coordinator App. At least two files are required for each Coordinator App:
  1. coordinator.xml - Definition of coordinator job is defined in this file. Based on what(time based or input based) your workflow will trigger, how long it will continue, workflow wait time - all of this information need to be written on this coordinator.xml file.
  2. - Contain properties for coordinator job, behaves same as job.properfiles file.
Based on my requirement, here is my coordinator.xml file:
As I need to pass file for a coordinator job, I cannot pass previous file at the same time. That's why I need to move all properties from the file to file. Remember one thing, file must have a property which specifics the location of coordinator.xml file (similar to in After moving those properties my file became:

As you noticed I mentioned application path oozie.coord.application.path and that path contains the cooridnator.xml file.
Now I'm pretty much set. Now if I execute a coordinator job now it will execute the coordinator app located in the coordinator application path. Coordinator app has a tag <workflow><app-path>.... </app-path></workflow> which specifics the actual workflow location. At that location, I have my workflow.xml file. So that workflow.xml will be triggered based on  how I define the job in coordinator.xml file.

I'm submitting my coordinator job by:
If you are running your coordinator job successfully, I highly recommend you to go through this document and try out some other use cases and alternatives.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.