herb

我要靠谱→hello world!
随笔 - 14, 文章 - 0, 评论 - 0, 引用 - 0
数据加载中……

hadoop二次排序中对组合键,分区,分组的一点应用

    这两天看Hadoop的排序方面的问题,看到了下面这篇文章,觉得挺好,结合自己已经了解的知识,将排序、对组合键的使用等方面的知识有个更清楚的认识,将文章摘下来放在这儿,有空再看看。
      摘自http://www.fuzhijie.me/?p=34
     我想涉及到文件的Join操作应该都要使用到二次排序吧,之前我用字符串拼接的方法显得太不专业了,本来在reduce过程中是不需要保存这些数据的,遍历一次便可以将记录全部collect好。Hadoop 0.20包里面有一个SecondarySort的例子程序,结合公司牛人写的一个ppt,终于搞明白了。呵呵,刚好也用上了,所以总结一下。

Hadoop提供了几种默认类型如果Text,LongWritable等,它们都实现了WritableComparable接口,因此具有比较和序列化的能力。要实现二次排序,我想大概有两种办法。第一种是使用自定义类型,该类型实现WritableComparable接口,给原始数据添加一个权值,通过权值来改变排序的结果;第二种方法是给记录的key做一些不同的标记,比如有些在最前面加上一个’+'前缀,另一些前面加上’-'前缀,通过这些前缀来决定排序的规则。这两种办法都要实现自己的分区函数和分组函数,因为key已经被改变了,显然第一种方法感觉要专业一点,但是第二种方法感觉要高效一些,因为不需要类来封装。

我使用了第一种方法来实现二次排序,需求是做一个一对多的文件连接。来一个形象的比喻,比如一个人去商场买东西,他推着购物车,每个商品都有自己唯一的编号。因此数据有两部分组成:
1、用户对商品编号,这是一对多的。数据保存在base.dat文件中。
2、商品编号对商品的信息,其中包括商品的价格,这是一对一的。数据保存在spu.dat文件中。

最后要生成用户对应商品价格记录,这样就可以统计出用户购买商品的总价格。这两个文件通过商品的编号连接。

程序很简单,自己定义了一个UserKey类,在这个类封装了原始数据,另外添加一个权重属性,排序时需要将商品对商品价格排到最前面去。注意这个compareTo方法返回值的涵义,返回-1表示自己要排在比较的记录之前,返回1表示自己要排在比较的记录之前,之前我一直以为返回1表示大于的意思,结果程序就出现了奇怪的现象。Hadoop没有使用Java默认的序列化方式,用户必须负责自定义类型的序列化接口的实现,我感觉下面的程序写得不够专业,这是我比较惯用的序列化手段,如果使用SequenceFileOutputFormat保存输出结果,可以看到对象序列化后的数据的保存方式,不过Java虚拟机统一了数据格式,因此不能使用C/C++的思维来观察这些数据,但是也差不多了。

全部源代码如下:

package taobao; 
import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.util.*; 

import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.mapred.lib.InverseMapper; 
import org.apache.hadoop.mapred.lib.MultipleOutputs; 
import org.apache.hadoop.util.*; 

public class CK { 
     public static class Key implements WritableComparable<Key> { 
         public String id = ""; 
         public short weight; 

         public Key() { 
         } 

         public Key(String i, short j) { 
             id = i; 
         } 
         @Override
         public void readFields(DataInput in) throws IOException { 
             // 先写字符串的长度信息 
             int length = in.readInt(); 
             byte[] buf = new byte[length];     
             in.readFully(buf, 0, length);    

             // 得到id号 
             id = new String(buf);     

             // 得到权值 
             weight = in.readShort(); 
         } 

         @Override
         public void write(DataOutput out) throws IOException { 
             System.out.println("in write");
            
             String str = id.toString(); 
             int length = str.length(); 
             byte[] buf = str.getBytes(); 

             // 先写字符串长度 
             // WritableUtils.writeVInt(out, length); 
             out.writeInt(length); 

             // 再写字符串数据 
             out.write(buf, 0, length);     

             // 接着是权值 
             out.writeShort(weight); 
         } 

         @Override
         public int hashCode() { 
             return id.hashCode(); 
         } 

         @Override
         public String toString() { 
             return id; 
         } 

         @Override
         public boolean equals(Object right) { 
             System.out.println("in equals"); 
             // 只要id相等就认为两个key相等 
             if (right instanceof Key) { 
                 Key r = (Key) right; 
                 return r.id.equals(id); 
             } else { 
                 return false; 
             } 
         } 
         @Override
         public int compareTo(Key k) { 
             System.out.println("in compareTo, key=" + k.toString()); 
             // 先比较value id 
             int cmp = id.compareTo(k.id); 
             if (cmp != 0) { 
                 return cmp; 
             } 
             // 如果value id相等,再比较权值 
             if (weight > k.weight) 
                 return -1; 
             else if (weight < k.weight) 
                 return 1; 
             else
                 return 0; 
         } 
     } 
    
    public static class Map extends MapReduceBase implements
             Mapper<LongWritable, Text, Key, Text> { 
         public void map(LongWritable l, Text value, 
                 OutputCollector<Key, Text> output, Reporter reporter) 
                 throws IOException { 
             String line = value.toString(); 
             String[] pair = line.split("\t"); 

             if (pair.length == 3) { 
                 //key->商品编号,value->购买者 
                 Key k = new Key(pair[1], (short) 0);     
                 output.collect(k, new Text(pair[0])); 
             } else { 
                 //key->商品编号,value->商品价格 
                 Key k = new Key(pair[0], (short) 1);    
                 output.collect(k, new Text(pair[1])); 
             } 
         } 
     } 
     public static class Reduce extends MapReduceBase implements
             Reducer<Key, Text, Text, Text> { 
         public void reduce(Key key, Iterator<Text> values, 
                 OutputCollector<Text, Text> output, Reporter reporter) 
                 throws IOException { 

             // 此处一定要new出一个新对象来,否则结果不会正确 
             // 这都是Java引用导致的问题 
             Text second =  new Text(values.next()); 
             while (values.hasNext()) { 
                 output.collect(values.next(), second); 
             } 
         } 
     }     

     public static class FirstGroupingComparator implements RawComparator<Key> { 
         @Override
         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
             return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, 
                     b2, s2, Integer.SIZE / 8); 
         } 

         // 完全根据id来分区 
         @Override
         public int compare(Key o1, Key o2) { 
             System.out.println("in group compare"); 
             String l = o1.id.toString(); 
             String r = o2.id.toString(); 
             int res = l.compareTo(r); 
             System.out.println("res=" + res); 
             return res; 
         } 
     } 


     public static class FirstPartitioner implements Partitioner<Key, Text> { 

         @Override
         public void configure(JobConf job) { 
             // TODO Auto-generated method stub 
         } 

         @Override
         public int getPartition(Key key, Text value, int numPartitions) { 
             System.out.println("in FirstPartitioner"); 
             return Math.abs(key.id.hashCode()) % numPartitions; 
         } 
     } 
    
     public static void main(String[] args) throws Exception { 
         JobConf conf = new JobConf(CK.class); 
         conf.setJobName("Composite key"); 

         // 设置Map输出的key和value的类型 
         conf.setMapOutputKeyClass(Key.class); 
         conf.setMapOutputValueClass(Text.class);     

         // 设置Reduce输出的key和value的类型 
         conf.setOutputKeyClass(Text.class); 
         conf.setOutputValueClass(Text.class); 

         // 设置Mapper和Reducer 
         conf.setMapperClass(Map.class); 
         conf.setReducerClass(Reduce.class); 

         // 设置group函数和分区函数 
         conf.setOutputValueGroupingComparator(FirstGroupingComparator.class); 
         conf.setPartitionerClass(FirstPartitioner.class);     

         conf.setInputFormat(TextInputFormat.class); 
         conf.setOutputFormat(TextOutputFormat.class); 

         // conf.setOutputFormat(SequenceFileOutputFormat.class); 

         // 如果输出目录已经存在,那么先将其删除 
         FileSystem fstm = FileSystem.get(conf); 
         Path outDir = new Path(args[1]); 
         fstm.delete(outDir, true);    

         // 设置输入输出目录 
         FileInputFormat.setInputPaths(conf, new Path(args[0])); 
         FileOutputFormat.setOutputPath(conf, outDir);     

         JobClient.runJob(conf); 
     } 
}

posted on 2010-12-08 11:13 herb 阅读(1763) 评论(0)  编辑 收藏 引用 所属分类: Hadoop


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理