Flank一括放送変数使用及びml mapWithBcVarable方法使用
方法1:flink Data Set APIを使用する
points.map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids")// map
import scala.collection.JavaConverters._
final class SelectNearestCenter extends RichMapFunction[DenseVector, (Int, DenseVector)] with Serializable{
private var centroids: Traversable[DenseVector] = null
override def open(parameters: Configuration) {
centroids = getRuntimeContext.getBroadcastVariable[DenseVector]("centroids").asScala
}
def map(p: DenseVector): (Int, DenseVector) = {
//use centroids ...
}
}
方法2:Flink ml mapWithBcVarableを使用する方法points.mapWithBcVariable(currentCentroids) {
(point, center) => {
// center
}
}