類似度計算におけるMapReduceの応用と最適化


需要:ユーザーの類似度を計算し、ユーザーリストUと特徴リストF、およびユーザーと特徴の関係がある.U 1とU 2の類似度は、の交差数から判断される.
解決方法:
一、ユーザー次元のJoin
最も暴力的で非効率な方法は,ユーザ量が一般的に大きいためjoin効率が極めて低い.普通は考えない.
二、特徴次元
ユーザ対フィーチャーのマトリクスをフィーチャー対ユーザーのマトリクスに変換します.
1、特徴対ユーザーのマトリックスに転換する:F 1->U 1…Un  
map: context.write(F, U)
reduce: context.write(F,List)
2、類似度の計算
次のようなソリューションがあります.
(1)直接出力UxUy pair(IO密集型)
map:user listを各user pairペアに分解して出力し、具体的には以下の例(コードは偽コードのみ):
String data[] = value.toSting().split("\t");
String users[] = data[1].split(",");//user  ,  
for(int i = 0; i < users.length; i ++){
    for(int j = i + 1; j < users.length; j ++){
        //  users[i] users[j]   
        context.write(users[i]+"_"+users[j], 1);//     users[i] users[j]     ,   ,   ,       
    }
}

 
reduce:user pairごとのvalue listを合計します.すなわち、この2つのユーザーの類似度です.
欠点:map端出力のuser pairが多く(O(N*N))、reducerのshuffleをボトルネックにする
(2)各userによる集約(計算集約型)
,(1)を押した出力は1>,  1>,  1>, 1>, 1>,1>. (1は回数)
userによる重合の結果,u 2,u 3,u 5>,u 3,u 5>,u 5>,出力数N(U)−1であった.
このスキームはuser listをソートする必要があり、1つのuser listがu 2,u 3,u 5>を出力し、もう1つはu 3,u 5>を出力するように、reduceがu 1->u 2,u 3,u 3,u 5であるように、後でreduceがuseridで集約されるのを容易にする.
ソートしない場合は、u 2,u 3,u 5>,u 1,u 3,u 5>のようなjobをもう1つ操作する必要があります.これで,jobによる統合和処理も必要である.
mapper:
StringBuilder uuidListStr = new StringBuilder();
String data[] = value.toString().split("\t");
String uuidArr[] = data[1].split(",");
Arrays.sort(uuidArr);
for(int i = 0; i < uuidArr.length; i ++){
    for(int j = i + 1; j < uuidArr.length; j ++){
        uuidListStr.append(uuidArr[j]).append(",");
    }
    if(uuidListStr.length() > 0){
        uuidListStr.deleteCharAt(uuidListStr.length() - 1);
        context.write(new Text(uuidArr[i]), new Text(uuidListStr.toString()));
        uuidListStr = new StringBuilder();
    }
}

 
reduce:u 1->u 2,u 3,u 5,u 3,u 5が得られ、valuesにおける各ユーザが現れる個数を計算し、count>を出力すればよい
//  hashMap    user   
Map<String, Integer> countMap = new HashMap<String, Integer>();
for (Text v : values) { //  v  u1,u2,u3     
    uuids = v.toString().split(",");
    for (int i = 0; i < uuids.length; i++) {
        uuid = uuids[i];
        tmp = countMap.get(uuid);
        if (null == tmp) {
            countMap.put(uuid, new Integer(1));
        } else {
            countMap.put(uuid, tmp + 1);
        }
    }
}
for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
    context.write(new Text(key.toString() + "_" + entry.getKey()), new IntWritable(entry.getValue()));
}

 
このスキームのボトルネックはmapで自分で実現したソートであり,あるFではユーザ数が特に大きく,データの傾きをもたらす可能性があり,あるuser listは特に大きく,ソートに時間がかかり,タスク全体が遅くなる(計算密集型)ことがある.1つの考え方は,デフォルトの64 Mから8 MになるようにsplitSizeを小さくすることで,InputSplit数が多くなり,すなわちMapperが多くなり,各Mapperが処理するデータ量が小さくなり,並列の利点を十分に発揮することである.
具体的にはsplitSizeのコードを設定します.
//splitSize = max(minSize, min(maxSize, blockSize))
conf.set("mapred.min.split.size", 8 * 1024 * 1024 + "");
conf.set("mapred.max.split.size", 8 * 1024 * 1024 + "");

 
(3)順序をReducer端に置く
転置特徴対ユーザのマトリクスのjobにおいてreduceが既に各Fのuser listを得ている場合、user listを直接ソートしてuserによる集約の結果を出力することができる.
a、reduceメソッドでuser listをソートする
(2)方式と同様のデータ傾きの問題が発生し,(2)のようにsplitSizeを減少させて各Mapperの処理データ量を減少させ,Mapper数を増大させることはできない.この案は一般的に死んでしまい、データ量が大きい場合は考慮しない.
b、Reducerを利用したSort機能
上書きするクラス:
上書きするクラス:
MapOutputKey:compareToがmap/reduceの各フェーズでソートされる根拠
Partitioner:partitionでどのゾーンに分けるか
GroupingComparator:reduce用のsort-->reduceでkey反復したときのグループ.
 
Reducerの各段階は,Shuffle/Merge,Sort,Reduce,Outputである.ここでSortはReduceと並行している.Sort反復ループで得られたレコードはgroupingされ、reduceメソッドのvaluesが得られる.
Sortは各ファイルをKey(MapOutputKey)のサイズで最小スタックを構築し、最小Keyの記録を1つ取るたびに、GroupingComparatorで判断します(具体的なソースコードは検討されていないが、この実装は前のレコードを保存するKeyであるべきであり、現在のレコードが前のKeyがGrouping Comparatorメソッドで得た結果と同じであれば、groupのレコードリストに現在のレコードを追加し、そのリスト要素順が挿入順である;異なる場合は、Key以前のリストのデータをreduce側に転送する法を用いてgroupのレコードリストを空にする)
独自のMapOutputKeyを作成できます
public class GeoMapOutputKey implements Writable,WritableComparable<GeoMapOutputKey> {
    private Text geohash = new Text();//   Feature
    private Text uuid = new Text();  //   user
    public GeoMapOutputKey(){}
    public GeoMapOutputKey(String geohash, String uuid){
        this.geohash.set(geohash);
        this.uuid.set(uuid);
    }
    public Text getGeohash() {
        return geohash;
    }
    public void setGeohash(Text geohash) {
        this.geohash = geohash;
    }
    public Text getUuid() {
        return uuid;
    }
    public void setUuid(Text uuid) {
        this.uuid = uuid;
    }
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        geohash.write(dataOutput);
        uuid.write(dataOutput);
    }
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        geohash.readFields(dataInput);
        uuid.readFields(dataInput);
    }
    @Override
    //     compareTo  ,    geohash    ,   uuid    
    public int compareTo(GeoMapOutputKey o) {
        int compareValue = this.geohash.compareTo(o.geohash);
        if(compareValue == 0){
            compareValue = this.uuid.compareTo(o.uuid);
        }
        return compareValue;
    }
    @Override
    public int hashCode() {
        return geohash.hashCode() * 163 + uuid.hashCode() * 163;
    }
    @Override
    public boolean equals(Object o) {
        if (o instanceof GeoMapOutputKey) {
            GeoMapOutputKey ok = (GeoMapOutputKey) o;
            return geohash.equals(ok.getGeohash()) && uuid.equals(ok.getUuid());
        }
        return false;
    }
    @Override
    public String toString() {
        return geohash + "\t" + uuid;
    }
}

 
このGeoMapOutputKeyでは、Featureでソートしてからuserでソートするように保存できます.(map出力にはN行レコードがあり、ReducerでもデフォルトではこのN行レコードに対してcompareを行うため、性能にあまり影響はありません).
Mapperのmapフェーズの出力はパーティション化メソッドを呼び出してパーティションを決定します.デフォルトではfeature+userでパーティション化されます.Featureでパーティション化する必要がありますので、上書きします.
public class GeoPartitioner extends Partitioner<GeoMapOutputKey, Text> {
    @Override
    public int getPartition(GeoMapOutputKey geoMapOutputKey, Text text, int numPartitions) {
   //   geohash  feature
        return Math.abs(geoMapOutputKey.getGeohash().hashCode()) % numPartitions;
    }
}

 
デフォルトではReducerのGroupingComparatorはKeyに従ってgrouping集約操作を行い、reduceメソッドのkeyがfeature_になります.u 1のように、あまり役に立たないので、GroupingComparatorをカスタマイズして、同じfeatureの集約、すなわちreduceメソッドのkeyがfeatureであるようにします.
public class GeoGroupingComparator extends WritableComparator {
    protected GeoGroupingComparator() {
        super(GeoMapOutputKey.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        GeoMapOutputKey ok1 = (GeoMapOutputKey) a;
        GeoMapOutputKey ok2 = (GeoMapOutputKey) b;
        //    feature    ,   feature  grouping  
        return ok1.getGeohash().compareTo(ok2.getGeohash());
    }
}

 
この2つのクラスのコードを設定します.
job.setMapOutputKeyClass(GeoMapOutputKey.class);
job.setGroupingComparatorClass(GeoGroupingComparator.class);

 
このような設定により,Sort最小スタックは先にfeatureを再userでソートし,集約時にはfeatureで集約することができる.
reduceのkeyは,,のいずれか(最初の?)であり、valueは
このjobの出力はU 1−>u 2,u 3,u 5のような形式である.次のjobのmapperはcontext(「u 1」,「u 2,u 3,u 5」)のみであり、reducerは(2)のreducerと同様に動作する.
 
第(3)の方式が最適であり,処理速度が従来よりずっと効率的である.