用MapReduce读HBase写MongoDB样例
1、版本信息:
Hadoop 版本:2.7.1
HBase 版本:1.2.1
MongDB 版本:3.4.14
2、HBase 表名及数据:
3、Maven 依赖:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> <version>3.4.3</version> </dependency> <dependency> <groupId>org.mongodb.mongo-hadoop</groupId> <artifactId>mongo-hadoop-core</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.1.1</version> </dependency>
4、MapReduce 程序:
package mapreduce;import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.MongoConfigUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;import java.io.IOException;
import java.util.UUID;public class HBaseToMongo {
public static void main(String[] args) throws Exception {
Long st = System.currentTimeMillis();Configuration config </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Configuration(); config.set(</span>"dfs.socket.timeout", "180000"<span style="color: rgba(0, 0, 0, 1)">); config.set(</span>"hbase.zookeeper.property.clientPort", "2181"<span style="color: rgba(0, 0, 0, 1)">); config.set(</span>"hbase.zookeeper.quorum", "10.11.2.4,10.11.2.5,10.11.2.6"<span style="color: rgba(0, 0, 0, 1)">); </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> The format of the URI is: </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> mongodb:</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]</span> String uri = "mongodb://10.11.2.15:27017,10.11.2.16:27017,10.11.2.17:27017/postal.qch_test"<span style="color: rgba(0, 0, 0, 1)">; MongoConfigUtil.setOutputURI(config, uri); Job job </span>=<span style="color: rgba(0, 0, 0, 1)"> Job.getInstance(config); job.setJobName(</span>"HBaseToMongo"<span style="color: rgba(0, 0, 0, 1)">); job.setJarByClass(FilterMapper.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">); job.setOutputFormatClass(MongoOutputFormat.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">); job.setNumReduceTasks(</span>0<span style="color: rgba(0, 0, 0, 1)">); TableMapReduceUtil.initTableMapperJob(</span>"qch_t1", <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Scan(), FilterMapper.</span><span style="color: rgba(0, 0, 255, 1)">class</span>, ImmutableBytesWritable.<span style="color: rgba(0, 0, 255, 1)">class</span>, BSONWritable.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">, job); System.exit( job.waitForCompletion( </span><span style="color: rgba(0, 0, 255, 1)">true</span> ) ? 0 : 1<span style="color: rgba(0, 0, 0, 1)"> ); System.out.println(</span>"HBaseToMongo:" + (System.currentTimeMillis() -<span style="color: rgba(0, 0, 0, 1)"> st)); } </span><span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span> FilterMapper <span style="color: rgba(0, 0, 255, 1)">extends</span> TableMapper<Text, BSONWritable><span style="color: rgba(0, 0, 0, 1)"> { @Override </span><span style="color: rgba(0, 0, 255, 1)">protected</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> map(ImmutableBytesWritable key, Result value, Context context) </span><span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> IOException, InterruptedException { String col </span>= getStrByByte(value.getValue("if".getBytes(), "col1"<span style="color: rgba(0, 0, 0, 1)">.getBytes())); BSONWritable bsonWritable </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> BSONWritable(); BasicDBObject doc </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> BasicDBObject(); doc.put(</span>"_id"<span style="color: rgba(0, 0, 0, 1)">, UUID.randomUUID().toString()); doc.put(</span>"col"<span style="color: rgba(0, 0, 0, 1)">, col); bsonWritable.setDoc(doc); context.write(</span><span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Text(key.toString()), bsonWritable); } </span><span style="color: rgba(0, 0, 255, 1)">private</span> String getStrByByte(<span style="color: rgba(0, 0, 255, 1)">byte</span><span style="color: rgba(0, 0, 0, 1)">[] by) { String str </span>= ""<span style="color: rgba(0, 0, 0, 1)">; </span><span style="color: rgba(0, 0, 255, 1)">if</span> (by != <span style="color: rgba(0, 0, 255, 1)">null</span> && by.length > 0<span style="color: rgba(0, 0, 0, 1)">) { str </span>=<span style="color: rgba(0, 0, 0, 1)"> Bytes.toString(by); } </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> str; } }
}
5、运行结果: