PythonはRedisを用いてジョブスケジューリングシステムを実現する(超簡単)
4886 ワード
概要
Redisはオープンソースで、先進的なkey-valueストレージであり、高性能で拡張性の高いWebアプリケーションを構築するための完璧なソリューションです.
Redisはその多くの競争から受け継いだ3つの主な特徴です.
Redisデータベースは完全にメモリにあり、ディスクを使用すると永続性のみに使用されます.
多くのキー値データストレージに比べて、Redisは豊富なデータ型を持っています.
Redisは、任意の数のスレーブサーバにデータをコピーすることができる.
Redisのメリット
異常高速:Redisの速度は非常に速く、毎秒約11万セット、毎秒約81000+レコードを実行できます.
豊富なデータ型をサポート:Redisは、リスト、集合、秩序化集合、ハッシュデータ型のような多くの開発者をサポートしています.これにより、データ型を介して処理できる問題を知っているため、さまざまな問題を解決しやすくなります.
操作はすべてアトミックです.すべてのRedis操作はアトミックです.これにより、2つのクライアントが同時にアクセスしているRedisサーバが更新された値を得ることが保証されます.
マルチファンクションユーティリティ:Redisは、キャッシュ、メッセージ、キュー使用(Redisオリジナルサポートパブリケーション/サブスクリプション)、Webアプリケーションセッション、Webアプリケーションヒットカウントなどの短いデータ、アプリケーションを使用できるマルチユーティリティです.
トピックに進む:
Redisはメモリデータベースの典型的な代表として、多くのアプリケーションシーンで使用されているが、ここではRedisのpub/sub機能について、この機能によって簡単なジョブスケジューリングシステムを実現する方法についてのみ説明する.ここでは単純な考えを示したいだけなので、この例には、エラー処理、持続化など、考慮すべきことがたくさんあります.
次は実現上のアイデアです
MyMaster:クラスタのmasterノードプログラムで、ジョブの生成、ジョブの配布、実行結果の取得を担当します.
MySlave:クラスタの計算ノードプログラムで、各計算ノードが1つずつ、ジョブを取得して実行し、結果をmasterノードに送信します.
channel CHANNEL_DISPATCH:slaveノードごとにチャネルを購読します.たとえば、「CHANNEL_DISPATCH_[idxまたはマシン名]」など、masterはこのチャネル内のpublishがdispatchされるジョブを購読します.
channel CHANNEL_RESULT:ジョブの結果を保存するためのchannelで、masterとslaveはこのchannelを共有し、masterはこのchannelを購読してジョブの実行結果を取得し、各slaveはジョブの実行結果をこのchannelに公開する責任を負う.
Masterコード
説明
MyMasterクラス-masterメインプログラム、dispatchとresulthandlerのスレッドを起動するために使用
MyServer DispatchThreadクラス-ジョブスレッドを配布し、ジョブを生成してコンピューティングノードに配布
MyServerResultHandleThreadクラス-ジョブ実行結果処理スレッド、channelからジョブ結果を取得して表示
Slaveコード
説明
MySlaveクラス-slaveノードメインプログラムMyJobWorkerThreadのスレッドを起動する
MyJobWorkerThreadクラス-channelから配布されたジョブを取得し、実行結果をmasterに送信
テスト
まずMySlaveを実行して配布ジョブchannelを定義します.
その後、MyMaster配布ジョブを実行し、実行結果を表示します.
PythonがRedisを使ってジョブスケジューリングシステムを実現することについて(超簡単)、編集者は皆さんにこんなにたくさん紹介して、みんなに役に立つことを望みます!
Redisはオープンソースで、先進的なkey-valueストレージであり、高性能で拡張性の高いWebアプリケーションを構築するための完璧なソリューションです.
Redisはその多くの競争から受け継いだ3つの主な特徴です.
Redisデータベースは完全にメモリにあり、ディスクを使用すると永続性のみに使用されます.
多くのキー値データストレージに比べて、Redisは豊富なデータ型を持っています.
Redisは、任意の数のスレーブサーバにデータをコピーすることができる.
Redisのメリット
異常高速:Redisの速度は非常に速く、毎秒約11万セット、毎秒約81000+レコードを実行できます.
豊富なデータ型をサポート:Redisは、リスト、集合、秩序化集合、ハッシュデータ型のような多くの開発者をサポートしています.これにより、データ型を介して処理できる問題を知っているため、さまざまな問題を解決しやすくなります.
操作はすべてアトミックです.すべてのRedis操作はアトミックです.これにより、2つのクライアントが同時にアクセスしているRedisサーバが更新された値を得ることが保証されます.
マルチファンクションユーティリティ:Redisは、キャッシュ、メッセージ、キュー使用(Redisオリジナルサポートパブリケーション/サブスクリプション)、Webアプリケーションセッション、Webアプリケーションヒットカウントなどの短いデータ、アプリケーションを使用できるマルチユーティリティです.
トピックに進む:
Redisはメモリデータベースの典型的な代表として、多くのアプリケーションシーンで使用されているが、ここではRedisのpub/sub機能について、この機能によって簡単なジョブスケジューリングシステムを実現する方法についてのみ説明する.ここでは単純な考えを示したいだけなので、この例には、エラー処理、持続化など、考慮すべきことがたくさんあります.
次は実現上のアイデアです
MyMaster:クラスタのmasterノードプログラムで、ジョブの生成、ジョブの配布、実行結果の取得を担当します.
MySlave:クラスタの計算ノードプログラムで、各計算ノードが1つずつ、ジョブを取得して実行し、結果をmasterノードに送信します.
channel CHANNEL_DISPATCH:slaveノードごとにチャネルを購読します.たとえば、「CHANNEL_DISPATCH_[idxまたはマシン名]」など、masterはこのチャネル内のpublishがdispatchされるジョブを購読します.
channel CHANNEL_RESULT:ジョブの結果を保存するためのchannelで、masterとslaveはこのchannelを共有し、masterはこのchannelを購読してジョブの実行結果を取得し、各slaveはジョブの実行結果をこのchannelに公開する責任を負う.
Masterコード
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MyMaster():
def __init__(self):
pass
def start(self):
MyServerResultHandleThread().start()
MyServerDispatchThread().start()
class MyServerDispatchThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
for i in range(1, 100):
channel = CHANNEL_DISPATCH + '_' + str(random.randint(1, 3))
print("Dispatch job %s to %s" % (str(i), channel))
ret = r.publish(channel, str(i))
if ret == 0:
print("Dispatch job %s failed." % str(i))
time.sleep(5)
class MyServerResultHandleThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(CHANNEL_RESULT)
for message in p.listen():
if message['type'] != 'message':
continue
print("Received finished job %s" % message['data'])
if __name__ == "__main__":
MyMaster().start()
time.sleep(10000)
説明
MyMasterクラス-masterメインプログラム、dispatchとresulthandlerのスレッドを起動するために使用
MyServer DispatchThreadクラス-ジョブスレッドを配布し、ジョブを生成してコンピューティングノードに配布
MyServerResultHandleThreadクラス-ジョブ実行結果処理スレッド、channelからジョブ結果を取得して表示
Slaveコード
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MySlave():
def __init__(self):
pass
def start(self):
for i in range(1, 4):
MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str(i)).start()
class MyJobWorkerThread(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.channel = channel
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(self.channel)
for message in p.listen():
if message['type'] != 'message':
continue
print("%s: Received dispatched job %s " % (self.channel, message['data']))
print("%s: Run dispatched job %s " % (self.channel, message['data']))
time.sleep(2)
print("%s: Send finished job %s " % (self.channel, message['data']))
ret = r.publish(CHANNEL_RESULT, message['data'])
if ret == 0:
print("%s: Send finished job %s failed." % (self.channel, message['data']))
if __name__ == "__main__":
MySlave().start()
time.sleep(10000)
説明
MySlaveクラス-slaveノードメインプログラムMyJobWorkerThreadのスレッドを起動する
MyJobWorkerThreadクラス-channelから配布されたジョブを取得し、実行結果をmasterに送信
テスト
まずMySlaveを実行して配布ジョブchannelを定義します.
その後、MyMaster配布ジョブを実行し、実行結果を表示します.
PythonがRedisを使ってジョブスケジューリングシステムを実現することについて(超簡単)、編集者は皆さんにこんなにたくさん紹介して、みんなに役に立つことを望みます!