一、日志分析項目
1、概述
生產中會生成大量的系統日志、應用程序日志、安全日志等等日志,通過對日志的分析可以了解服務器的負載,健康情況,可以分析客服的分部情況、客戶行為,還可以做出預測等。
一般采集流程
日志產出——》采集(logstash、flume、scribe)——》存儲——》分析——》存儲(數據庫、NoSQL)——》可視化
開源實時日志分析ELK平臺
Logstash收集日志,并存放到ElasticSearch集群中,Kibana則是從ES集群中查詢數據生成圖表,返回到瀏覽器的端。
2、分析的前提
1)半結構化數據
日志是半結構化數據,是有組織的,有格式的數據,可以分割成行和列,就可以當做表理解和處理,當然也可以分析里面的數據。
2)結構化數據
結構化數據就是數據庫里面的數據,定義行列還可以定義數據的類型。Html。
3)結構化數據
非結構化數據,音頻和視頻等。
3、文本分析
日志是文本文件,需要依賴文件IO、字符串操作、正則表達式技術。
通過這些就可以將日志中的數據提取出來。
183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)”
里面的數據對后期的分析都是必須的。
4、提取數據
- 按照空格分隔
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
for word in line.split():
print(word)
切割情況:
183.60.212.153
–
–
[19/Feb/2013:10:23:29
+0800]
“GET
/o2o/media.html?menu=3
HTTP/1.1”
200
16691
“-”
“Mozilla/5.0
(compatible;
EasouSpider;
+http://www.easou.com/search/spider.html)”
缺點:沒有按照要求的格式分隔好,所需要的數據多都是按照空格分隔開了。所以,定義的時候不選用在文件中出現的字符就可以省下好多事。
改進:依舊按照空格分隔,但是遇到雙引號、中括號特殊處理一下。
先按照空格切分,然后迭代一個個字符,如果發現是[ 或者”?,則就不判斷是是否是空格,直到發現] 或者”結尾等,這個區間獲取的就是時間等數據。
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
chars = set(” ?\t”)
def makekey(line:str):
start = 0
skip =False
for i,c in enumerate(line):
if not skip and c in ‘”[‘ : ??#遇到” 或者[
start = i + 1
skip = True
elif skip and c in ‘”]’: ???#遇到 ” 或者]
skip = False
yield line[start:i]
start = i + 1
continue
if skip:
continue
if c in chars:
if start == i:
start = i + 1
continue
yield line[start:i]
start = i + 1
else:
if start < len(line):
yield line[start:]
print(list(makekey(line)))
[‘183.60.212.153’, ‘-‘, ‘-‘, ’19/Feb/2013:10:23:29 +0800’, ‘GET /o2o/media.html?menu=3 HTTP/1.1’, ‘200’, ‘16691’, ‘-‘, ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’]
5、類型轉換
文件中的數據是有類型的,例如時間、狀態嗎,對不同的文件進行不同的類型轉換.自定義轉換等。
1)時間轉換
19/Feb/2013:10:23:29 +0800 ?對應的格式是
%d/%b/%Y:%H:%M:%S ?%z
使用的函數應該是datetime類中的strptime方法。
import datetime
timestr = ’19/Feb/2013:10:23:29 +0800′
def conver_time(timestr):
return datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’)
print(conver_time(timestr))
轉換結果:
2013-02-19 10:23:29+08:00
利用lanbda可以轉換為一行的函數。
lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’)
2)狀態碼和字節數
都是int整型,使用int函數進行轉換。
3)請求信息的解析
‘GET /o2o/media.html?menu=3 HTTP/1.1′
request = ‘GET /o2o/media.html?menu=3 HTTP/1.1’
def get_request(request:str):
return dict(zip([‘method’,’url’,’protocol’],request.split()))
lambda request:dict(zip([‘method’,’url’,’protocol’],request.split()))
利用zip函數組建字典,三項利用split()空格進行分割。
輸出的結果:
{‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘protocol’: ‘HTTP/1.1′}
4)映射
對每一個字段進行命名,然后與值和類型轉換的方法對應,解析每一行是必須要有順序的。
import datetime
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
chars = set(” ?\t”)
def makekey(line:str):
start = 0
skip =False
for i,c in enumerate(line):
if not skip and c in ‘”[‘ : ??#遇到” 或者[
start = i + 1
skip = True
elif skip and c in ‘”]’: ???#遇到 ” 或者]
skip = False
yield line[start:i]
start = i + 1
continue
if skip:
continue
if c in chars:
if start == i:
start = i + 1
continue
yield line[start:i]
start = i + 1
else:
if start < len(line):
yield line[start:]
# print(list(makekey(line)))
names = (‘remote’,”,”,’dateime’,’request’,’status’,’size’,”,’useragent’)
ops = (None,None,None,
lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
int,int,None,None
)
def extract(line:str):
return dict(map(lambda item:(item[0],item[2](item[1])if item[2] is not None else item[1]),zip(names,makekey(line),ops)))
print(extract(line))
{‘remote’: ‘183.60.212.153’, ”: ‘-‘, ‘dateime’: datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), ‘request’: {‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘protocol’: ‘HTTP/1.1’}, ‘status’: 200, ‘size’: 16691, ‘useragent’: ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’}
names = (‘remote’,”,”,’dateime’,’request’,’status’,’size’,”,’useragent’)
ops = (None,None,None,
lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
int,int,None,None
)
def extract(line:str):
return dict(map(lambda item:(item[0],item[2](item[1])if item[2] is not None else item[1]),zip(names,makekey(line),ops)))
print(extract(line))
6、正則表達式提取:
1)構造一個正則表達式提取需要的字段,
pattern = ”'([\d.]{7,}) – ?– \[([/\w +:]+)\] “(\w+) (\S+) ([\w/\d.]+)” (\d+)(\d+).+”(.+)” ”’
2)進一步改造pattern分組,ops和名詞對象,不需要names了。
pattern = ”'(?P<remote>[\d.]{7,}) – ?– \[(?P<datetime>[/\w +:]+)\] “(?P<method>\w+) (?P<url>\S+) (?P<procotol>[\w/\d.]+)” (?P<status>\d+)(?P<size>\d+).+”(?P<useragent>.+)” ”’
命名分組:
ops = (
‘dateime’:lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
‘request’:lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’:int,
‘size’:int
)
3)完整代碼:
import datetime
import re
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
# mathcer = re.match(pattern,line)
# if mathcer:
# ????print(mathcer.groupdict())
regex = re.compile(pattern)
def extract(line:str):
matcher = regex.match(line)
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
{‘remote’: ‘183.60.212.153’, ‘datetime’: datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), ‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘procotol’: ‘HTTP/1.1’, ‘status’: 200, ‘size’: 16691, ‘useragent’: ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’}
7、異常處理:
import datetime
import re
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
# mathcer = re.match(pattern,line)
# if mathcer:
# ????print(mathcer.groupdict())
regex = re.compile(pattern)
def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
日志中出現一些不匹配的行,需要處理。
regex.match()可能匹配不上,所以增加一個判斷,采用拋出異常等形式?;蛘叻祷匾粋€特殊值得方式,告知調用者沒有匹配。
8、異常處理:
1)?數據載入
對于本項目,數據就是日志的一行行記錄,載入數據就是文件IO的讀取,將或者數據的方法封裝成函數。
import datetime
import re
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
regex = re.compile(pattern)
def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
def load(path):
“””裝載日志文件”””
????with open(path)as f:
for line in f:
filds = extract(line)
if fields:
yield fields
else:
continue
9、時間窗口分析:
1)?概念
許多數據,例如日志,都是和時間相關的,都是按照時間順序產生的。
產生的數據分析的時候,要按照時間求值。
Interval表示每一次求值的時間間隔。
Width時間窗口的寬度,指的是一次求值的時間窗口寬度。
2)?當width > interval
數據求值的時候會有重疊
3)?當width = ?interval
4)?當width < interval
一般不采納這種方案,會有數據缺失。
5)?時序數據
運維環境中,日志、監控等產生的數據都是與時間相關的數據,按照時間的先后產生并記錄下來數據,所以一般按照時間對數據進行分析。
6)?數據分析基本程序結構
無限的生成隨機函數,產生時間相關的數據,返回時間和隨機數的字典。
每次取3個數據,求平均值。
import random
import datetime
import time
def source():
while True:
yield {‘value’:random.randint(1,100),’datetime’:datetime.datetime.now()}
time.sleep(1)
#獲取數據
s = source()
items = [next(s)for _ in range(3)]
#處理函數
def handler(iterable):
return sum(map(lambda item:item[‘value’],iterable)) / len(iterable)
print(items)
print(“{:.2f}”.format(handler(items)))
[{‘value’: 87, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 29, 556430)}, {‘value’: 32, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 30, 557127)}, {‘value’: 2, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 31, 557792)}]
40.33
上面代碼模擬一段時間內產生了數據,等一段固定時間取數據來計算平均值。
7)?窗口函數實現
將上面的獲取數據的程序拓展為window函數,使用重疊的方案。
import random
import datetime
import time
def source(second=1):
while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
# #獲取數據
# s = source()
# items = [next(s)for _ in range(3)]
# print(items)
# print(“{:.2f}”.format(handler(items)))
def window(iterator,handler,width:int,interval:int):
“””
?????窗口函數
????:param?iterator: 數據源,生成器,用來拿數據
????:param?handler: 數據處理函數
????:param?width: 時間窗口寬度,秒。
????:param?interval: 處理時間間隔,秒
????:return:
????“””
????start = datetime.datetime.strptime(‘20170101 000000 +0800′,’%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800′,’%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待計算數據
delta = datetime.timedelta(seconds=width-interval)
while True:
#從數據源獲取數據
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]
#每隔interval計算buffer中的數據一次。
if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width數據
buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 處理函數
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)
window(source(),handler,10,5)
時間計算:
10、分發:
1)生產者消費者模型
對于一個監控系統,需要處理很多數據,包括日志,對其中已有的數據采集、分析。被監控對象就是數據的生產者producer,數據的處理程序就是數據的消費者consumer。
生產者消費者傳統模型。
傳統的生產者消費者模式,生產者生產,消費者消費,這種模型存在問題,開發代碼的耦合性太高,如果生成規模擴大,不易擴展,生產和消費的速度很難匹配等。
解決的辦法就是——隊列queue
作用——解耦、緩沖。
生產者往往會部署好幾個程序,日志也會產生好多,而消費者也會有多個程序,去提取日志分析處理。
數據的生產是不穩定的,會造成短時間數據的”潮涌”,需要緩沖。
消費者的消費能力不一樣,有快有慢,消費者可以自己決定消費緩沖區中的數據。
單機可以使用queue內建的模塊構件進程內的隊列,滿足多個線程之間的消費需要。
大型系統可以使用第三方消息中間件:RabbitMQ ?RocketMQ ??Kafka.
2)queue模塊–隊列
queue模塊提供了一個先進先出的隊列Queue。
Queue.Queue(maxsize=0)
創建FIFO隊列,返回Queue對象。
maxsixe小于等于0,隊列長度沒有限制。
Queue.get(block=True,timeout=None)
從隊列中移除元素并返回這個元素。
Block為阻塞。Timeout為超時。
如果block為True,是阻塞,timeout為None就是一直阻塞。
如果block為True,是阻塞,timeout有值的話就會阻塞到一定秒數拋出異常。
Block為False,是非阻塞,timeout就被忽略,要么成功返回一個元素,嚴么拋出empty異常。
Queue.get_nowait()
等價于get(False),也就是說要么成功返回一個元素,要么拋出異常。
但是queue的這種阻塞效果,需要多線程的時候演示。
Queue.put(item,block=True,timeout=None)
把一個元素加入到隊列中去。
Block=True,timeout=None,一直阻塞至有空位置防元素。
Block=True,timeout=5,阻塞5秒就拋出full異常。
Block=True,timeout實效,立即返回,能塞進去就塞,不能塞就返回拋出異常。
Queue.put_nowait(item)
等價于put(item,False),也就是能塞進去就塞,不能就拋出full異常。
#Queue測試。
from queue import Queue
import random
q = Queue()
q.put(random.randint(1,100))
q.put(random.randint(1,100))
print(q.get())
print(q.get())
#print(q.get())
print(q.get(timeout=3))
第一個print ?: 68
第二個print:15
第三個print :阻塞
第四個print:超過timeout報錯,empty。
11、分發器實現:
生產者(數據源)生產數據,緩沖到消息隊列中。
數據處理流程:
數據加載-》 提取-》 分析(滑動窗口函數)
處理大量數據的時候,對于一個數據源來說,需要多個消費者處理,但是如果分配就是個問題了。
需要一個分發器(調度器),把數據分發給不同的消費者處理。
每一個消費者拿到數據后,有自己的處理函數,要有注冊機制。
數據加載——》提取——》分發——》 ?分析函數1
| —–》 分析函數2
分析1 和分析2是不同的handler,不同的窗口寬度,間隔時間。
如何分發?
輪詢策略。
一對多的副本發送,一個數據通過分發器,發送到n個消費者。
消息隊列
在生產者和消費者之間使用消費隊列,那么所有消費者公用一個消息隊列,還是各自擁有一個隊列呢?
共用一個隊列也是可以的,但是需要解決爭搶的問題,相對來說每個消費者自己擁有一個隊列,何為容易。
如何注冊;
在調度器內部記錄有哪些消費者,每一個消費者擁有自己的隊列。
線程。
由于一個數據會被多個不同的注冊過的handler處理,最好的方式就是線程。
線程使用舉例。
線程使用舉例:
import threading
#定義線程
# target線程中運行的函數,args這個啊哈雙女戶運行時候需要的實參元組。
t = threading.Thread(target=window,args=(src,handler,width,interval))
#啟動線程
t.start()
12、分發器實現代碼:
import random
import datetime
import time
import threading
from queue import Queue
def source(second=1):
“””
????生成數據
????:param?second:
????:return:
????“””
????while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
# #獲取數據
# s = source()
# items = [next(s)for _ in range(3)]
# print(items)
# print(“{:.2f}”.format(handler(items)))
def window(iterator,handler,width:int,interval:int):
“””
?????窗口函數
????:param?iterator: 數據源,生成器,用來拿數據
????:param?handler: 數據處理函數
????:param?width: 時間窗口寬度,秒。
????:param?interval: 處理時間間隔,秒
????:return:
????“””
????start = datetime.datetime.strptime(‘20170101 000000 +0800′,’%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800′,’%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待計算數據
delta = datetime.timedelta(seconds=width-interval)
while True:
#從數據源獲取數據
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]
#每隔interval計算buffer中的數據一次。
if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width數據
buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 處理函數
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)
window(source(),handler,10,5)
def dispatcher(src):
#分發器中記錄handler,同時保存各自的隊列
handlers = []
queues = []
def reg(handler,width:int,interval:int):
“””
????????注冊窗口處理函數
????????:param?handler:注冊的數據處理函數
????????:param?width: 時間窗口寬度
????????:param?interval: 時間間隔
????????:return:
????????“””
????q = Queue()
queues.append(q)
h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)
def run():
for t in handlers:
t.start() ?#啟動線程處理數據
for item in src: ??#將數據源提取到的數據分發到所有隊列中。
for q in queues:
q.put(item)
return reg,run
reg,run = dispatcher(source())
reg(handler,10,5) ?#注冊
run() ??#運行
13、整合代碼
import random
import datetime
import time
import threading
from queue import Queue
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
regex = re.compile(pattern) ?#編譯
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
?????????‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
def load(path):
“””裝載日志文件”””
????with open(path)as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
#數據處理
def source(second=1):
“””
????生成數據
????:param?second:
????:return:
????“””
????while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
#滑動窗口函數
def window(iterator,handler,width:int,interval:int):
“””
?????窗口函數
????:param?iterator: 數據源,生成器,用來拿數據
????:param?handler: 數據處理函數
????:param?width: 時間窗口寬度,秒。
????:param?interval: 處理時間間隔,秒
????:return:
????“””
????start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待計算數據
????delta = datetime.timedelta(seconds=width-interval)
while True:
#從數據源獲取數據
????????data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]
#每隔interval計算buffer中的數據一次。
????????if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width數據
????????????buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 處理函數
#隨機數平均數測試函數
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)
def donothing_handler(iterable):
return iterable
def dispatcher(src):
#分發器中記錄handler,同時保存各自的隊列
????handlers = []
queues = []
def reg(handler,width:int,interval:int):
“””
????????注冊窗口處理函數
????????:param?handler:注冊的數據處理函數
????????:param?width: 時間窗口寬度
????????:param?interval: 時間間隔
????????:return:
????????“””
????q = Queue()
queues.append(q)
h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)
def run():
for t in handlers:
t.start() ?#啟動線程處理數據
????????for item in src: ??#將數據源提取到的數據分發到所有隊列中。
????????????for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
????import sys
path = ‘test.log’
????reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #運行
reg(handler,10,5) ?#注冊
run() ??#運行
?14、完成分析功能
分析日志很重要,通過海量數據分析就能夠知道是否遭受了攻擊,是否被爬取及爬取高峰期,是否有盜鏈等。
百度(baidu)爬蟲名稱(baiduspider)
谷歌(goole)爬蟲名稱(Googlebot)
15、狀態碼分析
狀態碼分析:
304 ?服務器收到客戶端提交的請求參數,發現資源未變化,要求瀏覽器使用靜態資源的緩存。
404 服務器找不到請求的資源。
304占比大,說明靜態緩存效果明顯,404占比大,說明網站出現了問題?;蛘邍L試嗅探資源。
如果400、500占比突然開始增大,網站一定出問題了。
def status_hanler(iterable):
#時間窗口內的一批函數
????status = {}
for item in iterable:
key = item[‘status’]
status[key] = status.get(key,0)+1
total = len(iterable)
return {k:status[k]/total for k,v in status.items()}
16、日志文件的加載
目前實現的代碼中,只能接受一個路徑,修改為一批路徑。
可以約定一下路徑下文件的存放方式:
如果送來的是一批路徑,就迭代其中的路徑。
如果路徑是一個普通文件,就按照行讀取文件。
如果路徑是一個目錄,就遍歷路徑下所有普通文件,每一個文件按照行處理,不遞歸處理子目錄:
from pathlib import Path
def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
????????????????
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
????????if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
????????elif p.is_file():
yield from openfile(str(p))
17、完整代碼
import random
import datetime
import time
import threading
from queue import Queue
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
regex = re.compile(pattern) ?#編譯
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
?????????‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
????????if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
????????elif p.is_file():
yield from openfile(str(p))
#數據處理
def source(second=1):
“””
????生成數據
????:param?second:
????:return:
????“””
????while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
#滑動窗口函數
def window(iterator,handler,width:int,interval:int):
“””
?????窗口函數
????:param?iterator: 數據源,生成器,用來拿數據
????:param?handler: 數據處理函數
????:param?width: 時間窗口寬度,秒。
????:param?interval: 處理時間間隔,秒
????:return:
????“””
????start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待計算數據
????delta = datetime.timedelta(seconds=width-interval)
while True:
#從數據源獲取數據
????????data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]
#每隔interval計算buffer中的數據一次。
????????if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width數據
????????????buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 處理函數
#隨機數平均數測試函數
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)
def donothing_handler(iterable):
return iterable
def dispatcher(src):
#分發器中記錄handler,同時保存各自的隊列
????handlers = []
queues = []
def reg(handler,width:int,interval:int):
“””
????????注冊窗口處理函數
????????:param?handler:注冊的數據處理函數
????????:param?width: 時間窗口寬度
????????:param?interval: 時間間隔
????????:return:
????????“””
????q = Queue()
queues.append(q)
h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)
def run():
for t in handlers:
t.start() ?#啟動線程處理數據
????????for item in src: ??#將數據源提取到的數據分發到所有隊列中。
????????????for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
????import sys
path = ‘test.log’
????reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #運行
reg(handler,10,5) ?#注冊
run() ??#運行
可以指定文件或目錄,對日志進行數據分析。
分析函數可以動態注冊
數據可以分發給不同的分析處理程序處理。
18、瀏覽器分析
1)Useragent
這里指的是,軟件按照一定的格式想遠端的服務器提供一個標識自己的字符串。
在HTTP協議中,使用user-agent字段傳送這個字符串。
2)信息提取
Pyyaml uaparser user-agent模塊
安裝 pip install Pyyaml uaparser user-agent
使用
from user_agents import parse
useragents = [
“Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)”\
“Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)”\
“Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0”
]
for uastring in useragents:
ua = parse(uastring)
print(ua.brower,ua.brower.family,ua.brower.version,ua.brower.version_string)
#運行結構
數據分析代碼:
from user_agents import parse
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda useragent: parse(useragent)
}
增加瀏覽器分析函數:
from user_agents import parse
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda useragent: parse(useragent)
}
from user_agents import parse
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda ua: parse(ua)
}
#瀏覽器分析
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]
key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
return browers
統計所有瀏覽器:
allbrowers = {}
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]
key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
allbrowers[key] = allbrowers.get(key,0)+1
print(sorted(allbrowers.items(),key =lambda x:x[1],reversed=True))[:10]
return browers
19、完整版代碼(最終版)
import random
import datetime
import time
import threading
from queue import Queue
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
regex = re.compile(pattern) ?#編譯
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
?????????‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
????????if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
????????elif p.is_file():
yield from openfile(str(p))
#數據處理
def source(second=1):
“””
????生成數據
????:param?second:
????:return:
????“””
????while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
#滑動窗口函數
def window(iterator,handler,width:int,interval:int):
“””
?????窗口函數
????:param?iterator: 數據源,生成器,用來拿數據
????:param?handler: 數據處理函數
????:param?width: 時間窗口寬度,秒。
????:param?interval: 處理時間間隔,秒
????:return:
????“””
????start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待計算數據
????delta = datetime.timedelta(seconds=width-interval)
while True:
#從數據源獲取數據
????????data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]
#每隔interval計算buffer中的數據一次。
????????if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width數據
????????????buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 處理函數
#隨機數平均數測試函數
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)
def donothing_handler(iterable):
return iterable
def status_hanler(iterable):
#時間窗口內的一批函數
????status = {}
for item in iterable:
key = item[‘status’]
status[key] = status.get(key,0)+1
total = len(iterable)
return {k:status[k]/total for k,v in status.items()}
#瀏覽器分析
allbrowers = {}
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]
key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
allbrowers[key] = allbrowers.get(key,0)+1
print(sorted(allbrowers.items(),key =lambda x:x[1],reversed=True))[:10]
return browers
def dispatcher(src):
#分發器中記錄handler,同時保存各自的隊列
????handlers = []
queues = []
def reg(handler,width:int,interval:int):
“””
????????注冊窗口處理函數
????????:param?handler:注冊的數據處理函數
????????:param?width: 時間窗口寬度
????????:param?interval: 時間間隔
????????:return:
????????“””
????q = Queue()
queues.append(q)
h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)
def run():
for t in handlers:
t.start() ?#啟動線程處理數據
????????for item in src: ??#將數據源提取到的數據分發到所有隊列中。
????????????for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
????import sys
path = ‘test.log’
????reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #運行
reg(handler,10,5) ?#注冊
run() ??#運行
本文來自投稿,不代表Linux運維部落立場,如若轉載,請注明出處:http://www.www58058.com/97632