• <th id="kadjp"></th>

            1. ?
                開發(fā)技術(shù) / Technology

                MapReduce從HBase讀寫數(shù)據(jù)簡(jiǎn)單示例

                日期:2015年1月29日  作者:zhjw  來(lái)源:互聯(lián)網(wǎng)    點(diǎn)擊:861

                就用單詞計(jì)數(shù)這個(gè)例子,需要統(tǒng)計(jì)的單詞存在HBase中的word表,MapReduce執(zhí)行的時(shí)候從word表讀取數(shù)據(jù),統(tǒng)計(jì)結(jié)束后將結(jié)果寫入到HBase的stat表中。

                 

                  1、在eclipse中建立一個(gè)hadoop項(xiàng)目,然后從hbase的發(fā)布包中引入如下jar

                hbase-0.94.13.jar
                zookeeper-3.4.5.jar
                protobuf-java-2.4.0a.jar
                guava-11.0.2.jar

                 

                  2、在HBase中建立相關(guān)的表和初始化測(cè)試數(shù)據(jù)                                         

                復(fù)制代碼
                package cn.luxh.app;
                
                import java.io.IOException;
                import java.util.ArrayList;
                import java.util.List;
                
                import org.apache.hadoop.hbase.client.HTable;
                import org.apache.hadoop.hbase.client.Put;
                public class InitData {
                    
                    public static void main(String[] args) throws IOException {
                        //創(chuàng)建一個(gè)word表,只有一個(gè)列族content
                        HBaseUtil.createTable("word","content");
                        
                        //獲取word表
                        HTable htable = HBaseUtil.getHTable("word");
                        htable.setAutoFlush(false);
                        
                        //創(chuàng)建測(cè)試數(shù)據(jù)
                       List<Put> puts = new ArrayList<Put>();
                       
                       Put put1 = HBaseUtil.getPut("1","content",null,"The Apache Hadoop software library is a framework");
                       Put put2 = HBaseUtil.getPut("2","content",null,"The common utilities that support the other Hadoop modules");
                       Put put3 = HBaseUtil.getPut("3","content",null,"Hadoop by reading the documentation");
                       Put put4 = HBaseUtil.getPut("4","content",null,"Hadoop from the release page");
                       Put put5 = HBaseUtil.getPut("5","content",null,"Hadoop on the mailing list");
                       
                       puts.add(put1);
                       puts.add(put2);
                       puts.add(put3);
                       puts.add(put4);
                       puts.add(put5);
                       
                       //提交測(cè)試數(shù)據(jù)
                      htable.put(puts);
                      htable.flushCommits();
                      htable.close();
                        //創(chuàng)建stat表,只有一個(gè)列祖result
                      HBaseUtil.createTable("stat","result");
                    }
                }
                復(fù)制代碼

                  1)代碼中的HBaseUtil工具類參考:http://www.cnblogs.com/luxh/archive/2013/04/16/3025172.html

                  2)執(zhí)行上面的程序后,查看HBase中是否已創(chuàng)建成功

                hbase(main):012:0> list
                TABLE 
                stat 
                word 
                2 row(s) in 0.4730 seconds

                  3)查看word中的測(cè)試數(shù)據(jù)

                復(fù)制代碼
                hbase(main):005:0> scan 'word'
                ROW                    COLUMN+CELL                                                     
                 1                     column=content:, timestamp=1385447676510, value=The Apache Hadoo
                                       p software library is a framework                               
                 2                     column=content:, timestamp=1385447676510, value=The common utili
                                       ties that support the other Hadoop modules                      
                 3                     column=content:, timestamp=1385447676510, value=Hadoop by readin
                                       g the documentation                                             
                 4                     column=content:, timestamp=1385447676510, value=Hadoop from the 
                                       release page                                                    
                 5                     column=content:, timestamp=1385447676510, value=Hadoop on the ma
                                       iling list                                                      
                5 row(s) in 5.7810 seconds
                復(fù)制代碼

                 

                  3、MapReduce程序                                                                          

                復(fù)制代碼
                package cn.luxh.app;
                
                import java.io.IOException;
                import java.util.StringTokenizer;
                
                import org.apache.hadoop.conf.Configuration;
                import org.apache.hadoop.hbase.HBaseConfiguration;
                import org.apache.hadoop.hbase.client.Put;
                import org.apache.hadoop.hbase.client.Result;
                import org.apache.hadoop.hbase.client.Scan;
                import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
                import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
                import org.apache.hadoop.hbase.mapreduce.TableMapper;
                import org.apache.hadoop.hbase.mapreduce.TableReducer;
                import org.apache.hadoop.hbase.util.Bytes;
                import org.apache.hadoop.io.IntWritable;
                import org.apache.hadoop.io.Text;
                import org.apache.hadoop.mapreduce.Job;
                public class WordStat {
                    
                    /**
                     * TableMapper<Text,IntWritable>  Text:輸出的key類型,IntWritable:輸出的value類型
                     */
                    public static class MyMapper extends TableMapper<Text,IntWritable>{
                        
                        private static IntWritable one = new IntWritable(1);
                        private static Text word = new Text();
                        
                        @Override
                        protected void map(ImmutableBytesWritable key, Result value,
                                Context context)
                                throws IOException, InterruptedException {
                            //表里面只有一個(gè)列族,所以我就直接獲取每一行的值
                            String words = Bytes.toString(value.list().get(0).getValue());
                            StringTokenizer st = new StringTokenizer(words); 
                            while (st.hasMoreTokens()) {
                                 String s = st.nextToken();
                                 word.set(s);
                                 context.write(word, one);
                            }
                        }
                    }
                    
                    /**
                     * TableReducer<Text,IntWritable>  Text:輸入的key類型,IntWritable:輸入的value類型,ImmutableBytesWritable:輸出類型
                     */
                    public static class MyReducer extends TableReducer<Text,IntWritable,ImmutableBytesWritable>{
                        
                        @Override
                        protected void reduce(Text key, Iterable<IntWritable> values,
                                Context context)
                                throws IOException, InterruptedException {
                            
                            int sum = 0;
                            for(IntWritable val:values) {
                                sum+=val.get();
                            }
                            //添加一行記錄,每一個(gè)單詞作為行鍵
                            Put put = new Put(Bytes.toBytes(key.toString()));
                            //在列族result中添加一個(gè)標(biāo)識(shí)符num,賦值為每個(gè)單詞出現(xiàn)的次數(shù)
                            //String.valueOf(sum)先將數(shù)字轉(zhuǎn)化為字符串,否則存到數(shù)據(jù)庫(kù)后會(huì)變成x00x00x00x這種形式
                            //然后再轉(zhuǎn)二進(jìn)制存到hbase。
                            put.add(Bytes.toBytes("result"), Bytes.toBytes("num"), Bytes.toBytes(String.valueOf(sum)));
                            context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put);
                        }
                    }
                    
                    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                        
                        Configuration conf = HBaseConfiguration.create();
                        Job job = new Job(conf,"wordstat");
                        job.setJarByClass(Blog.class);
                        
                        
                        Scan scan = new Scan();
                        //指定要查詢的列族
                        scan.addColumn(Bytes.toBytes("content"),null);
                        //指定Mapper讀取的表為word
                        TableMapReduceUtil.initTableMapperJob("word", scan, MyMapper.class, Text.class, IntWritable.class, job);
                     //指定Reducer寫入的表為stat
                        TableMapReduceUtil.initTableReducerJob("stat", MyReducer.class, job);
                        System.exit(job.waitForCompletion(true)?0:1);
                    }
                }
                復(fù)制代碼

                  等待程序執(zhí)行結(jié)束,查看統(tǒng)計(jì)表stat

                復(fù)制代碼
                hbase(main):014:0> scan 'stat'
                ROW                    COLUMN+CELL                                                     
                 Apache                column=result:num, timestamp=1385449492309, value=1             
                 Hadoop                column=result:num, timestamp=1385449492309, value=5             
                 The                   column=result:num, timestamp=1385449492309, value=2             
                 a                     column=result:num, timestamp=1385449492309, value=1             
                 by                    column=result:num, timestamp=1385449492309, value=1             
                 common                column=result:num, timestamp=1385449492309, value=1             
                 documentation         column=result:num, timestamp=1385449492309, value=1             
                 framework             column=result:num, timestamp=1385449492309, value=1             
                 from                  column=result:num, timestamp=1385449492309, value=1             
                 is                    column=result:num, timestamp=1385449492309, value=1             
                 library               column=result:num, timestamp=1385449492309, value=1             
                 list                  column=result:num, timestamp=1385449492309, value=1             
                 mailing               column=result:num, timestamp=1385449492309, value=1             
                 modules               column=result:num, timestamp=1385449492309, value=1             
                 on                    column=result:num, timestamp=1385449492309, value=1             
                 other                 column=result:num, timestamp=1385449492309, value=1             
                 page                  column=result:num, timestamp=1385449492309, value=1             
                 reading               column=result:num, timestamp=1385449492309, value=1             
                 release               column=result:num, timestamp=1385449492309, value=1             
                 software              column=result:num, timestamp=1385449492309, value=1             
                 support               column=result:num, timestamp=1385449492309, value=1             
                 that                  column=result:num, timestamp=1385449492309, value=1             
                 the                   column=result:num, timestamp=1385449492309, value=4             
                 utilities             column=result:num, timestamp=1385449492309, value=1             
                24 row(s) in 0.7970 seconds
                復(fù)制代碼

                 

                国产欧美在线观看,国产精品白浆冒出视频,91精品国产91热久久久福利,大伊香蕉在线精品视频97 国产精品美女久久福利 国产精品黄的免费观看
              • <th id="kadjp"></th>