Exploring Big Data on a Desktop: Spark and HBase (May 2015)

2
5566

The last issue of OSFY carried the column ‘Exploring Big Data’, which took a look at Apache Spark. This article explores HBase, the Hadoop database, which is a distributed, scalable big data store. The integration of Spark with HBase is also covered.

Spark can work with multiple formats, including HBase tables. Unfortunately, I could not get the HBase Python examples included with Spark to work. Hence, you may need to experiment with Scala and Spark instead. The quick-start documentation with Scala code is fairly easy to read and understand even if one knows only Python and not Scala (http://spark.apache.org/docs/1.2.1/quick-start.html). You can use the Scala example ‘HBaseTest.scala’ as the basis for further exploration.
Start the HBase server. In the Spark installation directory, you will need to start the Spark shell including HBase jars in the driver classpath. For example:

$ ../hbase-0.98.9-hadoop2/bin/start-hbase.sh
$ HBASE_PATH=`../hbase-0.98.9-hadoop2/bin/hbase classpath`
$ bin/spark-shell --driver-class-path $HBASE_PATH
scala>

You are now ready to enter Scala code in the interactive shell.

scala> import org.apache.hadoop.hbase.HBaseConfiguration
scala> val conf = HBaseConfiguration.create()
scala> val tableName=”spark_table”
scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
scala> conf.set(TableInputFormat.INPUT_TABLE,tableName)
scala> import org.apache.hadoop.hbase.client.HBaseAdmin
scala> val admin = new HbaseAdmin(conf)
scala> admin.isTableAvailable(tableName)
res2: Boolean = false

So far, you have set up a connection to HBase and confirmed that the table you wish to create does not exist.

scala> import org.apache.hadoop.hbase.{HTableDescriptor, HColumnDescriptor}
scala> val tableDesc = new HTableDescriptor(tableName)
scala> tableDesc.addFamily(new HColumnDescriptor(“cf”.getBytes()))
scala> admin.createTable(tableDesc)

You have now created a new table, ‘spark_table’, with a column family, ‘cf’. In another terminal window, in the HBase installation directory, you can run the HBase shell and verify that the table has indeed been created, and that it is empty.

$ bin/hbase shell
hbase(main):001:0> list
spark_table
=> [“spark_table”]
hbase(main):002:0> scan “spark_table”
ROW COLUMN+CELL
0 row(s) in 2.5640 seconds

You can go back to the Scala shell prompt and enter some data.

scala> import org.apache.hadoop.hbase.client.{HTable,Put}
scala> val table = new HTable(conf, tableName)
scala> var row = new Put(“dummy”.getBytes())
scala> row.add(“cf”.getBytes(), “content”.getBytes(), “Test data”.getBytes())
scala> table.put(row)
scala> table.flushCommits()

You have now created a connection to the HBase table and created a row with the key ‘dummy’ and the content ‘Test data’, in a column, ‘cf:content’. You can verify that this data has indeed been stored from the HBase shell, as follows:

hbase(main):003:0> scan “spark_table”
ROW COLUMN+CELL
dummy column=cf:content,
timestamp=1427880756119, value=Test data 1 row(s) in 0.1200 seconds

Let’s suppose you have a group of text files in an HDFS directory, TextFiles, and you want to store them as ‘key, value’ pairs in the HBase table—where the file name is the key and the file content is the value.
The following code illustrates a way of doing so. The code appears to be complex in order to avoid serialisation exceptions. (For an explanation, see http://stackoverflow.com/questions/25250774/writing-to-hbase-via-spark-task-not-serializable)

scala> val filesRDD = sc.wholeTextFiles(“hdfs://localhost/fedora/TextFiles”)
scala> filesRDD.foreachPartition { iter =>
| val hbaseConf = HBaseConfiguration.create()
| val table = new HTable(hbaseConf, tableName)
| iter.foreach { x =>
| var p = new Put(x._1.getBytes())
| p.add(“cf”.getBytes(),”content”.getBytes(),x._2.getBytes())
| table.put(p)
| }}

The parameter, x, is a (key, value) tuple, whose elements can be accessed as x._1 and x._2, which is not a very familiar syntax. The key is the file name, and the value is the contents of the file.
It is far more likely that you will want to use an existing HBase table. All you need to do is to create a Resilient Distributed Dataset (RDD) of the table; after that you can do all the operations available on an RDD, e.g., count() as shown below:

scala> val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
| classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
| classOf[org.apache.hadoop.hbase.client.Result])
scala> hBaseRDD.count()

The Spark universe continues to expand as familiarity with it increases. It definitely seems like a technology with a bright future and worth exploring.

Previous articlePlotting with Scilab is Simple
Next articleUsing Parallel Ports to Control Devices
The author works as a consultant. Prior to consulting, Anil was a professor at Padre Conceicao College of Engineering (PCCE) in Goa, managed IT and imaging solutions for Phil Corporation Limited (Goa), supported domestic customers for Tata Burroughs/TIL, and was a researcher with IIT-K and the Indian Institute of Geomagnetism (Mumbai).

2 COMMENTS

  1. Hey, I have gone through many articles wherein Hbase tables are being read using Spark(scala) but all of these end at counting the number of entries. Is there a way to read/manipulate contents of a HBase table in Spark using scala. It would be appreciated if you point me to some online resource regarding the same.

    • I think the use of spark on HBase is bulk data processing , if you want to present a simple aggregation report from 1 index , it will be only done using HBase , also Hbase is for interactive processing while spark is for row by row processing , spark can best fit in replacement of Mapreduce programming …

LEAVE A REPLY

Please enter your comment!
Please enter your name here