類似度計算におけるMapReduceの応用と最適化
需要:ユーザーの類似度を計算し、ユーザーリストUと特徴リストF、およびユーザーと特徴の関係がある.U 1とU 2の類似度は、∩の交差数から判断される.
解決方法:
一、ユーザー次元のJoin
最も暴力的で非効率な方法は,ユーザ量が一般的に大きいためjoin効率が極めて低い.普通は考えない.
二、特徴次元
ユーザ対フィーチャーのマトリクスをフィーチャー対ユーザーのマトリクスに変換します.
1、特徴対ユーザーのマトリックスに転換する:F 1->U 1…Un
map: context.write(F, U)
reduce: context.write(F,List)
2、類似度の計算
次のようなソリューションがあります.
(1)直接出力UxUy pair(IO密集型)
map:user listを各user pairペアに分解して出力し、具体的には以下の例(コードは偽コードのみ):
reduce:user pairごとのvalue listを合計します.すなわち、この2つのユーザーの類似度です.
欠点:map端出力のuser pairが多く(O(N*N))、reducerのshuffleをボトルネックにする
(2)各userによる集約(計算集約型)
,(1)を押した出力は1>,1>, 1>, 1>, 1>,1>. (1は回数)
userによる重合の結果,u 2,u 3,u 5>,u 3,u 5>,u 5>,出力数N(U)−1であった.
このスキームはuser listをソートする必要があり、1つのuser listがu 2,u 3,u 5>を出力し、もう1つはu 3,u 5>を出力するように、reduceがu 1->u 2,u 3,u 3,u 5であるように、後でreduceがuseridで集約されるのを容易にする.
ソートしない場合は、u 2,u 3,u 5>,u 1,u 3,u 5>のようなjobをもう1つ操作する必要があります.これでと,jobによる統合和処理も必要である.
mapper:
reduce:u 1->u 2,u 3,u 5,u 3,u 5が得られ、valuesにおける各ユーザが現れる個数を計算し、count>を出力すればよい
このスキームのボトルネックはmapで自分で実現したソートであり,あるFではユーザ数が特に大きく,データの傾きをもたらす可能性があり,あるuser listは特に大きく,ソートに時間がかかり,タスク全体が遅くなる(計算密集型)ことがある.1つの考え方は,デフォルトの64 Mから8 MになるようにsplitSizeを小さくすることで,InputSplit数が多くなり,すなわちMapperが多くなり,各Mapperが処理するデータ量が小さくなり,並列の利点を十分に発揮することである.
具体的にはsplitSizeのコードを設定します.
(3)順序をReducer端に置く
転置特徴対ユーザのマトリクスのjobにおいてreduceが既に各Fのuser listを得ている場合、user listを直接ソートしてuserによる集約の結果を出力することができる.
a、reduceメソッドでuser listをソートする
(2)方式と同様のデータ傾きの問題が発生し,(2)のようにsplitSizeを減少させて各Mapperの処理データ量を減少させ,Mapper数を増大させることはできない.この案は一般的に死んでしまい、データ量が大きい場合は考慮しない.
b、Reducerを利用したSort機能
上書きするクラス:
上書きするクラス:
MapOutputKey:compareToがmap/reduceの各フェーズでソートされる根拠
Partitioner:partitionでどのゾーンに分けるか
GroupingComparator:reduce用のsort-->reduceでkey反復したときのグループ.
Reducerの各段階は,Shuffle/Merge,Sort,Reduce,Outputである.ここでSortはReduceと並行している.Sort反復ループで得られたレコードはgroupingされ、reduceメソッドのvaluesが得られる.
Sortは各ファイルをKey(MapOutputKey)のサイズで最小スタックを構築し、最小Keyの記録を1つ取るたびに、GroupingComparatorで判断します(具体的なソースコードは検討されていないが、この実装は前のレコードを保存するKeyであるべきであり、現在のレコードが前のKeyがGrouping Comparatorメソッドで得た結果と同じであれば、groupのレコードリストに現在のレコードを追加し、そのリスト要素順が挿入順である;異なる場合は、Key以前のリストのデータをreduce側に転送する法を用いてgroupのレコードリストを空にする)
独自のMapOutputKeyを作成できます
このGeoMapOutputKeyでは、Featureでソートしてからuserでソートするように保存できます.(map出力にはN行レコードがあり、ReducerでもデフォルトではこのN行レコードに対してcompareを行うため、性能にあまり影響はありません).
Mapperのmapフェーズの出力はパーティション化メソッドを呼び出してパーティションを決定します.デフォルトではfeature+userでパーティション化されます.Featureでパーティション化する必要がありますので、上書きします.
デフォルトではReducerのGroupingComparatorはKeyに従ってgrouping集約操作を行い、reduceメソッドのkeyがfeature_になります.u 1のように、あまり役に立たないので、GroupingComparatorをカスタマイズして、同じfeatureの集約、すなわちreduceメソッドのkeyがfeatureであるようにします.
この2つのクラスのコードを設定します.
このような設定により,Sort最小スタックは先にfeatureを再userでソートし,集約時にはfeatureで集約することができる.
reduceのkeyは,,のいずれか(最初の?)であり、valueは
このjobの出力はU 1−>u 2,u 3,u 5のような形式である.次のjobのmapperはcontext(「u 1」,「u 2,u 3,u 5」)のみであり、reducerは(2)のreducerと同様に動作する.
第(3)の方式が最適であり,処理速度が従来よりずっと効率的である.
解決方法:
一、ユーザー次元のJoin
最も暴力的で非効率な方法は,ユーザ量が一般的に大きいためjoin効率が極めて低い.普通は考えない.
二、特徴次元
ユーザ対フィーチャーのマトリクスをフィーチャー対ユーザーのマトリクスに変換します.
1、特徴対ユーザーのマトリックスに転換する:F 1->U 1…Un
map: context.write(F, U)
reduce: context.write(F,List)
2、類似度の計算
次のようなソリューションがあります.
(1)直接出力UxUy pair(IO密集型)
map:user listを各user pairペアに分解して出力し、具体的には以下の例(コードは偽コードのみ):
String data[] = value.toSting().split("\t");
String users[] = data[1].split(",");//user ,
for(int i = 0; i < users.length; i ++){
for(int j = i + 1; j < users.length; j ++){
// users[i] users[j]
context.write(users[i]+"_"+users[j], 1);// users[i] users[j] , , ,
}
}
reduce:user pairごとのvalue listを合計します.すなわち、この2つのユーザーの類似度です.
欠点:map端出力のuser pairが多く(O(N*N))、reducerのshuffleをボトルネックにする
(2)各userによる集約(計算集約型)
,(1)を押した出力は1>,
userによる重合の結果,u 2,u 3,u 5>,u 3,u 5>,u 5>,出力数N(U)−1であった.
このスキームはuser listをソートする必要があり、1つのuser listがu 2,u 3,u 5>を出力し、もう1つはu 3,u 5>を出力するように、reduceがu 1->u 2,u 3,u 3,u 5であるように、後でreduceがuseridで集約されるのを容易にする.
ソートしない場合は、u 2,u 3,u 5>,u 1,u 3,u 5>のようなjobをもう1つ操作する必要があります.これでと,jobによる統合和処理も必要である.
mapper:
StringBuilder uuidListStr = new StringBuilder();
String data[] = value.toString().split("\t");
String uuidArr[] = data[1].split(",");
Arrays.sort(uuidArr);
for(int i = 0; i < uuidArr.length; i ++){
for(int j = i + 1; j < uuidArr.length; j ++){
uuidListStr.append(uuidArr[j]).append(",");
}
if(uuidListStr.length() > 0){
uuidListStr.deleteCharAt(uuidListStr.length() - 1);
context.write(new Text(uuidArr[i]), new Text(uuidListStr.toString()));
uuidListStr = new StringBuilder();
}
}
reduce:u 1->u 2,u 3,u 5,u 3,u 5が得られ、valuesにおける各ユーザが現れる個数
// hashMap user
Map<String, Integer> countMap = new HashMap<String, Integer>();
for (Text v : values) { // v u1,u2,u3
uuids = v.toString().split(",");
for (int i = 0; i < uuids.length; i++) {
uuid = uuids[i];
tmp = countMap.get(uuid);
if (null == tmp) {
countMap.put(uuid, new Integer(1));
} else {
countMap.put(uuid, tmp + 1);
}
}
}
for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
context.write(new Text(key.toString() + "_" + entry.getKey()), new IntWritable(entry.getValue()));
}
このスキームのボトルネックはmapで自分で実現したソートであり,あるFではユーザ数が特に大きく,データの傾きをもたらす可能性があり,あるuser listは特に大きく,ソートに時間がかかり,タスク全体が遅くなる(計算密集型)ことがある.1つの考え方は,デフォルトの64 Mから8 MになるようにsplitSizeを小さくすることで,InputSplit数が多くなり,すなわちMapperが多くなり,各Mapperが処理するデータ量が小さくなり,並列の利点を十分に発揮することである.
具体的にはsplitSizeのコードを設定します.
//splitSize = max(minSize, min(maxSize, blockSize))
conf.set("mapred.min.split.size", 8 * 1024 * 1024 + "");
conf.set("mapred.max.split.size", 8 * 1024 * 1024 + "");
(3)順序をReducer端に置く
転置特徴対ユーザのマトリクスのjobにおいてreduceが既に各Fのuser listを得ている場合、user listを直接ソートしてuserによる集約の結果を出力することができる.
a、reduceメソッドでuser listをソートする
(2)方式と同様のデータ傾きの問題が発生し,(2)のようにsplitSizeを減少させて各Mapperの処理データ量を減少させ,Mapper数を増大させることはできない.この案は一般的に死んでしまい、データ量が大きい場合は考慮しない.
b、Reducerを利用したSort機能
上書きするクラス:
上書きするクラス:
MapOutputKey:compareToがmap/reduceの各フェーズでソートされる根拠
Partitioner:partitionでどのゾーンに分けるか
GroupingComparator:reduce用のsort-->reduceでkey反復したときのグループ.
Reducerの各段階は,Shuffle/Merge,Sort,Reduce,Outputである.ここでSortはReduceと並行している.Sort反復ループで得られたレコードはgroupingされ、reduceメソッドのvaluesが得られる.
Sortは各ファイルをKey(MapOutputKey)のサイズで最小スタックを構築し、最小Keyの記録を1つ取るたびに、GroupingComparatorで判断します(具体的なソースコードは検討されていないが、この実装は前のレコードを保存するKeyであるべきであり、現在のレコードが前のKeyがGrouping Comparatorメソッドで得た結果と同じであれば、groupのレコードリストに現在のレコードを追加し、そのリスト要素順が挿入順である;異なる場合は、Key以前のリストのデータをreduce側に転送する法を用いてgroupのレコードリストを空にする)
独自のMapOutputKeyを作成できます
public class GeoMapOutputKey implements Writable,WritableComparable<GeoMapOutputKey> {
private Text geohash = new Text();// Feature
private Text uuid = new Text(); // user
public GeoMapOutputKey(){}
public GeoMapOutputKey(String geohash, String uuid){
this.geohash.set(geohash);
this.uuid.set(uuid);
}
public Text getGeohash() {
return geohash;
}
public void setGeohash(Text geohash) {
this.geohash = geohash;
}
public Text getUuid() {
return uuid;
}
public void setUuid(Text uuid) {
this.uuid = uuid;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
geohash.write(dataOutput);
uuid.write(dataOutput);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
geohash.readFields(dataInput);
uuid.readFields(dataInput);
}
@Override
// compareTo , geohash , uuid
public int compareTo(GeoMapOutputKey o) {
int compareValue = this.geohash.compareTo(o.geohash);
if(compareValue == 0){
compareValue = this.uuid.compareTo(o.uuid);
}
return compareValue;
}
@Override
public int hashCode() {
return geohash.hashCode() * 163 + uuid.hashCode() * 163;
}
@Override
public boolean equals(Object o) {
if (o instanceof GeoMapOutputKey) {
GeoMapOutputKey ok = (GeoMapOutputKey) o;
return geohash.equals(ok.getGeohash()) && uuid.equals(ok.getUuid());
}
return false;
}
@Override
public String toString() {
return geohash + "\t" + uuid;
}
}
このGeoMapOutputKeyでは、Featureでソートしてからuserでソートするように保存できます.(map出力にはN行レコードがあり、ReducerでもデフォルトではこのN行レコードに対してcompareを行うため、性能にあまり影響はありません).
Mapperのmapフェーズの出力はパーティション化メソッドを呼び出してパーティションを決定します.デフォルトではfeature+userでパーティション化されます.Featureでパーティション化する必要がありますので、上書きします.
public class GeoPartitioner extends Partitioner<GeoMapOutputKey, Text> {
@Override
public int getPartition(GeoMapOutputKey geoMapOutputKey, Text text, int numPartitions) {
// geohash feature
return Math.abs(geoMapOutputKey.getGeohash().hashCode()) % numPartitions;
}
}
デフォルトではReducerのGroupingComparatorはKeyに従ってgrouping集約操作を行い、reduceメソッドのkeyがfeature_になります.u 1のように、あまり役に立たないので、GroupingComparatorをカスタマイズして、同じfeatureの集約、すなわちreduceメソッドのkeyがfeatureであるようにします.
public class GeoGroupingComparator extends WritableComparator {
protected GeoGroupingComparator() {
super(GeoMapOutputKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
GeoMapOutputKey ok1 = (GeoMapOutputKey) a;
GeoMapOutputKey ok2 = (GeoMapOutputKey) b;
// feature , feature grouping
return ok1.getGeohash().compareTo(ok2.getGeohash());
}
}
この2つのクラスのコードを設定します.
job.setMapOutputKeyClass(GeoMapOutputKey.class);
job.setGroupingComparatorClass(GeoGroupingComparator.class);
このような設定により,Sort最小スタックは先にfeatureを再userでソートし,集約時にはfeatureで集約することができる.
reduceのkeyは
このjobの出力はU 1−>u 2,u 3,u 5のような形式である.次のjobのmapperはcontext(「u 1」,「u 2,u 3,u 5」)のみであり、reducerは(2)のreducerと同様に動作する.
第(3)の方式が最適であり,処理速度が従来よりずっと効率的である.