數據載入
def load(path:str):
with open(path) as f:
for line in f:
tmp = extract(line)
if tmp:
yield tmp
else:
# TODO 解析失敗就拋棄,或者打印日志
continue
時間窗口分析
概念
- 很多數據,例如日志,都和時間相關的,都是按照時間順序產生的。
- 產生的數據分析的時候,要按照時間求值
- interval 表示每一次求值的時間間隔
- width 時間窗口寬度,指的一次求值的時間窗口寬度
當width > interval
- 數據求值是會有重疊
當width = interval
- 數據求值沒有重疊
當width < interval
- 一般不采納,因為這樣會有數據流失
時序數據
- 運行環境中,日志、監控等產生的數據都是與時間相關的數據,按照時間先后產生并記錄下來的數據,所以一般按照時間對數據進行分析
時序數據分析的節本程序結構
- 隨機生成幾個數,產生時間相關的數據,返回 時間 + 隨機數
- 每次取三個值,求平均值
import random import datetime import time def f(): while True: yield {'value':random.randrange(100), 'time':datetime.datetime.now()} time.sleep(1) src = f() items = [next(src) for _ in range(3)] def handler(iterable): vals = [x['value'] for x in iterable] return sum(vals) / len(vals) print(items) print(handler(items))
窗口函數實現
import random
import datetime
import time
# 數據源函數
def f():
while True:
yield {'value':random.randrange(100), 'time':datetime.datetime.now()}
time.sleep(5)
def window(src, handler, width:int, interval:int):
"""
窗口函數
:param src: 數據源,生成器,用來拿數據
:param handler: 數據處理函數
:param width: 時間窗口寬度,秒
:param interval: 處理時間間隔,秒
"""
# 初始兩個時間段
start = datetime.datetime.strptime('20170101 00:00:00', '%Y%m%d %H:%M:%S')
current = datetime.datetime.strptime('20170101 00:01:00', '%Y%m%d %H:%M:%S')
buffer = [] # 窗口中待計算的數據
delta = datetime.timedelta(seconds = width - interval)
while True:
# 從數據源獲取數據
data = next(src)
# 存入臨時緩沖等待計算
if data: # 篩掉不符合的數據
buffer.append(data)
current = data['time']
# 進入循環開始操作
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print('{:.2f}'.format(ret))
start = current
# 處理重疊的數據
buffer = [x for x in buffer if x['time'] > current - delta]
def handler(iterable):
vals = [x['value'] for x in iterable]
return sum(vals) / len(vals)
- 第41行current – delta是因為現在的current還沒有更新,而current的時間值到當前current時間值之間的數據正好是重疊的數據
- 當width和interval給一樣的時候,那么delta為0,所以不會有重復數據
- 相當于用給定的width往后滑動,一下走這么多interval
- 比如這個,是時間寬為4往下走,兩個兩個的往后走,所以每次會有兩個重復的數據
本文來自投稿,不代表Linux運維部落立場,如若轉載,請注明出處:http://www.www58058.com/88218