Python第十二周學習總結

并行,并發,多線程

一、并發

 

1、基本概念

并發和并行的區別:

1)并行,parallel

同時做某些事,可以互不干擾的同一時刻做幾件事。(解決并發的一種方法)

高速公路多個車道,車輛都在跑。同一時刻。

2)并發 concurrency

同時做某些事,一個時段內有事情要處理。(遇到的問題)

高并發,同一時刻內,有很多事情要處理。

2、并發的解決

1)隊列、緩沖區

排隊就是把人排成隊列,先進先出,解決了資源使用的問題。

排成的隊列,其實就是一個緩沖地帶,就是緩沖區。

Queue模塊的類queue、lifoqueue、priorityqueue。

2)爭搶的

會有一個人占據窗口,其他人會繼續爭搶,可以鎖定窗口,窗口不在為其他人服務,這就是鎖機制。(鎖的概念,排他性鎖,非排他性鎖)。

 

3)預處理

一種提前加載用戶需要的數據的思路,預處理思想,緩存常用。

 

4)并行

日??梢酝ㄟ^購買更多的服務器,或者開多線程,實現并行處理,來解決并發問題。

水平擴展思想。

如果在但CPU上處理,就不是并行了。

但是多數服務都是多CPU的,服務的部署就是多機、分布式的,都是并行處理。

(串行比并行快)

5)提速

提高單個CPU性能,或單個服務器安裝更多的CPU

這就是一種垂直擴展思想。

 

6)消息中間件

例如地跌站外的九曲回腸的走廊,緩沖人流。

常見的消息中間件有RabbitMQ,ActiveMQ(Apache)、RocketMQ(Apache)。

 

3、進程和線程

在實現了線程的操作系統中,線程是操作系統能夠進行運算調度的最小單位。他包含在進程中,是進程中的實際運作單位。一個程序執行實例就是一個進程。

 

進程(process)是計算機中的程序關于某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。

 

進程和程序的關系

程序是源代碼編譯后的文件,而這些文件存放在磁盤上。當程序被操作系統加載到內存

中,就是進程,進程中存放著指令和數據(資源),也是線程的容器。

 

Linux進程有父進程、子進程,Windows的進程是平等關系。

 

線程,有時被稱為輕量級進程,是程序執行流的最小單元,一個標準的線程由線程ID,當前指令指針(pc),寄存器集合堆棧組成。每個線程有自己獨立的棧。

在許多系統中,創建一個線程比創建一個進程快10-100倍。

 

進程、線程的理解

現代操作系統提出的進程的概念,每一個進程都認為自己是獨占所有的計算機硬件資源。

進程就是獨立的王國,進程間不可以隨便的共享數據。

線程就是省份,同一個進程內的線程可以共享進程的資源,每一個線程擁有自己獨立的堆棧。

 

4、線程狀態

狀態 含義
就緒(ready) 線程能夠運行,但在等待被調度,可能線程剛剛創建啟動,或剛剛從阻塞恢復,或者被其他線程搶占。
運行(running) 線程正在運行
阻塞(Blocked) 線程等待外部事件發生而無法運行,如I/O操作。
終止(Terminated) 線程完成,或退出,或被取消。

圖片19

5、Python中的線程和進程

進程會啟動一個解釋器進程,線程會共享一個解釋器進程。

 

1)Python的線程開發

Python的線程開發使用標準庫threading

 

2)Thread類

簽名:

def __init__(self, group: None = …,
target: Optional[Callable[…, None]] = …,
name: Optional[str] = …,
args: Iterable = …,
kwargs: Mapping[str, Any] = …,
*, daemon: Optional[bool] = …) -> None: …

參數名 含義
target 線程調用對象,就是目標函數
name 為線程起名字
args 為目標函數傳遞實參,元組
Kwargs 為目標函數關鍵詞傳參,字典

?

3)線程啟動

import threading
import time

def worker():
print(‘before’)
time.sleep(3)
print(‘finished’)

t = threading.Thread(target=worker) ?#線程對象
t.start() ??#啟動

 

通過threading.Thread創建一個線程對象,target是目標函數,name可以指定名稱。

需要調用start方法啟動函數。

線程之所以執行函數,是因為線程中就是用來執行代碼的,所以還是函數調用。

 

函數執行完畢后,線程也就退出了。

如果想讓一個線程一直工作,不讓線程退出就要利用到while循環。

import threading
import time

def worker():
count = 0
while True:
count += 1
print(‘before’)
time.sleep(3)
if count >5:
print(‘finished’)
break

t = threading.Thread(target=worker) ?#線程對象
t.start() ??#啟動

 

4)線程退出

Python中沒有提供終止線程的方法。線程在下面情況下退出。

  • 線程函數內語句執行完畢
  • 線程函數中拋出未處理的異常。

import threading
import time

def worker():
count = 0
while True:
if count >5:
break
#return
#raise RuntimeError(count)
time.sleep(3)
print(‘before’)
count += 1
print(‘finished’)

t = threading.Thread(target=worker) ?#線程對象
t.start() ??#啟動
print(‘end’)

 

線程沒有優先級,沒有線程組的概念。也不能被銷毀、停止、掛起,那么就是沒有恢復和中斷了。

5)線程的傳參

import threading
import time

def add(x,y):
print(‘{}+{}={}’.format(x,y,x+y))

t1 = threading.Thread(target=add,name=’1′,args=(4,5))
t1.start()
time.sleep(2)

t2 = threading.Thread(target=add,name = ‘2’,args=(4,),kwargs={‘y’:6})
t2.start()
time.sleep(2)
t3 = threading.Thread(target=add,name=’3′,kwargs={‘x’:4,’y’:7})
t3.start()

 

線程中的傳參,和函數傳參沒有什么區別,本質上就是函數傳承。

 

6)threading的屬性和方法

名稱 含義
current_thread() 返回當前主線程
main_thread() 返回主線程對象
active_count() 當前處于alive狀態的線程個數
enumerate() 返回所有活著的線程的列表,不包括已經終止的線程和未開始的線程
git_ident() 返回當前線程的ID,非0整數。

active_count、enumerate方法返回的值還包括主線程。

import threading
import time

def showinfo():
print(‘currentthread = {}’.format(threading.current_thread()))
print(‘main thread = {}’.format(threading.main_thread()))
print(‘active count = {}’.format(threading.active_count()))

def worker():
count = 0
showinfo()
while True:
if count>5:
break
time.sleep(5)
count += 1
print(‘finsh’)

t = threading.Thread(target=worker,name=’work’)
showinfo()
t.start()

print(‘end’)

 

 

currentthread = <_MainThread(MainThread, started 4048)>

main thread = <_MainThread(MainThread, started 4048)>

active count = 1

currentthread = <Thread(work, started 9084)>

end

main thread = <_MainThread(MainThread, stopped 4048)>

active count = 2

finsh

finsh

finsh

finsh

finsh

Finsh

 

名稱 含義
Name 他只是一個名字,只是一個標識符,名字可以重名,getname()獲取,setname()設置這個名詞
Ident 線程id,是非0的整數,線程啟動后才會有ID,否則為None,線程退出,此id依舊可以訪問,此id可以重復訪問。
Is_alive() 返回線程是否或者

線程的name只是一個名稱,可以重復;id必須唯一,但可以在線程退出后在利用。

 

import threading

import time

 

 

def worker():

count = 0

while True:

if count > 5:

break

time.sleep(2)

count += 1

print(threading.current_thread().name)

 

t = threading.Thread(name=’work’,target=worker)

print(t.ident)

t.start()

 

while True:

time.sleep(1)

if t.is_alive():

print(‘{}{}alive’.format(t.name,t.ident))

else:

print(‘{}{}dead’.format(t.name,t.ident))

名稱 含義
Start() 啟動線程,每一個線程必須且只能執行該方法一次
Run() 運行線程函數

 

Start()啟動線程,只能執行一次。操作系統。開辟新的線程。

Run()直接做的是主線程。函數調用。

(1)start()

import threading
import time

def worker():
count = 0
while True:
if count > 5:
break
time.sleep(3)
count += 1
print(‘running’)
class Mythread(threading.Thread):
def start(self):
print(‘start—-‘)
super().start()

def run(self):
print(‘run—-‘)
super().run()

t = Mythread(target=worker,name=’work’)

t.start()

start方法運行結果是start—-

run—-

Running

按照線程進行執行。

(2)run()

import threading
import time

def worker():
count = 0
while True:
if count>3:
break
time.sleep(2)
count += 1
print(‘runing’)
class Mythread(threading.Thread):
def start(self):
print(‘start—-‘)
super().start()

def run(self):
print(‘run—-‘)
super().run()

t = Mythread(target=worker,name=’work1’)
t.run()

# run—-
# runing

總結:run()執行結果就是直接是函數,調用,調用run函數。

Start()方法會調用run()方法,而run()方法可以運行函數。

 

  • start和run的區別

Start方法啟動線程,啟動了一個新的線程,名字叫做worker運行,但是run方法,并沒有啟動新的線程,只是在主線程內調用了一個普通的函數。

 

7)多線程

 

多線程,一個進程中如果有多個線程,就是多線程,是先一種并發。

import threading
import time

def worker():
count = 0
while True:
if count>3:
break
time.sleep(2)
count += 1
print(‘runing’)
print(threading.current_thread().name,threading.current_thread().ident)
class Mythread(threading.Thread):
def start(self):
print(‘start—-‘)
super().start()

def run(self):
print(‘run—-‘)
super().run()

t1 = Mythread(target=worker,name=’work1′)
t2 = Mythread(target=worker,name=’work2’)

# t1.run()
# t2.run()
####runing
# MainThread 1380
# runing
# MainThread 1380
# runing
# MainThread 1380
t1.start()
t2.start()
# start—-
# run—-
# start—-
# run—-
# runing
# work2 5048
# runing
# work1 9048

 

Start()方法work1和work2交替執行。啟動線程后,進程內多個活動的線程并行工作,就是多線程。

Run()方法中沒有開啟新的線程,就是普通函數調用,所以執行完t1.run()

,然后執行t2.run(),run()方法就不是多線程。

 

一個進程中至少有一個線程,并作為程序的入口,這個線程就是主線程,一個線程必須有一個主線程。

其他線程成為工作線程。

 

8)線程安全

import threading

def worker():
for x in range(100):
print(‘{}is running’.format(threading.current_thread().name))

for x in range(1,4):
name = ‘worker{}’.format(x)
t = threading.Thread(name=name,target=worker)
t.start()

 

利用ipython執行的結果是不是一行行的打印,而是很多字符串打印在了一起。

這樣說明了print函數被打斷了,被線程切換打斷了,print函數分為兩步,第一步是打印字符串,第二部是換行,就在這個期間,發生了線程的切換,說明了print函數是線程不安全的。

 

線程安全:線程執行一段代碼,不會產生不確定的結果,那么這段代碼是線程安全的。

 

解決上面打印的問題:

(1)不讓print打印換行

import threading

 

def worker():

for x in range(100):

print(‘{} is running.\n’.format(threading.current_thread().name),end=”)

 

for x in range(1,5):

name = ‘worker{}’.format(x)

t = threading.Thread(name=name,target=worker)

t.start()

利用字符串是不可變類型,可以作為一個整體不可分割輸出,end=’’就不在print輸出換行了。

 

  • 使用logging

標準庫里面的logging模塊,是日志處理模塊,線程安全的,生產環境代碼都使用logging。

import threading
import logging

def worker():
for x in range(100):
# print(‘{} is running.\n’.format(threading.current_thread().name),end=”)
logging.warning(‘{}is running’.format(threading.current_thread().name))
for x in range(1,5):
name = ‘worker{}’.format(x)
t = threading.Thread(name=name,target=worker)
t.start()

 

 

 

9)daemon線程和non-daemon線程

daemon不是Linux里面的守護進程。

 

進程靠線程執行代碼,至少有一個主線程,其他線程是工作線程。

主線程是第一個啟動的線程。

父線程:如果A中啟動了一個線程B,那么A就是B的父線程。

子線程:B就是A的子線程。

 

源碼Thread的__init__ 方法中。

If deamon is not None:

Self._daemonic = daemon

else:

Self._daemonic = current_thread().daemon

Self._ident = None

線程daemon屬性,如果設定就是用戶的設置,否則,就取當前線程的daemon的值。

主線程是non-daemon線程,即daemon = False。

import time
import threading

def foo():
time.sleep(5)
for i in range(20):
print(i)

t = threading.Thread(target=foo,daemon=False)
t.start()
print(‘end’)

 

daemon設置False值,主線程執行完畢后,等待工作線程。

import time
import threading

def foo():
time.sleep(5)
for i in range(20):
print(i)

t = threading.Thread(target=foo,daemon=True)
t.start()
print(‘end’)

Daemon值改為true,主線程執行完畢后直接退出。

名稱 含義
Daemon 表示線程是否是daemon,這個值必須在start()之前設置,否則引發RuntimeError異常
IsDaemon() 是否是daemon線程
SetDaemon 設置daemon線程,必須在start方法之前設置。

總結:線程阿具有一個daemon屬性,可以顯示設置為True或者False,也可以不設置,則取默認值None。

 

如果不設置daemon,就取當前線程的daemon來設置他。

主線程是non-daemon線程,即daemon = False。

從主線程創建的所有線程的不設置daemon屬性,則默認daemon = False,也就是non-daemon線程。

程序在沒有活著的non-daemon線程運行時推出,也是就剩下的只是daemon線程,主線程才能推出。否則主線程只能等待。

 

構造線程的時候,可以設置daemon屬性,這個屬性必須在start方法前設置好。

daemon=True主線程不等。工作線程

daemon=False主線程等。只要有一個non-daemon就會等待。

控制一個屬性的。

在start之前。

 

只是有一個non-daemon就會等待,沒有的話直接不等,直接結束線程。

總結:

線程具有daemon屬性,可以設置為True或者False。

(激活的non-daemon,主線程才會等待工作線程。)

import time
import threading

def bar():
time.sleep(10)
print(‘bar’)

def foo():
for i in range(20):
print(i)
t = threading.Thread(target=bar,daemon=False)
t.start()
t = threading.Thread(target=foo,daemon=True)
t.start()

print(‘end’)

 

這樣不會執行bar的,因為主線程的daemon設置的值為True,改為False就好了。

活著讓主線程sleep幾秒。

import time
import threading

def bar():
time.sleep(10)
print(‘bar’)

def foo(n):
for i in range(n):
print(i)
t1 = threading.Thread(target=foo,args=(10,),daemon=True)
t1.start()
t2 = threading.Thread(target=foo,args=(20,),daemon=False)
t2.start()

time.sleep(6)
print(‘end’)

 

如果non-daemon線程的時候,主線程退出,也不會結束所有的daemon線程,直到所有的non-daemon線程全部結束,如果還有daemon線程,主線程需要退出,會結束所有的daemon線程,退出。

 

主線程是non-daemon。其他線程靠傳參。

決定的是是否需要等待。如果有激活的non-daemon,就需要等待,沒有激活的,主線程直接退出。

 

  • join方法

import time
import threading
def foo(n):
for i in range(n):
print(i)
time.sleep(1)

t1 = threading.Thread(target=foo,args=(10,),daemon=True)
t1.start()
t1.join()

 

 

利用join,主線程被迫等待他。把當前線程阻塞住了,x.join就等待誰。保證代碼的執行順序。

 

使用了join方法后,daemon線程執行完了,主線程才退出了。

 

Join(timeout= None),是線程的標準方法之一。

一個線程中調用另一個線程的join方法,調用者將被阻塞,直到被調用線程終止。

一個線程可以被join多次。

Timeout參數指定調用者等待多久,沒有設置超時的,就會一直等待到調用的線程結束。

調用誰的join方法,就是join誰,就要等睡。

 

 

 

 

  • daemon線程應用場景

簡單來說,本來并沒有daemon Thread,這個概念唯一的作用是,當把一個線程設置為daemon,他會隨著主線程的退出而退出。

主要應用場景為:

  • 后臺任務。發送心跳包,監控。
  • 主線程工作才有用的線程。如主線程中維護著公共的資源,主線程已經清理了,準備退出,而工作線程使用這些資源工作沒有意義了,一起退出最合適。
  • 隨時可以被終止的線程。

 

如果主線程退出,想所有其他工作線程一起退出,就使用daemon=True來創建工作線程。

import time
import threading

def bar():
while True:
time.sleep(1)
print(‘bar’)

def foo():
print(‘t1 daemon = {}’.format(threading.current_thread().isDaemon()))
t2 = threading.Thread(target=bar)
t2.start()

print(‘t2 daemon = {}’.format(t2.isDaemon()))

t1 = threading.Thread(target=foo,daemon=True)
t1.start()

time.sleep(3)
print(‘Main end’)

 

改造成一直執行的:

import time
import threading

def bar():
while True:
time.sleep(1)
print(‘bar’)

def foo():
print(‘t1 daemon = {}’.format(threading.current_thread().isDaemon()))
t2 = threading.Thread(target=bar)
t2.start()
t2.join()
print(‘t2 daemon = {}’.format(t2.isDaemon()))

t1 = threading.Thread(target=foo,daemon=True)
t1.start()
t1.join()

time.sleep(3)
print(‘Main end’)

 

Daemon線程,簡化了手動關閉線程的工作。

 

12)threading.local 類

 

局部變量的實現:

import threading
import time

def worker():
x = 0
for i in range(10):
time.sleep(0.01)
x += 1
print(threading.current_thread(),x)

for i in ?range(10):
threading.Thread(target=worker).start()

 

利用全局變量實現:

import threading
import time

globals_data = threading.local()

def worker():
globals_data.x = 0
for i in range(10):
time.sleep(0.01)
globals_data.x += 1
print(threading.current_thread(),globals_data.x)

for i in ?range(10):
threading.Thread(target=worker).start()

 

import threading

X = ‘abc’
ctx = threading.local()
ctx.x = 123

print(ctx,type(ctx),ctx.x)

def worker():
print(X)
print(ctx)
print(ctx.x) ??#打印的時候出錯,表示x不能跨線程
print(‘working’)

worker()
print()
threading.Thread(target=worker).start() #另一個線程啟動

 

threading.local類構建了一個大字典,其元素是每一線程實例地址為key和線程對象引用線程單獨的字典的映射。

 

通過threading.local實例就可在不同的線程中,安全的使用線程獨有的數據,做到了線程間數據隔離,如同本地變量一樣安全。

 

Local和線程相關的大字典,每次利用的時候利用線程的小字典來頂替local實例的大字典。

不利用的話,全局變量的話直接就是threading.local和本地線程相關的數據。

 

 

13)定時器timer延遲執行

Threading.Timer繼承自thread,這個類用來另一多久執行一個函數。

Class threading.Timer(interval,function,args=None,kwargs=None)

Start方法執行以后,Timer對象會處于等待狀態,等待了interval之后,開始執行function函數的。如果在執行函數之前的等待階段,使用了cancel方法,就會跳過執行函數結果。

 

本質上就是一個Thread,只是沒有提供name,daemon。

import threading
import logging
import time

def worker():
logging.info(‘in worker’)
time.sleep(2)

t = threading.Timer(5,worker)
t.start() ?#啟動
print(threading.enumerate())
t.cancel() ??#取消
time.sleep(1)
print(threading.enumerate())

 

[<_MainThread(MainThread, started 7512)>, <Timer(Thread-1, started 6644)>]

[<_MainThread(MainThread, started 7512)>]

 

import threading
import logging
import time

def worker():
logging.info(‘in worker’)
time.sleep(2)

t = threading.Timer(5,worker)
t.cancel() ??#取消
t.start() ?#啟動
print(threading.enumerate())
time.sleep(1)
print(threading.enumerate())

 

[<_MainThread(MainThread, started 7512)>]

[<_MainThread(MainThread, started 7512)>]

 

 

 

二、線程同步

1、概念

線程同步,線程間協同,通過某種技術,讓一個線程訪問某些數據時候,其他線程不能訪問這些數據,直到該線程完成對數據的操作。

 

不同操作系統實現技術有所不同,有臨界區、互斥量、信號量、事件Event。

 

2、Event

Event事件,是線程間通信機制中最簡單的實現,使用一個內部的標記flag,通過flag的True或False的變化來進行操作。

名稱 含義
set() 標記為True
clear() 標記為False
is_set() 標記是否為True
Wait(timeout=None) 設置等待標記為True的時長,None為無限等待,等到返回True,未等到超時了返回False。

 

課堂練習:老板雇傭了一個工人,讓他生產杯子,老板一直等著這個工人,直到上產了十個杯子。

1)利用join

import threading
import time
import logging

def worker(count=10):
cups = []
while len(cups)<count:
logging.info(‘wprking’)
time.sleep(0.01)
cups.append(1)
print(len(cups))
logging.info(‘I am finished’)
w = threading.Thread(target=worker)
w.start()
w.join()

 

 

 

 

2)利用event

import threading
import logging
import time

def boss(event:threading.Event):
logging.info(‘I am boss,waiting’)
event.wait()
logging.info(‘good job’)

def worker(event:threading.Event,count=10):
logging.info(‘I am working for u’)
cups = []
while True:
logging.info(‘makeing’)
time.sleep(1)
cups.append(1)
if len(cups) >= count:
print(len(cups))
event.set()
break
logging.info(‘finished my job.cups={}’.format(cups))

event = threading.Event()
w = threading.Thread(target=worker,args=(event,))
b = threading.Thread(target=boss,args=(event,))
w.start()
b.start()

 

 

 

 

 

 

  • wait的應用

import threading
import logging
logging.basicConfig(level=logging.INFO)

def do(event:threading.Event,interval:int):
while not event.wait(interval): ?#沒有置set,所以是False。 ??不是False的時候就不能進入循環了。
logging.info(‘do sth’) ???#沒三秒打印一次。 ??not False執行此語句

e = threading.Event()
threading.Thread(target=do,args=(e,10)).start()

e.wait(12) ?#整體停留了十秒。
e.set() ???#重置為True。
print(‘end’)

 

  • 練習,實現timer。

 

總結:

使用同一個Event用來做標記。

Event的wait優于time.sleep,更快的切換到其他線程,提高并發效率。

 

 

import threading
import time

class MyTimer:
def __init__(self,interval,function,args,kwargs):
self.interval = interval
self.target = function
self.args = args
self.kwargs = kwargs
self.event = threading.Event()
self.thread = threading.Thread(target=self.target,args=self.args,kwargs=self.kwargs)

def start(self):
self.event.wait(self.interval)
if not self.event.is_set(): ??#如果沒有置False,那么就是False,not False為True,執行run語句。
self.run()

def run(self):
self.start()

self.event.set()

def cancel(self):
self.event.set()

 

 

三、Lock鎖

1)鎖,凡是存在共享資源爭搶的地方都可以使用鎖。從而保證只有一個使用者可以完全使用這個資源。

 

lock.acquire ?上鎖 ???lock.release ?解鎖

import threading
import logging
import time
FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s’
logging.basicConfig(format=FORMAT,level=logging.INFO)

cups = []
def worker(count=10):
logging.info(‘i am work’)
while len(cups) < count:
time.sleep(0.1)
cups.append(1)
logging.info(‘i am finsh.cups={}’.format(len(cups)))

for _ in range(10):
threading.Thread(target=worker,args=(1000,)).start()

 

2018-05-26 15:38:25,913 Thread-1 32 i am work

2018-05-26 15:38:25,913 Thread-2 4332 i am work

2018-05-26 15:38:25,913 Thread-3 9992 i am work

2018-05-26 15:38:25,914 Thread-4 8464 i am work

2018-05-26 15:38:25,914 Thread-5 9968 i am work

2018-05-26 15:38:25,915 Thread-6 8712 i am work

2018-05-26 15:38:25,915 Thread-7 4412 i am work

2018-05-26 15:38:25,915 Thread-8 8456 i am work

2018-05-26 15:38:25,915 Thread-9 8316 i am work

2018-05-26 15:38:25,915 Thread-10 9772 i am work

2018-05-26 15:38:35,925 Thread-8 8456 i am finsh.cups=1000

2018-05-26 15:38:36,023 Thread-7 4412 i am finsh.cups=1001

2018-05-26 15:38:36,023 Thread-1 32 i am finsh.cups=1002

2018-05-26 15:38:36,023 Thread-6 8712 i am finsh.cups=1003

2018-05-26 15:38:36,024 Thread-5 9968 i am finsh.cups=1004

2018-05-26 15:38:36,024 Thread-4 8464 i am finsh.cups=1005

2018-05-26 15:38:36,024 Thread-10 9772 i am finsh.cups=1006

2018-05-26 15:38:36,024 Thread-2 4332 i am finsh.cups=1007

2018-05-26 15:38:36,025 Thread-3 9992 i am finsh.cups=1008

2018-05-26 15:38:36,025 Thread-9 8316 i am finsh.cups=1009

 

運行結果來看,多線程調度,導致了判斷失誤,多生產了杯子只有用到了鎖。

Lock,鎖,一旦線程獲得鎖,其他要獲得鎖的線程將被阻塞。

名稱 含義
acquire(blocking=True,timeout=-1) 默認阻塞,阻塞可以設置超時時間,非阻塞時,timeout禁止設置,成果獲取鎖,返回True,否則返回None
Release 釋放鎖,可以從任何線程調用釋放,

已上鎖的鎖,會被重置到unlocked未上鎖的鎖上調用,拋出RuntimeError異常。

import threading
import logging
import time
FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s’
logging.basicConfig(format=FORMAT,level=logging.INFO)

cups = []
lock = threading.Lock()
def worker(count=10):
logging.info(‘i am work’)
lock.acquire()
while len(cups) < count:
print(threading.current_thread(),len(cups))
time.sleep(0.000001)
cups.append(1)
logging.info(‘i am finsh.cups={}’.format(len(cups)))
lock.release()

for _ in range(10):
threading.Thread(target=worker,args=(1000,)).start()

 

上鎖位置不對,由一個線程搶占,并獨自占鎖并完成任務。

import threading
import logging
import time
FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s’
logging.basicConfig(format=FORMAT,level=logging.INFO)

cups = []
lock = threading.Lock()
def worker(count=10):
logging.info(‘i am work’)
flag= False
while True:
lock.acquire() #獲取鎖

if len(cups) >= count:
flag = True
# print(threading.current_thread(),len(cups))
time.sleep(0.000001)
if not flag:
cups.append(1)
print(threading.current_thread(),len(cups))
lock.release() ??#追加后釋放鎖
if flag:
break
logging.info(‘i am finsh.cups={}’.format(len(cups)))

for _ in range(10):
threading.Thread(target=worker,args=(1000,)).start()

 

鎖保證了數據完整性,但是性能下降好多。

If flag:break是為了保證release方法被執行,否則就出現了死鎖,得到鎖的永遠沒有釋放。

 

計數器類,可以加可以減。

2)加鎖、解鎖

一般加鎖就需要解鎖,但是加鎖后解鎖前,還要有一些代碼執行,就有可能拋出異常,一旦出現異常鎖是無法釋放的,但是當前線程可能因為這個就異常終止了,這就產生了死鎖。

 

加鎖。解鎖常用語句:

  • 使用..finally語句保證鎖的釋放。
  • With上下文管理,鎖對象支持上下文管理。

import threading
import time

class Counter:
def __init__(self):
self._val = 0
self.__lock = threading.Lock()

@property
def value(self):
return self._val

def inc(self):
try:
self.__lock.acquire()
self._val += 1
finally:
self.__lock.release()

def dec(self):
with self.__lock:
self._val -= 1

def run(c:Counter,count=1000):
for _ in range(10):
for i in range(-50,50):
if i<0:
c.dec()
else:
c.inc()

c = Counter()
c1 = 10
c2 = 10
for i in range(c1):
threading.Thread(target=run,args=(c,c2)).start()

while True:
time.sleep(1)
if threading.active_count() == 1:
print(threading.enumerate())
print(c.value)
break
else:
print(threading.enumerate())

 

 

不影響其他線程的切換,但是上鎖后其他線程被阻塞了。只能等待。

 

 

 

3)鎖的應用場景

適用于訪問和修改同一個共享資源的時候,讀寫同一個資源的時候。

 

全部是讀取同一個共享資源需要鎖嗎?

因為共享資源是不可變的,每一次讀取都是一樣的值,所以不用加鎖。

 

使用鎖的注意事項:

 

少用鎖必要時用鎖,使用了鎖,多線程訪問被鎖的資源時候,就成了串行,要么排隊執行,要么爭搶執行。

 

加鎖時間越短越好,不需要拍就立即釋放鎖。

一定要避免死鎖。

 

不使用鎖,有了效率,但是結果是錯的。

使用了鎖,效率低下,但是結果是對的。

 

4)非阻塞鎖使用

import threading
import logging
import time

FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s’
logging.basicConfig(format=FORMAT,level=logging.INFO)

def worker(tasks):
for task in tasks:
time.sleep(0.01)
if task.lock.acquire(False):
logging.info(‘{}{}begin to start’.format(threading.current_thread(),task.name))
else:
logging.info(‘{}{}is working’.format(threading.current_thread(),task.name))

class Task:
def __init__(self,name):
self.name = name
self.lock = threading.Lock()

tasks = [Task(‘task-{}’.format(x))for x in range(10)]

for i in range(5):
threading.Thread(target=worker,name=’worker-{}’.format(i),args=(tasks,)).start()

 

 

5)可重入鎖RLock:

是線程相關的鎖

線程A可重復鎖,并可以多次成功獲取,不會阻塞 ,最后要在線程A中做和acquire次數相同的release。

 

拿到這把鎖的線程可以多次使用。

別的線程拿到的話也是被阻塞的。

一個線程占用鎖的時候,其他線程不能拿到,只能的是阻塞。直到當前線程次有的鎖全部釋放完,其他線程才可以獲取。

 

可重入鎖,與線程相關,可在一個線程中獲取鎖,并可繼續在同一線程中不阻塞獲取鎖,當鎖未釋放完,其他線程獲取鎖就會阻塞。直到當前持有鎖的線程釋放完了鎖。

 

四、Condition

構造方法:condition(lock=None),可以傳入一個lock對象或Rlock對象,默認是Rlock。

名稱 含義
Acquire(*args) 獲取鎖
Wait(self,timeout=None) 等待超時
Notify(n=1) 喚醒之多指定書目個數的等待的線程,沒有等待的線程就沒有任何操作
Notify_all() 喚醒所有等待的線程。

 

用于生產者、消費者模型,為了解決生產者消費者速度匹配的問題:

 

import threading
import logging
import random

FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s’
logging.basicConfig(format=FORMAT,level=logging.INFO)

class Dispatcher:
def __init__(self):
self.data = None
self.event = threading.Event()

def produce(self,total):
for _ in range(total):
data = random.randint(0,100)
logging.info(data)
self.data = data
self.event.wait(1)
self.event.set()

def consume(self):
while not self.event.is_set():
data = self.data
logging.info(‘recieved{}’.format(data))
self.data = None
self.event.wait(0.5)

d = Dispatcher()
p = threading.Thread(target=d.produce,args=(10,),name=’producer’)
c = threading.Thread(target=d.consume,name=’consume’)
c.start()
p.start()

 

消費者采用主動消費,消費者浪費了大量的時間,主動來查看有沒有數據。換成通知的機制。

import threading
import logging
import random

FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s’
logging.basicConfig(format=FORMAT,level=logging.INFO)

class Dispatcher:
def __init__(self):
self.data = None
self.event = threading.Event()
self.cond = threading.Condition()

def produce(self,total):
for _ in range(total):
data = random.randint(0,100)
with self.cond:
logging.info(data)
self.data = data
self.cond.notify_all()
self.event.wait(1)
self.event.set()

def consume(self):
while not self.event.is_set():
with self.cond:
self.cond.wait()
logging.info(‘recieved{}’.format(self.data))
self.data = None
self.event.wait(0.5)

d = Dispatcher()
p = threading.Thread(target=d.produce,args=(10,),name=’producer’)
c = threading.Thread(target=d.consume,name=’consume’)
c.start()
p.start()

 

如果是一個生產者,多個消費者呢:

import threading
import logging
import random

FORMAT = ‘%(asctime)s %(threadName)s %(thread)d %(message)s’
logging.basicConfig(format=FORMAT,level=logging.INFO)

class Dispatcher:
def __init__(self):
self.data = None
self.event = threading.Event()
self.cond = threading.Condition()

def produce(self,total):
for _ in range(total):
data = random.randint(0,100)
with self.cond:
logging.info(data)
self.data = data
self.cond.notify_all()
self.event.wait(1) ?#模擬生產速度
self.event.set()

def consume(self):
while not self.event.is_set():
with self.cond:
self.cond.wait() ?#阻塞等通知
logging.info(‘recieved{}’.format(self.data))
self.data = None
self.event.wait(0.5) ?#模擬消費 的速度

d = Dispatcher()
p = threading.Thread(target=d.produce,args=(10,),name=’producer’)

for i in range(5):
c = threading.Thread(target=d.consume, name=’consume{}’.format(i))
c.start()
p.start()

 

 

Self.cond.notify_all()發通知:

修改為self.cond.notify(n=2) ?隨機通知兩個消費者。

Condition總結:

用于生產者消費者模型中,解決生產者,消費者速度匹配的問題。

采用了通知機制,非常有效率。

 

使用方式:

使用condition,必須先acquire,用完了要release。因為內部實現了鎖,默認使用了RLock鎖。最好的方式就是使用上下文。

消費者wait,等待通知。

生產者生產好消息,對消費者發出通知,可以使用notify或者notify_all方法。

 

本文來自投稿,不代表Linux運維部落立場,如若轉載,請注明出處:http://www.www58058.com/99570

(0)
604603701@qq.com604603701@qq.com
上一篇 2018-05-27
下一篇 2018-05-27

相關推薦

  • Centtos7搭建ftp服務

    Centtos7搭建ftp服務 下載安裝軟件包 yum -y install vsftpd ? 開啟啟用ftp服務 systemctl start vsftpd ???#設置立即啟用該服務 systemctl status vsftpd ??#查看該服務當前運行狀態 systemctl enable vsftpd ??#設置開機自動啟用該服務 systemc…

    Python筆記 2018-07-07
  • ss

    sas

    Python筆記 2018-05-10
  • Python 部分知識點總結(七)

    此篇博客只是記錄第九周未掌握或不熟悉的知識點,用來加深印象。

    Python筆記 2018-05-06
  • PYTHON類型注解

    PYTHON類型注解 函數定義的弊端 Python是動態語言,變量隨時可以被賦值,且能賦值為不同的類型 Python不是靜態編譯型語言,變量類型是在運行器決定的 動態語言很靈活,但是這種特性也是弊端 def add(x, y):return x + yprint(add(4, 5))print(add(‘hello’, ‘…

    Python筆記 2018-05-02
  • Python函數

    函數 數學函數 Python函數 若干語句塊、函數名稱、參數列表構成,組織代碼的最小單元 完成一定的功能 作用 結構化編程對代碼的最基本的封裝,一般按照功能組織一段代碼 復用,減少冗余代碼 簡潔美觀,可讀易懂 函數分類 內建函數,max()、reversed() 庫函數,math.ceil() 函數定義、調用 def語句定義函數 def 函數名(參數列表):…

    2018-04-16
欧美性久久久久