如何在Reducer输出中对逗号分隔的键进行排序? [英] How to sort comma separated keys in Reducer ouput?

查看:96
本文介绍了如何在Reducer输出中对逗号分隔的键进行排序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用MapReduce运行RFM分析程序。 OutputKeyClass是Text.class,我发出逗号分隔的R(新近性),F(频率),M(Monetory)作为Reducer的关键字,其中R = BigInteger,F = Binteger,M = BigDecimal,并且该值也是文本代表Customer_ID。我知道Hadoop根据键排序输出,但是我的最终结果有点奇怪。我希望输出键首先按R排序,然后按F然后按M排序。但由于未知原因,我得到以下输出排序顺序:

  545,1,7652 100000 
545,23,390159.402343750 100001
452,13,132586 100002
452,4,32202 100004
452,1,9310 100007
452,14057 100018
452,3,18970 100021

但我想要以下输出:

  545,23,390159.402343750 100001 
545,1,7652 100000
452,13,132586 100002
452,4,32202 100004
452,3,18970 100021
452,1,9310 100007
452,14057 100018

注意:customer_ID是Map阶段的关键,属于特定Customer_ID的所有RFM值都汇集在一起因此经过大量搜索后,我发现了一些有用的材料,我现在发布的这些材料的编译方式如下所示:

b
$ b


  1. 您必须从您的自定义数据类型开始。由于我有三个逗号分隔的值需要按降序排序,所以我必须在Hadoop中创建一个 TextQuadlet.java 数据类型。我创建一个quadlet的原因是因为该键的第一部分将是自然键,而其余三部分将是R,F,M:

      import java.io. *; 
    import org.apache.hadoop.io。*;
    public class TextQuadlet实现WritableComparable< TextQuadlet> {
    私人字符串customer_id;
    私有长期R;
    私人长F;
    私人双人M;
    public TextQuadlet(){
    }
    public TextQuadlet(String customer_id,long R,long F,double M){
    set(customer_id,R,F,M);
    }
    public void set(String customer_id2,long R2,long F2,double M2){
    this.customer_id = customer_id2;
    this.R = R2;
    this.F = F2;
    this.M = M2;
    }
    public String getCustomer_id(){
    return customer_id;
    }
    public long getR(){
    return R;
    }
    public long getF(){$​​ b $ b return F;
    }
    public double getM(){
    return M;
    }
    @Override
    public void write(DataOutput out)throws IOException {
    out.writeUTF(this.customer_id);
    out.writeLong(this.R);
    out.writeLong(this.F);
    out.writeDouble(this.M);
    }
    @Override
    public void readFields(DataInput in)throws IOException {
    this.customer_id = in.readUTF();
    this.R = in.readLong();
    this.F = in.readLong();
    this.M = in.readDouble();
    }
    //这个哈希码函数非常重要,因为它被这个类的自定义
    //分区器使用。
    @Override
    public int hashCode(){
    return(int)(customer_id.hashCode()* 163 + R + F + M);
    }
    @Override
    public boolean equals(Object o){
    if(o instanceof TextQuadlet){
    TextQuadlet tp =(TextQuadlet)o;
    返回customer_id.equals(tp.customer_id)&& R ==(tp.R)&& F ==(tp.F)&&中号==(tp.M);
    }
    返回false;

    @Override
    public String toString(){
    return customer_id +,+ R +,+ F +,+ M;
    }
    //条件语句中的LHS是当前键
    //条件语句中的RHS是前一个键
    //当您返回负值时,表示您正在交换
    //当前和前一个键值对的位置
    //返回0或正值意味着您保留
    //顺序,因为它是
    @Override
    public int compareTo(TextQuadlet tp){
    //这里我的自然是customer_id,我甚至没有把它带入
    //考虑。

    //所以你可能已经得出结论,我正在对R,F,M进行排序。
    if(this.R!= tp.R){
    if(this.R return 1;
    }
    else {
    return -1; (this.F!= tp.F){
    if(this.F< tp.F){
    return 1;
    }
    }
    if
    }
    else {
    return -1; (this.M!= tp.M){
    if(this.M return 1;
    }
    }
    if
    }
    else {
    return -1;
    }
    }
    返回0;
    }
    public static int compare(TextQuadlet tp1,TextQuadlet tp2){
    int cmp = tp1.compareTo(tp2);
    return cmp;
    }
    public static int compare(Text customer_id1,Text customer_id2){
    int cmp = customer_id1.compareTo(customer_id1);
    return cmp;


    $ / code $ / pre $接下来你需要一个定制的分区器这样所有具有相同密钥的值最终在一个reducer:

      import org.apache.hadoop.io.Text ; 
    import org.apache.hadoop.mapreduce.Partitioner;

    public class FirstPartitioner_RFM扩展了Partitioner< TextQuadlet,Text> {
    @Override
    public int getPartition(TextQuadlet key,Text value,int numPartitions){
    return(int)key.hashCode()%numPartitions;



  2. 第三,您需要一个自定义这样所有的值都通过它们的 customer_id 的自然键而不是组合键 customer_id,R,F,M

      import org.apache.hadoop.io.WritableComparable; 
    import org.apache.hadoop.io.WritableComparator;

    public class GroupComparator_RFM_N extends WritableComparator {
    protected GroupComparator_RFM_N(){
    super(TextQuadlet.class,true);

    @SuppressWarnings(rawtypes)
    @Override
    public int compare(WritableComparable w1,WritableComparable w2){
    TextQuadlet ip1 =(TextQuadlet)w1;
    TextQuadlet ip2 =(TextQuadlet)w2;
    //在这里,我们告诉hadoop按键的按键进行分组。
    return ip1.getCustomer_id()。compareTo(ip2.getCustomer_id());


    $ / code $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ $ <$>比较器,它将再次基于R,F,M对键进行排序并实现与 TextQuadlet.java 中使用的相同排序技术。由于我在编码时迷路了,所以稍微改变了我在这个函数中比较数据类型的方式,但其底层逻辑与 TextQuadlet.java 中相同:

      import org.apache.hadoop.io.WritableComparable; 
    import org.apache.hadoop.io.WritableComparator;

    public class KeyComparator_RFM extends WritableComparator {
    protected KeyComparator_RFM(){
    super(TextQuadlet.class,true);

    @SuppressWarnings(rawtypes)
    @Override
    public int compare(WritableComparable w1,WritableComparable w2){
    TextQuadlet ip1 =(TextQuadlet)w1;
    TextQuadlet ip2 =(TextQuadlet)w2;
    //条件语句中的LHS是当前键值对
    //条件语句中的RHS是前一个键值对
    //当您返回负值时,意味着您正在交换
    //当前键值对和前一个键值对
    的位置//如果您正在比较字符串,那么以
    结尾的字符串//为 compareTo`方法原来是前一个键,调用`compareTo`方法的
    //字符串变成
    //当前键。
    if(ip1.getR()== ip2.getR()){
    if(ip1.getF()== ip2.getF()){
    if(ip1.getM )== ip2.getM()){
    return 0;
    }
    else {
    if(ip1.getM()< ip2.getM())
    return 1;
    else
    return -1;


    else {
    if(ip1.getF()< ip2.getF())
    return 1;
    else
    return -1;
    }
    }
    else {
    if(ip1.getR() return 1;
    else
    return -1;
    }
    }
    }


  3. 最后,在你的驱动类中,你必须包含我们的自定义类。这里我用 TextQuadlet,Text 作为k-v对。但您可以根据您的需要选择任何其他类。

      job.setPartitionerClass(FirstPartitioner_RFM.class); 
    job.setSortComparatorClass(KeyComparator_RFM.class);
    job.setGroupingComparatorClass(GroupComparator_RFM_N.class);
    job.setMapOutputKeyClass(TextQuadlet.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(TextQuadlet.class);
    job.setOutputValueClass(Text.class);


如果技术上出错,在代码或解释中的某处,因为我完全根据我个人对互联网上阅读的内容的理解得出了这个答案,它完全适用于我。


I am running an RFM Analysis program using MapReduce. The OutputKeyClass is Text.class and I am emitting comma separated R (Recency), F (Frequency), M (Monetory) as the key from Reducer where R=BigInteger, F=Binteger, M=BigDecimal and the value is also a Text representing Customer_ID. I know that Hadoop sorts output based on keys but my final result is a bit wierd. I want the output keys to be sorted by R first, then F and then M. But I am getting the following output sort order for unknown reasons:

545,1,7652    100000
545,23,390159.402343750    100001
452,13,132586    100002
452,4,32202    100004
452,1,9310    100007
452,1,4057    100018
452,3,18970    100021

But I want the following output:

545,23,390159.402343750    100001
545,1,7652    100000
452,13,132586    100002
452,4,32202    100004
452,3,18970    100021
452,1,9310    100007
452,1,4057    100018

NOTE: The customer_ID was the key in Map phase and all the RFM values belonging to a particular Customer_ID are brought together at the Reducer for aggregation.

解决方案

So after a lot of searching I found some useful material the compilation of which I am posting now:

  1. You have to start with your custom data type. Since I had three comma separated values which needed to be sorted descendingly, I had to create a TextQuadlet.java data type in Hadoop. The reason I am creating a quadlet is because the first part of the key will be the natural key and the rest of the three parts will be the R, F, M:

    import java.io.*;
    import org.apache.hadoop.io.*;
    public class TextQuadlet implements WritableComparable<TextQuadlet> {
    private String customer_id;
    private long R;
    private long F;
    private double M;
    public TextQuadlet() {
    }
    public TextQuadlet(String customer_id, long R, long F, double M) {
        set(customer_id, R, F, M);
    }
    public void set(String customer_id2, long R2, long F2, double M2) {
        this.customer_id = customer_id2;
        this.R = R2;
        this.F = F2;
        this.M=M2;
    }
    public String getCustomer_id() {
        return customer_id;
    }
    public long getR() {
        return R;
    }
    public long getF() {
        return F;
    }
    public double getM() {
        return M;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.customer_id);
        out.writeLong(this.R);
        out.writeLong(this.F);
        out.writeDouble(this.M);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        this.customer_id = in.readUTF();
        this.R = in.readLong();
        this.F = in.readLong();
        this.M = in.readDouble();
    }
    // This hashcode function is important as it is used by the custom
    // partitioner for this class.
    @Override
    public int hashCode() {
        return (int) (customer_id.hashCode() * 163 + R + F + M);
    }
    @Override
    public boolean equals(Object o) {
        if (o instanceof TextQuadlet) {
            TextQuadlet tp = (TextQuadlet) o;
            return customer_id.equals(tp.customer_id) && R == (tp.R) && F==(tp.F) && M==(tp.M);
        }
        return false;
    }
    @Override
    public String toString() {
        return customer_id + "," + R + "," + F + "," + M;
    }
    // LHS in the conditional statement is the current key
    // RHS in the conditional statement is the previous key
    // When you return a negative value, it means that you are exchanging
    // the positions of current and previous key-value pair
    // Returning 0 or a positive value means that you are keeping the
    // order as it is
    @Override
    public int compareTo(TextQuadlet tp) {
    // Here my natural is is customer_id and I don't even take it into
    // consideration.
    
    // So as you might have concluded, I am sorting R,F,M descendingly.
        if (this.R != tp.R) {
            if(this.R < tp.R) {
                return 1;
            }
            else{
                return -1;
            }
        }
        if (this.F != tp.F) {
            if(this.F < tp.F) {
                return 1;
            }
            else{
                return -1;
            }
        }
        if (this.M != tp.M){
            if(this.M < tp.M) {
                return 1;
            }
            else{
                return -1;
            }
        }
        return 0;
    }
    public static int compare(TextQuadlet tp1, TextQuadlet tp2) {
        int cmp = tp1.compareTo(tp2);
        return cmp;
    }
    public static int compare(Text customer_id1, Text customer_id2) {
        int cmp = customer_id1.compareTo(customer_id1);
        return cmp;
    }
    }
    

  2. Next you'll need a custom partitioner so that all the values which have the same key end up at one reducer:

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class FirstPartitioner_RFM extends Partitioner<TextQuadlet, Text> {
    @Override
    public int getPartition(TextQuadlet key, Text value, int numPartitions) {
        return (int) key.hashCode() % numPartitions;
       }
    }
    

  3. Thirdly, you'll need a custom group comparater so that all the values are grouped together by their natural key which is customer_id and not the composite key which is customer_id,R,F,M:

    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class GroupComparator_RFM_N extends WritableComparator {
    protected GroupComparator_RFM_N() {
        super(TextQuadlet.class, true);
    }
    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        TextQuadlet ip1 = (TextQuadlet) w1;
        TextQuadlet ip2 = (TextQuadlet) w2;
        // Here we tell hadoop to group the keys by their natural key.
        return ip1.getCustomer_id().compareTo(ip2.getCustomer_id());
        }
    }
    

  4. Fourthly, you'll need a key comparater which will again sort the keys based on R,F,M descendingly and implement the same sort technique which is used in TextQuadlet.java. Since I got lost while coding, I slightly changed the way I compared data types in this function but the underlying logic is the same as in TextQuadlet.java:

    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class KeyComparator_RFM extends WritableComparator {
    protected KeyComparator_RFM() {
        super(TextQuadlet.class, true);
    }
    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        TextQuadlet ip1 = (TextQuadlet) w1;
        TextQuadlet ip2 = (TextQuadlet) w2;
        // LHS in the conditional statement is the current key-value pair
        // RHS in the conditional statement is the previous key-value pair
        // When you return a negative value, it means that you are exchanging
        // the positions of current and previous key-value pair
        // If you are comparing strings, the string which ends up as the argument
        // for the `compareTo` method turns out to be the previous key and the
        // string which is invoking the `compareTo` method turns out to be the
        // current key.
        if(ip1.getR() == ip2.getR()){
            if(ip1.getF() == ip2.getF()){
                if(ip1.getM() == ip2.getM()){
                    return 0;
                }
                else{
                    if(ip1.getM() < ip2.getM())
                        return 1;
                    else
                        return -1;
                }
            }
            else{
                if(ip1.getF() < ip2.getF())
                    return 1;
                else
                    return -1;
            }
        }
        else{
            if(ip1.getR() < ip2.getR())
                return 1;
            else
                return -1;
            }
        }
    }
    

  5. And finally, in your driver class, you'll have to include our custom classes. Here I have used TextQuadlet,Text as k-v pair. But you can choose any other class depending on your needs.:

    job.setPartitionerClass(FirstPartitioner_RFM.class);
    job.setSortComparatorClass(KeyComparator_RFM.class);
    job.setGroupingComparatorClass(GroupComparator_RFM_N.class);
    job.setMapOutputKeyClass(TextQuadlet.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(TextQuadlet.class);
    job.setOutputValueClass(Text.class);
    

Do correct me if I am technically going wrong somewhere in the code or in the explanation as I have based this answer purely on my personal understanding from what I read on the internet and it works for me perfectly.

这篇关于如何在Reducer输出中对逗号分隔的键进行排序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆