Mapeduceインスタンス-グループウェイト(groupby distinct)
11189 ワード
1 public class GroupComparator implements RawComparator<MyBinaryKey> {
2
3 @Override
4 public int compare(MyBinaryKey o1, MyBinaryKey o2) {
5 return o1.toString().compareTo(o2.toString());
6 }
7
8 @Override
9 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
10 return WritableComparator.compareBytes(b1, s1, Long.SIZE / 8 + Integer.SIZE / 8 * 3, b2, s2, Long.SIZE / 8 + Integer.SIZE / 8 * 3);
11 }
12
13 }
14
15 public abstract class UVBinaryKey extends BinaryComparable implements WritableComparable<BinaryComparable>{
16 // ;
17 @Override
18 public void readFields(DataInput in) throws IOException {
19
20 }
21
22 @Override
23 public byte[] getBytes() {
24
25 }
26
27 }
28
29 public class MyPartitioner extends Partitioner<MyBinaryKey, NullWritable> {
30
31 /**
32 * uv/ip , uv/ip
33 */
34 @Override
35 public int getPartition(MyBinaryKey key, NullWritable value, int numPartitions) {
36
37 int k=0;
38 for(byte b : key.getAttr()){
39 k+=b&0xff;
40 }
41 return k%numPartitions;
42 }
43
44 }
45
46
47
48 job.setMapOutputKeyClass(UVBinaryKey.class);
49 job.setGroupingComparatorClass(GroupComparator.class);
50 job.setPartitionerClass(MyPartitioner.class);
51
52 map
1 combiner( )
2 reduce :
3 @Override
4 protected void reduce(UVBinaryKey key, Iterable<NullWritable> values, Context context)
5 throws IOException,
6 InterruptedException {
7 long count = 0;
8 byte[] tbsign = null;
9 for (NullWritable nullWritable : values) {
10 byte[] attr = key.getAttr();
11 if (tbsign == null) {
12 tbsign = attr;
13 count++;
14 }
15 if (tbsign != null) {
16 if (tbsign.length != attr.length) {
17 count++;
18 tbsign = attr;
19 } else {
20 for (int i = 0; i < tbsign.length; i++) {
21 if (tbsign[i] != attr[i]) {
22 count++;
23 tbsign = attr;
24 break;
25 }
26 }
27 }
28 }
29
30 }
31 StringBuffer out = new StringBuffer();
32 out.append(new String(key.getCity()))
33 .append(Constants.FIELDS_TERMINATED).append(count);
34 context.write(new Text(out.toString()), NullWritable.get());
35
36 }