说明
《Python 教程》 持续更新中,提供建议、纠错、催更等加作者微信: gr99123(备注:pandas教程)和关注公众号「盖若」ID: gairuo。跟作者学习,请进入 Python学习课程。欢迎关注作者出版的书籍:《深入浅出Pandas》 和 《Python之光》。
多进程间的数据共享确实是一个重要的话题。在多进程和多线程编程中,数据共享是一个重要的问题,需要小心处理以避免数据不一致或竞争条件。
适用于共享简单的数值或数组。
from multiprocessing import Process, Value, Array
def f(n, a):
    n.value = 3.14
    for i in range(len(a)):
        a[i] = a[i] * 2
if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(5))
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
    print(num.value)  # 输出: 3.14
    print(arr[:])     # 输出: [0, 2, 4, 6, 8]
将大型等差数列的和再求总和:
from multiprocessing import Process, Value
import time
start_time = time.time()
# 进程要执行的代码
def task_func(pid, num, total):
    print(f'{pid=}开始执行({num=})...')
    n = sum(range(num))
    print(f'{pid=}执行完成, 结果为{n=}')
    # with total.get_lock():
    total.value += n
if __name__=='__main__':
    nums = [123**4, 124**4, 125**4, 126**4]
    print(f'程序开始执行...')
    procs = []
    
    # 创建共享变量
    total = Value('q', 0)  # 'q' 表示长整型
    
    # 用参数实例化
    for pid, n in enumerate(nums):
        p = Process(target=task_func, args=(pid, n, total))
        procs.append(p)
        p.start()
    
    # 完成执行
    for i in procs:
        i.join()
    print(f'全部完成!总和为:{total.value}')
    end_time = time.time()
    print(f"\n执行时间: {end_time - start_time:.2f} seconds")
不求和,返回列表:
from multiprocessing import Process, Manager
import time
start_time = time.time()
# 进程要执行的代码
def task_func(pid, num, shared_list):
    print(f'{pid=}开始执行({num=})...')
    n = sum(range(num))
    print(f'{pid=}执行完成, 结果为{n=}')
    shared_list.append(n)
if __name__=='__main__':
    nums = [123**4, 124**4, 125**4, 126**4]
    print(f'程序开始执行...')
    procs = []
    
    # 创建一个共享列表
    manager = Manager()
    shared_list = manager.list()
    
    # 用参数实例化
    for pid, n in enumerate(nums):
        p = Process(target=task_func, args=(pid, n, shared_list))
        procs.append(p)
        p.start()
    
    # 完成执行
    for i in procs:
        i.join()
    print(f'全部完成!总和为:{list(shared_list)}')
    end_time = time.time()
    print(f"\n执行时间: {end_time - start_time:.2f} seconds")
适用于需要共享更复杂的Python对象,如字典或列表。
from multiprocessing import Process, Manager
def f(d, l):
    d['key'] = 1
    l.append('item')
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list()
        p = Process(target=f, args=(d, l))
        p.start()
        p.join()
        print(d)  # 输出: {'key': 1}
        print(l)  # 输出: ['item']
适用于进程间需要传递数据或实现生产者-消费者模式。
from multiprocessing import Process, Queue
def producer(q):
    q.put('Hello')
def consumer(q):
    msg = q.get()
    print(f"Got: {msg}")
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
适用于两个进程之间需要双向通信。
from multiprocessing import Process, Pipe
def f(conn):
    conn.send('Hello')
    print(conn.recv())
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    parent_conn.send('Goodbye')
    p.join()
适用于需要共享大量数据且要求高性能的场景。
from multiprocessing import shared_memory
import numpy as np
def create_shm():
    a = np.array([1, 1, 2, 3, 5, 8])
    shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
    b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
    b[:] = a[:]
    return shm.name
def read_shm(name):
    existing_shm = shared_memory.SharedMemory(name=name)
    c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
    print(c)
    existing_shm.close()
if __name__ == '__main__':
    name = create_shm()
    p = Process(target=read_shm, args=(name,))
    p.start()
    p.join()
    # 清理共享内存
    shm = shared_memory.SharedMemory(name=name)
    shm.close()
    shm.unlink()
适用于需要持久化数据或数据量较大的情况。
import os
from multiprocessing import Process
def write_to_file(filename, data):
    with open(filename, 'w') as f:
        f.write(data)
def read_from_file(filename):
    with open(filename, 'r') as f:
        print(f.read())
if __name__ == '__main__':
    filename = 'shared_data.txt'
    p1 = Process(target=write_to_file, args=(filename, "Hello, World!"))
    p2 = Process(target=read_from_file, args=(filename,))
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    os.remove(filename)  # 清理文件
选择哪种方法取决于您的具体需求:
在实际应用中,可能需要结合使用多种方法来实现最优的数据共享策略。同时,要注意在使用共享资源时正确处理同步问题,以避免数据竞争和其他并发问题。
更新时间:2024-07-02 10:51:47 标签:python 多进程 数据 共享