1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
| #!/usr/bin/env python # -*- coding: utf-8 -*- #minyt 2018.9.1 #获取24小时内出现的模块次数 # 该程序通过elasticsearch python client 获取相关精简数据,可以计算请求数、超时数、错误数、正确率、错误率等等 import MySQLdb from elasticsearch import Elasticsearch from elasticsearch import helpers
#定义elasticsearch集群索引名 index_name = "logstash-nginxlog-*"
#实例化Elasticsearch类,并设置超时间为180秒,默认是10秒的,如果数据量很大,时间设置更长一些 es = Elasticsearch(['elasticsearch01','elasticsearch02','elasticsearch03'],timeout=180)
#DSL(领域特定语言)查询语法,查询top50 sname的排列次数 data_sname = { "aggs": { "2": { "terms": { "field": "apistatus.sname.keyword", "size": 100, "order": { "_count": "desc" } } } }, "size": 0, "_source": { "excludes": [] }, "stored_fields": [ "*" ], "script_fields": {}, "docvalue_fields": [ "@timestamp" ], "query": { "bool": { "must": [ { "match_all": {} }, { "range": { "@timestamp": { "gte" : "now-24h/h", "lt" : "now/h" } } } ], "filter": [], "should": [], "must_not": [] } } }
#按照DSL(特定领域语言)语法查询获取数据 def get_original_data(): try: #根据上面条件搜索数据 res = es.search( index=index_name, size=0, body=data_sname ) return res
except: print "get original data failure"
#初始化数据库 def init_mysql(): # 打开数据库连接 db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )
# 使用cursor()方法获取操作游标 cursor = db.cursor()
# SQL 更新语句 sql = "update appname set count=0" try: # 执行SQL语句 cursor.execute(sql) # 提交到数据库执行 db.commit() except: # 发生错误时回滚 db.rollback()
# 关闭数据库连接 db.close()
def updata_mysql(sname_count,sname_list): # 打开数据库连接 db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )
# 使用cursor()方法获取操作游标 cursor = db.cursor()
# SQL 更新语句 sql = "update appname set count=%d where sname = '%s'" % (sname_count,sname_list) try: # 执行SQL语句 cursor.execute(sql) # 提交到数据库执行 db.commit() except: # 发生错误时回滚 db.rollback()
# 关闭数据库连接 db.close()
#根据Index数据结构通过Elasticsearch Python Client上传数据到新的Index def import_process_data(): try: #列表形式显示结果 res = get_original_data() #print res res_list = res.get('aggregations').get('2').get('buckets') #print res_list
#初始化数据库 init_mysql()
#获取24小时内出现的SNAME for value in res_list: sname_list = value.get('key') sname_count = value.get('doc_count') print sname_list,sname_count #更新sname_status值 updata_mysql(sname_count,sname_list)
except Exception, e: print repr(e)
if __name__ == "__main__": import_process_data()
|