Friday, May 17, 2013

Chunk data import / Incremental Import in Sqoop

Recently I faced an issue while importing data from oracle with Sqoop. So far it was working fine till I faced a new requirement. Before discussing about the new requirement, let me quickly write about how it's currently working.

Currently I am running Sqoop from Oozie but I am not using coordinator job. So I am executing each Oozie job manually from command prompt.

You can check these links if you want to know how to run Sqoop and Oozie together.
In our option parameter file, I have a field something like this below:
ID <= 1000000
For each run, I used to change that field manually and re-run my Oozie job.

New Requirement

Now, what I have asked to do is run my Oozie job through coordinator and import block-wise/chunk data from Oracle. Based on the current requirement, what I'm trying to achieve is to import list of rows from M to N. Ideally for each run, I'm targeting to import 15 millions rows from that specific table and Hadoop will process those records and will be ready to process another batch before the following run.

As an example:
1st run: 1 to 20
2nd run: 21 to 40
3rd run: 41 to 60
and so on...

First thing which I started exploring is to use "--boundary-query" parameter which comes with sqoop. From their documents: "By default sqoop will use query select min(<split-by>), max(<split-by>) from <table name> to find out boundaries for creating splits. In some cases this query is not the most optimal so you can specify any arbitrary query returning two numeric columns using --boundary-query argument."

After spending some time on it and discussing in Sqoop mailing list, I came to know that incremental import is not working with chunks. It imports everything since last import (more specifically, everything from --last-value to end).

Then I decided to create a shell action in Oozie which will update the appropriate parameter after each execution of Sqoop, so that following Sqoop runs will have a new options for its import.

So I made some changes in my option parameter file (option.par) and here is the new one:
To store current index value and chunk-size, I used another property based file
My shell script will update the value of startIndex by the chunkSize. Here is the script ( which I wrote for this:

I want to add something here is that when you are modifying a file by a script and running through Oozie, a cache version of the file in HDFS actually being updated. That's why I had to copy back those files to my original location of HDFS. Again, behind the scene, a mapred user is doing the work but I'm running the oozie job as ambari_qa user (note: I'm using Hortonworks Hadoop, HDP 1.2.0). That's why I had to give back all the permissions on those files to all users.

Here is my Oozie workflow (workflow.xml):
I put everything inside my Oozie application path in HDFS. Here is my folder structure:
Don't forget to give the "write" permission when you first put it inside HDFS. Now you can run the Oozie workflow by executing this:
[ambari_qa@ip-10-0-0-91 ~]$ oozie job -oozie http://ip-10-0-0-91:11000/oozie -config -run
Here is the file:
This is it! Now every time you execute the Oozie job, it will import a new chunk of data from Oracle. How I'm running it as a coordinator job, I will put them in another post. Jarcec mentioned in one of the Sqoop user mail threads that Sqoop will have this feature soon but I'm not sure it's time frame. So I had to do this work around. It worked for me, I hope it will work for you too!

Note: For privacy purpose, I had to modify several lines on this post from my original post. So if you find something is not working or facing any issues, please do not hesitate to contact me.