下面是一個測試solib庫中調用函數的測試腳本,但該腳本還存在這一些問題,我目前無法理解和解決;
問題:
1.我定義了logging采用日志滾動的方式,寫日志,并且每個日志的大小是20M,但測試結果發現日志連1M都沒到就開始輪轉了,并且在輪轉過程中,還出現logging寫日志,卻發現,日志輪轉了,結果竟然報了,輪轉日志不存在。
Traceback (most recent call last):
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit
self.doRollover()
Traceback (most recent call last):
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit
self.doRollover()
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 135, in doRollover
os.remove(dfn)
OSError: [Errno 2] No such file or directory: '/tmp/mysofuns_test.log.5'
2.在進行多線程測試時,發現寫到文件中的日志和直接打到屏幕上的日志竟然不一樣。但經過分析,發現寫到文件中的日志,出現了重復,而打到屏幕上的日志卻是正常的。不能理解,暫時我也不知道如何避免。
#!/usr/bin/python27 #coding:utf-8 import threading from os.path import exists,basename from os import mkdir,getcwd,sep import sys import time import logging logging.basicConfig(level=logging.DEBUG,format='%(asctime)s (%(threadName)-2s) %(message)s',) class fileso(): conf_dir = getcwd() + '/conf' conf_filename = "config.py" def init(self,conf_dir=None,conf_filename=None): if conf_dir: self.conf_dir = conf_dir conf_dir = self.conf_dir sys.path.append(conf_dir) if conf_filename: self.conf_filename = conf_filename conf_filename = conf_dir + sep + self.conf_filename if exists(conf_dir): if exists(conf_filename): return True else: print "沒有在",conf_dir,"中找到測試主配置文件:",conf_filename else: mkdir(conf_dir) with open(conf_filename,'w+') as conf_file: print "開始初始化主配置文件...." conf_contant = """ #coding:utf-8 # 設置輸出日志的格式,默認只輸出消息. # 如: -f fn 或 -f f lineno,dt,tn,pid # 可使用的格式符號: # ln 顯示日志級別名稱. # fp 顯示此程序的絕對路徑. # fn 顯示當前執行程序名. # funcn 顯示打印日志的函數名. # lineno 顯示日志的行號. # dt 顯示日期時間. # tn 顯示進程名. # pid 顯示進程pid. logformat = "" # 設置輸出日志的文件名: logfilename = "/tmp/mytest.log" # so庫函數的位置: so_file = "/tmp/libPSBCSM3JDLL.so" # C語言類型與測試工具需要的類型的對應表 # #-------------------------------------------------- # C type =============== TEST type #-------------------------------------------------- # # char ---------------- strchar (1-character string) # char* ---------------- strchar* # # char ----------------- char (byte char) # unsigned char -------- unsigned char (byte char) # unsigned char* ------- unsigned char* (byte char) # wchar_t -------------- c_wchar (1-character unicode string) # wchar_t* ------------- c_wchar_p (unicode or None) # # short ---------------- short # unsigned short ------- unsigned short # int ------------------ int # int8 ----------------- int8 (8bit int) # int16 ---------------- int16 (16bit int) # int32 ---------------- int32 (32bit int) # int64 or __int64 ----- int64 (64bit int) # unsigned int8 -------- unsigned int8 (8bit unsigned int) # unsigned int16 ------- unsigned int16 (16bit unsigned int) # unsigned int32 ------- unsigned int32 (32bit unsigned int) # unsigned int64 ------- unsigned int64 (64bit unsigned int) # long ----------------- long # unsigned long -------- unsigned long # long long ------------ long long # # float ---------------- float # double --------------- double # # bool ----------------- bool # void* ---------------- void* # #-------------------------------------------------- # # so庫函數所有的參數類型 so_args = ( #['FunctionsName','Func_return_type','Func_type1','Func_type2',...] ["TestFuncsName1", "int", "unsigned char*", "unsigned char*", "unsigned char*", "unsigned char*", "int", "int", "int", "__int64", "int"], # <--- Note:"," ["TestFuncsName2", "int", "unsigned char*", "unsigned char*", "unsigned char*", "unsigned char*", "int", "int", "int", "__int64", "int"], # <--- Note:"," ) # 給so庫函數提供的所有測試數據. # 下面是為了更清晰而分行寫了,它們是可以寫成一行的. # 寫成一行的格式: # so_value = [ (["函數名",'第一個參數','第二個參數','第三個參數',....]), ] # 請注意書寫格式: 特別是逗號; 如------------------------------------- ^ ,這個。 so_value = [ #(["FunctionName",'Func_parameter1','Func_parameter2','Func_parameter3',....]), ("TestFuncsName1", ['0x1234567812345678123456781234567812345678123456781234567812345678', '12345678901', '123456', '123456', 0, #<---- 注:若整數類型的,可直接寫. 2, 6, 20160101000000, 0 ], # <--- 注意:"," ['0x1234567812345678123456781234567812345678123456781234567812345678', '12345678901', '123456', '123456', 0, 2, 6, 20160101000000, 0 ], # <--- Note:"," ), # <--- Note:"," ("TestFuncsName2", ['0x1234567812345678123456781234567812345678123456781234567812345678', '12345678901', '123456', '123456', 0, 2, 6, 20160101000000, 0 ], # <--- Note:"," ['0x1234567812345678123456781234567812345678123456781234567812345678', '12345678901', '123456', '123456', 0, 2, 6, 20160101000000, 0 ], ), # <--- Note:"," ] """ conf_file.write(conf_contant) conf_file.flush() conf_file.close() print "初始化完成,現在可以修改主配置文件:",conf_filename, \ '了\n修改完成后,直接執行',sys.argv[0],'即可.\n查看幫助可輸入:',sys.argv[0],' -h' \ '若需要重新初始化,則直接刪除:',conf_filename sys.exit(0) osFuncs = {} conf_fobj = None logformat = None logfilename = None multiThread = False # 注: 我對C語言不是太懂,以前學了皮毛,都還老師了, #故:這里的C類型對應,是按PythonDOC中的說明寫的。其中char存在歧義,故做了一點修改, #還有int64不是太確定以下定義是否正確。 dict_ctype = {"strchar":'c_char',"strchar*":'c_char_p', "wchar_t":'c_wchar',"wchar_t*":'c_wchar_p', "char":'c_byte', "unsigned char":'c_ubyte',"unsigned char*":'c_char_p', "short":'c_short', "unsigned short":'c_ushort', "int":'c_int',"int8":'c_uint8',"int16":'c_uint16', "int32":'c_uint32',"int64":'c_uint64', "unsigned int8":'c_uint8',"unsigned int16":'c_uint16', "unsigned int32":'c_uint32',"unsigned int64":'c_uint64', "long":'c_long', "unsigned long":'c_ulong', "__int64":'c_longlong',"long long":'c_longlong', "unsigned __int64":'c_ulonglong',"unsigned long long":'c_ulonglong', "float":'c_float', "double":'c_double', "bool":'c_bool', "void*":'c_void_p', } def __init__(self,conf_dir=None,conf_filename=None): self.init(conf_dir,conf_filename) self.conf_fobj = __import__(self.conf_filename.rstrip('.py')) self.logfilename = self.conf_fobj.logfilename self.logformat = self.conf_fobj.logformat.split(',') def setMultiThread(self,mBool): self.multiThread = mBool def convertType_c2t(self): """The function is c type convert test type. """ fargs = self.conf_fobj.so_args funcs = {} paras = "" retargs = {} for l in fargs: # Second paras: return type of the function retargs[l[0]]=(self.dict_ctype[l[1]]) for i in range(len(l[2:])): paras = paras + "," + self.dict_ctype[l[i+2]] # First paras: function name of os file in funcs[l[0]] = paras.lstrip(',') paras = "" return funcs,retargs def handleValue_t2c(self,values,paras): """The function is test value convert c value.""" count = 0 vStr = "" intlong = ("c_byte","c_ubyte","c_short","c_ushort","c_int","c_long", \ "c_ulong","c_longlong","c_ulonglong","c_uint8","c_uint16", \ "c_uint32","c_uint64",) for p in paras.split(','): if p == "c_bool": # c type:_Bool = py type:bool(1) vStr = vStr + ',' + str(values[count]) elif p == "c_char": # c type:char = py type: 1-character string vStr = vStr + ',"' + values[count] + '"' elif p == "c_wchar": # c type:wchar_t = py type: 1-character unicode string vStr = vStr + ',"' + values[count] + '"' elif p in intlong: # c type:intlong = py type: int/long vStr = vStr + ',' + str(values[count]) elif p in ('c_float','c_double','c_longdouble'): # c type:() = py type: float vStr = vStr + ',' + str(values[count]) elif p == 'c_char_p': # c type:char* = py type:string or None vStr = vStr + ',"' + values[count] + '"' elif p == 'c_wchar_p': # c type:wchar_t* = py type:unicode or None vStr = vStr + ',"' + values[count] + '"' elif p == 'c_void_p': # c type:void* = py type:int/long or None vStr = vStr + ',"' + str(values[count]) + '"' count += 1 return vStr.lstrip(',') # 注:這里不是注釋,預覽看這里變成注釋了; def content_join(self,mainContent): imp_section_content = """ #coding:utf-8 from ctypes import * import logging from logging.handlers import RotatingFileHandler """ format = "" for f in self.logformat: if f == 'ln': format = format + ' %(levelname)s' elif f == 'fp': format = format + ' %(pathname)s' elif f == 'fn': format = format + ' %(filename)s' elif f == 'funcn': format = format + ' %(funcName)s' elif f == 'lineno': # output: log row number. format = format + ' %(lineno)d' elif f == 'dt': # output: data time. format = format + ' %(asctime)s' elif f == 'tn': format = format + ' %(threadName)s' elif f == 'pid': format = format + ' %(process)d' format = format + ' %(message)s' log_section_content = 'format="' + format + '"\n' + 'logfilename="' + self.logfilename + '"' + """ log = logging.getLogger() Rthandler = RotatingFileHandler(logfilename,maxBytes=20*1024*1024,backupCount=5) Rthandler.setFormatter(logging.Formatter(format)) log.addHandler(Rthandler) log.setLevel(logging.INFO) """ main_section_content = "fso='" + self.conf_fobj.so_file + "'" + ''' cdll.LoadLibrary(fso) openFso = CDLL(fso) ''' + mainContent content = imp_section_content + '\n' + log_section_content + '\n' + main_section_content return content def constructor(self): '''功能: 合成調用so庫函數的語句; 語句有三種: 1.指定so庫函數的參數都是什么類型; 2.指定so庫函數的返回值是什么類型; 3.指定so庫函數需要的所有參數值. ''' # convertType_c2t return 2 value: # funcs={"funcName":"funcParaTypes"} # retargs={funcName:returnParaType} funcs,retargs = self.convertType_c2t() mainContentAll = "" if self.multiThread: compile_main = {} compile_common = compile(self.content_join(""),'','exec') self.osFuncs[compile_common] = compile_main # paraValues=('funcName',['paras1','paras2',...],[...],...) # so_value=[paraValues,paraValues,...] for paraValues in self.conf_fobj.so_value: for k_funcName,v_funcStrParaTypes in funcs.items(): if paraValues[0] == k_funcName: count = 0 for groupval in paraValues[1:]: funcStrParaValues = self.handleValue_t2c(groupval,v_funcStrParaTypes) #指定so庫函數的參數都是什么類型; funcName_type_join = 'openFso.' + k_funcName + '.argtpes=[' + v_funcStrParaTypes + ']' #指定so庫函數的返回值是什么類型; funcName_retType_join = 'openFso.' + k_funcName + '.restype=' + retargs[k_funcName] #指定so庫函數需要的所有參數值. call_func_join = 'openFso.' + k_funcName + '(' + funcStrParaValues + ')' output_msg_c1 = "log.info('測試:" + str(count) + "')" output_msg_c2 = "log.info('測試內容:" + call_func_join + "')" exec_func_join = "exec_func = " + call_func_join output_msg_c3 = "log.info('測試結果:'+str(exec_func))" mainContent = funcName_type_join + '\n' + \ funcName_retType_join + '\n' + \ output_msg_c1 + '\n' + \ exec_func_join + '\n' + \ output_msg_c2 + '\n' + \ output_msg_c3 + '\n' # 生成一個文件中包含so庫的所有函數 mainContentAll = mainContentAll + '\n' + mainContent if self.multiThread: # 每個元素是一個文件,一個文件中只測試一個so庫函數 if count == 0: compile_custom_test = [] #print 'k_funcName=',k_funcName,'\nmainContent=',mainContent compile_custom_test.append(compile(mainContent,'','exec')) compile_main[k_funcName]=compile_custom_test count += 1 return self.content_join(mainContentAll) def starttest(self): ''' 單線程測試. ''' exec self.constructor() print "提示:請查看輸出日志文件:",self.logfilename def mstarttest(self): ''' 根據測試函數的個數啟動相應數量的線程. ''' self.setMultiThread(True) self.constructor() self.run() def num_starttest(self,num): """啟動指定倍數的線程""" self.setMultiThread(True) self.constructor() for i in range(num): self.run('--'+str(i)+"輪") def run(self,mark=""): if self.osFuncs: # self.osFuncs=[compile_common_code1,{"funcName":[compile_custom_test_para1,...]},{..},..] k_common_code, = self.osFuncs all_paras_code = self.osFuncs[k_common_code] # condition: 它必須是全局條件鎖,否則它將失去它的作用. condition = threading.Condition() worker = [] i = 0 for k_funcName,v_execcode in all_paras_code.items(): funcCode = (k_common_code,v_execcode,) w = threading.Thread(name='Worker'+str(i)+mark,target=self.worker,args=(condition,funcCode,)) worker.append(w) i += 1 b = threading.Thread(name='Boss',target=self.boss,args=(condition,)) for t in worker: t.start() b.start() def boss(self,condition): logging.debug('多線程測試開始...') # 這里用with語句寫的,但其實,我對它并不是很了解,只是照書上寫的。如果有了解的,也可以多交流,呵呵。。 with condition: #這里調用notifyAll同時通知所有已經處于wait狀態的子線程,使它們同時啟動起來。實現一個簡單的并行壓測。 condition.notifyAll() def worker(self,condition,codefile): #with condition就類似于if condition.acquire():,來判斷是否獲取了鎖; #with語句的優勢: 似乎是可以通過__exit__()來釋放鎖;這個我不確定,可能理解錯誤。 #這篇文章,介紹with語句的用法:http://blog.csdn.net/suwei19870312/article/details/23258495 with condition: #線程執行wait()方法時,會處于等待狀態,它只有接受到notify通知后,才能獲得鎖。 condition.wait() exec codefile[0] for codeObj in codefile[1]: #print 'codeObj=',codeObj exec codeObj help_msg = '使用格式:' + sys.argv[0] + ' [-h | -f | -mf | -mn]\n' + \ '參數:\n' + \ '\t-h 顯示幫助信息.\n\t-mf 啟動線程數=測試函數的個數\n' + \ '\t-mn 啟動線程數=測試函數的個數 x 指定的數.如: -mn 3 就是啟動3倍的函數個數.\n' def handle_paras(): fso = fileso() count = 1 for args in sys.argv[1:]: if args in ('/h','-h','--help'): print help_msg sys.exit(0) elif args == '-mf': fso.mstarttest() return elif args == '-mn': Num = int(sys.argv[count+1]) fso.num_starttest(Num) return fso.starttest() if __name__ == '__main__': handle_paras()
原創文章,作者:Wn1m,如若轉載,請注明出處:http://www.www58058.com/10758