说明
《Python 教程》 持续更新中,提供建议、纠错、催更等加作者微信: gr99123(备注:pandas教程)和关注公众号「盖若」ID: gairuo。跟作者学习,请进入 Python学习课程。欢迎关注作者出版的书籍:《深入浅出Pandas》 和 《Python之光》。
多进程间的数据共享确实是一个重要的话题。在多进程和多线程编程中,数据共享是一个重要的问题,需要小心处理以避免数据不一致或竞争条件。
适用于共享简单的数值或数组。
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.14
for i in range(len(a)):
a[i] = a[i] * 2
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(5))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value) # 输出: 3.14
print(arr[:]) # 输出: [0, 2, 4, 6, 8]
将大型等差数列的和再求总和:
from multiprocessing import Process, Value
import time
start_time = time.time()
# 进程要执行的代码
def task_func(pid, num, total):
print(f'{pid=}开始执行({num=})...')
n = sum(range(num))
print(f'{pid=}执行完成, 结果为{n=}')
# with total.get_lock():
total.value += n
if __name__=='__main__':
nums = [123**4, 124**4, 125**4, 126**4]
print(f'程序开始执行...')
procs = []
# 创建共享变量
total = Value('q', 0) # 'q' 表示长整型
# 用参数实例化
for pid, n in enumerate(nums):
p = Process(target=task_func, args=(pid, n, total))
procs.append(p)
p.start()
# 完成执行
for i in procs:
i.join()
print(f'全部完成!总和为:{total.value}')
end_time = time.time()
print(f"\n执行时间: {end_time - start_time:.2f} seconds")
不求和,返回列表:
from multiprocessing import Process, Manager
import time
start_time = time.time()
# 进程要执行的代码
def task_func(pid, num, shared_list):
print(f'{pid=}开始执行({num=})...')
n = sum(range(num))
print(f'{pid=}执行完成, 结果为{n=}')
shared_list.append(n)
if __name__=='__main__':
nums = [123**4, 124**4, 125**4, 126**4]
print(f'程序开始执行...')
procs = []
# 创建一个共享列表
manager = Manager()
shared_list = manager.list()
# 用参数实例化
for pid, n in enumerate(nums):
p = Process(target=task_func, args=(pid, n, shared_list))
procs.append(p)
p.start()
# 完成执行
for i in procs:
i.join()
print(f'全部完成!总和为:{list(shared_list)}')
end_time = time.time()
print(f"\n执行时间: {end_time - start_time:.2f} seconds")
适用于需要共享更复杂的Python对象,如字典或列表。
from multiprocessing import Process, Manager
def f(d, l):
d['key'] = 1
l.append('item')
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list()
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d) # 输出: {'key': 1}
print(l) # 输出: ['item']
适用于进程间需要传递数据或实现生产者-消费者模式。
from multiprocessing import Process, Queue
def producer(q):
q.put('Hello')
def consumer(q):
msg = q.get()
print(f"Got: {msg}")
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
适用于两个进程之间需要双向通信。
from multiprocessing import Process, Pipe
def f(conn):
conn.send('Hello')
print(conn.recv())
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
parent_conn.send('Goodbye')
p.join()
适用于需要共享大量数据且要求高性能的场景。
from multiprocessing import shared_memory
import numpy as np
def create_shm():
a = np.array([1, 1, 2, 3, 5, 8])
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
b[:] = a[:]
return shm.name
def read_shm(name):
existing_shm = shared_memory.SharedMemory(name=name)
c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
print(c)
existing_shm.close()
if __name__ == '__main__':
name = create_shm()
p = Process(target=read_shm, args=(name,))
p.start()
p.join()
# 清理共享内存
shm = shared_memory.SharedMemory(name=name)
shm.close()
shm.unlink()
适用于需要持久化数据或数据量较大的情况。
import os
from multiprocessing import Process
def write_to_file(filename, data):
with open(filename, 'w') as f:
f.write(data)
def read_from_file(filename):
with open(filename, 'r') as f:
print(f.read())
if __name__ == '__main__':
filename = 'shared_data.txt'
p1 = Process(target=write_to_file, args=(filename, "Hello, World!"))
p2 = Process(target=read_from_file, args=(filename,))
p1.start()
p1.join()
p2.start()
p2.join()
os.remove(filename) # 清理文件
选择哪种方法取决于您的具体需求:
在实际应用中,可能需要结合使用多种方法来实现最优的数据共享策略。同时,要注意在使用共享资源时正确处理同步问题,以避免数据竞争和其他并发问题。
更新时间:2024-07-02 10:51:47 标签:python 多进程 数据 共享