SparkStreaming統合Redis

3027 ワード

需要:Spark RDD演算子でRedisにアクセスしたい.
構想:Broadcast変数によってRedis構成情報をすべての計算ノードにブロードキャストする.Redis接続の遅延作成をlazyキーワードで実現します.
具体的な手順:
1.Redisクライアントパッケージクラスの定義package   xxx   import   redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig, Protocol} import   org.apache.commons.pool2.impl.GenericObjectPoolConfig import   org.slf4j.LoggerFactory import   com.typesafe.scalalogging.slf4j.Logger /**   * Redis ,   *   */ class   RedisSink(makeJedisPool : () => JedisPool)  extends   Serializable {    lazy val pool = makeJedisPool() } /**   * Redis ,   *   */ object RedisSink{      def apply(redisHost: String, redisPort: Int,                 password: String, database: Int): RedisSink = {      val createJedisPoolFunc = () => {        val poolConfig =  new   GenericObjectPoolConfig()        val pool =  new   JedisPool(poolConfig, redisHost, redisPort, Protocol.DEFAULT_TIMEOUT, password, database)                 val hook =  new   Thread {          override def run = {            pool.destroy()          }        }        sys.addShutdownHook(hook.run)        pool      }      new   RedisSink(createJedisPoolFunc)    } }
2.サービス初期化時にRedisSinkを作成する// Redis if ( redisPassword.isEmpty() ){    redisPassword =  null     // , null, } var redisSink : Broadcast[RedisSink] = {    sparkSession.sparkContext.broadcast(RedisSink(redisHost, redisPort, redisPassword, redisDatabase)) } appContext.redisSink = redisSink    // redisSink
3.redisにアクセスしたい場所(計算ノードを含む)で、開梱して使用します.val jedisPool = appContext.redisSink.value.pool val jedis = jedisPool.getResource jedis.hset(RedisKeys.XXX,  "XXX" "YYY" ) jedis.close()