InfluxDBノート(Tornadoベース)

11580 ワード

from influxdb import DataFrameClient


class InfluxClient(object):
    """
    influx database    
    """

    def __init__(self, host, port, dbname, username, password, **setting):
        self._df = DataFrameClient(host=host, port=port, username=username,
                                   database=dbname, password=password, **setting)

    def query_all(self, statement):
        """
              
        :param statement:
        :return:
        """
        rs = self._df.query(statement if isinstance(statement, str) else str(statement, 'utf8'))
        if len(rs) == 0:
            return []
        else:
            return list(rs.values())[0].to_dict('records')

    def query_one(self, statement):
        """
                ,    
        :return:
        """
        rs = self._df.query(statement if isinstance(statement, str) else str(statement, 'utf8'))
        if len(rs) != 0:
            return list(rs.values())[0].to_dict('records')[0]
        else:
            raise Exception('return no value')

    def query_exist(self, statement):
        """
                
        :param statement:
        :return:
        """
        rs = self._df.query(statement if isinstance(statement, str) else str(statement, 'utf8'))
        return len(rs) != 0

    def query_app_consume(self, app_name, bg_millis, ed_millis):
        """
                  
        :param app_name:
        :param bg_millis:
        :param ed_millis:
        :return:
        """
        df = self._df.query(
            "SELECT _100,_500,_1000,_2000,_3000,_4000,_5000,_10000,_20000,_9223372036854775807 FROM app_stat"
            " WHERE app_name = '{app_name}' AND time >= {bg_millis}ms AND time < {ed_millis}ms".format(
                app_name=app_name, bg_millis=bg_millis, ed_millis=ed_millis))
        if len(df) == 0:
            return None
        else:
            return list(df.values())[0]

    def query_app_summary(self, app_name, bg_millis, ed_millis):
        """
                  
        :param app_name:
        :param bg_millis:
        :param ed_millis:
        :return:
        """
        df = self._df.query(
            "SELECT request_count,error_count,concurrent_max,running_count,request_time_mills FROM app_stat"
            " WHERE app_name = '{app_name}' AND time >= {bg_millis}ms AND time < {ed_millis}ms".format(
                app_name=app_name, bg_millis=bg_millis, ed_millis=ed_millis))
        if len(df) == 0:
            return None
        else:
            return list(df.values())[0]

    def query_app_interface(self, app_name, bg_millis, ed_millis):
        """
                
        :param app_name:
        :param bg_millis:
        :param ed_millis:
        :return :type default_dict
        """
        dfs = self._df.query("SELECT * FROM req_stat WHERE app_name = '{app_name}' AND time >= {bg_millis}ms AND "
                             "time < {ed_millis}ms GROUP BY uri".format(app_name=app_name, bg_millis=bg_millis,
                                                                        ed_millis=ed_millis))
        if len(dfs) == 0:
            return None
        else:
            return dfs

    def query_host_memory(self, host_name, bg_millis, ed_millis):
        """
                
        :param host_name:
        :param bg_millis:
        :param ed_millis:
        :return:
        """
        df = self._df.query(
            "SELECT used,actual_used,actual_free,total,used_percent,used_swap,total_swap FROM memory_stat"
            " WHERE host_name = '{host_name}' AND time >= {bg_millis}ms AND time < {ed_millis}ms".format(
                host_name=host_name, bg_millis=bg_millis, ed_millis=ed_millis))
        if len(df) == 0:
            return None
        else:
            return list(df.values())[0]

    def query_host_cpu(self, host_name, bg_millis, ed_millis):
        """
          cpu    
        :param host_name:
        :param bg_millis:
        :param ed_millis:
        :return:
        """
        # user      , influxdb shell  
        df = self._df.query(
            "SELECT combined, irq, idle, nice, soft_irq, stolen, sys, \"user\", wait FROM cpu_stat"
            " WHERE host_name = '{host_name}' AND time >= {bg_millis}ms AND time < {ed_millis}ms"
                .format(host_name=host_name, bg_millis=bg_millis, ed_millis=ed_millis))
        if len(df) == 0:
            return None
        else:
            return list(df.values())[0]

    def query_host_disk(self, host_name, bg_millis, ed_millis):
        """
                
        :param host_name:
        :param bg_millis:
        :param ed_millis:
        :return :type default_dict
        """
        dfs = self._df.query("SELECT avail,free,used,use_percent,total FROM disk_stat WHERE host_name = '{host_name}'"
                             " AND time >= {bg_millis}ms AND time < {ed_millis}ms GROUP BY dir_name"
                             .format(host_name=host_name, bg_millis=bg_millis, ed_millis=ed_millis))
        if len(dfs) == 0:
            return None
        else:
            return dfs

    def query_host_net(self, host_name, bg_millis, ed_millis):
        """
                
        :param host_name:
        :param bg_millis:
        :param ed_millis:
        :return:
        """
        dfs = self._df.query(
            "SELECT rx_bytes,rx_dropped,rx_errors,rx_packets, tx_bytes,tx_dropped, tx_errors, tx_packets "
            "FROM net_stat WHERE host_name = '{host_name}' AND time >= {bg_millis}ms AND time < {ed_millis}ms "
            "group by if_name".format(host_name=host_name, bg_millis=bg_millis, ed_millis=ed_millis))
        if len(dfs) == 0:
            return None
        else:
            return dfs

    def close(self):
        """
            
        :return:
        """
        self._df.close()