Exploring Big Data on a Desktop, with Hadoop, Elasticsearch and Pig


In continuation of earlier articles, the author goes further into the subject to discuss Elasticsearch and Pig, and explain how they can be used to create an index for a large number of PDF files.

If the files and data are already in Hadoop HDFS, is Elasticsearch still useful? How does one create an index?
Consider a large number of PDF files that need to be searched. As a first step, process each PDF file and store it as a record in an HDFS file. Then, you may experiment with two different but very simple approaches to create an index.

  • Write a simple Python mapper using MapReduce streaming to create an index.
  • Install the Elasticsearch-Hadoop plugin and create an index using a Pig script.

The environment for these experiments will be the same as in the earlier articles – three virtual machines, h-mstr, h-slv1 and h-slv2, each running HDFS and Elasticsearch services.
Loading PDF files into Hadoop HDFS
Enter the following code in load_pdf_files.py. Each PDF file is converted to a single line of text. Any tab characters are filtered so that there are no ambiguities when using a Pig script. For each file, the output will be the path, tab, file name and the text content of the file.

from __future__ import print_function
import sys
import os
import subprocess
# Call pdftotext to convert the pdf file and store the result in /tmp/pdf.txt
def pdf_to_text(inpath,infile):
return exit_code,'/tmp/pdf.txt'
# Join all the lines of the converted pdf file into a single string
# Replace any tabs in the converted documents
# Write the file as a single line prefixing it with the path and the name
def process_file(p,f):
exit_code,textfile = pdf_to_text(p,f)
if exit_code == 0:
print("%s\t%s"%(p,f), end='\t')
print("%s"% ' '.join([line.strip().replace('\t',' ') for line in open(textfile)]))
# Generator for yielding pdf files 
def get_documents(path):
for curr_path,dirs,files in os.walk(path):
for f in files:
if f.rsplit('.',1)[1].lower() == 'pdf'
yield curr_path,f
# Start here
# Search for each file in the current path of type 'pdf' and process it
except IndexError:
# Use an error file for stderr to prevent these messages going to hadoop streaming
ErrFile = open('/tmp/err.txt','w')
for p,f in get_documents(path):

Now, you can run the above program on your desktop and load data into a file in Hadoop HDFS as follows:

$ ./load_pdf_files.py ~/Documents |HADOOP_USER_NAME=fedora \
hdfs dfs -fs hdfs://h-mstr/ -put - document_files.txt

Using MapReduce to create an index
Log into h-mstr as user fedora and enter the following code in ‘indexing_mapper.py’.

import sys
from elasticsearch import Elasticsearch
# Generator for yielding each line split into path, file name and the text content
def hdfs_input(sep='\t'):
for line in sys.stdin:
yield path,name,text
# Create an index pdfdocs with fields path, title and text.
# Index each line received from Hadoop streaming 
def main():
es = Elasticsearch(hosts='h-mstr')
for path,name,text in hdfs_input():
doc = {'path':path,'title':name, 'text':text}
es.index(index='pdfdocs', doc_type='text', body= doc)
if __name__ == "__main__" :

Run the code in the following command on h-mstr:

$ hadoop jar /usr/share/java/hadoop/hadoop-streaming.jar \
-files indexing_mapper.py -mapper indexing_mapper.py \
-input document_files.txt -output es.out

The following URLs will give you more information about the allocation and status of the index.

Using a Pig script to create an index
The Fedora 20 repositories do not as yet have the Pig distribution. It will be included in Fedora 21. So, download and install Pig from the Apache site http://pig.apache.org/releases.html on each of the virtual machines.
You will also need to install the Elasticsearch-Hadoop plugin on these systems. For example, you may run the following commands from h-mstr:

$ sudo /usr/share/elasticsearch/bin/plugin \
-u http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.1.0.Beta2.zip \
i elasticsearch-hadoop

$ ssh -t fedora@h-slv1 sudo /usr/share/elasticsearch/bin/plugin \
-u http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.1.0.Beta2.zip \
-i elasticsearch-hadoop

$ ssh -t fedora@h-slv2 sudo /usr/share/elasticsearch/bin/plugin \
-u http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.1.0.Beta2.zip \
-i elasticsearch-hadoop

The Pig script, indexing.pig, for creating the index is just four lines. The elasticsearch-hadoop jar file has to be registered. The Hadoop text file is loaded. The tuple (path, text, title) is stored in (indexed by) Elasticsearch storage.

REGISTER /usr/share/elasticsearch/plugins/hadoop/dist/elasticsearch-hadoop-2.1.0.Beta2.jar;
A = LOAD 'document_files.txt' USING PigStorage() AS (path:chararray, title:chararray, text:chararray);
B = FOREACH A GENERATE path, text, title ;
STORE B INTO 'docs/text' USING org.elasticsearch.hadoop.pig.EsStorage();

You can check the status of the indices and compare the pdfdocs index created earlier with the docs index created by running the Pig script:

[fedora@h-mstr ~]$ pig indexing.pig

The ultimate test is to compare the results of the two indices; e.g., you can browse the Elasticsearch index searching for ‘Python’ in the contents and displaying up to 25 values. Only the values of the fields path and title will be displayed.


The more flexible option is to use a json string to query as follows (for details, go to http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/query-dsl.html):

curl "h-mstr:9200/pdfdocs/_search?pretty=true" -d '{ 
"fields": [“path”, "title"], 
“size”: 25,
"query": { "query_string": { "query": "python" }}}'

If all has gone well, you should get the same answers for the queries—whether you use the docs or pdfdocs indices.

Previous articleTen Lesser Known Commands for Linux Users
Next articleDecrypt https Traffic with Wireshark
The author works as a consultant. Prior to consulting, Anil was a professor at Padre Conceicao College of Engineering (PCCE) in Goa, managed IT and imaging solutions for Phil Corporation Limited (Goa), supported domestic customers for Tata Burroughs/TIL, and was a researcher with IIT-K and the Indian Institute of Geomagnetism (Mumbai).


Please enter your comment!
Please enter your name here