日志分析項目

日志分析

123456

# 日志分析完整代碼

import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path

# 數據源
PATTERN = '''(?P<remote>[\d\.]{7,})\s-\s-\s-[(?P<datetime>[^\[\]]+)]\s\
"(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s\
(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''

regex = re.compile(PATTERN)  # 編譯
from user_agents import parse

ops = {
    'datetime': lambda datestr: datetime.datetime.strptime(datestr , '%d/%b/%Y:%H:%M:%S %z') ,
    'status': int ,
    'size': int ,
    'useragent': lambda ua: parse(ua)
}


def extract(line: str) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {name: ops.get(name , lambda x: x) for name , data in matcher.groupdict().items()}


# 裝載文件
def openfile(path: str):
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue  # TODO 解析失敗則拋棄或記錄日志


def load(*paths):
    for item in paths:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    yield from openfile(str(file))
                elif p.is_file():
                    yield from openfile(str(p))


# 數據處理
def source(second=1):
    """生成數據"""
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))) ,
            'value': random.randint(1 , 100)
        }
        time.sleep(second)


# 滑動窗口函數
def window(src: Queue , handler , width: int , interval: int):
    """
    窗口函數

    :param src: 數據源,緩存隊列,用來拿數據
    :param handler: 數據處理函數
    :param width: 時間窗口寬度,秒
    :param interval: 處理時間間隔,秒
    """

    start = datetime.datetime.strptime('20170101 000000 +0800' , '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800' , '%Y%m%d %H%M%S %z')
    buffer = []  # 窗口中的待計算數據
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        # 從數據源獲取數據
        data = src.get()
        if data:
            buffer.append(data)  # 存入臨時緩沖等待計算
            current = data['datetime']

        # 每隔interval計算buffer中的數據一次
        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print('{}'.format(ret))
            start = current

            # 清除超出width的數據
            buffer = [x for x in buffer if x['datetime'] > current - delta]


# 隨機數平均數測試函數
def handler(iterable):
    return sum(map(lambda x: x['value'] , iterable)) / len(iterable)


# 測試函數
def donothing_handler(iterable):
    return iterable


# 狀態碼占比
def status_handler(iterable):
    # 時間窗口內的一批數據
    status = {}
    for item in iterable:
        key = item['status']
        status[key] = status.get(key , 0) + 1
    # total = sum(status.values())
    total = len(iterable)
    return {k: status[k] / total for k , v in status.items()}


allbrowsers = {}


# 瀏覽器分析
def browser_handler(iterable):
    browsers = {}
    for item in iterable:
        ua = item['useragent']

        key = (ua.browser.family , ua.browser.version_string)
        browsers[key] = browsers.get(key , 0) + 1
        allbrowsers[key] = allbrowsers.get(key , 0) + 1

    print(sorted(allbrowsers.items() , key=lambda x: x[1] , reverse=True)[:10])
    return browsers


# 分發器
def dispatcher(src):
    # 分發器中記錄handler, 同時保存各自的隊列
    handlers = []
    queues = []

    def reg(handler , width: int , interval: int):
        """
        注冊窗口處理函數

        :param handler: 注冊的數據處理函數
        :param width: 時間窗口寬度
        :param interval: 時間間隔
        """
        q = Queue()
        queues.append(q)

        h = threading.Thread(target=window , args=(q , handler , width , interval))
        handlers.append(h)

    def run():
        for t in handlers:
            t.start()  # 啟動線程處理數據

        for item in src:  # 數據源取到的數據分發到所有隊列中
            for q in queues:
                q.put(item)

    return reg , run


if __name__ == "__main__":
    import sys

    # path = sys.argv[1]
    path = 'test.log'

    reg , run = dispatcher(load(path))
    reg(status_handler , 10 , 5)  # 注冊
    reg(browser_handler , 5 , 5)
    run()  # 運行

本文來自投稿,不代表Linux運維部落立場,如若轉載,請注明出處:http://www.www58058.com/97752

(2)
JacoJaco
上一篇 2018-05-07 21:01
下一篇 2018-05-07 21:31

相關推薦

  • 高階函數和裝飾器

    高階函數和裝飾器 高階函數 : 滿足以下條件之一的稱為高階函數 接受一個或多個函數作為參數 輸出一個函數 高階函數舉例: def counter(base): def inc(step=1): nonlocal base base += step return base return inc 1)自定義sort函數 def sort(itertable): …

    Python筆記 2018-04-23
  • Python 部分知識點總結(八)

    此篇博客只是記錄第十周未掌握或不熟悉的知識點,用來加深印象。

    Python筆記 2018-05-13
  • 函數

    函數 數學定義:y=f(x),y是x的函數,x是自變量 Python函數 有若干個語句塊,函數名稱,參數列表構成,它是組織代碼的最小單元 完成一定作用 函數的作用 結構化編程對代碼的最基本的封裝,一般按照功能組織一段代碼 封裝的目的為了復用,減少了冗余代碼 代碼更加簡潔美觀,更加易讀 函數的分類 內建函數,如max(),reversed()等 庫函數,如ma…

    2018-04-16
  • 遞歸函數

    遞歸函數 def foo(b,b1=3):print(“foo1 called “,b,b1)def foo2(c):foo3(c)print(“foo2 called”,c)def foo3(d):print(“foo3 called”)def mian():print(“…

    2018-04-16
  • 函數執行過程和遞歸函數練習題

    函數執行過程和遞歸函數練習題

    2018-04-16
  • python中 ‘is’ 和 ‘==’ 區別

    id –> 唯一身份標識符,?is比較的是id, ==比較的是value?

    2018-04-16
欧美性久久久久