pysnmpカスタムMIB作成SNMPエージェント(python開発snmp)
8491 ワード
概要
PySNMPはプラットフォームにまたがる純粋なPython SNMPエンジン実装である.SNMPエンジンは、プロキシ/マネージャ/プロキシロールで動作し、IPv 4/IPv 6および他のネットワーク転送によってSNMP v 1/v 2 c/v 3プロトコルバージョンを議論できる機能を備えています.
その名前にもかかわらず、SNMPは簡単なプロトコルではありません.たとえば、3番目のバージョンでは、複雑なオープン・セキュリティ・フレームワーク、マルチ言語機能、リモート構成、その他の機能が導入されています.PySNMPは複雑なシステムの詳細と機能に緊密に従い、ユーザーに最大の機能と柔軟性をもたらす.--pysnmp公式サイトインストールソフトウェア 作成mib,MY-MIB MIBファイルをPythonモジュールpysnmpで読み取れるコード に変換
ps:ここMY-MIB.pyのファイル名はmibのファイル
これにより、新しいファイルコード
開始
テスト
PySNMPはプラットフォームにまたがる純粋なPython SNMPエンジン実装である.SNMPエンジンは、プロキシ/マネージャ/プロキシロールで動作し、IPv 4/IPv 6および他のネットワーク転送によってSNMP v 1/v 2 c/v 3プロトコルバージョンを議論できる機能を備えています.
その名前にもかかわらず、SNMPは簡単なプロトコルではありません.たとえば、3番目のバージョンでは、複雑なオープン・セキュリティ・フレームワーク、マルチ言語機能、リモート構成、その他の機能が導入されています.PySNMPは複雑なシステムの詳細と機能に緊密に従い、ユーザーに最大の機能と柔軟性をもたらす.--pysnmp公式サイト
yum install net-snmp net-snmp-python \
libsmi-devel libsmi
pip pyasn1==0.4.4
pysnmp==4.4.2
pysmi==0.3.4
-- MY-MIB python 。
MY-MIB DEFINITIONS ::= BEGIN
IMPORTS
OBJECT-TYPE, Integer32, NOTIFICATION-TYPE, enterprises
FROM SNMPv2-SMI
;
myCompany OBJECT IDENTIFIER ::= {enterprises 42}
testCount OBJECT-TYPE
SYNTAX Integer32
MAX-ACCESS read-only
STATUS current
DESCRIPTION "A sample count of something."
::= {myCompany 1}
testDescription OBJECT-TYPE
SYNTAX OCTET STRING
MAX-ACCESS read-only
STATUS current
DESCRIPTION "A description of something"
::= {myCompany 2}
testTrap NOTIFICATION-TYPE
STATUS current
DESCRIPTION "Test notification"
::= {myCompany 3}
END
ps:ここMY-MIB.pyのファイル名はmibのファイル
MY-MIB DEFINITIONS ::= BEGIN
の名前と同じにしますbuild-pysnmp-mib -o MY-MIB.py MY-MIB
これにより、新しいファイル
MY-MIB.py
がエクスポートされます.エージェントを問い合わせる場合は、net-snmp検索MIBの場所にMIBのコピーを追加する必要があります.MY-MIBファイル/usr/lib/python2.7/site-packages/pysnmp/smi/mibs
を追加しました.from pysnmp.entity import engine, config
from pysnmp import debug
from pysnmp.entity.rfc3413 import cmdrsp, context, ntforg
from pysnmp.carrier.asynsock.dgram import udp
from pysnmp.smi import builder
import threading
import collections
import time
#can be useful
#debug.setLogger(debug.Debug('all'))
MibObject = collections.namedtuple('MibObject', ['mibName',
'objectType', 'valueFunc'])
class Mib(object):
"""Stores the data we want to serve.
"""
def __init__(self):
self._lock = threading.RLock()
self._test_count = 0
def getTestDescription(self):
return "My Description"
def getTestCount(self):
with self._lock:
return self._test_count
def setTestCount(self, value):
with self._lock:
self._test_count = value
def createVariable(SuperClass, getValue, *args):
"""This is going to create a instance variable that we can export.
getValue is a function to call to retreive the value of the scalar
"""
class Var(SuperClass):
def readGet(self, name, *args):
return name, self.syntax.clone(getValue())
return Var(*args)
class SNMPAgent(object):
"""Implements an Agent that serves the custom MIB and
can send a trap.
"""
def __init__(self, mibObjects):
"""
mibObjects - a list of MibObject tuples that this agent
will serve
"""
#each SNMP-based application has an engine
self._snmpEngine = engine.SnmpEngine()
#open a UDP socket to listen for snmp requests
config.addSocketTransport(self._snmpEngine, udp.domainName,
udp.UdpTransport().openServerMode(('', 161)))
#add a v2 user with the community string public
config.addV1System(self._snmpEngine, "agent", "public")
#let anyone accessing 'public' read anything in the subtree below,
#which is the enterprises subtree that we defined our MIB to be in
config.addVacmUser(self._snmpEngine, 2, "agent", "noAuthNoPriv",
readSubTree=(1,3,6,1,4,1))
#each app has one or more contexts
self._snmpContext = context.SnmpContext(self._snmpEngine)
#the builder is used to load mibs. tell it to look in the
#current directory for our new MIB. We'll also use it to
#export our symbols later
mibBuilder = self._snmpContext.getMibInstrum().getMibBuilder()
mibSources = mibBuilder.getMibSources() + (builder.DirMibSource('.'),)
mibBuilder.setMibSources(*mibSources)
#our variables will subclass this since we only have scalar types
#can't load this type directly, need to import it
MibScalarInstance, = mibBuilder.importSymbols('SNMPv2-SMI',
'MibScalarInstance')
#export our custom mib
for mibObject in mibObjects:
nextVar, = mibBuilder.importSymbols(mibObject.mibName,
mibObject.objectType)
instance = createVariable(MibScalarInstance,
mibObject.valueFunc,
nextVar.name, (0,),
nextVar.syntax)
#need to export as Instance
instanceDict = {str(nextVar.name)+"Instance":instance}
mibBuilder.exportSymbols(mibObject.mibName,
**instanceDict)
# tell pysnmp to respotd to get, getnext, and getbulk
cmdrsp.GetCommandResponder(self._snmpEngine, self._snmpContext)
cmdrsp.NextCommandResponder(self._snmpEngine, self._snmpContext)
cmdrsp.BulkCommandResponder(self._snmpEngine, self._snmpContext)
def setTrapReceiver(self, host, community):
"""Send traps to the host using community string community
"""
config.addV1System(self._snmpEngine, 'nms-area', community)
config.addVacmUser(self._snmpEngine, 2, 'nms-area', 'noAuthNoPriv',
notifySubTree=(1,3,6,1,4,1))
config.addTargetParams(self._snmpEngine,
'nms-creds', 'nms-area', 'noAuthNoPriv', 1)
config.addTargetAddr(self._snmpEngine, 'my-nms', udp.domainName,
(host, 162), 'nms-creds',
tagList='all-my-managers')
#set last parameter to 'notification' to have it send
#informs rather than unacknowledged traps
config.addNotificationTarget(
self._snmpEngine, 'test-notification', 'my-filter',
'all-my-managers', 'trap')
def sendTrap(self):
print "Sending trap"
ntfOrg = ntforg.NotificationOriginator(self._snmpContext)
errorIndication = ntfOrg.sendNotification(
self._snmpEngine,
'test-notification',
('MY-MIB', 'testTrap'),
())
def serve_forever(self):
print "Starting agent"
self._snmpEngine.transportDispatcher.jobStarted(1)
try:
self._snmpEngine.transportDispatcher.runDispatcher()
except:
self._snmpEngine.transportDispatcher.closeDispatcher()
raise
class Worker(threading.Thread):
"""Just to demonstrate updating the MIB
and sending traps
"""
def __init__(self, agent, mib):
threading.Thread.__init__(self)
self._agent = agent
self._mib = mib
self.setDaemon(True)
def run(self):
while True:
time.sleep(3)
self._mib.setTestCount(mib.getTestCount()+1)
self._agent.sendTrap()
if __name__ == '__main__':
mib = Mib()
# MY-MIB mib ,testDescription oid, mib.getTestDescription oid 。
objects = [MibObject('MY-MIB', 'testDescription', mib.getTestDescription),
MibObject('MY-MIB', 'testCount', mib.getTestCount)]
agent = SNMPAgent(objects)
agent.setTrapReceiver('192.168.1.14', 'traps')
Worker(agent, mib).start()
try:
agent.serve_forever()
except KeyboardInterrupt:
print "Shutting down"
開始
python snmpAgent.py
テスト
snmpget -v2c -c public {snmp ip} {oid}