一、線程同步
-
線程同步:線程間協同,通過某種技術,讓一個線程訪問某些數據時,其他線程不能訪問這些數據,直到該線程完成對數據的操作不同操作系統實現技術有所不同,有臨界區(Critical?Section)、互斥量(Mutex)、信號量(Semaphore)、事件(Event)等
-
Event?是使用一個內部的標記?flag,通過?flag?的?True?或?False?的變化來進行操作?? ?? ? set():標記設置為?True?? ?? ? clear():標記設置為?False?? ?? ? is_set():標記是否為?True?? ?? ? wait(timeout=None):設置等待標記為?True?的時長,None?為無限等待,等到返回?True,未等到超時了返回?False
-
wait?的使用from threading import Event, Threadimport logginglogging.basicConfig(level=logging.INFO)def do(event, interval):????while not event.wait(interval):????????logging.info(‘do sth’)e = Event()Thread(target=do, args=(e, 3)).start()e.wait(10)e.set()print(‘main exit’)Event?的?wait?優于 time.sleep,它會更快的切換到其他線程,提高并發效率
-
實現?Timer,延時執行的線程,延時計算?add(x, y)from threading import Event, Threadimport loggingimport datetimelogging.basicConfig(level=logging.INFO)def add(x, y):????logging.info(x + y)class Timer:????def __init__(self, interval, fn, *args, **kwargs):????????self.interval = interval????????self.fn = fn????????self.args = args????????self.kwargs = kwargs????????self.event = Event()????def start(self):????????Thread(target=self.__run).start()????def cancel(self):????????self.event.set()????def __run(self):????????start = datetime.datetime.now()????????logging.info(‘waiting’)????????self.event.wait(self.interval)????????if not self.event.is_set():????????????self.fn(*self.args, **self.kwargs)????????delta = (datetime.datetime.now() – start).total_seconds()????????logging.info(‘finished {}’.format(delta))????????self.event.set()t = Timer(10, add, 4, 50)t.start()e = Event()e.wait(4)print(‘=========’)
-
鎖:凡是存在共享資源爭搶的地方都可以使用鎖,從而保證只有一個使用者可以完全使用這個資源,一旦線程獲得鎖,其它師徒獲取鎖的線程將被阻塞acquire(blocking=True, timeout=-1):默認阻塞,阻塞可以設置超時時間。非阻塞時,timeout?禁止設置。成功獲取鎖,返回?True,否則返回?Falserelease():釋放鎖,可以從任何線程調用釋放,已上鎖的鎖,會被重置為?unlocked,未上鎖的鎖被調用,會拋出?RuntimeError?異常
-
死鎖:一般來說,加鎖就需要解鎖,但是加鎖后解鎖前,還要有一些代碼執行,就有可能拋異常,一旦出現異常,鎖是無法釋放,但是當前線程可能因為這個異常被終止了,這就產生了死鎖。加鎖、解鎖常用語句:?? ?? ? 使用?try …?finally?語句保證鎖的釋放?? ?? ? with?上下文管理,鎖對象支持上下文管理
-
鎖適用于訪問和修改同一個共享資源的時候,即讀寫同一個資源的時候?? ?? 如果全部都是讀取同一個共享資源,那就不需要鎖,因為這時可以認為共享資源是不可變的,每一次服務它都是一樣的值使用鎖的注意事項:?? ?? ? 少用鎖,必要時用鎖,使用了鎖,多線程訪問被鎖的資源時,就成了串行,要么排隊執行,要么爭搶執行?? ?? ? 加鎖時間越短越好,不需要就立即釋放鎖?? ?? ? 一定要避免死鎖
-
可重入鎖(RLock),是線程相關的鎖,可在一個線程中獲取鎖,并可繼續在同一線程中不阻塞獲取鎖。當鎖未釋放完,其它線程獲取鎖就會阻塞,直到當前持有鎖的線程釋放完鎖。
-
構造方法?Condition(lock=None),可以傳入一個?Lock?或?RLock?對象,默認是?RLockacquire:獲取鎖wait(self, timeout=None):等待或超時notify(n=1):喚醒至多指定數目個數的等待的線程,沒有等待的線程就沒有任何操作notify_all():喚醒所有等待的線程總結:Condition?用于生產者消費者模型中,解決生產者消費者速度匹配的問題,采用了通知機制,非常有效率使用方式:使用?Condition,必須先?acquire,用完了要?release,因為內部使用了鎖,默認使用?RLock?鎖,最好的方式是使用?with?上下文?? ?? ? ?? ??? ?消費者?wait,等待通知?? ?? ? ?? ??? ?生產者生產后對消費者發通知,可以使用?notify?或者?notify_all?方法
二、并發和線程
-
并行(parallel):同時做某些事,可以互不干擾的同一時刻做幾件事(同時發生的概念)并發(concurrency):同時做某些事,但是強調,一個時段內有是事情要處理(眾多車輛在這一段要通過路面的事件就是并發,車很多就是高并發)
-
并發的解決:隊列和緩沖區(緩沖區、優先隊列)、爭搶(鎖機制)、預處理、并行(水平擴展)、提速(垂直擴展)、消息中間件(站外的九曲回腸的走廊)
-
線程:是操作系統能夠進行運算調度的最小單位,被包含在進程之中,是進程中的實際運算單位?? ??? ??? ?? ? 一個程序的執行實例就是一個進程?? ??? ?? ? ? ? 有時被稱為輕量級進程,是程序執行流的最小單元?? ??? ??? ?? ? 一個標準的線程有線程?ID、當前指令指針、寄存器集合和堆棧組成?? ??? ??? ?? ? 在許多系統中,創建一個線程比創建一個進程快 10 – 100?倍進程:是計算機中的程序關于某數據集合上的一次運行活動,是系統運行資源分配和調度的基本單位,是操作系統結構的基礎?? ?? ? ? 現代操作系統提出進程的概念,每一個進程都認為自己獨占所有的計算機硬件資源?? ??? ?? 進程就是獨立的王國,進程間不可以隨便的共享數據關系:程序是源代碼編譯后的文件,而這些文件存放在磁盤上,當程序被操作系統加載到內核中,就是進程,進程中存放著指令和數據(資源),它也是線程的容器?? ??? ?? 線程就是省份,同一個進程內的線程可以共享進程的資源,每一個線程擁有自己獨立的堆棧
-
線程的狀態?? ?? ? 就緒(Ready):線程能夠運行,但在等待被調度,可能線程剛剛創建啟動,或剛剛從阻塞中恢復,或者被其他線程槍占?? ?? ? 運行(Running):線程正在運行?? ?? ? 阻塞(Blocked):線程等待外部事件發生而無法運行,如?I/O?操作?? ?? ? 終止(Terminated):線程完成,或退出,或被取消
-
Thread?類簽名:def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)target:線程調用的對象,就是目標函數name:為線程起個名字args:為目標函數傳遞參數,元祖kwargs:為目標函數關鍵字傳參,字典線程之所以執行函數,是因為線程中就是執行代碼的,而最簡單的封裝就是函數,所以還是函數調用
-
線程退出:線程函數內語句執行完畢的時候?或者?線程函數中拋出未處理的異常
-
threading?的屬性和方法current_thread():返回當前線程對象main_thread:返回主線程對象active_count():當前出來?alive?狀態的線程個數,包括主線程enumerate():返回所有或者的線程的列表,不包括已經終止的線程和未開始的線程,包括主線程get_ident():返回當前線程帶的 ID,非 0?整數
-
Thread?實例的屬性和方法name:只是一個名字,只是個標識,名稱可以重名,getName(),setName()?獲取、設置這個名詞ident:線程?ID,它是非 0?整數,線程啟動后才會有?ID,否則為?None,線程退出,此?ID?依舊可以訪問,此?ID?可以重復使用is_alive:返回線程是否活著注意:線程的?name?是一個名稱,可以重復;ID?必須唯一,但可以在線程退出后再利用
-
start()?方法會調用?run()?方法,而?run()?方法可以運行函數使用?start?方法啟動線程,啟動了一個新的線程,名字叫做?worker?運行使用?run?方法并沒有啟動新的線程,就是在主線程中調用一個普通的函數而已因此,啟動線程請使用?start?方法,才能啟動多個線程
-
一個進程中至少有一個線程,并作為程序的入口,這個線程就是主線程,一個進程至少有一個主線程,其他線程稱為工作線程
-
print?函數是?線程不安全?的,兩個解決辦法:不讓?print?打印換行;使用?loggingimport threadingdef worker():????for x in range(100):????????print(“{} is runnning.\n”.format(threading.current_thread().name), end=”)for x in range(1, 5):????name = “worker{}”.format(x)????t =threading.Thread(name=name, target=worker)????t.start()字符串是不可變的類型,它可以作為一個整體不可分割輸出,end=”?就不再讓?print?輸出換行了import threadingimport loggingdef worker():????for x in range(100):????????logging.warning(“{} is running.”.format(threading.current_thread().name))for x in range(1, 5):????name = “worker{}”.format(x)????t =threading.Thread(name=name, target=worker)????t.start()標準庫里面的?logging?模塊,日志處理模塊,線程安全的,生成環境代碼都是用?logging
-
Python?中,構造線程的時候,可以設置?daemon?屬性,這個屬性必須在?start?方法前設置好?? ??? ??? ?? ? 線程?daemon?屬性,如果設定就是用戶的設置,否則就取消當前線程的?daemon?值?? ??? ??? ?? ? 主線程是?non-daemon?線程,即?daemon = False應用場景:?? ?? ? 后臺任務,如發送心跳包、監控,這種場景最多?? ?? ? 主線程工作才有用的線程,如主線程中維護著公共的資源,主線程已經清理了,準備退出,而工作線程使用這些資源工作也沒有意義了,一起退出最合適?? ?? ? 隨時可以被終止的線程daemon?屬性:表示線程是否是?daemon?線程,這個值必須在?start()?之前設置,否則引發?RuntimeError?異常isDaemon():是否是?daemon?線程setDaemon:設置為?daemon?線程,必須在?start方法之前設置總結:?? ?? ? 線程具有一個?daemon?屬性,可以顯示設置為?True?或?False,也可以不設置,則取默認值?None?? ?? ? 如果不設置?daemon,就取當前線程的?daemon?來設置它?? ?? ? 主線程是?non-daemon?線程,即?daemon =?False?? ?? ? 從主線程創建的所有線程都不設置?daemon?屬性,則默認都是?daemon =?False,也就是?non-daemon線程?? ?? ? Python?程序在沒有活著的?non-daemon?線程運行時退出,也就是剩下的只能是?daemon?線程,主線程才能退出,否則主線程就只能等待
-
join(timeout=None),是線程的標準方法之一?? ?? ? 一個線程中調用另一個線程的?join?方法,調用者將被阻塞,直到被調用線程終止?? ?? ? 一個線程可以被?join?多次?? ?? ? timeout?參數指定調用者等待多久,沒有設置超時,就一直等到被調用線程結束?? ?? ? 調用誰的?join?方法,就是?join?誰,就要等誰
-
Python?提供?threading.local?類,將這個類實例化得到一個全局對象,但是不同的線程使用這個對象存儲的數據其他線程看不見import threadingimport timeglobal_data = threading.local()def worker():????global_data.x = 0????for i in range(100):????????time.sleep(0.001)????????global_data.x += 1????print(threading.current_thread(), global_data.x)for i in range(10):????threading.Thread(target=worker).start()threading.local?類構建了一個大字典,其元素是每一線程實例的地址為?key?和線程對象引用線程單獨的字典的映射,如:{ id(Thread) -> (ref(Thread), thread-local dict) },通過?threading.local?實例就可在不同的線程中,安全地使用線程獨有的數據,做到了線程間數據隔離,如同本地變量一樣安全
-
threading.Timer?繼承自?Thread,這個類用來定義多久執行一個函數?? ?? ? class threading.Timer(interval, function, args=None, kwargs=None)?? ?? ? start?方法執行之后,Timer?對象會處于等待狀態,等待了?interval?之后,開始執行?function?函數?? ?? ? 如果在執行函數之前的等待階段,使用了?cancel?方法,就會跳過執行函數結束?? ?? ? 可以用?setDaemon?來設置?daemon,如果設置成?True 就算?Timer?有?interval?也不管了總結:Timer?是線程?Thread?的子類,就是線程類,具有線程的能力和特征?? ?? ? ? 它的實例是能夠延時執行目標函數的線程,在真正執行目標函數之前,都可以?cancle?它
本文來自投稿,不代表Linux運維部落立場,如若轉載,請注明出處:http://www.www58058.com/99642