HadoopがreduceでIterableに対応する値が変わらない理由

3082 ワード

バージョン:
$ hadoop version
Hadoop 0.20.2-cdh3u4
Subversion git://ubuntu-slave01/var/lib/jenkins/workspace/CDH3u4-Full-RC/build/cdh3/hadoop20/0.20.2-cdh3u4/source -r 214dd731e3bdb687cb55988d3f47dd9e248c5690
Compiled by jenkins on Mon May  7 13:01:39 PDT 2012
From source with checksum a60c9795e41a3248b212344fb131c12c

問題の説明:
HadoopがReducerを実行するときに対応するIterableの対応する値は、次のように保持されます.
protected void reduce(Text key, Iterable<VectorWritable> values, Context context)
	        throws IOException, InterruptedException {
	Map<String, VectorWritable> map = new HashMap<String, VectorWritable>();
	for (VectorWritable vw : values) {
		NamedVector nv = (NamedVector) vw.get();
		Item itemi = Item.toInstance(nv.getName());
		map.put(itemi.getItemID(), vw);
	}
}

対応するMapの要素はすべて同じです.
問題の原因:
反復ごとに対応する値は、今回のreduceでメモリにインスタンスであり、ソースコードは次のとおりです.
public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
    extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  private RawKeyValueIterator input;
  private Counter inputKeyCounter;
  private Counter inputValueCounter;
  private RawComparator<KEYIN> comparator;
  private KEYIN key;                                  // current key
  private VALUEIN value;                              //  
.....................
}

実行するたびにvalueに値を割り当てます
@Override
public VALUEIN next() {
	// if this is the first record, we don't need to advance
	if (firstValue) {
		firstValue = false;
		return value;
	}
	// if this isn't the first record and the next key is different, they
	// can't advance it here.
	if (!nextKeyIsSame) {
		throw new NoSuchElementException("iterate past last value");
	}
		// otherwise, go to the next key/value pair
	try {
		nextKeyValue();
		return value;
	} catch (IOException ie) {
		throw new RuntimeException("next value iterator failed", ie);
	} catch (InterruptedException ie) {
		// this is bad, but we can't modify the exception list of java.util
		throw new RuntimeException("next value iterator interrupted", ie);        
	}
}

そのため上記の問題が発生した.
解決方法:
protected void reduce(Text key, Iterable<VectorWritable> values, Context context)
	        throws IOException, InterruptedException {
	Map<String, VectorWritable> map = new HashMap<String, VectorWritable>();
	for (VectorWritable vectorWritable : values) {
		VectorWritable vw = WritableUtils.clone(vectorWritable, context.getConfiguration());
		NamedVector nv = (NamedVector) vw.get();
		Item itemi = Item.toInstance(nv.getName());
		map.put(itemi.getItemID(), vw);
	}
}

クローンを1つ作成