Monday, February 25, 2013

Setup a Storm cluster on Amazon EC2

Storm - Real-time  Hadoop! Yes, you can call it in that way. As you know, Hadoop provides a set of general primitives for doing batch processing, Storm also provides a set of primitives for doing real-time computation. It's a very powerful tool and pretty straight forward to setup a Storm cluster. If you want to setup a Strom cluster on Amazon EC2, you should try Nathan's storm-deploy project first which will auto deploy a Storm cluster on EC2 based on your configurations. But if you want to manually deploy a Storm cluster, you can follow these steps (if you want more detailed information, you can also follow original documentation of Storm):

Let me show you my machine's current configuration first:
  • Machine type: m1.large (for supervisor) and m1.small (for nimbus)
  • OS: 64-bit CentOS 6.3
  • JRE Version: 1.6.0_43
  • JDK Version: 1.6.0_43
  • Python Version: 2.6.6

For this tutorial, I am going to setup a 3-node Storm cluster. IP addresses of each hosts and my targetted configurations is: - StormSupervisor1 - StormSupervisor2 - StormSupervisor3 - StormNimbus

Storm depends on Zookeeper for coordinating the cluster. I have already installed Zookeeper in each of those above hosts. Now apply each of the following steps on all Supervisor and Nimbus nodes:

A. Install ZeroMQ 2.1.7

Step #A-1: Download zeromq-2.1.7.tar.gz from

Step #A-2: Extract the gzip file:
[root@ip-10-0-0-194 tool]# tar -zxvf zeromq-2.1.7.tar.gz
Step #A-3: Build ZeroMQ and update the library:
Note: If you are facing "cannot link with -luuid, install uuid-dev." error when you are executing "./configure", then you need to install it. You can install it by executing "yum install libuuid-devel".

B. Install JZMQ

Step #B-1: Get the project from the Git by executing:
Step #B-2: Install it:

C. Setup Storm

Step #C-1: Download the latest version (for this tutorial, I'm using 0.8.1 version) from

Step #C-2: Unzip the downloaded zip file:
[root@ip-10-0-0-194 tool]# unzip
Step #C-3: Now change the configuration based on your environment. Default location of the main Storm configuration file is: "/storm/conf/storm.yaml". Any setting you write on this file will overwrite default configuration file. Here is what I changed in the storm.yaml file:
Note: I have created the "storm" folder manually inside "/var" directory.

At this point, you are ready to start your Storm cluster. Here, I installed and setup everything on a single instance first (supervisor1 - and then I created AMI from that instance and later created rest of the two supervisors and one nimbus node from that AMI.

Launch daemons by using the storm script (bin/storm) on each nodes. I started nimbus and UI daemons on the nimbus host and supervisor daemon on each of the supervisor nodes.

  • bin/storm nimbus on
  • bin/storm ui on
  • bin/storm supervisor on,,

You can see Storm UI by navigating to your nimbus host: http://{nimbus host}:8080. For my case, it was: (here, is the public IP address of my nimbus host).

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Saturday, February 23, 2013

Install Opscenter in CentOS environment

In my previous post, I talked about how to install Cassandra in CentOS environment. This is the follow-up post of my previous post and here I am going to show you how to install OpsCenter in the same environment. "OpsCenter is a browser-based user interface for monitoring, administering, and configuring multiple Cassandra clusters in a single, centralized management console (ref)". 

Last time, I installed Cassandra cluster on these nodes:
  • cassandra node1 ->
  • cassandra node1 ->
  • cassandra node1 ->
Now I am going to install OpsCenter agents on these nodes and will treat as my client node (that means, OpsCenter console will be deployed on that host). Before OpsCenter installation, make sure your Cassandra cluster is up and running successfully.

Step #1: Create a new yum repository definition for DataStax OpsCenter in node.
[root@ip-10-0-0-57 ~]# vim /etc/yum.repos.d/datastax.repo
Step #2: Write the edition you want to install in the datastax.repo file (I am installing OpsCenter community edition):
Step #3: Install the OpsCenter pacakge:
[root@ip-10-0-0-57 ~]# yum install opscenter-free
The above steps will install the most recent OpsCenter community edition in your system. But I want to install a specific version of OpsCenter today (appropriate for the Cassandra version which I installed earlier). So to do that, at first I need to check the list of versions for OpsCenter which are available now:
I wanted to install 3.0.2-1 version. So, I'm installing it by:
[root@ip-10-0-0-57 ~]# yum install opscenter-free-3.0.2-1
Step #4:  If you do not want to try with the repository and want to install manually any specific version of OpsCenter, in that case you can download the rpm file from and can install it by:
[root@ip-10-0-0-57 ~]# yum install opscenter-free-3.0.2-1.noarch.rpm
Step #5: Now configure your opscenter configuration file (/etc/opscenter/opscenterd.conf) to mention your web server's IP address or hostname:
Step #6: Now start your OpsCenter by:
Step #7: Now you can see your OpsCenter console by navigating to http://<opscenter_host>:8888. For my case, it would be: as it's the public IP address for the host

Step #8: Wait, you are not done yet! You still need to install your OpsCenter agents. For the first time when you open your OpsCenter console, it will ask you whether you want to create a new cluster or want to use existing cluster. In previous post, I installed a cluster with the name "Simple Cluster". So I want to install that existing cluster for my OpsCenter. So, I'm selecting "Use Existing Cluster" option.

Step #9: Now, you need to pass a list of hostnames or IP address of my cluster in each line at a time (leave other fields as they are):

Step #10: At this point you should be able to see your OpsCenter console.

Note that on top of your console, there is a notification labeled as "0 of 3 agents connected" with a link called "fix". This is because, none of your OpsCenter agents are installed yet. Click on that link and install agents automatically.

Step #11: Enter appropriate credentials of your machine. For my case, I'm writing "root" as my username and pasting my private key (including the commented part, i.e. --BEGIN RSA PRIVATE KEY):

Click on "Done" and finally install it by clicking on the "Install on all nodes" button.

Step #12: Accept all the fingerprints by clicking on the "Accept Fingerprints" button. Once you click on that button, each of your host will be downloading agent package by connecting to the internet and then it will install and configure agent in the system.

At the end of the installation you should be able to see a successful message on your screen:

As you see that each of the host downloads the agent package from internet, it is required that each of your host can talk to internet. If you do not have that setup, you can also install OpsCenter agent manually.

At the end of your installation you shouldn't be seen "0 of 3 agents connected" notification anymore.

Finally you are done! There are so many things you can do in OpsCenter. I highly recommend you to play with it and do some experiments by changing/applying different settings and configurations. It also comes with a large set of very good performance metrics, so do not forget to check those metrics too.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Thursday, February 21, 2013

Sqoop import/export from/to Oracle

I love Sqoop! It's a fun tool to work with and it's very powerful. Importing and exporting data from/to Oracle by Sqoop is pretty straightforward. One crucial thing you need to remember when working with Sqoop and Oracle together, that is using all capital letters for Oracle table names. Otherwise, Sqoop will not recognize Oracle tables.

This is my database (Oracle) related information:
  • URL:
  • db_name (SID): test
  • Schema: Employee_Reporting
  • Username: username1
  • Password: password1
  • Tablename: employee ( I'm going to export this table to HDFS by Sqoop)

Import from Oracle to HDFS:
Let's go through this option file (option.par):
You can see most of the parameters are self-explanatory. Notice that I'm providing table name in all capital letters. How you want to see the imported columns in HDFS? For that, we need to use --fields-terminated-by parameter. Here I'm passing "\t" for that parameter, which means that the column or field for each rows will be tab delimited after import. Sqoop will generate a class(with a list of setter and getter) to invoke the employee object, the name of that class name is defined by --class-name parameter. So in this case, it will create a class named inside com.example.reporting package. I'm using --verbose parameter to print out information while Sqoop is working. It's not mandatory and you can ignore it if you want. --split-by parameter represents the name of the column which I want to use for splitting the import data. Here, ID is the primary key of the table Employee. You can use any WHERE clause for your import, in that case you need to pass that with the --where parameter. For the above example, it will import all rows from the table Employee where ID is less than or equal to 100000 (e.g. importing 100000 rows). You need to mention a HDFS location which will be used as a destination directory for the imported data (--target-dir parameter). Remember one thing here is that the target directory should not be existing prior to run import command otherwise Sqoop will throw an error. The last parameter -m represents the number of map tasks to run in parallel for the entire import job.

Once you have your option file ready, you can execute the Sqoop import command as:
sqoop import --options-file option.par
Using option file is not mandatory, I'm just using it for my convenience. You can also pass each of the parameter from your console and execute the import job. Example:

Export from Hive to Oracle:
For export, I will be using some of the parameters which I used during import as they are common for both import and export job. Assume I processed(by MR jobs) the data generated by import job and inserted them into Hive tables. Now I want to export those Hive tables to Oracle. If you are familiar with Hive, you may know that Hive moves/copies data to its warehouse folder by default. I'm using Hortonworks's distribution and for my case Hive'e warehouse folder is located at: "/apps/hive/warehouse/emp_record". Here, emp_record is one of the Hive table I want to export from.

I have already created a matching table "Emp_Record" in my Oracle inside the same schema "Employee_Reporting". To export the Hive table, I'm executing the following command:
Notice that instead of using --target-dir, I'm using --export-dir, this is the location of the Hive table's warehouse folder and data will be exporting from there.

Now assume, inside the warehouse directory, I have a file 00000_1 (which contains the data of Hive table Emp_Record) and some of its lines are:
As you can see, each of columns/fields are tab delimited and each of the rows are separated by a new line. Again we see here that there is an entire row which contains null as their values (Ideally you might not have null values as you might want to filter those values from your M-R jobs). But say we have all kind of values, so we need to tell Sqoop how to treat each of those values. Because of that, I'm using --input-fields-terminated-by parameter to inform that the fields are tab delimited and --input-lines-terminated-by parameter to distinguish rows. Again from Sqoop side, there are two kinds of column - string column and non-string column. Sqoop needs to know what string value is interpreting a null value. Because of that I'm using --input-null-string and --input-null-non-string parameters for two column types and passing '\\N' as their value because for my case '\\N' is null.

Wrapping Up:
Sometimes you will face some issues during export when your Oracle table has a very tight constraint (e.g. not null, time-stamp, expecting value in specific format, etc). In that case, the best idea is to export the Hive table/HDFS data to a temporary Oracle table without any modification to make Sqoop happy :). And then write a SQL script to convert and filter those data to your expected values and load them into your main Oracle table.

The above two examples are just showing a very basic import and export Sqoop job. There are a lot of setting in Sqoop you can use. Once you are able to run export/import job successfully, I would recommend you to try the same job with different parameters and see how it goes. You can find all available options in the Sqoop user guide.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.

Wednesday, February 13, 2013

Install Cassandra Cluster within 30 minutes

Installing Cassandra cluster is pretty straight forward and you will find a lot of detailed documentations on DataStax site. But if you still find it overwhelming, you can follow the steps I mention here. It's just a sorted version of all of the steps you need to install a basic setup of Cassandra cluster. But I still recommend you to read DataStax documents for detailed information.

I am using a EBS backed CentOS AMI for this setup. You can choose different AMI based on your needs.

Please check the current Cassandra version if you want to setup the most latest one. Today, I am going to install Cassandra 1.2.3 version.

Step #1: Create an instance in Amazon AWS. I choose the following one for this setup:

EBS Backed CentOS 6.3

This is an EBS backed 64-bit 6.3 CentOS AMI. For me, the IP address which was assigned for this server:

Step #2: Once you created this instance, by default it will not allow you to login as root user. So, login to that server as ec2-user.

Step #3: Now allow root login on that machine:
Using username "ec2-user".
Authenticating with public key "imported-openssh-key"
[ec2-user@ip-10-0-0-57 ~]$ sudo su
[root@ip-10-0-0-57 ec2-user]# cd /root/.ssh
[root@ip-10-0-0-57 .ssh]# cp /home/ec2-user/.ssh/authorized_keys .
cp: overwrite `./authorized_keys'? y
[root@ip-10-0-0-57 .ssh]# chmod 700 /root/.ssh
[root@ip-10-0-0-57 .ssh]# chmod 640 /root/.ssh/authorized_keys
[root@ip-10-0-0-57 .ssh]# service sshd restart
Step #4: Now you should be able to login as root user on that server. So, login again as root user and check the current Java version:
[root@ip-10-0-0-57 ~]# java -version
The AMI which I used for this server didn't have Java pre-installed. So, I'm going to install last version of Java 1.6 on this server. It is recommended not to use Java 1.7 for Cassandra.

Step #5: Download Java rpm (jre-6u43-linux-x64-rpm.bin) from the following location:

Step #6: Copy your downloaded rpm to that server by using WinSCP. (You can use any other tool if you want).

Step #7: Give required permissions to that rpm and install it:
[root@ip-10-0-0-57 ~]# chmod a+x jre-6u43-linux-x64-rpm.bin
[root@ip-10-0-0-57 ~]# ./jre-6u43-linux-x64-rpm.bin
Step #8: Java should be installed successfully and it should show you expected information:
[root@ip-10-0-0-57 ~]# java -version
java version "1.6.0_43"
Java(TM) SE Runtime Environment (build 1.6.0_43-b01)
Java HotSpot(TM) 64-Bit Server VM (build 20.14-b01, mixed mode)
Step #9: Now download Cassandra 1.2.3 rpm from the following location and copy it to your server by WinSCP:

DataStax Cassandra 1.2.3 rpm

You can also install it by using DataStax repository (check DataStax Cassandra manual for that), I'm just following this way as a preference.

Step #10: After you copy the Cassandra rpm to your server, install it:
[root@ip-10-0-0-57 ~]# yum install cassandra12-1.2.3-1.noarch.rpm
Step #11: Cassandra should be installed successfully and you can check the status by this (By default, Cassandra server is stopped right after its installed):
[root@ip-10-0-0-57 ~]# /etc/init.d/cassandra status
cassandra is stopped
Step #12: During Cassandra installation, it downloads OpenJDK version of Java as a dependency and it will overwrite your java setting. So follow these to set back your Java to use Oracle version:

Step #13: At this point, your Cassandra is ready to start. But before you start your Cassandra, you need to update its configuration file for your cluster. All configuration file are present in the location "/etc/cassandra/conf" of your server by default if its packaged install. For the cluster, I will change one of the file from there which is cassandra.yaml. The cassandra.yaml is the main configuration file for Cassandra. 

There are so many properties you can change in the main configuration file and you can find its details here: But as I am just installing the most basic version of Cassandra cluster, I will change only the following property:
  • cluster_name: Name of your cluster. It will be same for all hosts or instances.
  • initial_token: Used in versions prior to 1.2. For this setup, I will manually set this value. But that is not required and you can setup your cluster by using both initial_token & num_tokens property. You can read more about it on the web and can try out different configs once you are familiar with Cassandra.
  • partitioner: It determine which node to store the data on. Remember, paritioner cannot be changed without loading your all data. So configure your correct partitioner before initializing your cluster.
  • seed_provider: A list of comma-delimited IP addresses to use as contact points when a node joins a cluster. This value should be unique for all your hosts.
  • listen_address: Local IP address of each host.
  • rpc_address: Listener address for client connections. Make it to listens on all configured interfaces.
  • endpoint_snitch: Sets which snitch Cassandra uses for locating nodes and routing requests. Since, I'm installing Cassandra cluster on AWS with a single region, I will be using EC2Snitch. Again, this value should be unique for all of your hosts.
So, after modifying, here is my updated configuration file (reflects only changes which I made):
Step #14:

Murmur3Partitioner: This is a new partitioner which is available from Cassandra 1.2 version. initial_token value is depends on the partitioner you are using. To generate initial_token value for Murmur3Partitioner, you can run the following commands:
Note: Here, 3 is the number of nodes which I will be using for my cluster setup. Change that value based on your needs.

RandomPartitioner: If you want to use RandomParitioner then in that case you can generate your initial_token value by using the tooken-generator tool which comes with Cassandra installation:
Step #15: Now, I am going to create a new AMI from my current instance so that when I create a new instance from that, I will have Cassandra installation ready with expected configuration.

Since, I'm installing a 3 node cluster, I will create two new instances from the AMI which I just created. So, I got IP addresses like this:

- cassandra node1 ->
- cassandra node2 ->
- cassandra node3 ->

If you are using Amazon Virtual Private Cloud (VPC), then you have the option to choose specific IP address based on your needs.

Step #16: Remember, even though you created instances from the AMI, you still need to change some values in cassandra.yaml file which varies based on hosts. Those are:

Node #2:
initial_token: -3074457345618258603
Node #3:
initial_token: 3074457345618258602
Step #17: You may need to update your "/etc/hosts" file in case your hostname is not configured. I have updated that file in each server like this:
Step #18: That's it! Now you are ready to start your Cassandra. Execute this in each node to start your cluster:
[root@ip-10-0-0-57 ~]# /etc/init.d/cassandra start
Starting cassandra: OK
Step #19: You can check the status of your Cassandra by executing this:

Two major things here you need to look on the status, those are "Owns" and "Status" columns. You see here all nodes are up and sharing same percentage of the total ownership. 

As I said at the beginning, this is the most basic (or minimum) setup of Cassandra cluster. There are a lot of things you can change, tune and modify based on your needs. Once you are familiar with basic Cassandra, I highly recommend you to do some experiments by using different configurations. The more you try the more you learn!

In another post, I will write about how to install DataStax OpsCenter to monitor this Cassandra Cluster.

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.