Saturday, May 25, 2013

Cassandra Stress Test

In this post, I will go through how you can quickly stress test your Cassandra performance. Before you go for tuning your Cassandra you might want to see how well its performing so far or where its slowing down. You can definitely write a benchmark tool which inserts some random data and reads it after that and measure performance based on time. When I first asked to stress test Cassandra, I was writing pretty much same kind of tool. But in the middle I found an existing code which stress test Cassandra and which is good enough to start with. It's basically a pom based Java project which uses Hector (my project also use Hector - A Java Client for Cassandra).

You can directly go here to get more information about how its written and how to run it:

But if you just want a quick way to run it, you can follow the following steps:

Step#1: Install It

Step#2: Run It:
What the above command doing is:
  • Inserting (-o insert) 1000000 records (-n) into column family StressStandard which has 10 columns (-c)
  • Using 5 threads (-t) and each batch size is 1000(-b)
  • So each thread is getting 1000000 / 5 = 200000 inserts, as the batch size is 1000, so each thread is actually inserting 200000 / 1000 = 200 times.
After it inserts 1000000, it will show you a brief stat of data insertion performance. For the above test, it took around 3 minutes to insert all records (no optimization), which was 140.87 write request per seconds with bandwidth 15730.39 kb/sec. You can also test read performance, as well as some other Hector's API performance (rangeslice, multiget, etc).

I played with this stress tool a lot and later I converted it based on my needs(to work with my Cassandra keyspace andcolumn families) and ran it for my stress test. I highly recommend you to use this stress tool, it will serve most of the basic cases.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me :)

Friday, May 17, 2013

Chunk data import / Incremental Import in Sqoop

Recently I faced an issue while importing data from oracle with Sqoop. So far it was working fine till I faced a new requirement. Before discussing about the new requirement, let me quickly write about how it's currently working.

Currently I am running Sqoop from Oozie but I am not using coordinator job. So I am executing each Oozie job manually from command prompt.

You can check these links if you want to know how to run Sqoop and Oozie together.
In our option parameter file, I have a field something like this below:
ID <= 1000000
For each run, I used to change that field manually and re-run my Oozie job.

New Requirement

Now, what I have asked to do is run my Oozie job through coordinator and import block-wise/chunk data from Oracle. Based on the current requirement, what I'm trying to achieve is to import list of rows from M to N. Ideally for each run, I'm targeting to import 15 millions rows from that specific table and Hadoop will process those records and will be ready to process another batch before the following run.

As an example:
1st run: 1 to 20
2nd run: 21 to 40
3rd run: 41 to 60
and so on...

First thing which I started exploring is to use "--boundary-query" parameter which comes with sqoop. From their documents: "By default sqoop will use query select min(<split-by>), max(<split-by>) from <table name> to find out boundaries for creating splits. In some cases this query is not the most optimal so you can specify any arbitrary query returning two numeric columns using --boundary-query argument."

After spending some time on it and discussing in Sqoop mailing list, I came to know that incremental import is not working with chunks. It imports everything since last import (more specifically, everything from --last-value to end).

Then I decided to create a shell action in Oozie which will update the appropriate parameter after each execution of Sqoop, so that following Sqoop runs will have a new options for its import.

So I made some changes in my option parameter file (option.par) and here is the new one:
To store current index value and chunk-size, I used another property based file
My shell script will update the value of startIndex by the chunkSize. Here is the script ( which I wrote for this:

I want to add something here is that when you are modifying a file by a script and running through Oozie, a cache version of the file in HDFS actually being updated. That's why I had to copy back those files to my original location of HDFS. Again, behind the scene, a mapred user is doing the work but I'm running the oozie job as ambari_qa user (note: I'm using Hortonworks Hadoop, HDP 1.2.0). That's why I had to give back all the permissions on those files to all users.

Here is my Oozie workflow (workflow.xml):
I put everything inside my Oozie application path in HDFS. Here is my folder structure:
Don't forget to give the "write" permission when you first put it inside HDFS. Now you can run the Oozie workflow by executing this:
[ambari_qa@ip-10-0-0-91 ~]$ oozie job -oozie http://ip-10-0-0-91:11000/oozie -config -run
Here is the file:
This is it! Now every time you execute the Oozie job, it will import a new chunk of data from Oracle. How I'm running it as a coordinator job, I will put them in another post. Jarcec mentioned in one of the Sqoop user mail threads that Sqoop will have this feature soon but I'm not sure it's time frame. So I had to do this work around. It worked for me, I hope it will work for you too!

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.