How To Setup a Hadoop Cluster

In this tutorial I will show you the require steps for setting up a multi-node hadoop cluster using Hadoop Distributed File System (HDFS) in Linux based Operating Systems.

What is Hadoop ?
Apache Hadoop is a software framework that supports data-intensive distributed applications under a free license.[1] It enables applications to work with thousands of nodes and petabytes of data. Hadoop was inspired by Google's MapReduce and Google File System (GFS) papers.

Source :

In this tutorial I will guide you the required steps to setup a multi-node cluster.

To Setup hadoop we need some prerequisites.

1. Download and Config JDK
    Java 1.6.X recommended.

2. Download Hadoop
    Download Hadoop latest stable release in here

All the nodes must have the same version of JDK and hadoop core.

Establish Authentication among nodes

Suppose if a user from node_A wants to login to a remote node_B by using SSH, It will asked the password for node_B for authentication. So it is impossible to enter the password every time the masternode wants to operate the slavenode. To solve this we must adopt public key authentication. Every node will generate a pair of public key and private key, and node_A can login to node_B without password authentication only if node_B has a copy of node_A's public key. In hadoop cluster all the slave nodes must have a copy of master nodes public key.

To do this,
Login each node and run the following command.
ssh-keygen -t rsa
When question asked simply press enter to continue. Then two files "id_rsa" and "" are creates under the /home/username/.ssh/

Now login to master node and run the following command.

  • cat /home/username/.ssh/ >> /home/username/.ssh/authorizes_keys
  • scp /home/username/.ssh/ ip_address_of_slavenode:/home/username/.ssh/
Then login to each slave node and run the following command.
cat /home/username/.ssh/ >> /home/username/.ssh/authorized_keys
Then login back to master node and run to test whether masternode can login to slave node without password.
ssh ip_address_of_slave_node

In this step we have to install hadoop in each slave node. Download the hadoop and exact to a directory and set the HADOOP_INSTALL variable.

Hadoop Configuration

Set the JAVA_HOME and HADOOP_INSTALL system variables.

Modify "" in HADOOP_HOME/conf/. Delete the beginning '#' in The Java Implementation to use and fill the appropriate path.

Modify hdfs-site.xml , mapred-site.xml , core-site.xml as below.
Download Link :

Start Hadoop

First you have to format the namenode. To do this
hadoop namenode -format
Then Start the cluster

Some Useful links.


Hbase is a distributed column-oriented database built on top of Hadoop Distributed file System. Hbase is the hadoop application to use when you require real-time read and write random-access to very large datasets.

more info :


Download stable release from above page and extract it to somewhere on your file system.
For linux users
                 tar xzf hbase-x.y.z.tar.gz

First You have to set the JAVA_HOME environment variable .

After that set the HBASE_INSTALL environment variable.
For linux users you can edit the bash_profile or simply write the line in your console.
                     export HBASE_INSTALL=/home/hbase/hbase-x.y.z
                     export PATH=$PATH:$HBASE_INSTALL/bin 

To verify your installation check with hbase command. It will generate list of options.

To start a tempory instance of hbase go to your hbase bin dirctory and type

Now it will start the hbase instance.

To Administer your hbase instance, launch the Hbase shell by typing hbase shell in console. This will bring a interpreter that has some hbase specific commands added to it.
For help just type help in console and you can see the list of help commands.

To  create a table name test with a single column family data
 create 'test','data'
and press enter.

To list down the tables type list command.

To insert data to table
put 'test','row1','data:1','val1'

To get the description of the table.
scan 'test'
 To drop the table first you have to disable it and drop it.
disable 'test'
drop 'test'

Shutdown the hbase instance

Using Java we can write a programme to retrieve, insert, delete, update data to hbase tables.

First Create the tables as below.
create 'DemoInsert', 'details'

To insert data create class call InsertData and 
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class InsertData {

public static void main(String[] args) throws Exception {

HBaseConfiguration hbaseConfig = new HBaseConfiguration();
HTable htable = new HTable(hbaseConfig, "DemoInsert");
htable.setWriteBufferSize(1024 * 1024 * 12);

int totalRecords = 100000;
int maxID = totalRecords / 1000;
Random rand = new Random();
System.out.println("importing " + totalRecords + " records ....");
for (int i=0; i < totalRecords; i++) {
int userID = rand.nextInt(maxID) + 1;
byte [] rowkey = Bytes.add(Bytes.toBytes(userID), Bytes.toBytes(i));
String randomPage =rand.nextInt()*1000 ;
Put put = new Put(rowkey);
put.add(Bytes.toBytes("details"), Bytes.toBytes("page"), Bytes.toBytes(randomPage)); htable.put(put);
} }

This is a basic example to show how to insert data to table in java.

You can find more smiler example in :


World Smallest Projector Up to date...

iGo, introduces the UP-2020 palm size pocket projector based on the Digital light processing System.

 The Projector features 854x480 native resolution and able to project viewable screen up to 70 inches. 
It also supports built in media playback function like MP4,JPG and BMP files and has micro-SD card slot, USB port and built in speaker.


What is Zookeeper ?

Zookeeper is use to build general distributed application so its a coordination service.
In this tutorials we will look how to build a sample distributed application.
First look at the definition
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them ,which make them brittle in the presence of change and difficult to manage. Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed.

Source :

In Zookeeper we can handle partial failure. Partial failure means something like this.
Suppose we send a message across the network to one node to another node.If the network fails sender does not know whether the receiver get the message. So the only way to find this is reconnect to the receiver and ask it. This is called the partial failure.

Zookeeper Characteristics
             Zookeeper has some few valuable operations, such as ordering and notifications.
             Can build large data structures and protocols.
3.Highly Available
             Runs on a collection of machines and designed to be highly available. 
4.Loosely  coupled interaction
             Zookeeper participants do not need to know about one another.

Installing Zookeeper

Here are the steps to install zookeeeper.
1. To install zookeeper require java 6 or later version. You can find the java latest version in 

2. After installing the jdk set the path.
   Windows : Set path in environment variable
   Unix : Open a terminal type vi ~/.bash_profile -> export JAVA_HOME=/path   /to/java/dir -> export PATH=$PATH:$JAVA_HOME/bin

3. Download Zookeeper

4.Unzip it and set zookeeper variable

5.Before running zookeeper service you have to make the zoo.cfg file inside the /conf/zoo.cfg as

6. Now all are ready to start the Zookeeper server start

7. zkServer status is use to check zookeeper is running or not

Group Membership

Create a Group

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class CreateGroup implements Watcher {
private static final int SESSION_TIMEOUT = 5000;
private ZooKeeper zk;
private CountDownLatch connectedSignal = new CountDownLatch(1);
public void connect(String hosts) throws IOException, InterruptedException {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
public void process(WatchedEvent event) { // Watcher interface
if (event.getState() == KeeperState.SyncConnected) {
public void create(String groupName) throws KeeperException,
InterruptedException {
String path = "/" + groupName;
String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,
System.out.println("Created " + createdPath);
public void close() throws InterruptedException {
public static void main(String[] args) throws Exception {
CreateGroup createGroup = new CreateGroup();

Zookeeper API :

Join The Group

public class ConnectionWatcher implements Watcher {
private static final int SESSION_TIMEOUT = 5000;
protected ZooKeeper zk;
private CountDownLatch connectedSignal = new CountDownLatch(1);
public void connect(String hosts) throws IOException, InterruptedException {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
public void close() throws InterruptedException {

public class JoinGroup extends ConnectionWatcher {
public void join(String groupName, String memberName) throws KeeperException,
InterruptedException {
String path = "/" + groupName + "/" + memberName;
String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,
System.out.println("Created " + createdPath);
public static void main(String[] args) throws Exception {
JoinGroup joinGroup = new JoinGroup();
joinGroup.join(args[1], args[2]);
// stay alive until process is killed or thread is interrupted

Retrive members in a group

public class ListGroup extends ConnectionWatcher {
public void list(String groupName) throws KeeperException,
InterruptedException {
String path = "/" + groupName;
try {
List children = zk.getChildren(path, false);
if (children.isEmpty()) {
System.out.printf("No members in group %s\n", groupName);
for (String child : children) {
} catch (KeeperException.NoNodeException e) {
System.out.printf("Group %s does not exist\n", groupName);
public static void main(String[] args) throws Exception {
ListGroup listGroup = new ListGroup();

Znodes can be two types. Persistent or Ephemeral. This type is set at creation time and may not be change later. When the client's session ends or client exit the application Ephemeral node will deleted. Ephemeral nodes not have children. But the persistent node will not be deleted when client's session ends or client exit the application.


Watches allow client to get notification when a node changes in a some way. Watches are set by a zookeeper service.

Useful Links


What is Failover and Clustering ?

First we go for the definition of Failover.
Failover is the capability to switch over automatically to a redundant or standby computer server, system, or network upon the failure or abnormal termination of the previously active application,server, system, or network.

source :
In simply failover is something like this. When client request something from the server but the particular server is fail the request automatically goes to another server and respond is goes to the client. But in backend the main server is check when the fail server is recovering. After the server is recover the client request automatically redirect to that server.

The main advantage is availability and the high degree of reliability.

There are different types of failovers.

source :

There are some failovers those are not automatically it require some manual intervention.
In Clod failover must do in manually.
Warm failover means it transact to other server automatically but the current transaction may abort because of the data synchronization failure.
Hot failover is the most reliable failover. Because it transact to another server in automatically and 100% synchronize the data. So there is no failure between the client and the sever.

In practically failover is very important to all of us. Suppose if we request a streaming video, streaming audio,VoIP or downloading a file from the internet. If the particular server is fail the connection between the server and the client is abort. So all the operation is abort. But if there is a failover capability it will automatically redirect the request to another server and continue the operation.  
So that's all about failover.

Now we will look into Clustering.
Here is the definition.
A computer cluster is a group of linked computers, working together closely thus in many respects forming a single computer. The components of a cluster are commonly, but not always, connected to each other through fast local area networks.

source :

Clustering means group of computers word together to provide some enterprise service. 
Clusters can comprise the redundant and failover capable.

All clusters provide two main advantages.
Scalability and High Availability.
Scalability means application support increasing number of users. Because clusters provide extra capacity by adding new servers.

Cluster Categorization
High Availability Clusters
Load Balancing Clusters
Compute Clusters

There are two types two Clusters
1.Shared nothing
Every application server has a files. Updating and maintenance files is very hard. 
2.Shared disk
There is storage disk that contains the files and application servers use those files. So update and maintenance files is easy.