用MapReduce读HBase写MongoDB样例

1、版本信息:

Hadoop 版本:2.7.1

HBase 版本:1.2.1

MongDB 版本:3.4.14

 

2、HBase 表名及数据:

 

3、Maven 依赖:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.1</version>
</dependency>
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongo-java-driver</artifactId>
    <version>3.4.3</version>
</dependency>
<dependency>
    <groupId>org.mongodb.mongo-hadoop</groupId>
    <artifactId>mongo-hadoop-core</artifactId>
    <version>2.0.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-common</artifactId>
    <version>1.1.1</version>
</dependency>

 

4、MapReduce 程序:

package mapreduce;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.MongoConfigUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;
import java.util.UUID;

public class HBaseToMongo {
public static void main(String[] args) throws Exception {
Long st
= System.currentTimeMillis();

    Configuration config </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Configuration();
    config.set(</span>"dfs.socket.timeout", "180000"<span style="color: rgba(0, 0, 0, 1)">);
    config.set(</span>"hbase.zookeeper.property.clientPort", "2181"<span style="color: rgba(0, 0, 0, 1)">);
    config.set(</span>"hbase.zookeeper.quorum", "10.11.2.4,10.11.2.5,10.11.2.6"<span style="color: rgba(0, 0, 0, 1)">);

    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> The format of the URI is:
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> mongodb:</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]</span>
    String uri = "mongodb://10.11.2.15:27017,10.11.2.16:27017,10.11.2.17:27017/postal.qch_test"<span style="color: rgba(0, 0, 0, 1)">;
    MongoConfigUtil.setOutputURI(config, uri);

    Job job </span>=<span style="color: rgba(0, 0, 0, 1)"> Job.getInstance(config);
    job.setJobName(</span>"HBaseToMongo"<span style="color: rgba(0, 0, 0, 1)">);
    job.setJarByClass(FilterMapper.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">);
    job.setOutputFormatClass(MongoOutputFormat.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">);
    job.setNumReduceTasks(</span>0<span style="color: rgba(0, 0, 0, 1)">);
    TableMapReduceUtil.initTableMapperJob(</span>"qch_t1", <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Scan(),
            FilterMapper.</span><span style="color: rgba(0, 0, 255, 1)">class</span>, ImmutableBytesWritable.<span style="color: rgba(0, 0, 255, 1)">class</span>, BSONWritable.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">, job);

    System.exit( job.waitForCompletion( </span><span style="color: rgba(0, 0, 255, 1)">true</span> ) ? 0 : 1<span style="color: rgba(0, 0, 0, 1)"> );
    System.out.println(</span>"HBaseToMongo:" + (System.currentTimeMillis() -<span style="color: rgba(0, 0, 0, 1)"> st));
}

</span><span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span> FilterMapper <span style="color: rgba(0, 0, 255, 1)">extends</span> TableMapper&lt;Text, BSONWritable&gt;<span style="color: rgba(0, 0, 0, 1)"> {
    @Override
    </span><span style="color: rgba(0, 0, 255, 1)">protected</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> map(ImmutableBytesWritable key,
                       Result value, Context context) </span><span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> IOException, InterruptedException {
        String col </span>= getStrByByte(value.getValue("if".getBytes(), "col1"<span style="color: rgba(0, 0, 0, 1)">.getBytes()));
        BSONWritable bsonWritable </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> BSONWritable();
        BasicDBObject doc </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> BasicDBObject();
        doc.put(</span>"_id"<span style="color: rgba(0, 0, 0, 1)">, UUID.randomUUID().toString());
        doc.put(</span>"col"<span style="color: rgba(0, 0, 0, 1)">, col);
        bsonWritable.setDoc(doc);
        context.write(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Text(key.toString()), bsonWritable);
    }

    </span><span style="color: rgba(0, 0, 255, 1)">private</span> String getStrByByte(<span style="color: rgba(0, 0, 255, 1)">byte</span><span style="color: rgba(0, 0, 0, 1)">[] by) {
        String str </span>= ""<span style="color: rgba(0, 0, 0, 1)">;
        </span><span style="color: rgba(0, 0, 255, 1)">if</span> (by != <span style="color: rgba(0, 0, 255, 1)">null</span> &amp;&amp; by.length &gt; 0<span style="color: rgba(0, 0, 0, 1)">) {
            str </span>=<span style="color: rgba(0, 0, 0, 1)"> Bytes.toString(by);
        }
        </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> str;
    }
}

}

 

5、运行结果:

 

6、程序源码:

https://github.com/quchunhui/tod-train-1.0/blob/master/hadoop/src/main/java/mapreduce/HBaseToMongo.java