python元编程

元编程和元类

对象是通过类创建的,类是通过元类创建的,元类提供了创建类的元信息。所有的类都直接或间接的继承自object,所有的元类都直接或间接的继承自type

例子:用元类实现单例模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading


class SingletonMeta(type):
"""自定义元类"""

def __init__(cls, *args, **kwargs):
cls.__instance = None
cls.__lock = threading.RLock()
super().__init__(*args, **kwargs)

def __call__(cls, *args, **kwargs):
if cls.__instance is None:
with cls.__lock:
if cls.__instance is None:
cls.__instance = super().__call__(*args, **kwargs)
return cls.__instance


class President(metaclass=SingletonMeta):
"""总统(单例类)"""

pass
  • 面向对象设计原则

    • 单一职责原则 (SRP)- 一个类只做该做的事情(类的设计要高内聚)
    • 开放封闭原则 (OCP)- 对扩展开放,对修改关闭(类的设计要低耦合)
    • 里氏替换原则 (LSP)- 子类可以替换父类(子类的设计要高内聚)
    • 接口隔离原则 (ISP)- 接口要小而专(接口的设计要高内聚)
    • 依赖倒置原则 (DIP)- 高层模块不应该依赖低层模块,二者都应该依赖抽象(类的设计要高内聚)
  • GoF 设计模式

    • 创建型模式:单例、工厂、建造者、原型
    • 结构型模式:适配器、门面(外观)、代理
    • 行为型模式:观察者、策略、模板方法、迭代器、责任链、命令、备忘录、状态、访问者、中介者、解释器

      例子:可插拔的哈希算法(策略模式)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  class StreamHasher:
"""哈希摘要生成器"""

def __init__(self, alg='md5', size=4096):
self.size = size
alg = alg.lower()
self.hasher = getattr(__import__('hashlib'), alg.lower())()

def __call__(self, stream):
return self.to_digest(stream)

def to_digest(self, stream):
"""生成十六进制形式的摘要"""
for buf in iter(lambda: stream.read(self.size), b''):
self.hasher.update(buf)
return self.hasher.hexdigest()

def main():
"""主函数"""
hasher1 = StreamHasher()
with open('Python-3.7.6.tgz', 'rb') as stream:
print(hasher1.to_digest(stream))
hasher2 = StreamHasher('sha1')
with open('Python-3.7.6.tgz', 'rb') as stream:
print(hasher2(stream))


if __name__ == '__main__':
main()

迭代器和生成器

  • 迭代器是实现了迭代器协议的对象。
    • Python 中没有像protocolinterface这样的定义协议的关键字。
    • Python 中用魔术方法表示协议。
    • __iter____next__魔术方法就是迭代器协议。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Fib(object):
"""迭代器"""

def __init__(self, num):
self.num = num
self.a, self.b = 0, 1
self.idx = 0

def __iter__(self):
return self

def __next__(self):
if self.idx < self.num:
self.a, self.b = self.b, self.a + self.b
self.idx += 1
return self.a
raise StopIteration()
  • 生成器是语法简化版的迭代器。
1
2
3
4
5
6
def fib(num):
"""生成器"""
a, b = 0, 1
for _ in range(num):
a, b = b, a + b
yield a
  • 生成器进化为协程。
    生成器对象可以使用send()方法发送数据,发送的数据会成为生成器函数中通过yield表达式获得的值。这样,生成器就可以作为协程使用,协程简单的说就是可以相互协作的子程序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def calc_avg():
"""流式计算平均值"""
total, counter = 0, 0
avg_value = None
while True:
value = yield avg_value
total, counter = total + value, counter + 1
avg_value = total / counter


gen = calc_avg()
next(gen)
print(gen.send(10))
print(gen.send(20))
print(gen.send(30))

并发编程

Python 中实现并发编程的三种方案:多线程、多进程和异步 I/O。并发编程的好处在于可以提升程序的执行效率以及改善用户体验;坏处在于并发的程序不容易开发和调试,同时对其他程序来说它并不友好。

多线程

  • 多线程:Python 中提供了Thread类并辅以LockConditionEventSemaphoreBarrier。Python 中有 GIL 来防止多个线程同时执行本地字节码,这个锁对于 CPython 是必须的,因为 CPython 的内存管理并不是线程安全的,因为 GIL 的存在多线程并不能发挥 CPU 的多核特性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
"""
面试题:进程和线程的区别和联系?
进程 - 操作系统分配内存的基本单位 - 一个进程可以包含一个或多个线程
线程 - 操作系统分配CPU的基本单位
并发编程(concurrent programming)
1. 提升执行性能 - 让程序中没有因果关系的部分可以并发的执行
2. 改善用户体验 - 让耗时间的操作不会造成程序的假死
"""
import glob
import os
import threading

from PIL import Image

PREFIX = 'thumbnails'


def generate_thumbnail(infile, size, format='PNG'):
"""生成指定图片文件的缩略图"""
file, ext = os.path.splitext(infile)
file = file[file.rfind('/') + 1:]
outfile = f'{PREFIX}/{file}_{size[0]}_{size[1]}.{ext}'
img = Image.open(infile)
img.thumbnail(size, Image.ANTIALIAS)
img.save(outfile, format)


def main():
"""主函数"""
if not os.path.exists(PREFIX):
os.mkdir(PREFIX)
for infile in glob.glob('images/*.png'):
for size in (32, 64, 128):
# 创建并启动线程
threading.Thread(
target=generate_thumbnail,
args=(infile, (size, size))
).start()


if __name__ == '__main__':
main()
  • 多个线程竞争资源的情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
"""
多线程程序如果没有竞争资源处理起来通常也比较简单
当多个线程竞争临界资源的时候如果缺乏必要的保护措施就会导致数据错乱
说明:临界资源就是被多个线程竞争的资源
"""
import time
import threading

from concurrent.futures import ThreadPoolExecutor


class Account(object):
"""银行账户"""

def __init__(self):
self.balance = 0.0
self.lock = threading.Lock()

def deposit(self, money):
# 通过锁保护临界资源
with self.lock:
new_balance = self.balance + money
time.sleep(0.001)
self.balance = new_balance


def main():
"""主函数"""
account = Account()
# 创建线程池
pool = ThreadPoolExecutor(max_workers=10)
futures = []
for _ in range(100):
future = pool.submit(account.deposit, 1)
futures.append(future)
# 关闭线程池
pool.shutdown()
for future in futures:
future.result()
print(account.balance)


if __name__ == '__main__':
main()

修改上面的程序,启动 5 个线程向账户中存钱,5 个线程从账户中取钱,取钱时如果余额不足就暂停线程进行等待。为了达到上述目标,需要对存钱和取钱的线程进行调度,在余额不足时取钱的线程暂停并释放锁,而存钱的线程将钱存入后要通知取钱的线程,使其从暂停状态被唤醒。可以使用threading模块的Condition来实现线程调度,该对象也是基于锁来创建的,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""
多个线程竞争一个资源 - 保护临界资源 - 锁(Lock/RLock)
多个线程竞争多个资源(线程数>资源数) - 信号量(Semaphore)
多个线程的调度 - 暂停线程执行/唤醒等待中的线程 - Condition
"""
from concurrent.futures import ThreadPoolExecutor
from random import randint
from time import sleep

import threading


class Account:
"""银行账户"""

def __init__(self, balance=0):
self.balance = balance
lock = threading.RLock()
self.condition = threading.Condition(lock)

def withdraw(self, money):
"""取钱"""
with self.condition:
while money > self.balance:
self.condition.wait()
new_balance = self.balance - money
sleep(0.001)
self.balance = new_balance

def deposit(self, money):
"""存钱"""
with self.condition:
new_balance = self.balance + money
sleep(0.001)
self.balance = new_balance
self.condition.notify_all()


def add_money(account):
while True:
money = randint(5, 10)
account.deposit(money)
print(threading.current_thread().name,
':', money, '====>', account.balance)
sleep(0.5)


def sub_money(account):
while True:
money = randint(10, 30)
account.withdraw(money)
print(threading.current_thread().name,
':', money, '<====', account.balance)
sleep(1)


def main():
account = Account()
with ThreadPoolExecutor(max_workers=15) as pool:
for _ in range(5):
pool.submit(add_money, account)
for _ in range(10):
pool.submit(sub_money, account)


if __name__ == '__main__':
main()

多进程

  • 多进程:多进程可以有效的解决 GIL 的问题,实现多进程主要的类是Process,其他辅助的类跟threading模块中的类似,进程间共享数据可以使用管道、套接字等,在multiprocessing模块中有一个Queue类,它基于管道和锁机制提供了多个进程共享的队列。下面是官方文档上关于多进程和进程池的一个示例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""
多进程和进程池的使用
多线程因为GIL的存在不能够发挥CPU的多核特性
对于计算密集型任务应该考虑使用多进程
time python3 example22.py
real 0m11.512s
user 0m39.319s
sys 0m0.169s
使用多进程后实际执行时间为11.512秒,而用户时间39.319秒约为实际执行时间的4倍
这就证明我们的程序通过多进程使用了CPU的多核特性,而且这台计算机配置了4核的CPU
"""
import concurrent.futures
import math

PRIMES = [
1116281,
1297337,
104395303,
472882027,
533000389,
817504243,
982451653,
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419
] * 5


def is_prime(n):
"""判断素数"""
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True


def main():
"""主函数"""
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))


if __name__ == '__main__':
main()

重点多线程和多进程的比较

以下情况需要使用多线程:

  1. 程序需要维护许多共享的状态(尤其是可变状态),Python 中的列表、字典、集合都是线程安全的,所以使用线程而不是进程维护共享状态的代价相对较小。
  2. 程序会花费大量时间在 I/O 操作上,没有太多并行计算的需求且不需占用太多的内存。
    以下情况需要使用多进程:
  3. 程序执行计算密集型任务或者要使用多 CPU 来执行。
  4. 程序的输入可以并行的分割,并且可以将分割的数据作为单独的任务提交给进程池。
  5. 程序的输出可以合并成一个单一的结果。
  6. 程序需要在共享内存空间中执行,通常是因为它们之间需要交换大量数据。
  • 异步处理:从调度程序的任务队列中挑选任务,该调度程序以交叉的形式执行这些任务,我们并不能保证任务将以某种顺序去执行,因为执行顺序取决于队列中的一项任务是否愿意将 CPU 处理时间让位给另一项任务。异步任务通常通过多任务协作处理的方式来实现,由于执行时间和顺序的不确定,因此需要通过回调式编程或者future对象来获取任务执行的结果。Python 3 通过asyncio模块和awaitasync关键字(在 Python 3.7 中正式被列为关键字)来支持异步处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
"""
异步I/O - async / await
"""
import asyncio


def num_generator(m, n):
"""指定范围的数字生成器"""
yield from range(m, n + 1)


async def prime_filter(m, n):
"""素数过滤器"""
primes = []
for i in num_generator(m, n):
flag = True
for j in range(2, int(i ** 0.5 + 1)):
if i % j == 0:
flag = False
break
if flag:
print('Prime =>', i)
primes.append(i)

await asyncio.sleep(0.001)
return tuple(primes)


async def square_mapper(m, n):
"""平方映射器"""
squares = []
for i in num_generator(m, n):
print('Square =>', i * i)
squares.append(i * i)

await asyncio.sleep(0.001)
return squares


def main():
"""主函数"""
loop = asyncio.get_event_loop()
future = asyncio.gather(prime_filter(2, 100), square_mapper(1, 100))
future.add_done_callback(lambda x: print(x.result()))
loop.run_until_complete(future)
loop.close()


if __name__ == '__main__':
main()

说明:上面的代码使用get_event_loop函数获得系统默认的事件循环,通过gather函数可以获得一个future对象,future对象的add_done_callback可以添加执行完成时的回调函数,loop对象的run_until_complete方法可以等待通过future对象获得协程执行结果。

Python 中有一个名为aiohttp的三方库,它提供了异步的 HTTP 客户端和服务器,这个三方库可以跟asyncio模块一起工作,并提供了对Future对象的支持。Python 3.6 中引入了asyncawait来定义异步执行的函数以及创建异步上下文,在 Python 3.7 中它们正式成为了关键字。下面的代码异步的从 5 个 URL 中获取页面并通过正则表达式的命名捕获组提取了网站的标题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import re

import aiohttp

PATTERN = re.compile(r'\<title\>(?P<title>.*)\<\/title\>')


async def fetch_page(session, url):
async with session.get(url, ssl=False) as resp:
return await resp.text()


async def show_title(url):
async with aiohttp.ClientSession() as session:
html = await fetch_page(session, url)
print(PATTERN.search(html).group('title'))


def main():
urls = ('https://www.python.org/',
'https://git-scm.com/',
'https://www.jd.com/',
'https://www.taobao.com/',
'https://www.douban.com/')
loop = asyncio.get_event_loop()
cos = [show_title(url) for url in urls]
loop.run_until_complete(asyncio.wait(cos))
loop.close()


if __name__ == '__main__':
main()

重点异步 I/O 与多进程的比较

当程序不需要真正的并发性或并行性,而是更多的依赖于异步处理和回调时,asyncio就是一种很好的选择。如果程序中有大量的等待与休眠时,也应该考虑asyncio,它很适合编写没有实时数据处理需求的 Web 应用服务器。

Python 还有很多用于处理并行任务的三方库,例如:joblibPyMP等。实际开发中,要提升系统的可扩展性和并发性通常有垂直扩展(增加单个节点的处理能力)和水平扩展(将单个节点变成多个节点)两种做法。可以通过消息队列来实现应用程序的解耦合,消息队列相当于是多线程同步队列的扩展版本,不同机器上的应用程序相当于就是线程,而共享的分布式消息队列就是原来程序中的 Queue。消息队列(面向消息的中间件)的最流行和最标准化的实现是 AMQP(高级消息队列协议),AMQP 源于金融行业,提供了排队、路由、可靠传输、安全等功能,最著名的实现包括:Apache 的 ActiveMQRabbitMQ 等。
要实现任务的异步化,可以使用名为Celery的三方库。Celery是 Python 编写的分布式任务队列,它使用分布式消息进行工作,可以基于 RabbitMQ 或 Redis 来作为后端的消息代理。

python 爬虫的beautifulsoup库的使用

BeatifulSoup 是一个可以从 HTML 或 XML 文件中提取数据的 Python 库。它能够通过你喜欢的转换器实现惯用的文档导航,查找,修改文档的方式。

BeatifulSoup 的解析器

1
2
3
4
soup = BeautifulSoup(resp, "html.parser") # html.parser 是默认的解析器
title_name = soup.findAll('p', attrs = {"class": "price_color"}) # 找到第一个p标签,参数class等于price_color
for name in title_name:
print(name.string)

BeatifulSoup 解析器有 4 种,分别是:

  • html.parser:Python 内置的 HTML 解析器
  • lxml:效率更高的 HTML/XML 解析器
  • html5lib:HTML5 解析器

解析成树状结构
树状

下载模块 bs4

1
pip3 install bs4

bs4 常用方法说明

方法/属性 作用
find_all(name, attrs, recursive, string, limit, **kwargs) 查找所有符合条件的标签,返回列表
soup.title 返回第一个符合条件的标签
soup.find(name) 找到第一个匹配的标签
soup.find_all(name) 找到所有匹配的标签
soup.find(id=”link2”) 通过属性查找
soup.find_all(attrs={“id”:”link2”}) 通过属性查找
tag.get(‘属性名’) 获取标签属性,例如 href、src 等
tag.text / tag.get_text() 获取标签的文本内容
soup.select(‘css 选择器’) 使用 CSS 选择器语法提取标签(更灵活)

python 爬虫的一些设置参数

python 爬虫

设置headers

1
2
3
4
5
6
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
}

zh-CN: 最高优先级,客户端最希望收到简体中文的内容。

zh;q=0.8: 如果服务器不能提供 zh-CN,那退而求其次接受普通中文,优先级稍低。

en-US;q=0.6: 如果没有中文版本,接受美式英文。

en;q=0.4: 最低优先级,接受其他形式的英文。

作用: 设置 HTTP 请求头,比如伪装成浏览器、添加认证信息等

设置proxies

1
2
3
4
5
proxies = {
"http": "http://10.10.1.10:3128",
"https": "http://10.10.1.10:1080",
}
r = requests.get("http://httpbin.org/ip", proxies=proxies)

设置请求代理服务器

设置params

1
2
3
4
5
params = {
"wd": "python",
}
response = requests.get(url, headers=headers, params=params)
url : https://www.baidu.com/s?wd=python

作用: 传递 URL 查询参数(即 ?key=value 形式)

设置cookies

1
2
3
4
cookies = {
"cookie": "name=germey",
}
response = requests.get(url, headers=headers, cookies=cookies)

作用: 传递 cookies,即 HTTP 请求中的 cookie 信息

设置auth

1
2
3
4
from requests.auth import HTTPBasicAuth
HTTPBasicAuth = requests.auth.HTTPBasicAuth('user', 'pass')
r = requests.get('https://httpbin.org/basic-auth/user/pass', auth=HTTPBasicAuth)
print(r.status_code)

作用: 提供 HTTP 基本认证

设置allow_redirects

1
response = requests.get(url, allow_redirects=False)

是否允许重定向(默认 True)

设置verify

1
2
r = requests.get('https://expired.badssl.com', verify=False)
print(r.status_code)

是否验证 SSL 证书(默认 True)

requests库

requests库

响应示例

1
2
3
4
5
6
7
8
9
10
# 发送 GET 请求
response = requests.get('https://httpbin.org/get')

# 响应状态码
print("状态码:", response.status_code)

# 响应内容(文本格式)
print("响应内容:", response.text)
# 响应内容(JSON 格式)
print("响应内容(JSON):", response.json())
属性 说明
response.status_code 响应状态码,如 200 表示成功
response.text 以字符串形式获取响应内容
response.content 以字节流形式获取响应内容
response.json() 以 JSON 格式解析响应内容
response.headers 响应头信息
response.cookies 响应中的 cookies
response.url 实际请求的 URL
  • response.text 获取响应内容
  • response.content 获取响应内容
  • response.json 获取响应内容
  • response.status_code 获取响应状态码
  • response.headers 获取响应头
  • response.url 获取响应url
  • response.raw 获取原始响应内容
  • response.cookies 获取响应cookies
  • response.encoding 获取响应编码
  • response.status_code 获取响应状态码
  • response.headers 获取响应头
  • response.url 获取响应url

✅ 进阶提示(可选参数):

1
2
3
4
5
6
# 添加请求参数和请求头
params = {'key': 'value'}
headers = {'User-Agent': 'my-app'}

response = requests.get('https://httpbin.org/get', params=params, headers=headers)
print(response.url) # 展示最终拼接后的 URL

Python 模块 DrissionPage

DrissionPage(ChromiumPage)简单用法总结

安装

1
pip install DrissionPage

ChromiumPage 基本用法总结(代码 + 注释)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from DrissionPage import ChromiumPage

# 创建 ChromiumPage 对象,启动内置 Chromium 浏览器(默认非无头模式)
page = ChromiumPage()

# 访问网页
page.get('https://example.com')

# 查找元素:使用 CSS 选择器
el = page.e('h1') # 获取第一个 <h1> 元素
print(el.text) # 输出文本内容

# 查找元素:使用 XPath 选择器(前面加 @ 表示 XPath)
el2 = page.e('@//p') # 获取第一个 <p> 元素
print(el2.text)

# 点击元素(如按钮)
page.e('button#submit').click() # 点击 id 为 submit 的按钮

# 输入文本
page.e('input[name="username"]').input('admin') # 在 name 为 username 的输入框输入 'admin'

# 等待元素出现,最多等待10秒
page.wait.e('div.result', timeout=10) # 等待 class 为 result 的 div 出现

# 获取整个页面 HTML 源码
html = page.html

# 执行 JavaScript 脚本,获取页面标题
title = page.run_js('return document.title;')
print(f'页面标题:{title}')

# 模拟截图并保存到当前目录
page.screenshot('screenshot.png')

# 保存当前 HTML 到本地文件
with open('page.html', 'w', encoding='utf-8') as f:
f.write(html)

# 设置移动端 User-Agent(模拟手机访问)
page.set_user_agent(mobile=True)

# 关闭浏览器(一定要在结束时关闭以释放资源)
page.close()

其他常用配置(可选)

1
2
3
4
5
6
# 启动时设置无头模式(不弹出浏览器窗口)
page = ChromiumPage(headless=True)

# 启动时设置代理(适用于需要翻墙或隐藏 IP)
page = ChromiumPage(proxy='http://127.0.0.1:1080')

使用优势总结

功能 DrissionPage Selenium Playwright
浏览器自动化
支持 requests 风格
CSS/XPath 查找封装 一般
简单截图、JS 执行等
易用性 ⭐⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐⭐

Python 模块 csv

模块csv使用

一、读取 CSV 文件

1.1 使用 csv.reader()

1
2
3
4
5
with open('example.csv', newline='', encoding='utf-8', delimiter=',') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
print(row)

常用参数
参数 说明
delimiter 指定分隔符,默认为逗号,
quotechar 指定引号字符,默认为双引号””
skipinitialspace 是否跳过分隔符后多余空格
1
csv.reader(csvfile, delimiter=';', quotechar='"', skipinitialspace=True)

1.2 使用 csv.DictReader()

以字典形式读取,首行自动作为 key。

1
2
3
4
with open('example.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
print(row['name'], row['age'])

常用参数

参数 说明
fieldnames 指定字段名(可用于无表头的CSV)

二、写入 CSV 文件

2.1 使用 csv.writer()

1
2
3
4
5
6
7
8
9
with open('output.csv', 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['name', 'age', 'gender']) # 写入表头
writer.writerow(['Alice', 25, 'F']) # 写入一行
writer.writerows([
['Bob', 30, 'M'],
['Charlie', 35, 'M']
])

常用参数

方法 说明
writerow(row) 写入一行数据
writerows(rows) 写入多行数据

2.2 使用 csv.DictWriter()

以字典形式写入,推荐用于字段较多或数据为字典结构时。

1
2
3
4
5
6
7
8
9
10
11
with open('output.csv', 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['name', 'age', 'gender']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

writer.writeheader() # 写入表头
writer.writerow({'name': 'Alice', 'age': 25, 'gender': 'F'})
writer.writerows([
{'name': 'Bob', 'age': 30, 'gender': 'M'},
{'name': 'Charlie', 'age': 35, 'gender': 'M'}
])

常用参数

方法 说明
writeheader() 写入表头
writerow(dict) 写入一行数据
writerows(dicts) 写入多行数据

三、处理换行符问题(Windows注意)

使用 newline=’’ 打开文件是推荐方式,防止出现空行。

1
2
3
with open('file.csv', 'w', newline='', encoding='utf-8') as f:
...

四、特殊情况处理

4.1 自定义分隔符和引号

1
csv.writer(f, delimiter=';', quotechar="'", quoting=csv.QUOTE_MINIMAL)

quoting 参数选项:

说明
csv.QUOTE_ALL 所有字段都加引号
csv.QUOTE_MINIMAL 只在需要时加引号(默认)
csv.QUOTE_NONNUMERIC 非数字字段加引号
csv.QUOTE_NONE 不添加引号

五、完整案例

5.1 从 CSV 读取再写入新文件

1
2
3
4
5
6
7
8
9
10
11
12
13
import csv

with open('input.csv', newline='', encoding='utf-8') as infile, \
open('output.csv', 'w', newline='', encoding='utf-8') as outfile:

reader = csv.DictReader(infile)
fieldnames = reader.fieldnames
writer = csv.DictWriter(outfile, fieldnames=fieldnames)

writer.writeheader()
for row in reader:
# 可以对数据进行处理
writer.writerow(row)

Python re 正则

re.png

🔹 常用正则表达式字符和作用

符号 / 表达式 含义 示例 说明
^ 匹配字符串开头 ^abc 匹配以 abc 开头的字符串
$ 匹配字符串结尾 abc$ 匹配以 abc 结尾的字符串
. 匹配任意单个字符 a.b 匹配 a+b 中间是任意字符
[] 字符集合 [abc] 匹配 a 或 b 或 c
[^] 否定字符集 [^abc] 匹配不是 a b c 的字符
- 范围表示 [a-z] 匹配 a 到 z 的小写字母
* 匹配前一个 0 次或多次 a* 匹配 “”, a, aa, aaa 等
+ 匹配前一个 1 次或多次 a+ 匹配 a, aa, aaa 等
? 匹配前一个 0 或 1 次 a? 匹配 “” 或 a
{n} 恰好 n 次 a{3} 匹配 aaa
{n,} 至少 n 次 a{2,} 匹配 aa, aaa, aaaa…
{n,m} n 到 m 次 a{2,4} 匹配 aa, aaa, aaaa
或,匹配多个之一 abc
() 分组 (abc)+ 捕获 abc、abcabc 等
\ 转义字符 . 匹配一个点号 .
\d 匹配数字 \d+ 匹配任意数字字符(等价于 [0-9])
\D 匹配非数字 \D+ 匹配 abc、空格等
\w 匹配单词字符 \w+ 匹配任意字母、数字或下划线字符(等价于 [a-zA-Z0-9_])
\W 匹配非单词字符 \W+ 匹配 !@# 空格等
\s 匹配空白字符 \s+ 匹配空格、换行、Tab
\S 匹配非空白字符 \S+ 匹配所有非空格字符
(?P…) 命名分组 (?P\d{4}) 命名 year 可 group(‘year’)

🔹 特殊符号说明

符号 是否需要转义 用途
. ✅ 是 匹配任意单个字符
& ❌ 否 正则中无特殊含义,直接匹配字符 &
^ ✅ 是 表示开头(或在 [] 中表示非)
$ ✅ 是 表示结尾
[] ✅ 是 表示字符集合
() ✅ 是 分组和捕获
\ ✅ 是 转义特殊字符,如 \d, .
? ✅ 是 0 或 1 次匹配,或非贪婪匹配
* ✅ 是 0 次或多次匹配
+ ✅ 是 1 次或多次匹配
{} ✅ 是 匹配次数范围
✅ 是

🔹 常见正则表达式示例

场景 正则表达式 匹配内容
匹配邮箱 \w+@\w+.\w+ abc@xyz.com
匹配手机号 1[3-9]\d{9} 13812345678
匹配日期 \d{4}-\d{2}-\d{2} 2025-05-14
匹配中文 [\u4e00-\u9fa5]+ 你好世界
匹配IP地址 \d{1,3}(.\d{1,3}){3} 192.168.1.1

re用法

re 用法

🧪 示例:不使用 vs 使用 re.compile()

🧱 普通用法(不推荐重复用正则时)

1
2
3
4
import re

re.search(r"\d+", "编号123") # 每次都重新解析正则
re.search(r"\d+", "订单456")

🚀 推荐写法(使用 re.compile)

1
2
3
4
5
6
import re

pattern = re.compile(r"\d+") # 编译成正则对象

print(pattern.search("编号123")) # 匹配成功
print(pattern.search("订单456")) # 复用 pattern,不用重新编译

🎯 优势总结

优点 说明
✅ 提高性能 编译一次后多次使用,比每次都重新解析正则更高效
✅ 增强可读性 把正则表达式独立定义,更清晰
✅ 支持 flags 参数 可以传入 re.IGNORECASE 等模式开关

🎈 带 flags 示例(忽略大小写)

1
2
3
4
pattern = re.compile(r"hello", re.IGNORECASE)

match = pattern.search("HeLLo World")
print(match.group()) # 输出:HeLLo

re 函数的用法区别

方法 用途简介 匹配位置 返回类型
re.search 搜索字符串中第一次出现正则表达式的模式 从字符串开头开始匹配 match 对象
re.match 从字符串开头匹配正则表达式的模式 从字符串开头开始匹配 match 对象
re.findall 搜索字符串,以列表形式返回全部能匹配的子串 从字符串开头开始匹配 列表(字符串或元组)
re.split 以正则表达式为分隔符拆分字符串 从字符串开头开始匹配 列表(字符串或元组)
re.sub 使用正则表达式替换字符串中每一个匹配的子串后返回替换后的字符串 从字符串开头开始匹配 字符串

Match 对象# Match 对象是 re 模块中匹配操作返回的结果对象,包含匹配信息

以下是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import re

text = "Hello, my email is example@example.com"
pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"

match = re.search(pattern, text) # 返回一个 Match 对象

if match:
print(f"找到匹配: {match.group()}") # 输出匹配的字符串
print(f"匹配的起始位置: {match.start()}") # 输出匹配的起始索引
print(f"匹配的结束位置: {match.end()}") # 输出匹配的结束索引
print(f"匹配的字符串片段: {text[match.start():match.end()]}") # 输出匹配的字符串片段
else:
print("未找到匹配")

match

1
2
3
4
5
result = re.match(r'\d+', '123abc')  # 匹配开头数字
if result:
print(result.group()) # 输出: 123
else:
print("No match")
1
2
3
4
5
result = re.search(r'\d+', '123abc')  # 匹配任意位置数字
if result:
print(result.group()) # 输出: 123
else:
print("No match")

findall

1
2
3
4
5
result = re.findall(r'\d+', '123abc')  # 匹配所有数字
if result:
print(result) # 输出: ['123']
else:
print("No match")

split

1
2
3
4
5
result = re.split(r'\d+', '123abc')  # 以数字分割字符串
if result:
print(result) # 输出: ['abc']
else:
print("No match")

sub

1
2
3
4
5
result = re.sub(r'\d+', 'abc', '123abc')  # 用abc替换所有数字
if result:
print(result) # 输出: abcabc
else:
print("No match")

Python的函数

定义函数

定义函数
我们不仅可以调用函数,还可以定义自己的函数

1
2
3
4
5
6
7
8
9
10
11
12
扇形面积计算公式:圆心角度数除以360乘以pi乘以半径的平方
central_angle_1 = 160
radius_1 = 30
sector_area_1 = central_angle / 360 * 3.14 * radius_1 ** 2

central_angle_2 = 160
radius_2 = 30
sector_area_2 = central_angle_2 / 360 * 3.14 * radius_2 ** 2
print(sector_area_1, sector_area_2)

dry
don't repeat yourself
1
2
3
4
5
6
7
8
9
10
11
def calculator_sector_1(central_angle, radius):
sector_area = central_angle / 360 * 3.14 * radius ** 2
print(sector_area)

calculator_sector_1(160, 30)
# 通过参数,让函数变得通用

def favority_book(title):
print(f"One of my favority books is {title}")

favority_book("stray birds ")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# /前面的参数是强制位置参数
def make_judgement(a, b, c, /):
"""判断三条边的长度能否构成三角形"""
return a + b > c and b + c > a and a + c > b


# 下面的代码会产生TypeError错误,错误信息提示“强制位置参数是不允许给出参数名的”
# TypeError: make_judgement() got some positional-only arguments passed as keyword arguments
# print(make_judgement(b=2, c=3, a=1))

def make_judgement(a, b, c, /):
# / 表示位置参数,只能使用位置参数传递参数,不能使用关键字参数传递参数

"""判断三条边的长度能否构成三角形"""
return a + b > c and b + c > a and a + c > b

print(make_judgement(5, 3, 4))
print(make_judgement(a=5, b=3, c=4)) # 报错,不能使用关键字参数传递参数
# print(make_judgement(1, 2, 3, 4)) # 报错,不能使用关键字参数传递参数

# *后面的参数是命名关键字参数
def make_judgement(*, a, b, c):
# * 表示命名关键字参数,必须使用关键字参数传递参数,不能使用位置参数传递参数
"""判断三条边的长度能否构成三角形"""
return a + b > c and b + c > a and a + c > b
print(make_judgement(a=5, b=3, c=4))
# print(make_judgement(1, 2, 3)) # 报错,必须使用关键字参数传递参数

验证码

1
2
3
4
5
6
7
8
9
10
11
import random
import string # 用于生成随机字符串

ALL_CHARS = string.digits + string.ascii_letters # 生成所有字符
print(type(ALL_CHARS))
def generate_code(*, code_len=4):
"""生成指定长度的验证码"""
return ''.join(random.sample(ALL_CHARS, code_len)) # 随机抽取指定长度的字符串
# ''.join() 将列表中的字符连接成一个字符串 '' 表示连接的字符不用任何分隔符
print(generate_code()) # 生成4位验证码
# print(generate_code(code_len=6)) # 生成6位验证码

双色球

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import random

RED_BALLS = [i for i in range(1, 34)] # 红球号码范围
BLUE_BALLS = [i for i in range(1, 17)] # 蓝球号码范围


def choose():
"""
生成一组随机号码
:return: 保存随机号码的列表
"""
selected_balls = random.sample(RED_BALLS, 6)
selected_balls.sort()
selected_balls.append(random.choice(BLUE_BALLS))
return selected_balls


def display(balls):
"""
格式输出一组号码
:param balls: 保存随机号码的列表
"""
for ball in balls[:-1]: # 遍历前6个红球号码
print(f'\033[91m{ball:0>2d}\033[0m', end=' ')
print(f'\033[034m{balls[-1]:0>2d}\033[0m')


n = int(input('生成几注号码: '))
for _ in range(n):
display(choose())

偏函数
偏函数是指固定函数的某些参数,生成一个新的函数,这样就无需在每次调用函数时都传递相同的参数。在 Python 语言中,我们可以使用functools模块的partial函数来创建偏函数。例如,int函数在默认情况下可以将字符串视为十进制整数进行类型转换,如果我们修修改它的base参数,就可以定义出三个新函数,分别用于将二进制、八进制、十六进制字符串转换为整数,代码如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import functools

int2 = functools.partial(int, base=2)
int8 = functools.partial(int, base=8)
int16 = functools.partial(int, base=16)

print(int('1001')) # 1001

print(int2('1001')) # 9
print(int8('1001')) # 513
print(int16('1001')) # 4097

a = bin(9)[2:] # 去掉前缀0b
b = oct(513)[2:] # 513是八进制数对应的十进制数为513,所以输出的是513的八进制表示形式,即1001
c = hex(4097)[2:]
print(a, b, c) # 1001 1001 1001

lambda匿名函数的用法

lambdaPython 中创建匿名函数的一种方式,常用于需要一个小函数但不想单独定义 def 函数的场景。

1
lambda 参数1, 参数2, ... : 表达式
  • lambda 是关键字,用于定义匿名函数。
  • 参数1, 参数2,... 是函数的参数列表,可以有多个参数,也可以没有参数。
  • : 是参数列表和函数体之间的分隔符。
  • 表达式 是函数的返回值,可以是任意合法的表达式。

相当于

1
2
def 函数名(参数1, 参数2, ...):
return 表达式

最简单的例子:加法

1
2
add = lambda x, y: x + y
print(add(2, 3)) # 输出 5

等价

1
2
def add(x, y):
return x + y

用作 sorted() 的 key 参数

1
2
3
data = [(1, 3), (2, 1), (4, 2)]
sorted_data = sorted(data, key=lambda x: x[1])
print(sorted_data) # 输出 [(2, 1), (4, 2), (1, 3)]

sorted

sorted() 是 Python 中用来对可迭代对象进行排序的函数,返回一个新的排序后的列表,不会改变原始数据。

✅ 一、基本语法

1
sorted(iterable, key=None, reverse=False)
  • iterable:要排序的可迭代对象,如列表、元组、集合等。
  • key:可选参数,用于指定排序的规则。可以是一个函数,也可以是一个 lambda 表达式。
  • reverse:可选参数,用于指定排序的顺序。默认为 False,表示升序;如果为 True,表示降序。

✅ 二、简单例子

1. 对列表进行排序

1
2
3
nums = [5, 2, 9, 1]
sorted_nums = sorted(nums)
print(sorted_nums) # 输出 [1, 2, 5, 9]

2. 字符串列表排序(按字母顺序)

1
2
words = ['banana', 'apple', 'cherry']
print(sorted(words)) # 输出 ['apple', 'banana', 'cherry']

3. 反向排序

1
2
nums = [3, 1, 4]
print(sorted(nums, reverse=True)) # 输出 [4, 3, 1]

4. 按字符串长度排序

1
2
words = ['banana', 'apple', 'kiwi']
print(sorted(words, key=len)) # 输出 ['kiwi', 'apple', 'banana']

5. 对字典列表按某个字段排序

1
2
3
4
5
6
7
8
9
10
11
12
13
people = [
{'name': 'Tom', 'age': 25},
{'name': 'Jerry', 'age': 20},
{'name': 'Bob', 'age': 30}
]

sorted_people = sorted(people, key=lambda x: x['age'])
for person in sorted_people:
print(person['name'], person['age'])
# 输出:
# Jerry 20
# Tom 25
# Bob 30