Python⽂件、⽇期和多线程

三、Python⽂件、⽇期和多线程

[TOC]

Python⽂件IO操作涉及⽂件读写操作,获取⽂件 后缀名 ,修改后缀名,获取⽂件修改时间, 压缩⽂
件, 加密⽂件等操作。

Python⽇期章节,由表⽰⼤⽇期的 calendar, date模块,逐渐过渡到表⽰时间刻度更⼩的模块:
datetime, time模块,按照此逻辑展开。

Python 多线程 希望透过5个⼩例⼦,帮助你对多线程模型编程本质有些更清晰的认识。

m⼀共总结最常⽤的 26个关于⽂件和时间处理模块的例⼦。

1 获取后缀名

import os
file_ext = os.path.splitext('./demo1.py')
front,ext = file_ext
front
ext

2 ⽂件读操作

import os
# 创建⽂件夹
def mkdir(path):
isexists = os.path.exists(path)
if not isexists:
os.mkdir(path)
# 读取⽂件信息
def openfile(filename):
f = open(filename)
fllist = f.read()
f.close()
return fllist # 返回读取内容

3 ⽂件写操作

# 写⼊⽂件信息
# example1
# w写⼊,如果⽂件存在,则清空内容后写⼊,不存在则创建
f = open(r"./test.txt", "w", encoding="utf-8")
print(f.write("测试⽂件写⼊"))
f.close
# example2
# a写⼊,⽂件存在,则在⽂件内容后追加写⼊,不存在则创建
f = open(r"./test.txt", "a", encoding="utf-8")
print(f.write("测试⽂件写⼊"))
f.close
# example3
# with关键字系统会⾃动关闭⽂件和处理异常
with open(r"./test.txt", "w") as f:
f.write("hello world!")

4 路径中的⽂件名

import os
file_ext = os.path.split('./demo1.py')
ipath,ifile = file_ext
ipath
ifile

5 批量修改⽂件后缀

批量修改⽂件后缀

本例⼦使⽤Python的 os模块和 argparse模块,将⼯作⽬录 work_dir下所有后缀名为 old_ext的⽂
件修改为后缀名为 new_ext

通过本例⼦,⼤家将会⼤概清楚 argparse模块的主要⽤法。

导⼊模块

import argparse
import os

定义脚本参数

def get_parser():
parser = argparse.ArgumentParser(description='⼯作⽬录中⽂件后缀名修改')
parser.add_argument('work_dir', metavar='WORK_DIR', type=str, nargs=1,help='修改后缀名的⽂件⽬录')
parser.add_argument('old_ext', metavar='OLD_EXT',type=str, nargs=1, help='原来的后缀')
parser.add_argument('new_ext', metavar='NEW_EXT',type=str, nargs=1, help='新的后缀')
return parser

后缀名批量修改

def batch_rename(work_dir, old_ext, new_ext):
"""
传递当前⽬录,原来后缀名,新的后缀名后,批量重命名后缀
"""
for filename in os.listdir(work_dir):
# 获取得到⽂件后缀
split_file = os.path.splitext(filename)
file_ext = split_file[1]
# 定位后缀名为old_ext 的⽂件
if old_ext == file_ext:
# 修改后⽂件的完整名称
newfile = split_file[0] + new_ext
# 实现重命名操作
os.rename(os.path.join(work_dir, filename),os.path.join(work_dir, newfile))
print("完成重命名")
print(os.listdir(work_dir))

实现Main

def main():
"""
main函数
"""
# 命令⾏参数
parser = get_parser()
args = vars(parser.parse_args())
# 从命令⾏参数中依次解析出参数
work_dir = args['work_dir'][0]
old_ext = args['old_ext'][0]
if old_ext[0] != '.':
old_ext = '.' + old_ext
new_ext = args['new_ext'][0]
if new_ext[0] != '.':
new_ext = '.' + new_ext
batch_rename(work_dir, old_ext, new_ext)

6 xls批量转换成xlsx

import os
def xls_to_xlsx(work_dir):
"""
传递当前⽬录,原来后缀名,新的后缀名后,批量重命名后缀
"""
old_ext, new_ext = '.xls', '.xlsx'
for filename in os.listdir(work_dir):
# 获取得到⽂件后缀
split_file = os.path.splitext(filename)
file_ext = split_file[1]
# 定位后缀名为old_ext 的⽂件
if old_ext == file_ext:
# 修改后⽂件的完整名称
newfile = split_file[0] + new_ext
# 实现重命名操作
os.rename(os.path.join(work_dir, filename),os.path.join(work_dir, newfile))
print("完成重命名")
print(os.listdir(work_dir))
xls_to_xlsx('./data')

7 定制⽂件不同⾏

⽐较两个⽂件在哪些⾏内容不同,返回这些⾏的编号,⾏号编号从1开始。

定义统计⽂件⾏数的函数

# 统计⽂件个数
def statLineCnt(statfile):
print('⽂件名:'+statfile)
cnt = 0
with open(statfile, encoding='utf-8') as f:
while f.readline():
cnt += 1
return cnt

统计⽂件不同之处的⼦函数:

# more表⽰含有更多⾏数的⽂件
def diff(more, cnt, less):
difflist = []
with open(less, encoding='utf-8') as l:
with open(more, encoding='utf-8') as m:
lines = l.readlines()
for i, line in enumerate(lines):
if line.strip() != m.readline().strip():
difflist.append(i)
if cnt - i > 1:
difflist.extend(range(i + 1, cnt))
return [no+1 for no in difflist]

主函数:

# 返回的结果⾏号从1开始
# list表⽰fileA和fileB不同的⾏的编号
def file_diff_line_nos(fileA, fileB):
try:
cntA = statLineCnt(fileA)
cntB = statLineCnt(fileB)
if cntA > cntB:
return diff(fileA, cntA, fileB)
return diff(fileB, cntB, fileA)
except Exception as e:
print(e)

⽐较两个⽂件A和B,拿相对较短的⽂件去⽐较,过滤⾏后的换⾏符 \n和空格。

暂未考虑某个⽂件最后可能有的多⾏空⾏等特殊情况

使⽤ file_diff_line_nos 函数:

if __name__ == '__main__':
import os
print(os.getcwd())
'''
例⼦:
fileA = "'hello world!!!!''\
'nice to meet you'\
'yes'\
'no1'\
'jack'"
fileB = "'hello world!!!!''\
'nice to meet you'\
'yes' "
'''
diff = file_diff_line_nos('./data/a.txt', './data/b.txt')
print(diff) # [4, 5]

关于⽂件⽐较的,实际上,在Python中有对应模块 difflib , 提供更多其他格式的⽂件更详细的⽐
较,⼤家可参考:

https://docs.python.org/zh-cn/3.10/library/difflib.html

8 获取指定后缀名的⽂件

import os
def find_file(work_dir,extension='jpg'):
lst = []
for filename in os.listdir(work_dir):
print(filename)
splits = os.path.splitext(filename)
ext = splits[1] # 拿到扩展名
if ext == '.'+extension:
lst.append(filename)
return lst
r = find_file('.','py')
print(r) # 返回所有⽬录下的py⽂件

9 批量获取⽂件修改时间

# 获取⽬录下⽂件的修改时间
import os
from datetime import datetime
print(f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
def get_modify_time(indir):
for root, _, files in os.walk(indir): # 循环⽬录和⼦⽬录
for file in files:
absfile = os.path.join(root, file)
modtime = datetime.fromtimestamp(os.path.getmtime(absfile))
now = datetime.now()
difftime = now-modtime
if difftime.days < 20: # 条件筛选超过指定时间的⽂件
print(f"""{absfile}修改时间
[{modtime.strftime('%Y-%m-%d %H:%M:%S')}]
距今[{difftime.days:3d}{difftime.seconds//3600:2d}
{difftime.seconds%3600//60:2d}分]"""
) # 打印相关信息
get_modify_time('./')

10 批量压缩⽂件

import zipfile  # 导⼊zipfile,这个是⽤来做压缩和解压的Python模块;
import os
import time


def batch_zip(start_dir):
start_dir = start_dir # 要压缩的⽂件夹路径
file_news = start_dir + '.zip' # 压缩后⽂件夹的名字
z = zipfile.ZipFile(file_news, 'w', zipfile.ZIP_DEFLATED)
for dir_path, dir_names, file_names in os.walk(start_dir):
# 这⼀句很重要,不replace的话,就从根⽬录开始复制
f_path = dir_path.replace(start_dir, '')
f_path = f_path and f_path + os.sep # 实现当前⽂件夹以及包含的所有⽂件的压缩
for filename in file_names:
z.write(os.path.join(dir_path, filename), f_path + filename)
z.close()
return file_news


batch_zip('./data')

11 32位加密

import hashlib
# 对字符串s实现32位加密
def hash_cry32(s):
m = hashlib.md5()
m.update((str(s).encode('utf-8')))
return m.hexdigest()
print(hash_cry32(1)) # c4ca4238a0b923820dcc509a6f75849b
print(hash_cry32('hello')) # 5d41402abc4b2a76b9719d911017c592

12 年的⽇历图

import calendar
from datetime import date
mydate = date.today()
year_calendar_str = calendar.calendar(2023)
print(f"{mydate.year}年的⽇历图:\n{year_calendar_str}\n")

13 判断是否为闰年

import calendar
from datetime import date
mydate = date.today()
is_leap = calendar.isleap(mydate.year)
print_leap_str = "%s年是闰年" if is_leap else "%s年不是闰年\n"
print(print_leap_str % mydate.year)

3 ⽉的⽇历图

import calendar
from datetime import date
mydate = date.today()
month_calendar_str = calendar.month(mydate.year, mydate.month)
print(f"{mydate.year}年-{mydate.month}⽉的⽇历图:\n{month_calendar_str}\n")

14 ⽉有⼏天

import calendar
from datetime import date

mydate = date.today()
weekday, days = calendar.monthrange(mydate.year, mydate.month)
print(f'{mydate.year}年-{mydate.month}⽉的第⼀天是那⼀周的第{weekday}天\n')
print(f'{mydate.year}年-{mydate.month}⽉共有{days}天\n')

15 ⽉第⼀天

from datetime import date

mydate = date.today()
month_first_day = date(mydate.year, mydate.month, 1)
print(f"当⽉第⼀天:{month_first_day}\n")

16 ⽉最后⼀天

from datetime import date

mydate = date.today()
_, days = calendar.monthrange(mydate.year, mydate.month)
month_last_day = date(mydate.year, mydate.month, days)
print(f"当月的最后一天:{month_last_day}\n")

17 获取当前时间

from datetime import date, datetime
from time import localtime
from time import strftime

today_date = date.today()
print(today_date) # 2019-12-22

today_time = datetime.today()
print(today_time) # 2019-12-22 18:02:33.398894

local_time = localtime()
print(strftime("%Y-%m-%d %H:%M:%S", local_time))

18 字符时间转时间

from time import strptime

# parse str time to struct time
struct_time = strptime('2023-08-03 17:42:08', "%Y-%m-%d %H:%M:%S")
print(struct_time) # struct_time类型就是time中的⼀个类

19 时间转字符时间

from time import strftime, strptime, localtime
print(localtime()) #这是输⼊的时间

print(strftime("转化后的时间: %Y-%m-%d %H:%M:%S", localtime()))

20 默认启动主线程

⼀般的,程序默认执⾏只在⼀个线程,这个线程称为主线程,例⼦演⽰如下:

导⼊线程相关的模块 threading:

import threading

threading的类⽅法 current_thread()返回当前线程:

t = threading.current_thread()
print(t)

所以,验证了程序默认是在 MainThead中执⾏。

t.getName()获得这个线程的名字,其他常⽤⽅法, getName()获得线程 id, is_alive()判断线程是
否存活等。

print(t.getName()) 
print(t.ident)
print(t.is_alive())

以上这些仅是介绍多线程的 背景知识 ,因为到⽬前为⽌,我们有且仅有⼀个”⼲活”的主线程

21 创建线程

创建⼀个线程:

my_thread = threading.Thread()

创建⼀个名称为 my_thread的线程:

my_thread = threading.Thread(name='my_thread')

创建线程的⽬的是告诉它帮助我们做些什么,做些什么通过参数 target传⼊,参数类型为
callable,函数就是可调⽤的:

def print_i(i):
print('打印i:%d'%(i,))
my_thread = threading.Thread(target=print_i,args=(1,))

my_thread线程已经全副武装,但是我们得按下发射按钮,启动start(),它才开始真正起飞。

my_thread.start()

打印结果如下,其中 args指定函数 print_i需要的参数i,类型为元祖。

打印i:1

⾄此,多线程相关的核⼼知识点,已经总结完毕。但是,仅仅知道这些,还不够!光纸上谈兵,当然远
远不够。

接下来,聊聊应⽤多线程编程,最本质的⼀些东西。

21.1 交替获得CPU时间⽚

为了更好解释,假定计算机是单核的,尽管对于 cpython,这个假定有些多余。
开辟3个线程,装到 threads中:

import time
from datetime import datetime
import threading


def print_time():
for _ in range(5): # 在每个线程中打印5次
time.sleep(0.1) # 模拟打印前的相关处理逻辑
print('当前线程%s,打印结束时间为:%s' %(threading.current_thread().getName(), datetime.today()))

threads = [threading.Thread(name='t%d' % (i,), target=print_time) for i in range(3)]

启动3个线程:

[t.start() for t in threads]

打印结果如下, t0, t1, t2三个线程,根据操作系统的调度算法,轮询获得CPU时间⽚,注意观察,
t2线程可能被连续调度,从⽽获得时间⽚。

当前线程t2,打印结束时间为:2023-08-03 18:05:11.951443
当前线程t0,打印结束时间为:2023-08-03 18:05:11.951443
当前线程t1,打印结束时间为:2023-08-03 18:05:11.951443
当前线程t0,打印结束时间为:2023-08-03 18:05:12.057807
当前线程t1,打印结束时间为:2023-08-03 18:05:12.057807
当前线程t2,打印结束时间为:2023-08-03 18:05:12.057807
当前线程t2,打印结束时间为:2023-08-03 18:05:12.168408
当前线程t0,打印结束时间为:2023-08-03 18:05:12.168408
当前线程t1,打印结束时间为:2023-08-03 18:05:12.168408
当前线程t2,打印结束时间为:2023-08-03 18:05:12.277193
当前线程t0,打印结束时间为:2023-08-03 18:05:12.277193
当前线程t1,打印结束时间为:2023-08-03 18:05:12.277193
当前线程t2,打印结束时间为:2023-08-03 18:05:12.386099
当前线程t1,打印结束时间为:2023-08-03 18:05:12.386099
当前线程t0,打印结束时间为:2023-08-03 18:05:12.386099

22 多线程抢夺同⼀个变量

多线程编程,存在抢夺同⼀个变量的问题。

⽐如下⾯例⼦,创建的10个线程同时竞争全局变量 a :

import threading
a = 0
def add1():
global a
a += 1
print('%s adds a to 1: %d'%(threading.current_thread().getName(),a))
threads = [threading.Thread(name='t%d'%(i,),target=add1) for i in range(10)]
[t.start() for t in threads]

执⾏结果:

t0 adds a to 1: 1
t1 adds a to 1: 2
t2 adds a to 1: 3
t3 adds a to 1: 4
t4 adds a to 1: 5
t5 adds a to 1: 6
t6 adds a to 1: 7
t7 adds a to 1: 8
t8 adds a to 1: 9
t9 adds a to 1: 10

结果⼀切正常,每个线程执⾏⼀次,把 a 的值加1,最后 a 变为10,⼀切正常。

运⾏上⾯代码⼗⼏遍,⼀切也都正常。

所以,我们能下结论:这段代码是线程安全的吗?

NO!

多线程中,只要存在同时读取和修改⼀个全局变量的情况,如果不采取其他措施,就⼀定不是安全的线程。

尽管,有时,某些情况的资源竞争,暴露出问题的概率 极低极低 :

本例中,如果线程0 在修改a后,其他某些线程还是get到的是没有修改前的值,就会暴露问题。

但是在本例中, a = a + 1 这种修改操作,花费的时间太短了,短到我们⽆法想象。所以,线程间轮询执⾏时,都能get到最新的a值。所以,暴露问题的概率就变得微乎其微。

23 代码稍作改动,叫问题暴露出来

只要弄明⽩问题暴露的原因,叫问题出现还是不困难的。

想象数据库的写⼊操作,⼀般需要耗费我们可以感知的时间。

为了模拟这个写⼊动作,简化期间,我们只需要延长修改变量 a 的时间,问题很容易就会还原出来。

import threading
import time
a = 0
def add1():
global a
tmp = a + 1
time.sleep(0.2) # 延时0.2秒,模拟写⼊所需时间
a = tmp
print('%s adds a to 1: %d'%(threading.current_thread().getName(),a))
threads = [threading.Thread(name='t%d'%(i,),target=add1) for i in range(10)]
[t.start() for t in threads]

重新运⾏代码,只需⼀次,问题⽴马完全暴露,结果如下:

t6 adds a to 1: 1
t2 adds a to 1: 1
t9 adds a to 1: 1
t1 adds a to 1: 1
t8 adds a to 1: 1
t0 adds a to 1: 1
t3 adds a to 1: 1
t4 adds a to 1: 1
t7 adds a to 1: 1
t5 adds a to 1: 1

看到,10个线程全部运⾏后, a 的值只相当于⼀个线程执⾏的结果。

下⾯分析,为什么会出现上⾯的结果:

这是⼀个很有说服⼒的例⼦,因为在修改a前,有0.2秒的休眠时间,某个线程延时后,CPU⽴即分配计

算资源给其他线程。直到分配给所有线程后,根据结果反映出,0.2秒的休眠时长还没耗尽,这样每个线程get到的a值都是0,所以才出现上⾯的结果。

以上最核⼼的三⾏代码:

tmp = a + 1
time.sleep(0.2) # 延时0.2秒,模拟写⼊所需时间
a = tmp

24 加上⼀把锁,避免以上情况出现

知道问题出现的原因后,要想修复问题,也没那么复杂。

通过python中提供的锁机制,某段代码只能单线程执⾏时,上锁,其他线程等待,直到释放锁后,其他

线程再争锁,执⾏代码,释放锁,重复以上。

创建⼀把锁 locka:

import threading
import time
locka = threading.Lock()

通过 locka.acquire() 获得锁,通过 locka.release()释放锁,它们之间的这些代码,只能单线程
执⾏。

a = 0
def add1():
global a
try:
locka.acquire() # 获得锁
tmp = a + 1
time.sleep(0.2) # 延时0.2秒,模拟写⼊所需时间
a = tmp
finally:
locka.release() # 释放锁
print('%s adds a to 1: %d'%(threading.current_thread().getName(),a))
threads = [threading.Thread(name='t%d'%(i,),target=add1) for i in range(10)]
[t.start() for t in threads]

执⾏结果如下:

t0 adds a to 1: 1
t1 adds a to 1: 2
t2 adds a to 1: 3
t3 adds a to 1: 4
t4 adds a to 1: 5
t5 adds a to 1: 6
t6 adds a to 1: 7
t7 adds a to 1: 8
t8 adds a to 1: 9
t9 adds a to 1: 10

⼀起正常,其实这已经是单线程顺序执⾏了,就本例⼦⽽⾔,已经失去多线程的价值,并且还带来了因为线程创建开销,浪费时间的副作⽤。

程序中只有⼀把锁,通过 try…finally还能确保不发⽣死锁。但是,当程序中启⽤多把锁,还是很容易发⽣死锁。

注意使⽤场合,避免死锁,是我们在使⽤多线程开发时需要注意的⼀些问题。

25 1 分钟掌握 time 模块

time 模块提供时间相关的类和函数

记住⼀个类: struct_time,9 个整数组成的元组

记住下⾯ 5 个最常⽤函数

⾸先导⼊ time模块

import time

1 此时此刻时间浮点数

seconds = time.time()
seconds

2 时间数组

local_time = time.localtime(seconds)
local_time

3 时间字符串

str_time = time.asctime(local_time)
str_time

4 格式化时间字符串

time.strftime 语义: string format time

format_time = time.strftime("%Y-%m-%d %H:%M:%S",local_time)
format_time

5 字符时间转时间数组

str_to_struct = time.strptime(format_time,'%Y-%m-%d %H:%M:%S')
str_to_struct

最后再记住常⽤字符串格式

常⽤字符串格式

%Y Year with century as a decimal number.
%m Month as a decimal number [01,12].
%d Day of the month as a decimal number [01,31].
%H Hour (24-hour clock) as a decimal number [00,23].
%M Minute as a decimal number [00,59].
%S Second as a decimal number [00,61].
%z Time zone offset from UTC.
%a Locale's abbreviated weekday name.
%A Locale's full weekday name.
%b Locale's abbreviated month name.

26 4G 内存处理 10G ⼤⼩的⽂件

4G 内存处理 10G ⼤⼩的⽂件,单机怎么做?

下⾯的讨论基于的假定:可以单独处理⼀⾏数据,⾏间数据相关性为零。

⽅法⼀:

仅使⽤ Python 内置模板,逐⾏读取到内存。

使⽤ yield,好处是解耦读取操作和处理操作:

def python_read(filename):
with open(filename,'r',encoding='utf-8') as f:
while True:
line = f.readline()
if not line:
return
yield line

以上每次读取⼀⾏,逐⾏迭代,逐⾏处理数据

if __name__ == '__main__':
g = python_read('./data/clean.dat')
for c in g:
print(c)
# process c

⽅法⼆:

⽅法⼀有缺点,逐⾏读⼊,频繁的 IO 操作拖累处理效率。是否有⼀次 IO ,读取多⾏的⽅法?

pandas 包 read_csv 函数,参数有 38 个之多,功能⾮常强⼤。

关于单机处理⼤⽂件, read_csv 的 chunksize 参数能做到,设置为 5 , 意味着⼀次读取 5 ⾏。

import pandas as pd
def pandas_read(filename,sep=',',chunksize=5):
reader = pd.read_csv(filename,sep,chunksize=chunksize)
while True:
try:
yield reader.get_chunk()
except StopIteration:
print('---Done---')
break

使⽤如同⽅法⼀:

if __name__ == '__main__':
g = pandas_read('./data/clean.dat',sep="::")
for c in g:
print(c)
# process c

以上就是单机处理⼤⽂件的两个⽅法,推荐使⽤⽅法⼆,更加灵活。除了⼯作中会⽤到,⾯试中也有时
被问到。