# 日志分析完整代碼 import random import datetime import time from queue import Queue import threading import re from pathlib import Path # 數據源 PATTERN = '''(?P<remote>[\d\.]{7,})\s-\s-\s-[(?P<datetime>[^\[\]]+)]\s\ "(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s\ (?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"''' regex = re.compile(PATTERN) # 編譯 from user_agents import parse ops = { 'datetime': lambda datestr: datetime.datetime.strptime(datestr , '%d/%b/%Y:%H:%M:%S %z') , 'status': int , 'size': int , 'useragent': lambda ua: parse(ua) } def extract(line: str) -> dict: matcher = regex.match(line) if matcher: return {name: ops.get(name , lambda x: x) for name , data in matcher.groupdict().items()} # 裝載文件 def openfile(path: str): with open(path) as f: for line in f: fields = extract(line) if fields: yield fields else: continue # TODO 解析失敗則拋棄或記錄日志 def load(*paths): for item in paths: p = Path(item) if not p.exists(): continue if p.is_dir(): for file in p.iterdir(): if file.is_file(): yield from openfile(str(file)) elif p.is_file(): yield from openfile(str(p)) # 數據處理 def source(second=1): """生成數據""" while True: yield { 'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))) , 'value': random.randint(1 , 100) } time.sleep(second) # 滑動窗口函數 def window(src: Queue , handler , width: int , interval: int): """ 窗口函數 :param src: 數據源,緩存隊列,用來拿數據 :param handler: 數據處理函數 :param width: 時間窗口寬度,秒 :param interval: 處理時間間隔,秒 """ start = datetime.datetime.strptime('20170101 000000 +0800' , '%Y%m%d %H%M%S %z') current = datetime.datetime.strptime('20170101 010000 +0800' , '%Y%m%d %H%M%S %z') buffer = [] # 窗口中的待計算數據 delta = datetime.timedelta(seconds=width - interval) while True: # 從數據源獲取數據 data = src.get() if data: buffer.append(data) # 存入臨時緩沖等待計算 current = data['datetime'] # 每隔interval計算buffer中的數據一次 if (current - start).total_seconds() >= interval: ret = handler(buffer) print('{}'.format(ret)) start = current # 清除超出width的數據 buffer = [x for x in buffer if x['datetime'] > current - delta] # 隨機數平均數測試函數 def handler(iterable): return sum(map(lambda x: x['value'] , iterable)) / len(iterable) # 測試函數 def donothing_handler(iterable): return iterable # 狀態碼占比 def status_handler(iterable): # 時間窗口內的一批數據 status = {} for item in iterable: key = item['status'] status[key] = status.get(key , 0) + 1 # total = sum(status.values()) total = len(iterable) return {k: status[k] / total for k , v in status.items()} allbrowsers = {} # 瀏覽器分析 def browser_handler(iterable): browsers = {} for item in iterable: ua = item['useragent'] key = (ua.browser.family , ua.browser.version_string) browsers[key] = browsers.get(key , 0) + 1 allbrowsers[key] = allbrowsers.get(key , 0) + 1 print(sorted(allbrowsers.items() , key=lambda x: x[1] , reverse=True)[:10]) return browsers # 分發器 def dispatcher(src): # 分發器中記錄handler, 同時保存各自的隊列 handlers = [] queues = [] def reg(handler , width: int , interval: int): """ 注冊窗口處理函數 :param handler: 注冊的數據處理函數 :param width: 時間窗口寬度 :param interval: 時間間隔 """ q = Queue() queues.append(q) h = threading.Thread(target=window , args=(q , handler , width , interval)) handlers.append(h) def run(): for t in handlers: t.start() # 啟動線程處理數據 for item in src: # 數據源取到的數據分發到所有隊列中 for q in queues: q.put(item) return reg , run if __name__ == "__main__": import sys # path = sys.argv[1] path = 'test.log' reg , run = dispatcher(load(path)) reg(status_handler , 10 , 5) # 注冊 reg(browser_handler , 5 , 5) run() # 運行
本文來自投稿,不代表Linux運維部落立場,如若轉載,請注明出處:http://www.www58058.com/97752