python元编程

元编程和元类

对象是通过类创建的,类是通过元类创建的,元类提供了创建类的元信息。所有的类都直接或间接的继承自object,所有的元类都直接或间接的继承自type

例子:用元类实现单例模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading


class SingletonMeta(type):
"""自定义元类"""

def __init__(cls, *args, **kwargs):
cls.__instance = None
cls.__lock = threading.RLock()
super().__init__(*args, **kwargs)

def __call__(cls, *args, **kwargs):
if cls.__instance is None:
with cls.__lock:
if cls.__instance is None:
cls.__instance = super().__call__(*args, **kwargs)
return cls.__instance


class President(metaclass=SingletonMeta):
"""总统(单例类)"""

pass
  • 面向对象设计原则

    • 单一职责原则 (SRP)- 一个类只做该做的事情(类的设计要高内聚)
    • 开放封闭原则 (OCP)- 对扩展开放,对修改关闭(类的设计要低耦合)
    • 里氏替换原则 (LSP)- 子类可以替换父类(子类的设计要高内聚)
    • 接口隔离原则 (ISP)- 接口要小而专(接口的设计要高内聚)
    • 依赖倒置原则 (DIP)- 高层模块不应该依赖低层模块,二者都应该依赖抽象(类的设计要高内聚)
  • GoF 设计模式

    • 创建型模式:单例、工厂、建造者、原型
    • 结构型模式:适配器、门面(外观)、代理
    • 行为型模式:观察者、策略、模板方法、迭代器、责任链、命令、备忘录、状态、访问者、中介者、解释器

      例子:可插拔的哈希算法(策略模式)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  class StreamHasher:
"""哈希摘要生成器"""

def __init__(self, alg='md5', size=4096):
self.size = size
alg = alg.lower()
self.hasher = getattr(__import__('hashlib'), alg.lower())()

def __call__(self, stream):
return self.to_digest(stream)

def to_digest(self, stream):
"""生成十六进制形式的摘要"""
for buf in iter(lambda: stream.read(self.size), b''):
self.hasher.update(buf)
return self.hasher.hexdigest()

def main():
"""主函数"""
hasher1 = StreamHasher()
with open('Python-3.7.6.tgz', 'rb') as stream:
print(hasher1.to_digest(stream))
hasher2 = StreamHasher('sha1')
with open('Python-3.7.6.tgz', 'rb') as stream:
print(hasher2(stream))


if __name__ == '__main__':
main()

迭代器和生成器

  • 迭代器是实现了迭代器协议的对象。
    • Python 中没有像protocolinterface这样的定义协议的关键字。
    • Python 中用魔术方法表示协议。
    • __iter____next__魔术方法就是迭代器协议。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Fib(object):
"""迭代器"""

def __init__(self, num):
self.num = num
self.a, self.b = 0, 1
self.idx = 0

def __iter__(self):
return self

def __next__(self):
if self.idx < self.num:
self.a, self.b = self.b, self.a + self.b
self.idx += 1
return self.a
raise StopIteration()
  • 生成器是语法简化版的迭代器。
1
2
3
4
5
6
def fib(num):
"""生成器"""
a, b = 0, 1
for _ in range(num):
a, b = b, a + b
yield a
  • 生成器进化为协程。
    生成器对象可以使用send()方法发送数据,发送的数据会成为生成器函数中通过yield表达式获得的值。这样,生成器就可以作为协程使用,协程简单的说就是可以相互协作的子程序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def calc_avg():
"""流式计算平均值"""
total, counter = 0, 0
avg_value = None
while True:
value = yield avg_value
total, counter = total + value, counter + 1
avg_value = total / counter


gen = calc_avg()
next(gen)
print(gen.send(10))
print(gen.send(20))
print(gen.send(30))

并发编程

Python 中实现并发编程的三种方案:多线程、多进程和异步 I/O。并发编程的好处在于可以提升程序的执行效率以及改善用户体验;坏处在于并发的程序不容易开发和调试,同时对其他程序来说它并不友好。

多线程

  • 多线程:Python 中提供了Thread类并辅以LockConditionEventSemaphoreBarrier。Python 中有 GIL 来防止多个线程同时执行本地字节码,这个锁对于 CPython 是必须的,因为 CPython 的内存管理并不是线程安全的,因为 GIL 的存在多线程并不能发挥 CPU 的多核特性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
"""
面试题:进程和线程的区别和联系?
进程 - 操作系统分配内存的基本单位 - 一个进程可以包含一个或多个线程
线程 - 操作系统分配CPU的基本单位
并发编程(concurrent programming)
1. 提升执行性能 - 让程序中没有因果关系的部分可以并发的执行
2. 改善用户体验 - 让耗时间的操作不会造成程序的假死
"""
import glob
import os
import threading

from PIL import Image

PREFIX = 'thumbnails'


def generate_thumbnail(infile, size, format='PNG'):
"""生成指定图片文件的缩略图"""
file, ext = os.path.splitext(infile)
file = file[file.rfind('/') + 1:]
outfile = f'{PREFIX}/{file}_{size[0]}_{size[1]}.{ext}'
img = Image.open(infile)
img.thumbnail(size, Image.ANTIALIAS)
img.save(outfile, format)


def main():
"""主函数"""
if not os.path.exists(PREFIX):
os.mkdir(PREFIX)
for infile in glob.glob('images/*.png'):
for size in (32, 64, 128):
# 创建并启动线程
threading.Thread(
target=generate_thumbnail,
args=(infile, (size, size))
).start()


if __name__ == '__main__':
main()
  • 多个线程竞争资源的情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
"""
多线程程序如果没有竞争资源处理起来通常也比较简单
当多个线程竞争临界资源的时候如果缺乏必要的保护措施就会导致数据错乱
说明:临界资源就是被多个线程竞争的资源
"""
import time
import threading

from concurrent.futures import ThreadPoolExecutor


class Account(object):
"""银行账户"""

def __init__(self):
self.balance = 0.0
self.lock = threading.Lock()

def deposit(self, money):
# 通过锁保护临界资源
with self.lock:
new_balance = self.balance + money
time.sleep(0.001)
self.balance = new_balance


def main():
"""主函数"""
account = Account()
# 创建线程池
pool = ThreadPoolExecutor(max_workers=10)
futures = []
for _ in range(100):
future = pool.submit(account.deposit, 1)
futures.append(future)
# 关闭线程池
pool.shutdown()
for future in futures:
future.result()
print(account.balance)


if __name__ == '__main__':
main()

修改上面的程序,启动 5 个线程向账户中存钱,5 个线程从账户中取钱,取钱时如果余额不足就暂停线程进行等待。为了达到上述目标,需要对存钱和取钱的线程进行调度,在余额不足时取钱的线程暂停并释放锁,而存钱的线程将钱存入后要通知取钱的线程,使其从暂停状态被唤醒。可以使用threading模块的Condition来实现线程调度,该对象也是基于锁来创建的,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""
多个线程竞争一个资源 - 保护临界资源 - 锁(Lock/RLock)
多个线程竞争多个资源(线程数>资源数) - 信号量(Semaphore)
多个线程的调度 - 暂停线程执行/唤醒等待中的线程 - Condition
"""
from concurrent.futures import ThreadPoolExecutor
from random import randint
from time import sleep

import threading


class Account:
"""银行账户"""

def __init__(self, balance=0):
self.balance = balance
lock = threading.RLock()
self.condition = threading.Condition(lock)

def withdraw(self, money):
"""取钱"""
with self.condition:
while money > self.balance:
self.condition.wait()
new_balance = self.balance - money
sleep(0.001)
self.balance = new_balance

def deposit(self, money):
"""存钱"""
with self.condition:
new_balance = self.balance + money
sleep(0.001)
self.balance = new_balance
self.condition.notify_all()


def add_money(account):
while True:
money = randint(5, 10)
account.deposit(money)
print(threading.current_thread().name,
':', money, '====>', account.balance)
sleep(0.5)


def sub_money(account):
while True:
money = randint(10, 30)
account.withdraw(money)
print(threading.current_thread().name,
':', money, '<====', account.balance)
sleep(1)


def main():
account = Account()
with ThreadPoolExecutor(max_workers=15) as pool:
for _ in range(5):
pool.submit(add_money, account)
for _ in range(10):
pool.submit(sub_money, account)


if __name__ == '__main__':
main()

多进程

  • 多进程:多进程可以有效的解决 GIL 的问题,实现多进程主要的类是Process,其他辅助的类跟threading模块中的类似,进程间共享数据可以使用管道、套接字等,在multiprocessing模块中有一个Queue类,它基于管道和锁机制提供了多个进程共享的队列。下面是官方文档上关于多进程和进程池的一个示例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""
多进程和进程池的使用
多线程因为GIL的存在不能够发挥CPU的多核特性
对于计算密集型任务应该考虑使用多进程
time python3 example22.py
real 0m11.512s
user 0m39.319s
sys 0m0.169s
使用多进程后实际执行时间为11.512秒,而用户时间39.319秒约为实际执行时间的4倍
这就证明我们的程序通过多进程使用了CPU的多核特性,而且这台计算机配置了4核的CPU
"""
import concurrent.futures
import math

PRIMES = [
1116281,
1297337,
104395303,
472882027,
533000389,
817504243,
982451653,
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419
] * 5


def is_prime(n):
"""判断素数"""
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True


def main():
"""主函数"""
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))


if __name__ == '__main__':
main()

重点多线程和多进程的比较

以下情况需要使用多线程:

  1. 程序需要维护许多共享的状态(尤其是可变状态),Python 中的列表、字典、集合都是线程安全的,所以使用线程而不是进程维护共享状态的代价相对较小。
  2. 程序会花费大量时间在 I/O 操作上,没有太多并行计算的需求且不需占用太多的内存。
    以下情况需要使用多进程:
  3. 程序执行计算密集型任务或者要使用多 CPU 来执行。
  4. 程序的输入可以并行的分割,并且可以将分割的数据作为单独的任务提交给进程池。
  5. 程序的输出可以合并成一个单一的结果。
  6. 程序需要在共享内存空间中执行,通常是因为它们之间需要交换大量数据。
  • 异步处理:从调度程序的任务队列中挑选任务,该调度程序以交叉的形式执行这些任务,我们并不能保证任务将以某种顺序去执行,因为执行顺序取决于队列中的一项任务是否愿意将 CPU 处理时间让位给另一项任务。异步任务通常通过多任务协作处理的方式来实现,由于执行时间和顺序的不确定,因此需要通过回调式编程或者future对象来获取任务执行的结果。Python 3 通过asyncio模块和awaitasync关键字(在 Python 3.7 中正式被列为关键字)来支持异步处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
"""
异步I/O - async / await
"""
import asyncio


def num_generator(m, n):
"""指定范围的数字生成器"""
yield from range(m, n + 1)


async def prime_filter(m, n):
"""素数过滤器"""
primes = []
for i in num_generator(m, n):
flag = True
for j in range(2, int(i ** 0.5 + 1)):
if i % j == 0:
flag = False
break
if flag:
print('Prime =>', i)
primes.append(i)

await asyncio.sleep(0.001)
return tuple(primes)


async def square_mapper(m, n):
"""平方映射器"""
squares = []
for i in num_generator(m, n):
print('Square =>', i * i)
squares.append(i * i)

await asyncio.sleep(0.001)
return squares


def main():
"""主函数"""
loop = asyncio.get_event_loop()
future = asyncio.gather(prime_filter(2, 100), square_mapper(1, 100))
future.add_done_callback(lambda x: print(x.result()))
loop.run_until_complete(future)
loop.close()


if __name__ == '__main__':
main()

说明:上面的代码使用get_event_loop函数获得系统默认的事件循环,通过gather函数可以获得一个future对象,future对象的add_done_callback可以添加执行完成时的回调函数,loop对象的run_until_complete方法可以等待通过future对象获得协程执行结果。

Python 中有一个名为aiohttp的三方库,它提供了异步的 HTTP 客户端和服务器,这个三方库可以跟asyncio模块一起工作,并提供了对Future对象的支持。Python 3.6 中引入了asyncawait来定义异步执行的函数以及创建异步上下文,在 Python 3.7 中它们正式成为了关键字。下面的代码异步的从 5 个 URL 中获取页面并通过正则表达式的命名捕获组提取了网站的标题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import re

import aiohttp

PATTERN = re.compile(r'\<title\>(?P<title>.*)\<\/title\>')


async def fetch_page(session, url):
async with session.get(url, ssl=False) as resp:
return await resp.text()


async def show_title(url):
async with aiohttp.ClientSession() as session:
html = await fetch_page(session, url)
print(PATTERN.search(html).group('title'))


def main():
urls = ('https://www.python.org/',
'https://git-scm.com/',
'https://www.jd.com/',
'https://www.taobao.com/',
'https://www.douban.com/')
loop = asyncio.get_event_loop()
cos = [show_title(url) for url in urls]
loop.run_until_complete(asyncio.wait(cos))
loop.close()


if __name__ == '__main__':
main()

重点异步 I/O 与多进程的比较

当程序不需要真正的并发性或并行性,而是更多的依赖于异步处理和回调时,asyncio就是一种很好的选择。如果程序中有大量的等待与休眠时,也应该考虑asyncio,它很适合编写没有实时数据处理需求的 Web 应用服务器。

Python 还有很多用于处理并行任务的三方库,例如:joblibPyMP等。实际开发中,要提升系统的可扩展性和并发性通常有垂直扩展(增加单个节点的处理能力)和水平扩展(将单个节点变成多个节点)两种做法。可以通过消息队列来实现应用程序的解耦合,消息队列相当于是多线程同步队列的扩展版本,不同机器上的应用程序相当于就是线程,而共享的分布式消息队列就是原来程序中的 Queue。消息队列(面向消息的中间件)的最流行和最标准化的实现是 AMQP(高级消息队列协议),AMQP 源于金融行业,提供了排队、路由、可靠传输、安全等功能,最著名的实现包括:Apache 的 ActiveMQRabbitMQ 等。
要实现任务的异步化,可以使用名为Celery的三方库。Celery是 Python 编写的分布式任务队列,它使用分布式消息进行工作,可以基于 RabbitMQ 或 Redis 来作为后端的消息代理。