python实战 Python⽂件、⽇期和多线程 爱吃窝窝头 2023-11-30 2023-11-30 三、Python⽂件、⽇期和多线程 [TOC]
Python⽂件IO操作涉及⽂件读写操作,获取⽂件 后缀名 ,修改后缀名,获取⽂件修改时间, 压缩⽂ 件, 加密⽂件等操作。
Python⽇期章节,由表⽰⼤⽇期的 calendar, date模块,逐渐过渡到表⽰时间刻度更⼩的模块: datetime, time模块,按照此逻辑展开。
Python 多线程 希望透过5个⼩例⼦,帮助你对多线程模型编程本质有些更清晰的认识。
m⼀共总结最常⽤的 26个关于⽂件和时间处理模块的例⼦。
1 获取后缀名 import osfile_ext = os.path.splitext('./demo1.py' ) front,ext = file_ext
2 ⽂件读操作 import osdef mkdir (path ): isexists = os.path.exists(path) if not isexists: os.mkdir(path) def openfile (filename ): f = open (filename) fllist = f.read() f.close() return fllist
3 ⽂件写操作 f = open (r"./test.txt" , "w" , encoding="utf-8" ) print (f.write("测试⽂件写⼊" ))f.close f = open (r"./test.txt" , "a" , encoding="utf-8" ) print (f.write("测试⽂件写⼊" ))f.close with open (r"./test.txt" , "w" ) as f: f.write("hello world!" )
4 路径中的⽂件名 import osfile_ext = os.path.split('./demo1.py' ) ipath,ifile = file_ext
5 批量修改⽂件后缀 批量修改⽂件后缀
本例⼦使⽤Python的 os模块和 argparse模块,将⼯作⽬录 work_dir下所有后缀名为 old_ext的⽂ 件修改为后缀名为 new_ext
通过本例⼦,⼤家将会⼤概清楚 argparse模块的主要⽤法。
导⼊模块
定义脚本参数
def get_parser (): parser = argparse.ArgumentParser(description='⼯作⽬录中⽂件后缀名修改' ) parser.add_argument('work_dir' , metavar='WORK_DIR' , type =str , nargs=1 ,help ='修改后缀名的⽂件⽬录' ) parser.add_argument('old_ext' , metavar='OLD_EXT' ,type =str , nargs=1 , help ='原来的后缀' ) parser.add_argument('new_ext' , metavar='NEW_EXT' ,type =str , nargs=1 , help ='新的后缀' ) return parser
后缀名批量修改
def batch_rename (work_dir, old_ext, new_ext ): """ 传递当前⽬录,原来后缀名,新的后缀名后,批量重命名后缀 """ for filename in os.listdir(work_dir): split_file = os.path.splitext(filename) file_ext = split_file[1 ] if old_ext == file_ext: newfile = split_file[0 ] + new_ext os.rename(os.path.join(work_dir, filename),os.path.join(work_dir, newfile)) print ("完成重命名" ) print (os.listdir(work_dir))
实现Main
def main (): """ main函数 """ parser = get_parser() args = vars (parser.parse_args()) work_dir = args['work_dir' ][0 ] old_ext = args['old_ext' ][0 ] if old_ext[0 ] != '.' : old_ext = '.' + old_ext new_ext = args['new_ext' ][0 ] if new_ext[0 ] != '.' : new_ext = '.' + new_ext batch_rename(work_dir, old_ext, new_ext)
6 xls批量转换成xlsx import osdef xls_to_xlsx (work_dir ): """ 传递当前⽬录,原来后缀名,新的后缀名后,批量重命名后缀 """ old_ext, new_ext = '.xls' , '.xlsx' for filename in os.listdir(work_dir): split_file = os.path.splitext(filename) file_ext = split_file[1 ] if old_ext == file_ext: newfile = split_file[0 ] + new_ext os.rename(os.path.join(work_dir, filename),os.path.join(work_dir, newfile)) print ("完成重命名" ) print (os.listdir(work_dir)) xls_to_xlsx('./data' )
7 定制⽂件不同⾏ ⽐较两个⽂件在哪些⾏内容不同,返回这些⾏的编号,⾏号编号从1开始。
定义统计⽂件⾏数的函数
def statLineCnt (statfile ): print ('⽂件名:' +statfile) cnt = 0 with open (statfile, encoding='utf-8' ) as f: while f.readline(): cnt += 1 return cnt
统计⽂件不同之处的⼦函数:
def diff (more, cnt, less ): difflist = [] with open (less, encoding='utf-8' ) as l: with open (more, encoding='utf-8' ) as m: lines = l.readlines() for i, line in enumerate (lines): if line.strip() != m.readline().strip(): difflist.append(i) if cnt - i > 1 : difflist.extend(range (i + 1 , cnt)) return [no+1 for no in difflist]
主函数:
def file_diff_line_nos (fileA, fileB ): try : cntA = statLineCnt(fileA) cntB = statLineCnt(fileB) if cntA > cntB: return diff(fileA, cntA, fileB) return diff(fileB, cntB, fileA) except Exception as e: print (e)
⽐较两个⽂件A和B,拿相对较短的⽂件去⽐较,过滤⾏后的换⾏符 \n和空格。
暂未考虑某个⽂件最后可能有的多⾏空⾏等特殊情况
使⽤ file_diff_line_nos 函数:
if __name__ == '__main__' : import os print (os.getcwd()) ''' 例⼦: fileA = "'hello world!!!!''\ 'nice to meet you'\ 'yes'\ 'no1'\ 'jack'" fileB = "'hello world!!!!''\ 'nice to meet you'\ 'yes' " ''' diff = file_diff_line_nos('./data/a.txt' , './data/b.txt' ) print (diff)
关于⽂件⽐较的,实际上,在Python中有对应模块 difflib , 提供更多其他格式的⽂件更详细的⽐ 较,⼤家可参考:
https://docs.python.org/zh-cn/3.10/library/difflib.html
8 获取指定后缀名的⽂件 import osdef find_file (work_dir,extension='jpg' ): lst = [] for filename in os.listdir(work_dir): print (filename) splits = os.path.splitext(filename) ext = splits[1 ] if ext == '.' +extension: lst.append(filename) return lst r = find_file('.' ,'py' ) print (r)
9 批量获取⽂件修改时间 import osfrom datetime import datetimeprint (f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S' )} " )def get_modify_time (indir ): for root, _, files in os.walk(indir): for file in files: absfile = os.path.join(root, file) modtime = datetime.fromtimestamp(os.path.getmtime(absfile)) now = datetime.now() difftime = now-modtime if difftime.days < 20 : print (f"""{absfile} 修改时间 [{modtime.strftime('%Y-%m-%d %H:%M:%S' )} ] 距今[{difftime.days:3d} 天{difftime.seconds//3600 :2d} 时 {difftime.seconds%3600 //60 :2d} 分]""" ) get_modify_time('./' )
10 批量压缩⽂件 import zipfile import osimport timedef batch_zip (start_dir ): start_dir = start_dir file_news = start_dir + '.zip' z = zipfile.ZipFile(file_news, 'w' , zipfile.ZIP_DEFLATED) for dir_path, dir_names, file_names in os.walk(start_dir): f_path = dir_path.replace(start_dir, '' ) f_path = f_path and f_path + os.sep for filename in file_names: z.write(os.path.join(dir_path, filename), f_path + filename) z.close() return file_news batch_zip('./data' )
11 32位加密 import hashlibdef hash_cry32 (s ): m = hashlib.md5() m.update((str (s).encode('utf-8' ))) return m.hexdigest() print (hash_cry32(1 )) print (hash_cry32('hello' ))
12 年的⽇历图 import calendarfrom datetime import datemydate = date.today() year_calendar_str = calendar.calendar(2023 ) print (f"{mydate.year} 年的⽇历图:\n{year_calendar_str} \n" )
13 判断是否为闰年 import calendarfrom datetime import datemydate = date.today() is_leap = calendar.isleap(mydate.year) print_leap_str = "%s年是闰年" if is_leap else "%s年不是闰年\n" print (print_leap_str % mydate.year)
3 ⽉的⽇历图 import calendarfrom datetime import datemydate = date.today() month_calendar_str = calendar.month(mydate.year, mydate.month) print (f"{mydate.year} 年-{mydate.month} ⽉的⽇历图:\n{month_calendar_str} \n" )
14 ⽉有⼏天 import calendarfrom datetime import datemydate = date.today() weekday, days = calendar.monthrange(mydate.year, mydate.month) print (f'{mydate.year} 年-{mydate.month} ⽉的第⼀天是那⼀周的第{weekday} 天\n' )print (f'{mydate.year} 年-{mydate.month} ⽉共有{days} 天\n' )
15 ⽉第⼀天 from datetime import datemydate = date.today() month_first_day = date(mydate.year, mydate.month, 1 ) print (f"当⽉第⼀天:{month_first_day} \n" )
16 ⽉最后⼀天 from datetime import datemydate = date.today() _, days = calendar.monthrange(mydate.year, mydate.month) month_last_day = date(mydate.year, mydate.month, days) print (f"当月的最后一天:{month_last_day} \n" )
17 获取当前时间 from datetime import date, datetimefrom time import localtimefrom time import strftimetoday_date = date.today() print (today_date) today_time = datetime.today() print (today_time) local_time = localtime() print (strftime("%Y-%m-%d %H:%M:%S" , local_time))
18 字符时间转时间 from time import strptimestruct_time = strptime('2023-08-03 17:42:08' , "%Y-%m-%d %H:%M:%S" ) print (struct_time)
19 时间转字符时间 from time import strftime, strptime, localtimeprint (localtime()) print (strftime("转化后的时间: %Y-%m-%d %H:%M:%S" , localtime()))
20 默认启动主线程 ⼀般的,程序默认执⾏只在⼀个线程,这个线程称为主线程,例⼦演⽰如下:
导⼊线程相关的模块 threading:
threading的类⽅法 current_thread()返回当前线程:
t = threading.current_thread() print (t)
所以,验证了程序默认是在 MainThead中执⾏。
t.getName()获得这个线程的名字,其他常⽤⽅法, getName()获得线程 id, is_alive()判断线程是 否存活等。
print (t.getName()) print (t.ident) print (t.is_alive())
以上这些仅是介绍多线程的 背景知识 ,因为到⽬前为⽌,我们有且仅有⼀个”⼲活”的主线程
21 创建线程 创建⼀个线程:
my_thread = threading.Thread()
创建⼀个名称为 my_thread的线程:
my_thread = threading.Thread(name='my_thread' )
创建线程的⽬的是告诉它帮助我们做些什么,做些什么通过参数 target传⼊,参数类型为 callable,函数就是可调⽤的:
def print_i (i ): print ('打印i:%d' %(i,)) my_thread = threading.Thread(target=print_i,args=(1 ,))
my_thread线程已经全副武装,但是我们得按下发射按钮,启动start(),它才开始真正起飞。
打印结果如下,其中 args指定函数 print_i需要的参数i,类型为元祖。
⾄此,多线程相关的核⼼知识点,已经总结完毕。但是,仅仅知道这些,还不够!光纸上谈兵,当然远 远不够。
接下来,聊聊应⽤多线程编程,最本质的⼀些东西。
21.1 交替获得CPU时间⽚ 为了更好解释,假定计算机是单核的,尽管对于 cpython,这个假定有些多余。 开辟3个线程,装到 threads中:
import timefrom datetime import datetimeimport threadingdef print_time (): for _ in range (5 ): time.sleep(0.1 ) print ('当前线程%s,打印结束时间为:%s' %(threading.current_thread().getName(), datetime.today())) threads = [threading.Thread(name='t%d' % (i,), target=print_time) for i in range (3 )]
启动3个线程:
[t.start() for t in threads]
打印结果如下, t0, t1, t2三个线程,根据操作系统的调度算法,轮询获得CPU时间⽚,注意观察, t2线程可能被连续调度,从⽽获得时间⽚。
当前线程t2,打印结束时间为:2023 -08-03 18 :05:11.951443 当前线程t0,打印结束时间为:2023 -08-03 18 :05:11.951443 当前线程t1,打印结束时间为:2023 -08-03 18 :05:11.951443 当前线程t0,打印结束时间为:2023 -08-03 18 :05:12.057807 当前线程t1,打印结束时间为:2023 -08-03 18 :05:12.057807 当前线程t2,打印结束时间为:2023 -08-03 18 :05:12.057807 当前线程t2,打印结束时间为:2023 -08-03 18 :05:12.168408 当前线程t0,打印结束时间为:2023 -08-03 18 :05:12.168408 当前线程t1,打印结束时间为:2023 -08-03 18 :05:12.168408 当前线程t2,打印结束时间为:2023 -08-03 18 :05:12.277193 当前线程t0,打印结束时间为:2023 -08-03 18 :05:12.277193 当前线程t1,打印结束时间为:2023 -08-03 18 :05:12.277193 当前线程t2,打印结束时间为:2023 -08-03 18 :05:12.386099 当前线程t1,打印结束时间为:2023 -08-03 18 :05:12.386099 当前线程t0,打印结束时间为:2023 -08-03 18 :05:12.386099
22 多线程抢夺同⼀个变量 多线程编程,存在抢夺同⼀个变量的问题。
⽐如下⾯例⼦,创建的10个线程同时竞争全局变量 a :
import threadinga = 0 def add1 (): global a a += 1 print ('%s adds a to 1: %d' %(threading.current_thread().getName(),a)) threads = [threading.Thread(name='t%d' %(i,),target=add1) for i in range (10 )] [t.start() for t in threads]
执⾏结果:
t0 adds a to 1 : 1 t1 adds a to 1 : 2 t2 adds a to 1 : 3 t3 adds a to 1 : 4 t4 adds a to 1 : 5 t5 adds a to 1 : 6 t6 adds a to 1 : 7 t7 adds a to 1 : 8 t8 adds a to 1 : 9 t9 adds a to 1 : 10
结果⼀切正常,每个线程执⾏⼀次,把 a 的值加1,最后 a 变为10,⼀切正常。
运⾏上⾯代码⼗⼏遍,⼀切也都正常。
所以,我们能下结论:这段代码是线程安全的吗?
NO!
多线程中,只要存在同时读取和修改⼀个全局变量的情况,如果不采取其他措施,就⼀定不是安全的线程。
尽管,有时,某些情况的资源竞争,暴露出问题的概率 极低极低 :
本例中,如果线程0 在修改a后,其他某些线程还是get到的是没有修改前的值,就会暴露问题。
但是在本例中, a = a + 1 这种修改操作,花费的时间太短了,短到我们⽆法想象。所以,线程间轮询执⾏时,都能get到最新的a值。所以,暴露问题的概率就变得微乎其微。
23 代码稍作改动,叫问题暴露出来 只要弄明⽩问题暴露的原因,叫问题出现还是不困难的。
想象数据库的写⼊操作,⼀般需要耗费我们可以感知的时间。
为了模拟这个写⼊动作,简化期间,我们只需要延长修改变量 a 的时间,问题很容易就会还原出来。
import threadingimport timea = 0 def add1 (): global a tmp = a + 1 time.sleep(0.2 ) a = tmp print ('%s adds a to 1: %d' %(threading.current_thread().getName(),a)) threads = [threading.Thread(name='t%d' %(i,),target=add1) for i in range (10 )] [t.start() for t in threads]
重新运⾏代码,只需⼀次,问题⽴马完全暴露,结果如下:
t6 adds a to 1 : 1 t2 adds a to 1 : 1 t9 adds a to 1 : 1 t1 adds a to 1 : 1 t8 adds a to 1 : 1 t0 adds a to 1 : 1 t3 adds a to 1 : 1 t4 adds a to 1 : 1 t7 adds a to 1 : 1 t5 adds a to 1 : 1
看到,10个线程全部运⾏后, a 的值只相当于⼀个线程执⾏的结果。
下⾯分析,为什么会出现上⾯的结果:
这是⼀个很有说服⼒的例⼦,因为在修改a前,有0.2秒的休眠时间,某个线程延时后,CPU⽴即分配计
算资源给其他线程。直到分配给所有线程后,根据结果反映出,0.2秒的休眠时长还没耗尽,这样每个线程get到的a值都是0,所以才出现上⾯的结果。
以上最核⼼的三⾏代码:
tmp = a + 1 time.sleep(0.2 ) a = tmp
24 加上⼀把锁,避免以上情况出现 知道问题出现的原因后,要想修复问题,也没那么复杂。
通过python中提供的锁机制,某段代码只能单线程执⾏时,上锁,其他线程等待,直到释放锁后,其他
线程再争锁,执⾏代码,释放锁,重复以上。
创建⼀把锁 locka:
import threadingimport timelocka = threading.Lock()
通过 locka.acquire() 获得锁,通过 locka.release()释放锁,它们之间的这些代码,只能单线程 执⾏。
a = 0 def add1 (): global a try : locka.acquire() tmp = a + 1 time.sleep(0.2 ) a = tmp finally : locka.release() print ('%s adds a to 1: %d' %(threading.current_thread().getName(),a)) threads = [threading.Thread(name='t%d' %(i,),target=add1) for i in range (10 )] [t.start() for t in threads]
执⾏结果如下:
t0 adds a to 1 : 1 t1 adds a to 1 : 2 t2 adds a to 1 : 3 t3 adds a to 1 : 4 t4 adds a to 1 : 5 t5 adds a to 1 : 6 t6 adds a to 1 : 7 t7 adds a to 1 : 8 t8 adds a to 1 : 9 t9 adds a to 1 : 10
⼀起正常,其实这已经是单线程顺序执⾏了,就本例⼦⽽⾔,已经失去多线程的价值,并且还带来了因为线程创建开销,浪费时间的副作⽤。
程序中只有⼀把锁,通过 try…finally还能确保不发⽣死锁。但是,当程序中启⽤多把锁,还是很容易发⽣死锁。
注意使⽤场合,避免死锁,是我们在使⽤多线程开发时需要注意的⼀些问题。
25 1 分钟掌握 time 模块 time 模块提供时间相关的类和函数
记住⼀个类: struct_time,9 个整数组成的元组
记住下⾯ 5 个最常⽤函数
⾸先导⼊ time模块
1 此时此刻时间浮点数 seconds = time.time() seconds
2 时间数组 local_time = time.localtime(seconds) local_time
3 时间字符串 str_time = time.asctime(local_time) str_time
4 格式化时间字符串 time.strftime 语义: string format time
format_time = time.strftime("%Y-%m-%d %H:%M:%S" ,local_time) format_time
5 字符时间转时间数组 str_to_struct = time.strptime(format_time,'%Y-%m-%d %H:%M:%S' ) str_to_struct
最后再记住常⽤字符串格式
常⽤字符串格式
%Y Year with century as a decimal number. %m Month as a decimal number [01,12 ]. %d Day of the month as a decimal number [01,31 ]. %H Hour (24 -hour clock) as a decimal number [00 ,23 ]. %M Minute as a decimal number [00 ,59 ]. %S Second as a decimal number [00 ,61 ]. %z Time zone offset from UTC. %a Locale's abbreviated weekday name. %A Locale' s full weekday name.%b Locale's abbreviated month name.
26 4G 内存处理 10G ⼤⼩的⽂件 4G 内存处理 10G ⼤⼩的⽂件,单机怎么做?
下⾯的讨论基于的假定:可以单独处理⼀⾏数据,⾏间数据相关性为零。
⽅法⼀:
仅使⽤ Python 内置模板,逐⾏读取到内存。
使⽤ yield,好处是解耦读取操作和处理操作:
def python_read (filename ): with open (filename,'r' ,encoding='utf-8' ) as f: while True : line = f.readline() if not line: return yield line
以上每次读取⼀⾏,逐⾏迭代,逐⾏处理数据
if __name__ == '__main__' : g = python_read('./data/clean.dat' ) for c in g: print (c)
⽅法⼆:
⽅法⼀有缺点,逐⾏读⼊,频繁的 IO 操作拖累处理效率。是否有⼀次 IO ,读取多⾏的⽅法?
pandas 包 read_csv 函数,参数有 38 个之多,功能⾮常强⼤。
关于单机处理⼤⽂件, read_csv 的 chunksize 参数能做到,设置为 5 , 意味着⼀次读取 5 ⾏。
import pandas as pddef pandas_read (filename,sep=',' ,chunksize=5 ): reader = pd.read_csv(filename,sep,chunksize=chunksize) while True : try : yield reader.get_chunk() except StopIteration: print ('---Done---' ) break
使⽤如同⽅法⼀:
if __name__ == '__main__' : g = pandas_read('./data/clean.dat' ,sep="::" ) for c in g: print (c)
以上就是单机处理⼤⽂件的两个⽅法,推荐使⽤⽅法⼆,更加灵活。除了⼯作中会⽤到,⾯试中也有时 被问到。