日志分析項目

日志分析

123456

# 日志分析完整代碼

import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path

# 數據源
PATTERN = '''(?P<remote>[\d\.]{7,})\s-\s-\s-[(?P<datetime>[^\[\]]+)]\s\
"(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s\
(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''

regex = re.compile(PATTERN)  # 編譯
from user_agents import parse

ops = {
    'datetime': lambda datestr: datetime.datetime.strptime(datestr , '%d/%b/%Y:%H:%M:%S %z') ,
    'status': int ,
    'size': int ,
    'useragent': lambda ua: parse(ua)
}


def extract(line: str) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {name: ops.get(name , lambda x: x) for name , data in matcher.groupdict().items()}


# 裝載文件
def openfile(path: str):
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue  # TODO 解析失敗則拋棄或記錄日志


def load(*paths):
    for item in paths:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    yield from openfile(str(file))
                elif p.is_file():
                    yield from openfile(str(p))


# 數據處理
def source(second=1):
    """生成數據"""
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))) ,
            'value': random.randint(1 , 100)
        }
        time.sleep(second)


# 滑動窗口函數
def window(src: Queue , handler , width: int , interval: int):
    """
    窗口函數

    :param src: 數據源,緩存隊列,用來拿數據
    :param handler: 數據處理函數
    :param width: 時間窗口寬度,秒
    :param interval: 處理時間間隔,秒
    """

    start = datetime.datetime.strptime('20170101 000000 +0800' , '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800' , '%Y%m%d %H%M%S %z')
    buffer = []  # 窗口中的待計算數據
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        # 從數據源獲取數據
        data = src.get()
        if data:
            buffer.append(data)  # 存入臨時緩沖等待計算
            current = data['datetime']

        # 每隔interval計算buffer中的數據一次
        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print('{}'.format(ret))
            start = current

            # 清除超出width的數據
            buffer = [x for x in buffer if x['datetime'] > current - delta]


# 隨機數平均數測試函數
def handler(iterable):
    return sum(map(lambda x: x['value'] , iterable)) / len(iterable)


# 測試函數
def donothing_handler(iterable):
    return iterable


# 狀態碼占比
def status_handler(iterable):
    # 時間窗口內的一批數據
    status = {}
    for item in iterable:
        key = item['status']
        status[key] = status.get(key , 0) + 1
    # total = sum(status.values())
    total = len(iterable)
    return {k: status[k] / total for k , v in status.items()}


allbrowsers = {}


# 瀏覽器分析
def browser_handler(iterable):
    browsers = {}
    for item in iterable:
        ua = item['useragent']

        key = (ua.browser.family , ua.browser.version_string)
        browsers[key] = browsers.get(key , 0) + 1
        allbrowsers[key] = allbrowsers.get(key , 0) + 1

    print(sorted(allbrowsers.items() , key=lambda x: x[1] , reverse=True)[:10])
    return browsers


# 分發器
def dispatcher(src):
    # 分發器中記錄handler, 同時保存各自的隊列
    handlers = []
    queues = []

    def reg(handler , width: int , interval: int):
        """
        注冊窗口處理函數

        :param handler: 注冊的數據處理函數
        :param width: 時間窗口寬度
        :param interval: 時間間隔
        """
        q = Queue()
        queues.append(q)

        h = threading.Thread(target=window , args=(q , handler , width , interval))
        handlers.append(h)

    def run():
        for t in handlers:
            t.start()  # 啟動線程處理數據

        for item in src:  # 數據源取到的數據分發到所有隊列中
            for q in queues:
                q.put(item)

    return reg , run


if __name__ == "__main__":
    import sys

    # path = sys.argv[1]
    path = 'test.log'

    reg , run = dispatcher(load(path))
    reg(status_handler , 10 , 5)  # 注冊
    reg(browser_handler , 5 , 5)
    run()  # 運行

本文來自投稿,不代表Linux運維部落立場,如若轉載,請注明出處:http://www.www58058.com/97752

(2)
JacoJaco
上一篇 2018-05-07
下一篇 2018-05-07

相關推薦

  • 高階函數

    高階函數 Frist class object 函數在python中是一等公民 函數也是對象,可調用的對象 函數可以作為普通變量,參數,返回值等等 數學概念y=g(f(x)) 在數學和計算機科學中,高階函數應當是至少滿足下面一個條件的函數 接受一個或者多個函數作為參數 輸出一個函數 計數器: def counter(base):def inc(step=1)…

    Python筆記 2018-04-23
  • Python文件操作

    計算機體系架構 運算器:完成各種算術運算、邏輯運算、出具傳輸等數據加工處理 控制器:控制程序的執行 CPU = 運算器 + 控制器 存儲器:用于記憶程序的數據,內存 輸入設備:將數據或程序輸入到計算機中 輸出設備:將數據或程序的處理結果展示給用戶 文件IO常用操作 open 打開 read 讀取 write 寫入 close 關閉 readline 行讀取 …

    Python筆記 2018-05-02
  • StringIO

    StringIOio模塊中的類From io import StringIO內存中,開辟的一個文本模式的buffer,可以像文件對象一樣操作它當close方法被調用的時候,這個buffer會被釋放StringIO操作getvalue() 獲取全部內容。跟文件指針沒有關系from io import StringIO# 內存中構建sio = StringIO(…

    Python筆記 2018-05-07
  • Centtos7搭建ftp服務

    Centtos7搭建ftp服務 下載安裝軟件包 yum -y install vsftpd ? 開啟啟用ftp服務 systemctl start vsftpd ???#設置立即啟用該服務 systemctl status vsftpd ??#查看該服務當前運行狀態 systemctl enable vsftpd ??#設置開機自動啟用該服務 systemc…

    Python筆記 2018-07-07
  • Python 部分知識點總結(七)

    此篇博客只是記錄第九周未掌握或不熟悉的知識點,用來加深印象。

    Python筆記 2018-05-06
欧美性久久久久