SparkStreaming統合Redis
3027 ワード
需要:Spark RDD演算子でRedisにアクセスしたい.
構想:Broadcast変数によってRedis構成情報をすべての計算ノードにブロードキャストする.Redis接続の遅延作成をlazyキーワードで実現します.
具体的な手順:
1.Redisクライアントパッケージクラスの定義
2.サービス初期化時にRedisSinkを作成する
3.redisにアクセスしたい場所(計算ノードを含む)で、開梱して使用します.
構想:Broadcast変数によってRedis構成情報をすべての計算ノードにブロードキャストする.Redis接続の遅延作成をlazyキーワードで実現します.
具体的な手順:
1.Redisクライアントパッケージクラスの定義
package
xxx
import
redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig, Protocol}
import
org.apache.commons.pool2.impl.GenericObjectPoolConfig
import
org.slf4j.LoggerFactory
import
com.typesafe.scalalogging.slf4j.Logger
/**
* Redis ,
*
*/
class
RedisSink(makeJedisPool : () => JedisPool)
extends
Serializable {
lazy val pool = makeJedisPool()
}
/**
* Redis ,
*
*/
object RedisSink{
def apply(redisHost: String, redisPort: Int,
password: String, database: Int): RedisSink = {
val createJedisPoolFunc = () => {
val poolConfig =
new
GenericObjectPoolConfig()
val pool =
new
JedisPool(poolConfig, redisHost, redisPort, Protocol.DEFAULT_TIMEOUT, password, database)
val hook =
new
Thread {
override def run = {
pool.destroy()
}
}
sys.addShutdownHook(hook.run)
pool
}
new
RedisSink(createJedisPoolFunc)
}
}
2.サービス初期化時にRedisSinkを作成する
// Redis
if
( redisPassword.isEmpty() ){
redisPassword =
null
// , null,
}
var redisSink : Broadcast[RedisSink] = {
sparkSession.sparkContext.broadcast(RedisSink(redisHost, redisPort, redisPassword, redisDatabase))
}
appContext.redisSink = redisSink
// redisSink
3.redisにアクセスしたい場所(計算ノードを含む)で、開梱して使用します.
val jedisPool = appContext.redisSink.value.pool
val jedis = jedisPool.getResource
jedis.hset(RedisKeys.XXX,
"XXX"
,
"YYY"
)
jedis.close()