python學習總結

第一個項目日志分析。(存在不足)

目錄

一、日志分析項目 1

1、概述 1

2、 分析的前提 1

1) 半結構化數據 1

2)結構化數據 1

3)結構化數據 1

3、 文本分析 1

4、 提取數據 2

1) 按照空格分隔 2

5、 類型轉換 3

1)時間轉換 3

2) 狀態碼和字節數 4

3) 請求信息的解析 4

4) 映射 4

6、 正則表達式提?。?6

7、 異常處理: 7

8、 異常處理: 8

1) 數據載入 8

9、 時間窗口分析: 9

1) 概念 9

2) 當width > interval 10

3) 當width = ?interval 10

4) 當width < interval 11

5) 時序數據 11

6) 數據分析基本程序結構 11

7) 窗口函數實現 12

10、 分發: 14

1) 生產者消費者模型 14

2) queue模塊–隊列 14

11、 分發器實現: 16

12、 分發器實現代碼: 17

13、 整合代碼 19

14、 完成分析功能 22

15、 狀態碼分析 22

16、 日志文件的加載 23

17、 完整代碼 23

18、 瀏覽器分析 27

19、 完整版代碼(最終版) 30

一、日志分析項目

1、概述

生產中會生成大量的系統日志、應用程序日志、安全日志等等日志,通過對日志的分析可以了解服務器的負載,健康情況,可以分析客服的分部情況、客戶行為,還可以做出預測等。

 

一般采集流程

日志產出——》采集(logstash、flume、scribe)——》存儲——》分析——》存儲(數據庫、NoSQL)——》可視化

 

開源實時日志分析ELK平臺

Logstash收集日志,并存放到ElasticSearch集群中,Kibana則是從ES集群中查詢數據生成圖表,返回到瀏覽器的端。

2、分析的前提

1)半結構化數據

日志是半結構化數據,是有組織的,有格式的數據,可以分割成行和列,就可以當做表理解和處理,當然也可以分析里面的數據。

2)結構化數據

結構化數據就是數據庫里面的數據,定義行列還可以定義數據的類型。Html。

3)結構化數據

非結構化數據,音頻和視頻等。

3、文本分析

日志是文本文件,需要依賴文件IO、字符串操作、正則表達式技術。

通過這些就可以將日志中的數據提取出來。

183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)”

 

里面的數據對后期的分析都是必須的。

4、提取數據

  • 按照空格分隔

 

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

for word in line.split():
print(word)

 

 

切割情況:

183.60.212.153

[19/Feb/2013:10:23:29

+0800]

“GET

/o2o/media.html?menu=3

HTTP/1.1”

200

16691

“-”

“Mozilla/5.0

(compatible;

EasouSpider;

+http://www.easou.com/search/spider.html)”

 

缺點:沒有按照要求的格式分隔好,所需要的數據多都是按照空格分隔開了。所以,定義的時候不選用在文件中出現的字符就可以省下好多事。

改進:依舊按照空格分隔,但是遇到雙引號、中括號特殊處理一下。

先按照空格切分,然后迭代一個個字符,如果發現是[ 或者”?,則就不判斷是是否是空格,直到發現] 或者”結尾等,這個區間獲取的就是時間等數據。

 

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
chars = set(” ?\t”)

def makekey(line:str):
start = 0
skip =False
for i,c in enumerate(line):
if not skip and c in ‘”[‘ : ??#遇到” 或者[
start = i + 1
skip = True
elif skip and c in ‘”]’: ???#遇到 ” 或者]
skip = False
yield line[start:i]
start = i + 1
continue

if skip:
continue

if c in chars:
if start == i:
start = i + 1
continue
yield line[start:i]
start = i + 1

else:
if start < len(line):
yield line[start:]

print(list(makekey(line)))

 

 

[‘183.60.212.153’, ‘-‘, ‘-‘, ’19/Feb/2013:10:23:29 +0800’, ‘GET /o2o/media.html?menu=3 HTTP/1.1’, ‘200’, ‘16691’, ‘-‘, ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’]

 

 

5、類型轉換

文件中的數據是有類型的,例如時間、狀態嗎,對不同的文件進行不同的類型轉換.自定義轉換等。

1)時間轉換

19/Feb/2013:10:23:29 +0800 ?對應的格式是

%d/%b/%Y:%H:%M:%S ?%z

使用的函數應該是datetime類中的strptime方法。

import datetime
timestr = ’19/Feb/2013:10:23:29 +0800′
def conver_time(timestr):
return datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’)
print(conver_time(timestr))

 

轉換結果:

2013-02-19 10:23:29+08:00

利用lanbda可以轉換為一行的函數。

lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’)

 

2)狀態碼和字節數

都是int整型,使用int函數進行轉換。

3)請求信息的解析

‘GET /o2o/media.html?menu=3 HTTP/1.1′

request = ‘GET /o2o/media.html?menu=3 HTTP/1.1’
def get_request(request:str):
return dict(zip([‘method’,’url’,’protocol’],request.split()))

 

lambda request:dict(zip([‘method’,’url’,’protocol’],request.split()))

 

利用zip函數組建字典,三項利用split()空格進行分割。

輸出的結果:

{‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘protocol’: ‘HTTP/1.1′}

4)映射

對每一個字段進行命名,然后與值和類型轉換的方法對應,解析每一行是必須要有順序的。

import datetime
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
chars = set(” ?\t”)

def makekey(line:str):
start = 0
skip =False
for i,c in enumerate(line):
if not skip and c in ‘”[‘ : ??#遇到” 或者[
start = i + 1
skip = True
elif skip and c in ‘”]’: ???#遇到 ” 或者]
skip = False
yield line[start:i]
start = i + 1
continue

if skip:
continue

if c in chars:
if start == i:
start = i + 1
continue
yield line[start:i]
start = i + 1

else:
if start < len(line):
yield line[start:]

# print(list(makekey(line)))
names = (‘remote’,”,”,’dateime’,’request’,’status’,’size’,”,’useragent’)
ops = (None,None,None,
lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
int,int,None,None
)
def extract(line:str):
return dict(map(lambda item:(item[0],item[2](item[1])if item[2] is not None else item[1]),zip(names,makekey(line),ops)))
print(extract(line))

 

{‘remote’: ‘183.60.212.153’, ”: ‘-‘, ‘dateime’: datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), ‘request’: {‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘protocol’: ‘HTTP/1.1’}, ‘status’: 200, ‘size’: 16691, ‘useragent’: ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’}

names = (‘remote’,”,”,’dateime’,’request’,’status’,’size’,”,’useragent’)
ops = (None,None,None,
lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
int,int,None,None
)
def extract(line:str):
return dict(map(lambda item:(item[0],item[2](item[1])if item[2] is not None else item[1]),zip(names,makekey(line),ops)))
print(extract(line))

 

6、正則表達式提取:

1)構造一個正則表達式提取需要的字段,

pattern = ”'([\d.]{7,}) – ?– \[([/\w +:]+)\] “(\w+) (\S+) ([\w/\d.]+)” (\d+)(\d+).+”(.+)” ”’

2)進一步改造pattern分組,ops和名詞對象,不需要names了。

pattern = ”'(?P<remote>[\d.]{7,}) – ?– \[(?P<datetime>[/\w +:]+)\] “(?P<method>\w+) (?P<url>\S+) (?P<procotol>[\w/\d.]+)” (?P<status>\d+)(?P<size>\d+).+”(?P<useragent>.+)” ”’

命名分組:

ops = (
‘dateime’:lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
‘request’:lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’:int,
‘size’:int
)

3)完整代碼:

import datetime
import re

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

# mathcer = re.match(pattern,line)
# if mathcer:
# ????print(mathcer.groupdict())
regex = re.compile(pattern)

def extract(line:str):
matcher = regex.match(line)
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

 

{‘remote’: ‘183.60.212.153’, ‘datetime’: datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), ‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘procotol’: ‘HTTP/1.1’, ‘status’: 200, ‘size’: 16691, ‘useragent’: ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’}

7、異常處理:

 

import datetime
import re

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

# mathcer = re.match(pattern,line)
# if mathcer:
# ????print(mathcer.groupdict())
regex = re.compile(pattern)

def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

 

日志中出現一些不匹配的行,需要處理。

regex.match()可能匹配不上,所以增加一個判斷,采用拋出異常等形式?;蛘叻祷匾粋€特殊值得方式,告知調用者沒有匹配。

8、異常處理:

1)?數據載入

對于本項目,數據就是日志的一行行記錄,載入數據就是文件IO的讀取,將或者數據的方法封裝成函數。

import datetime
import re

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

regex = re.compile(pattern)

def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

def load(path):
“””裝載日志文件”””
????with open(path)as f:
for line in f:
filds = extract(line)
if fields:
yield fields
else:
continue
 

9、時間窗口分析:

1)?概念

許多數據,例如日志,都是和時間相關的,都是按照時間順序產生的。

產生的數據分析的時候,要按照時間求值。

 

Interval表示每一次求值的時間間隔。

Width時間窗口的寬度,指的是一次求值的時間窗口寬度。

2)?當width > interval

 

 

數據求值的時候會有重疊

 

3)?當width = ?interval

 

4)?當width < interval

一般不采納這種方案,會有數據缺失。

 

5)?時序數據

運維環境中,日志、監控等產生的數據都是與時間相關的數據,按照時間的先后產生并記錄下來數據,所以一般按照時間對數據進行分析。

 

6)?數據分析基本程序結構

無限的生成隨機函數,產生時間相關的數據,返回時間和隨機數的字典。

每次取3個數據,求平均值。

import random
import datetime
import time

def source():
while True:
yield {‘value’:random.randint(1,100),’datetime’:datetime.datetime.now()}
time.sleep(1)
#獲取數據
s = source()
items = [next(s)for _ in range(3)]

#處理函數
def handler(iterable):
return sum(map(lambda item:item[‘value’],iterable)) / len(iterable)

print(items)
print(“{:.2f}”.format(handler(items)))

 

[{‘value’: 87, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 29, 556430)}, {‘value’: 32, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 30, 557127)}, {‘value’: 2, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 31, 557792)}]

40.33

上面代碼模擬一段時間內產生了數據,等一段固定時間取數據來計算平均值。

7)?窗口函數實現

將上面的獲取數據的程序拓展為window函數,使用重疊的方案。

 

import random
import datetime
import time

def source(second=1):
while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
# #獲取數據
# s = source()
# items = [next(s)for _ in range(3)]
# print(items)
# print(“{:.2f}”.format(handler(items)))

def window(iterator,handler,width:int,interval:int):
“””
?????窗口函數
????:param?iterator: 數據源,生成器,用來拿數據
????:param?handler: 數據處理函數
????:param?width: 時間窗口寬度,秒。
????:param?interval: 處理時間間隔,秒
????:return:
????“””
????start = datetime.datetime.strptime(‘20170101 000000 +0800′,’%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800′,’%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待計算數據
delta = datetime.timedelta(seconds=width-interval)
while True:
#從數據源獲取數據
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]

#每隔interval計算buffer中的數據一次。
if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width數據
buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 處理函數

def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)

window(source(),handler,10,5)

 

 

時間計算:

 

 

10、分發:

1)生產者消費者模型

對于一個監控系統,需要處理很多數據,包括日志,對其中已有的數據采集、分析。被監控對象就是數據的生產者producer,數據的處理程序就是數據的消費者consumer。

生產者消費者傳統模型。

 

 

 

 

 

 

 

 

傳統的生產者消費者模式,生產者生產,消費者消費,這種模型存在問題,開發代碼的耦合性太高,如果生成規模擴大,不易擴展,生產和消費的速度很難匹配等。

 

解決的辦法就是——隊列queue

作用——解耦、緩沖。

 

 

 

 

 

 

 

 

 

 

 

 

生產者往往會部署好幾個程序,日志也會產生好多,而消費者也會有多個程序,去提取日志分析處理。

 

數據的生產是不穩定的,會造成短時間數據的”潮涌”,需要緩沖。

消費者的消費能力不一樣,有快有慢,消費者可以自己決定消費緩沖區中的數據。

 

單機可以使用queue內建的模塊構件進程內的隊列,滿足多個線程之間的消費需要。

大型系統可以使用第三方消息中間件:RabbitMQ ?RocketMQ ??Kafka.

2)queue模塊–隊列

queue模塊提供了一個先進先出的隊列Queue。

 

Queue.Queue(maxsize=0)

創建FIFO隊列,返回Queue對象。

maxsixe小于等于0,隊列長度沒有限制。

 

Queue.get(block=True,timeout=None)

從隊列中移除元素并返回這個元素。

Block為阻塞。Timeout為超時。

如果block為True,是阻塞,timeout為None就是一直阻塞。

如果block為True,是阻塞,timeout有值的話就會阻塞到一定秒數拋出異常。

Block為False,是非阻塞,timeout就被忽略,要么成功返回一個元素,嚴么拋出empty異常。

 

Queue.get_nowait()

等價于get(False),也就是說要么成功返回一個元素,要么拋出異常。

但是queue的這種阻塞效果,需要多線程的時候演示。

 

Queue.put(item,block=True,timeout=None)

把一個元素加入到隊列中去。

Block=True,timeout=None,一直阻塞至有空位置防元素。

Block=True,timeout=5,阻塞5秒就拋出full異常。

Block=True,timeout實效,立即返回,能塞進去就塞,不能塞就返回拋出異常。

 

Queue.put_nowait(item)

等價于put(item,False),也就是能塞進去就塞,不能就拋出full異常。

 

#Queue測試。
from queue import Queue
import random

q = Queue()
q.put(random.randint(1,100))
q.put(random.randint(1,100))
print(q.get())
print(q.get())
#print(q.get())
print(q.get(timeout=3))

 

第一個print ?: 68

第二個print:15

第三個print :阻塞

第四個print:超過timeout報錯,empty。

 

 

 

 

11、分發器實現:

生產者(數據源)生產數據,緩沖到消息隊列中。

 

數據處理流程:

數據加載-》 提取-》 分析(滑動窗口函數)

 

處理大量數據的時候,對于一個數據源來說,需要多個消費者處理,但是如果分配就是個問題了。

需要一個分發器(調度器),把數據分發給不同的消費者處理。

每一個消費者拿到數據后,有自己的處理函數,要有注冊機制。

 

數據加載——》提取——》分發——》 ?分析函數1

| —–》 分析函數2

分析1 和分析2是不同的handler,不同的窗口寬度,間隔時間。

 

如何分發?

輪詢策略。

一對多的副本發送,一個數據通過分發器,發送到n個消費者。

 

消息隊列

在生產者和消費者之間使用消費隊列,那么所有消費者公用一個消息隊列,還是各自擁有一個隊列呢?

共用一個隊列也是可以的,但是需要解決爭搶的問題,相對來說每個消費者自己擁有一個隊列,何為容易。

 

如何注冊;

在調度器內部記錄有哪些消費者,每一個消費者擁有自己的隊列。

 

線程。

由于一個數據會被多個不同的注冊過的handler處理,最好的方式就是線程。

線程使用舉例。

線程使用舉例:

import threading
#定義線程
# target線程中運行的函數,args這個啊哈雙女戶運行時候需要的實參元組。
t = threading.Thread(target=window,args=(src,handler,width,interval))

#啟動線程
t.start()

 

12、分發器實現代碼:

import random
import datetime
import time
import threading
from queue import Queue

def source(second=1):
“””
????生成數據
????:param?second:
????:return:
????“””
????while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
# #獲取數據
# s = source()
# items = [next(s)for _ in range(3)]
# print(items)
# print(“{:.2f}”.format(handler(items)))

def window(iterator,handler,width:int,interval:int):
“””
?????窗口函數
????:param?iterator: 數據源,生成器,用來拿數據
????:param?handler: 數據處理函數
????:param?width: 時間窗口寬度,秒。
????:param?interval: 處理時間間隔,秒
????:return:
????“””
????start = datetime.datetime.strptime(‘20170101 000000 +0800′,’%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800′,’%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待計算數據
delta = datetime.timedelta(seconds=width-interval)
while True:
#從數據源獲取數據
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]

#每隔interval計算buffer中的數據一次。
if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width數據
buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 處理函數

def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)

window(source(),handler,10,5)

def dispatcher(src):
#分發器中記錄handler,同時保存各自的隊列
handlers = []
queues = []

def reg(handler,width:int,interval:int):
“””
????????注冊窗口處理函數
????????:param?handler:注冊的數據處理函數
????????:param?width: 時間窗口寬度
????????:param?interval: 時間間隔
????????:return:
????????“””
????q = Queue()
queues.append(q)

h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)

def run():
for t in handlers:
t.start() ?#啟動線程處理數據

for item in src: ??#將數據源提取到的數據分發到所有隊列中。
for q in queues:
q.put(item)
return reg,run
reg,run = dispatcher(source())

reg(handler,10,5) ?#注冊
run() ??#運行

13、整合代碼

import random
import datetime
import time
import threading
from queue import Queue

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

regex = re.compile(pattern) ?#編譯

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
?????????‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

def load(path):
“””裝載日志文件”””
????with open(path)as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
#數據處理
def source(second=1):
“””
????生成數據
????:param?second:
????:return:
????“””
????while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)

#滑動窗口函數
def window(iterator,handler,width:int,interval:int):
“””
?????窗口函數
????:param?iterator: 數據源,生成器,用來拿數據
????:param?handler: 數據處理函數
????:param?width: 時間窗口寬度,秒。
????:param?interval: 處理時間間隔,秒
????:return:
????“””

????start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待計算數據
????delta = datetime.timedelta(seconds=width-interval)

while True:
#從數據源獲取數據
????????data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]

#每隔interval計算buffer中的數據一次。
????????if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width數據
????????????buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 處理函數

#隨機數平均數測試函數
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)

def donothing_handler(iterable):
return iterable

def dispatcher(src):
#分發器中記錄handler,同時保存各自的隊列
????handlers = []
queues = []

def reg(handler,width:int,interval:int):
“””
????????注冊窗口處理函數
????????:param?handler:注冊的數據處理函數
????????:param?width: 時間窗口寬度
????????:param?interval: 時間間隔
????????:return:
????????“””
????q = Queue()
queues.append(q)

h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)

def run():
for t in handlers:
t.start() ?#啟動線程處理數據

????????for item in src: ??#將數據源提取到的數據分發到所有隊列中。
????????????for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
????import sys
path = ‘test.log’

????reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #運行

reg(handler,10,5) ?#注冊
run() ??#運行

?14、完成分析功能

分析日志很重要,通過海量數據分析就能夠知道是否遭受了攻擊,是否被爬取及爬取高峰期,是否有盜鏈等。

 

百度(baidu)爬蟲名稱(baiduspider)

谷歌(goole)爬蟲名稱(Googlebot)

15、狀態碼分析

狀態碼分析:

304 ?服務器收到客戶端提交的請求參數,發現資源未變化,要求瀏覽器使用靜態資源的緩存。

404 服務器找不到請求的資源。

304占比大,說明靜態緩存效果明顯,404占比大,說明網站出現了問題?;蛘邍L試嗅探資源。

如果400、500占比突然開始增大,網站一定出問題了。

 

def status_hanler(iterable):
#時間窗口內的一批函數
????status = {}
for item in iterable:
key = item[‘status’]
status[key] = status.get(key,0)+1

total = len(iterable)
return {k:status[k]/total for k,v in status.items()}

16、日志文件的加載

目前實現的代碼中,只能接受一個路徑,修改為一批路徑。

 

可以約定一下路徑下文件的存放方式:

如果送來的是一批路徑,就迭代其中的路徑。

如果路徑是一個普通文件,就按照行讀取文件。

如果路徑是一個目錄,就遍歷路徑下所有普通文件,每一個文件按照行處理,不遞歸處理子目錄:

 

from pathlib import Path

def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
????????????????
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
????????if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
????????elif p.is_file():
yield from openfile(str(p))

 

17、完整代碼

import random
import datetime
import time
import threading
from queue import Queue

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

regex = re.compile(pattern) ?#編譯

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
?????????‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue

def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
????????if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
????????elif p.is_file():
yield from openfile(str(p))

#數據處理
def source(second=1):
“””
????生成數據
????:param?second:
????:return:
????“””
????while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)

#滑動窗口函數
def window(iterator,handler,width:int,interval:int):
“””
?????窗口函數
????:param?iterator: 數據源,生成器,用來拿數據
????:param?handler: 數據處理函數
????:param?width: 時間窗口寬度,秒。
????:param?interval: 處理時間間隔,秒
????:return:
????“””

????start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待計算數據
????delta = datetime.timedelta(seconds=width-interval)

while True:
#從數據源獲取數據
????????data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]

#每隔interval計算buffer中的數據一次。
????????if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width數據
????????????buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 處理函數

#隨機數平均數測試函數
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)

def donothing_handler(iterable):
return iterable

def dispatcher(src):
#分發器中記錄handler,同時保存各自的隊列
????handlers = []
queues = []

def reg(handler,width:int,interval:int):
“””
????????注冊窗口處理函數
????????:param?handler:注冊的數據處理函數
????????:param?width: 時間窗口寬度
????????:param?interval: 時間間隔
????????:return:
????????“””
????q = Queue()
queues.append(q)

h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)

def run():
for t in handlers:
t.start() ?#啟動線程處理數據

????????for item in src: ??#將數據源提取到的數據分發到所有隊列中。
????????????for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
????import sys
path = ‘test.log’

????reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #運行

reg(handler,10,5) ?#注冊
run() ??#運行

 

 

可以指定文件或目錄,對日志進行數據分析。

分析函數可以動態注冊

數據可以分發給不同的分析處理程序處理。

 

 

18、瀏覽器分析

1)Useragent

這里指的是,軟件按照一定的格式想遠端的服務器提供一個標識自己的字符串。

在HTTP協議中,使用user-agent字段傳送這個字符串。

2)信息提取

Pyyaml uaparser user-agent模塊

 

安裝 pip install Pyyaml uaparser user-agent

使用

from user_agents import parse

useragents = [
“Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)”\
“Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)”\
“Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0”
]

for uastring in useragents:
ua = parse(uastring)
print(ua.brower,ua.brower.family,ua.brower.version,ua.brower.version_string)

#運行結構

 

 

數據分析代碼:

from user_agents import parse

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda useragent: parse(useragent)

}

增加瀏覽器分析函數:

 

from user_agents import parse

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda useragent: parse(useragent)

}

from user_agents import parse

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda ua: parse(ua)
}

#瀏覽器分析
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]

key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
return browers

 

統計所有瀏覽器:

 

allbrowers = {}
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]

key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
allbrowers[key] = allbrowers.get(key,0)+1

print(sorted(allbrowers.items(),key =lambda x:x[1],reversed=True))[:10]
return browers

19、完整版代碼(最終版)

import random
import datetime
import time
import threading
from queue import Queue

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

regex = re.compile(pattern) ?#編譯

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
?????????‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue

def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
????????if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
????????elif p.is_file():
yield from openfile(str(p))

#數據處理
def source(second=1):
“””
????生成數據
????:param?second:
????:return:
????“””
????while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)

#滑動窗口函數
def window(iterator,handler,width:int,interval:int):
“””
?????窗口函數
????:param?iterator: 數據源,生成器,用來拿數據
????:param?handler: 數據處理函數
????:param?width: 時間窗口寬度,秒。
????:param?interval: 處理時間間隔,秒
????:return:
????“””

????start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待計算數據
????delta = datetime.timedelta(seconds=width-interval)

while True:
#從數據源獲取數據
????????data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]

#每隔interval計算buffer中的數據一次。
????????if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width數據
????????????buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 處理函數

#隨機數平均數測試函數
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)

def donothing_handler(iterable):
return iterable

def status_hanler(iterable):
#時間窗口內的一批函數
????status = {}
for item in iterable:
key = item[‘status’]
status[key] = status.get(key,0)+1

total = len(iterable)
return {k:status[k]/total for k,v in status.items()}

#瀏覽器分析
allbrowers = {}
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]

key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
allbrowers[key] = allbrowers.get(key,0)+1

print(sorted(allbrowers.items(),key =lambda x:x[1],reversed=True))[:10]
return browers

def dispatcher(src):
#分發器中記錄handler,同時保存各自的隊列
????handlers = []
queues = []

def reg(handler,width:int,interval:int):
“””
????????注冊窗口處理函數
????????:param?handler:注冊的數據處理函數
????????:param?width: 時間窗口寬度
????????:param?interval: 時間間隔
????????:return:
????????“””
????q = Queue()
queues.append(q)

h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)

def run():
for t in handlers:
t.start() ?#啟動線程處理數據

????????for item in src: ??#將數據源提取到的數據分發到所有隊列中。
????????????for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
????import sys
path = ‘test.log’

????reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #運行

reg(handler,10,5) ?#注冊
run() ??#運行

 

 

本文來自投稿,不代表Linux運維部落立場,如若轉載,請注明出處:http://www.www58058.com/97632

(1)
604603701@qq.com604603701@qq.com
上一篇 2018-05-06
下一篇 2018-05-06

相關推薦

  • 裝飾器

    裝飾器 需求 一個加法函數,想增強它的功能,能夠輸出被調用過以及調用的參數信息 def add(x, y): return x + y 增加信息輸出功能 def add(x, y): print(“call add, x + y”) ?# 日志輸出到控制臺 return x + y p 上面的加法函數是完成了需求,但是有以下的缺點 打…

    Python筆記 2018-04-23
  • 遞歸函數

    遞歸函數 def foo(b,b1=3):print(“foo1 called “,b,b1)def foo2(c):foo3(c)print(“foo2 called”,c)def foo3(d):print(“foo3 called”)def mian():print(“…

    2018-04-16
  • 基礎語法

    基礎語法、判斷、循環

    2018-03-26
  • 封裝與解構 集合

    封裝和解構 封裝:將多個值進行分割,結合在一起,本質上返回元組,只是省掉了小括號 ‘==‘意思為內容一致,‘=’意思為內存空間一致 解構:把線性結構的元素解開,并順序的賦值給其他變量,左邊接納的變量數要和左邊解開的元素數量一致 集合不是非線性 解構中使用*變量名接收,但不能單獨使用,被*變量名收集后組成一個列表 第一個下劃線為9,結果被第二個下劃線重新賦值為…

    Python筆記 2018-04-01
  • Python函數

    函數 數學函數 Python函數 若干語句塊、函數名稱、參數列表構成,組織代碼的最小單元 完成一定的功能 作用 結構化編程對代碼的最基本的封裝,一般按照功能組織一段代碼 復用,減少冗余代碼 簡潔美觀,可讀易懂 函數分類 內建函數,max()、reversed() 庫函數,math.ceil() 函數定義、調用 def語句定義函數 def 函數名(參數列表):…

    2018-04-16
  • 面向對象,魔術方法

    面向對象 一面向對象 什么是面向對象: 一種認識世界、分析世界的方法論。將萬事萬物抽象為類。 類class: 類是抽象的概念,是萬事萬物的抽象,是一類事物的共同集合的集合。 用計算機語言來描述類,就是屬性和方法的集合。 對象instance,object: 對象是類的具象,是一個實體。 每個個體都是抽象類的不同實體。 哲學 一切皆對象 對象是數據和操作的封裝…

    Python筆記 2018-05-14
欧美性久久久久