sparkカスタムAccumulatorプレミアムアプリケーション(JAVA)

37238 ワード

public class SessionAggrStatAccumulator implements AccumulatorParam {
    private static final long serialVersionUID = 6311074555136039130L;
    /**
     * zero     *   ,    ,      ,      ,         ,  0
     * key=value|key=value       
     */
    public String zero(String initialValue) {
        return Constants.SESSION_COUNT + "=0|"
                + Constants.TIME_PERIOD_1s_3s + "=0|"
                + Constants.TIME_PERIOD_4s_6s + "=0|"
                + Constants.TIME_PERIOD_7s_9s + "=0|"
                + Constants.TIME_PERIOD_10s_30s + "=0|"
                + Constants.TIME_PERIOD_30s_60s + "=0|"
                + Constants.TIME_PERIOD_1m_3m + "=0|"
                + Constants.TIME_PERIOD_3m_10m + "=0|"
                + Constants.TIME_PERIOD_10m_30m + "=0|"
                + Constants.TIME_PERIOD_30m + "=0|"
                + Constants.STEP_PERIOD_1_3 + "=0|"
                + Constants.STEP_PERIOD_4_6 + "=0|"
                + Constants.STEP_PERIOD_7_9 + "=0|"
                + Constants.STEP_PERIOD_10_30 + "=0|"
                + Constants.STEP_PERIOD_30_60 + "=0|"
                + Constants.STEP_PERIOD_60 + "=0";
    }
    /**
     * addInPlace addAccumulator
     *          
     *
     *      ,        ,v1               
     * v2sessionsessionConstants.TIME_PERIOD_1s_3s
     *   ,  ,       
     *  v1v2   value1     *
     */
    public String addAccumulator(String v1, String v2) {
        return add(v1,v2);
    }

    public String addInPlace(String v1, String v2) {
        return add(v1,v2);
    }
    /**
     * session      
     * @param v1    
     * @param v2     
     * @return         
     */
    private String add(String v1, String v2){
        // v1v2
        if(StringUtils.isEmpty(v1)) {
            return v2;
        }

        //   StringUtilsv1v21
        String oldValue = StringUtils.getFieldFromConcatString(v1, "\\|", v2);
        if(oldValue != null) {
            // 1
            int newValue = Integer.valueOf(oldValue) + 1;
            //   StringUtilsv1v2            return StringUtils.setFieldInConcatString(v1, "\\|", v2, String.valueOf(newValue));
        }
        return v1;
    }


}
ツールクラス:
/**
 *       
 * @author Administrator
 *
 */
public class StringUtils {

   /**
    *          
    * @param str    
    * @return     
    */
   public static boolean isEmpty(String str) {
      return str == null || "".equals(str);
   }
   
   /**
    *           
    * @param str    
    * @return      
    */
   public static boolean isNotEmpty(String str) {
      return str != null && !"".equals(str);
   }
   
   /**
    *           
    * @param str    
    * @return    
    */
   public static String trimComma(String str) {
      if(str.startsWith(",")) {
         str = str.substring(1);
      }
      if(str.endsWith(",")) {
         str = str.substring(0, str.length() - 1);
      }
      return str;
   }
   
   /**
    *       
    * @param str
    * @return
    */
   public static String fulfuill(String str) {
      if(str.length() == 2) {
         return str;
      } else {
         return "0" + str;
      }
   }
   
   /**
    *             
    * @param str    
    * @param delimiter     
    * @param field   
    * @return    
    */
   public static String getFieldFromConcatString(String str, 
         String delimiter, String field) {
      try {
         String[] fields = str.split(delimiter);
         for(String concatField : fields) {
            // searchKeywords=|clickCategoryIds=1,2,3
            if(concatField.split("=").length == 2) {
               String fieldName = concatField.split("=")[0];
               String fieldValue = concatField.split("=")[1];
               if(fieldName.equals(field)) {
                  return fieldValue;
               }
            }
         }
      } catch (Exception e) {
         e.printStackTrace();
      }
      return null;
   }
   
   /**
    *               
    * @param str    
    * @param delimiter     
    * @param field    
    * @param newFieldValue   field 
    * @return    
    */
   public static String setFieldInConcatString(String str, 
         String delimiter, String field, String newFieldValue) {
      String[] fields = str.split(delimiter);
      
      for(int i = 0; i < fields.length; i++) {
         String fieldName = fields[i].split("=")[0];
         if(fieldName.equals(field)) {
            String concatField = fieldName + "=" + newFieldValue;
            fields[i] = concatField;
            break;
         }
      }
      
      StringBuffer buffer = new StringBuffer("");
      for(int i = 0; i < fields.length; i++) {
         buffer.append(fields[i]);
         if(i < fields.length - 1) {
            buffer.append("|");  
         }
      }
      
      return buffer.toString();
   }
   
}

テストクラス:
public class AccumuletorTest {
    public static void main(String[] args) {
        //   Spark   
        SparkConf conf = new SparkConf()
                .setAppName("AccumuletorTest")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        final Accumulator sessionAggrStatAccumulator = sc.accumulator(
                "", new SessionAggrStatAccumulator());
        List seq= new ArrayList();
        for (int i = 0; i < 1000; i++) {
            seq.add(Long.valueOf((int) (Math.random() * 1000)));
        }
        JavaRDD rdd=sc.parallelize(seq);
        //   RDD  
        rdd.foreach(new VoidFunction() {
            public void call(Long aLong) throws Exception {
                sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);
                calculateVisitLength(aLong);
                calculateStepLength(aLong);
            }
            /**
             *         
             * @param visitLength
             */
            private void calculateVisitLength(long visitLength) {
                if (visitLength >= 1 && visitLength <= 3) {
                    sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);
                } else if (visitLength >= 4 && visitLength <= 6) {
                    sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s);
                } else if (visitLength >= 7 && visitLength <= 9) {
                    sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s);
                } else if (visitLength >= 10 && visitLength <= 30) {
                    sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s);
                } else if (visitLength > 30 && visitLength <= 60) {
                    sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s);
                } else if (visitLength > 60 && visitLength <= 180) {
                    sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m);
                } else if (visitLength > 180 && visitLength <= 600) {
                    sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m);
                } else if (visitLength > 600 && visitLength <= 1800) {
                    sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m);
                } else if (visitLength > 1800) {
                    sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);
                }
            }

            /**
             *         
             * @param stepLength
             */
            private  void calculateStepLength(long stepLength) {
                if (stepLength >= 1 && stepLength <= 3) {
                    sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3);
                } else if (stepLength >= 4 && stepLength <= 6) {
                    sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6);
                } else if (stepLength >= 7 && stepLength <= 9) {
                    sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9);
                } else if (stepLength >= 10 && stepLength <= 30) {
                    sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30);
                } else if (stepLength > 30 && stepLength <= 60) {
                    sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);
                } else if (stepLength > 60) {
                    sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
                }
            }
        });
        System.out.println("accumuletor2:"+sessionAggrStatAccumulator.value());
    }

}