sparkカスタムAccumulatorプレミアムアプリケーション(JAVA)
37238 ワード
public class SessionAggrStatAccumulator implements AccumulatorParam {
private static final long serialVersionUID = 6311074555136039130L;
/**
* zero ,
* , , , , , 0
* , key=value|key=value
*/
public String zero(String initialValue) {
return Constants.SESSION_COUNT + "=0|"
+ Constants.TIME_PERIOD_1s_3s + "=0|"
+ Constants.TIME_PERIOD_4s_6s + "=0|"
+ Constants.TIME_PERIOD_7s_9s + "=0|"
+ Constants.TIME_PERIOD_10s_30s + "=0|"
+ Constants.TIME_PERIOD_30s_60s + "=0|"
+ Constants.TIME_PERIOD_1m_3m + "=0|"
+ Constants.TIME_PERIOD_3m_10m + "=0|"
+ Constants.TIME_PERIOD_10m_30m + "=0|"
+ Constants.TIME_PERIOD_30m + "=0|"
+ Constants.STEP_PERIOD_1_3 + "=0|"
+ Constants.STEP_PERIOD_4_6 + "=0|"
+ Constants.STEP_PERIOD_7_9 + "=0|"
+ Constants.STEP_PERIOD_10_30 + "=0|"
+ Constants.STEP_PERIOD_30_60 + "=0|"
+ Constants.STEP_PERIOD_60 + "=0";
}
/**
* addInPlace addAccumulator
*
*
* , ,v1
* v2, session , session , Constants.TIME_PERIOD_1s_3s
* , ,
* v1 , v2 value, 1,
*
*/
public String addAccumulator(String v1, String v2) {
return add(v1,v2);
}
public String addInPlace(String v1, String v2) {
return add(v1,v2);
}
/**
* session
* @param v1
* @param v2
* @return
*/
private String add(String v1, String v2){
// :v1 , v2
if(StringUtils.isEmpty(v1)) {
return v2;
}
// StringUtils , v1 , v2 , 1
String oldValue = StringUtils.getFieldFromConcatString(v1, "\\|", v2);
if(oldValue != null) {
// , 1
int newValue = Integer.valueOf(oldValue) + 1;
// StringUtils , v1 ,v2 ,
return StringUtils.setFieldInConcatString(v1, "\\|", v2, String.valueOf(newValue));
}
return v1;
}
}
ツールクラス:
/**
*
* @author Administrator
*
*/
public class StringUtils {
/**
*
* @param str
* @return
*/
public static boolean isEmpty(String str) {
return str == null || "".equals(str);
}
/**
*
* @param str
* @return
*/
public static boolean isNotEmpty(String str) {
return str != null && !"".equals(str);
}
/**
*
* @param str
* @return
*/
public static String trimComma(String str) {
if(str.startsWith(",")) {
str = str.substring(1);
}
if(str.endsWith(",")) {
str = str.substring(0, str.length() - 1);
}
return str;
}
/**
*
* @param str
* @return
*/
public static String fulfuill(String str) {
if(str.length() == 2) {
return str;
} else {
return "0" + str;
}
}
/**
*
* @param str
* @param delimiter
* @param field
* @return
*/
public static String getFieldFromConcatString(String str,
String delimiter, String field) {
try {
String[] fields = str.split(delimiter);
for(String concatField : fields) {
// searchKeywords=|clickCategoryIds=1,2,3
if(concatField.split("=").length == 2) {
String fieldName = concatField.split("=")[0];
String fieldValue = concatField.split("=")[1];
if(fieldName.equals(field)) {
return fieldValue;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
*
* @param str
* @param delimiter
* @param field
* @param newFieldValue field
* @return
*/
public static String setFieldInConcatString(String str,
String delimiter, String field, String newFieldValue) {
String[] fields = str.split(delimiter);
for(int i = 0; i < fields.length; i++) {
String fieldName = fields[i].split("=")[0];
if(fieldName.equals(field)) {
String concatField = fieldName + "=" + newFieldValue;
fields[i] = concatField;
break;
}
}
StringBuffer buffer = new StringBuffer("");
for(int i = 0; i < fields.length; i++) {
buffer.append(fields[i]);
if(i < fields.length - 1) {
buffer.append("|");
}
}
return buffer.toString();
}
}
テストクラス:
public class AccumuletorTest {
public static void main(String[] args) {
// Spark
SparkConf conf = new SparkConf()
.setAppName("AccumuletorTest")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
final Accumulator sessionAggrStatAccumulator = sc.accumulator(
"", new SessionAggrStatAccumulator());
List seq= new ArrayList();
for (int i = 0; i < 1000; i++) {
seq.add(Long.valueOf((int) (Math.random() * 1000)));
}
JavaRDD rdd=sc.parallelize(seq);
// RDD
rdd.foreach(new VoidFunction() {
public void call(Long aLong) throws Exception {
sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);
calculateVisitLength(aLong);
calculateStepLength(aLong);
}
/**
*
* @param visitLength
*/
private void calculateVisitLength(long visitLength) {
if (visitLength >= 1 && visitLength <= 3) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);
} else if (visitLength >= 4 && visitLength <= 6) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s);
} else if (visitLength >= 7 && visitLength <= 9) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s);
} else if (visitLength >= 10 && visitLength <= 30) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s);
} else if (visitLength > 30 && visitLength <= 60) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s);
} else if (visitLength > 60 && visitLength <= 180) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m);
} else if (visitLength > 180 && visitLength <= 600) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m);
} else if (visitLength > 600 && visitLength <= 1800) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m);
} else if (visitLength > 1800) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);
}
}
/**
*
* @param stepLength
*/
private void calculateStepLength(long stepLength) {
if (stepLength >= 1 && stepLength <= 3) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3);
} else if (stepLength >= 4 && stepLength <= 6) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6);
} else if (stepLength >= 7 && stepLength <= 9) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9);
} else if (stepLength >= 10 && stepLength <= 30) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30);
} else if (stepLength > 30 && stepLength <= 60) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);
} else if (stepLength > 60) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
}
}
});
System.out.println("accumuletor2:"+sessionAggrStatAccumulator.value());
}
}