利用mapreduce批量读写hbase数据

  package com.mr.test;

  import Java.io.IOException;

  import java.util.Iterator;

  import org.apache.hadoop.conf.Configuration;

  import org.apache.hadoop.fs.Path;

  import org.apache.hadoop.hbase.HBaseConfiguration;

  import org.apache.hadoop.hbase.KeyValue;

  import org.apache.hadoop.hbase.client.Result;

  import org.apache.hadoop.hbase.client.Scan;

  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

  import org.apache.hadoop.hbase.mapreduce.TableMapper;

  import org.apache.hadoop.hbase.util.Bytes;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.LongWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapred.MapReduceBase;

  import org.apache.hadoop.mapred.OutputCollector;

  import org.apache.hadoop.mapred.Reducer;

  import org.apache.hadoop.mapred.Reporter;

  import org.apache.hadoop.mapreduce.Job;

  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

  import org.apache.log4j.Logger;

  public class MRHbase {

  private static Logger log = Logger.getLogger(MRHbase.class);

  public static String family = "charactor";

  public static String col = "hobby";

  public static class HMap extends TableMapper {

  @Override

  protected void map(ImmutableBytesWritable key, Result value,

  Context context) throws IOException, InterruptedException {

  // KeyValue kv = value.getColumnLatest(family.getBytes(),

  // col.getBytes());

  // context.write(new Text(Bytes.toString(kv.getKey())),

  // new Text(Bytes.toString(kv.getValue())));

  byte[] v = value.getValue(family.getBytes(), col.getBytes());

  byte[] r = value.getRow();

  context.write(new Text(Bytes.toString(v)), new Text(Bytes.toString(r)));

  }

  }

  public static class Reduce extends MapReduceBase implements

  Reducer {

  public void reduce(Text key, Iterator values,

  OutputCollector output, Reporter reporter)

  throws IOException {

  while (values.hasNext()) {

  output.collect(key, values.next());

  }

  }

  }

  public static void main(String[] args) {

  Configuration conf = HBaseConfiguration.create();

  try {

  Job job = new Job(conf, "hbase test");

  job.setJarByClass(MRHbase.class);

  Scan scan = new Scan();

  scan.addColumn(family.getBytes(), col.getBytes());

  TableMapReduceUtil.initTableMapperJob("person", scan, HMap.class,

  Text.class, Text.class, job);

  job.setOutputFormatClass(TextOutputFormat.class);

  FileOutputFormat.setOutputPath(job, new Path(args[0]));

  job.waitForCompletion(true);

  } catch (Exception e) {

  // TODO Auto-generated catch block

  e.printStackTrace();

  }

  }

  }

  注:要把zookeeper.jar添加到hadoop/lib目录下,master&slaves

  //load ntf_data to hbase

  package com.ntf.data;

  import java.io.IOException;

  import java.util.Iterator;

  import org.apache.hadoop.conf.Configuration;

  import org.apache.hadoop.fs.Path;

  import org.apache.hadoop.hbase.HBaseConfiguration;

  import org.apache.hadoop.hbase.HColumnDescriptor;

  import org.apache.hadoop.hbase.HTableDescriptor;

  import org.apache.hadoop.hbase.client.HBaseAdmin;

  import org.apache.hadoop.hbase.client.Put;

  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

  import org.apache.hadoop.hbase.mapreduce.TableReducer;

  import org.apache.hadoop.hbase.util.Bytes;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.InputSplit;

  import org.apache.hadoop.mapreduce.Job;

  import org.apache.hadoop.mapreduce.Mapper;

  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  import org.apache.hadoop.mapreduce.lib.input.FileSplit;

  import org.apache.hadoop.util.GenericOptionsParser;

  public class BulkImportData {

  public static class TokenizerMapper extends

  Mapper {

  public Text _key = new Text();

  public Text _value = http://www.ddvip.com/tech/new Text();

  public void map(Object key, Text value, Context context)

  throws IOException, InterruptedException {

  String[] splits = value.toString().split(",");

  if(splits.length==3){

  InputSplit inputSplit=(InputSplit)context.getInputSplit();

  String filename=((FileSplit)inputSplit).getPath().getName();

  filename = filename.replace("mv_", "");

  filename = filename.replace(".txt", "");

  _key.set(splits[0]+"_"+filename);

  context.write(_key, value);

  }

  }

  }

  public static class IntSumReducer extends

  TableReducer {

  public void reduce(Text key, Iterable values,

  Context context) throws IOException, InterruptedException {

  Iterator itr = values.iterator();

  while(itr.hasNext()){

  Text t = itr.next();

  String[] strs = t.toString().split(",");

  if(strs.length!=3)continue;

  Put put = new Put(key.getBytes());

  put.add(Bytes.toBytes("content"), Bytes.toBytes("score"), Bytes.toBytes(strs[1]));

  put.add(Bytes.toBytes("content"), Bytes.toBytes("date"), Bytes.toBytes(strs[2]));

  context.write(new ImmutableBytesWritable(key.getBytes()), put);

  }

  }

  }

  public static void main(String[] args) throws Exception {

  String tablename = "ntf_data";

  Configuration conf = HBaseConfiguration.create();

  HBaseAdmin admin = new HBaseAdmin(conf);

  if (admin.tableExists(tablename)) {

  admin.disableTable(tablename);

  admin.deleteTable(tablename);

  }

  HTableDescriptor htd = new HTableDescriptor(tablename);

  HColumnDescriptor hcd = new HColumnDescriptor("content");

  htd.addFamily(hcd);

  admin.createTable(htd);

  String[] otherArgs = new GenericOptionsParser(conf, args)

  .getRemainingArgs();

  if (otherArgs.length != 1) {

  system.err

  .println("Usage: wordcount " + otherArgs.length);

  System.exit(2);

  }

  Job job = new Job(conf, "word count");

  job.setMapperClass(TokenizerMapper.class);

  job.setJarByClass(BulkImportData.class);

  job.setNumReduceTasks(5);

  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

  TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class,

  job);

  job.setOutputKeyClass(Text.class);

  job.setOutputValueClass(Text.class);

  System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

  }

 小编推荐:

 HBase完全分布式配置

 linux CentOS 上安装 Couchbase Server

 利用CombineFileInputFormat把netflix data set 导入到Hbase里

 云计算之路-阿里云上:受够了OCS,改用ECS+Couchbase跑缓存


赞 (0) 评论 分享 ()