pysnmpカスタムMIB作成SNMPエージェント(python開発snmp)

8491 ワード

概要
PySNMPはプラットフォームにまたがる純粋なPython SNMPエンジン実装である.SNMPエンジンは、プロキシ/マネージャ/プロキシロールで動作し、IPv 4/IPv 6および他のネットワーク転送によってSNMP v 1/v 2 c/v 3プロトコルバージョンを議論できる機能を備えています.
その名前にもかかわらず、SNMPは簡単なプロトコルではありません.たとえば、3番目のバージョンでは、複雑なオープン・セキュリティ・フレームワーク、マルチ言語機能、リモート構成、その他の機能が導入されています.PySNMPは複雑なシステムの詳細と機能に緊密に従い、ユーザーに最大の機能と柔軟性をもたらす.--pysnmp公式サイト
  • インストールソフトウェア
  • yum install net-snmp net-snmp-python  \
    libsmi-devel libsmi
    
    pip pyasn1==0.4.4
    pysnmp==4.4.2
    pysmi==0.3.4
    
  • 作成mib,MY-MIB
  • -- MY-MIB      python         。      
    MY-MIB DEFINITIONS ::= BEGIN
    
    IMPORTS
            OBJECT-TYPE, Integer32, NOTIFICATION-TYPE, enterprises
                         FROM SNMPv2-SMI
    ;
    
    myCompany       OBJECT IDENTIFIER ::= {enterprises 42}
    
    testCount OBJECT-TYPE
        SYNTAX Integer32
        MAX-ACCESS read-only
        STATUS current
        DESCRIPTION "A sample count of something."
        ::= {myCompany 1}
    
    testDescription OBJECT-TYPE
        SYNTAX OCTET STRING
        MAX-ACCESS read-only
        STATUS current
        DESCRIPTION "A description of something"
        ::= {myCompany 2}
    
    testTrap NOTIFICATION-TYPE
        STATUS current
        DESCRIPTION "Test notification"
        ::= {myCompany 3}
    
    END
    
  • MIBファイルをPythonモジュールpysnmpで読み取れるコード
  • に変換
    ps:ここMY-MIB.pyのファイル名はmibのファイルMY-MIB DEFINITIONS ::= BEGINの名前と同じにします
    build-pysnmp-mib  -o MY-MIB.py MY-MIB
    

    これにより、新しいファイルMY-MIB.pyがエクスポートされます.エージェントを問い合わせる場合は、net-snmp検索MIBの場所にMIBのコピーを追加する必要があります.MY-MIBファイル/usr/lib/python2.7/site-packages/pysnmp/smi/mibsを追加しました.
  • コード
  • from pysnmp.entity import engine, config
    from pysnmp import debug
    from pysnmp.entity.rfc3413 import cmdrsp, context, ntforg
    from pysnmp.carrier.asynsock.dgram import udp
    from pysnmp.smi import builder
    
    import threading
    import collections
    import time
    
    #can be useful
    #debug.setLogger(debug.Debug('all'))
    
    MibObject = collections.namedtuple('MibObject', ['mibName',
                                       'objectType', 'valueFunc'])
    
    class Mib(object):
        """Stores the data we want to serve. 
        """
    
        def __init__(self):
            self._lock = threading.RLock()
            self._test_count = 0
    
        def getTestDescription(self):
            return "My Description"
    
        def getTestCount(self):
            with self._lock:
                return self._test_count
    
        def setTestCount(self, value):
            with self._lock:
                self._test_count = value
    
    
    def createVariable(SuperClass, getValue, *args):
        """This is going to create a instance variable that we can export. 
        getValue is a function to call to retreive the value of the scalar
        """
        class Var(SuperClass):
            def readGet(self, name, *args):
                return name, self.syntax.clone(getValue())
        return Var(*args)
    
    
    class SNMPAgent(object):
        """Implements an Agent that serves the custom MIB and
        can send a trap.
        """
    
        def __init__(self, mibObjects):
            """
            mibObjects - a list of MibObject tuples that this agent
            will serve
            """
    
            #each SNMP-based application has an engine
            self._snmpEngine = engine.SnmpEngine()
    
            #open a UDP socket to listen for snmp requests
            config.addSocketTransport(self._snmpEngine, udp.domainName,
                                      udp.UdpTransport().openServerMode(('', 161)))
    
            #add a v2 user with the community string public
            config.addV1System(self._snmpEngine, "agent", "public")
            #let anyone accessing 'public' read anything in the subtree below,
            #which is the enterprises subtree that we defined our MIB to be in
            config.addVacmUser(self._snmpEngine, 2, "agent", "noAuthNoPriv",
                               readSubTree=(1,3,6,1,4,1))
    
            #each app has one or more contexts
            self._snmpContext = context.SnmpContext(self._snmpEngine)
    
            #the builder is used to load mibs. tell it to look in the
            #current directory for our new MIB. We'll also use it to
            #export our symbols later
            mibBuilder = self._snmpContext.getMibInstrum().getMibBuilder()
            mibSources = mibBuilder.getMibSources() + (builder.DirMibSource('.'),)
            mibBuilder.setMibSources(*mibSources)
    
            #our variables will subclass this since we only have scalar types
            #can't load this type directly, need to import it
            MibScalarInstance, = mibBuilder.importSymbols('SNMPv2-SMI',
                                                          'MibScalarInstance')
            #export our custom mib
            for mibObject in mibObjects:
                nextVar, = mibBuilder.importSymbols(mibObject.mibName,
                                                    mibObject.objectType)
                instance = createVariable(MibScalarInstance,
                                          mibObject.valueFunc,
                                          nextVar.name, (0,),
                                          nextVar.syntax)
                #need to export as Instance
                instanceDict = {str(nextVar.name)+"Instance":instance}
                mibBuilder.exportSymbols(mibObject.mibName,
                                         **instanceDict)
    
            # tell pysnmp to respotd to get, getnext, and getbulk
            cmdrsp.GetCommandResponder(self._snmpEngine, self._snmpContext)
            cmdrsp.NextCommandResponder(self._snmpEngine, self._snmpContext)
            cmdrsp.BulkCommandResponder(self._snmpEngine, self._snmpContext)
    
    
        def setTrapReceiver(self, host, community):
            """Send traps to the host using community string community
            """
            config.addV1System(self._snmpEngine, 'nms-area', community)
            config.addVacmUser(self._snmpEngine, 2, 'nms-area', 'noAuthNoPriv',
                               notifySubTree=(1,3,6,1,4,1))
            config.addTargetParams(self._snmpEngine,
                                   'nms-creds', 'nms-area', 'noAuthNoPriv', 1)
            config.addTargetAddr(self._snmpEngine, 'my-nms', udp.domainName,
                                 (host, 162), 'nms-creds',
                                 tagList='all-my-managers')
            #set last parameter to 'notification' to have it send
            #informs rather than unacknowledged traps
            config.addNotificationTarget(
                self._snmpEngine, 'test-notification', 'my-filter',
                'all-my-managers', 'trap')
    
    
        def sendTrap(self):
            print "Sending trap"
            ntfOrg = ntforg.NotificationOriginator(self._snmpContext)
            errorIndication = ntfOrg.sendNotification(
                self._snmpEngine,
                'test-notification',
                ('MY-MIB', 'testTrap'),
                ())
    
    
        def serve_forever(self):
            print "Starting agent"
            self._snmpEngine.transportDispatcher.jobStarted(1)
            try:
               self._snmpEngine.transportDispatcher.runDispatcher()
            except:
                self._snmpEngine.transportDispatcher.closeDispatcher()
                raise
    
    class Worker(threading.Thread):
        """Just to demonstrate updating the MIB
        and sending traps
        """
    
        def __init__(self, agent, mib):
            threading.Thread.__init__(self)
            self._agent = agent
            self._mib = mib
            self.setDaemon(True)
    
        def run(self):
            while True:
                time.sleep(3)
                self._mib.setTestCount(mib.getTestCount()+1)
                self._agent.sendTrap()
    
    if __name__ == '__main__':
        mib = Mib()
        # MY-MIB       mib    ,testDescription    oid, mib.getTestDescription    oid     。
        objects = [MibObject('MY-MIB', 'testDescription', mib.getTestDescription),
                   MibObject('MY-MIB', 'testCount', mib.getTestCount)]
        agent = SNMPAgent(objects)
        agent.setTrapReceiver('192.168.1.14', 'traps')
        Worker(agent, mib).start()
        try:
            agent.serve_forever()
        except KeyboardInterrupt:
            print "Shutting down"
    

    開始
    python snmpAgent.py
    

    テスト
    snmpget -v2c -c public {snmp   ip} {oid}