说明
《Python 教程》 持续更新中,提供建议、纠错、催更等加作者微信: gairuo123(备注:pandas教程)和关注公众号「盖若」ID: gairuo。跟作者学习,请进入 Python学习课程。欢迎关注作者出版的书籍:《深入浅出Pandas》 和 《Python之光》。
Python threading 模块中的 Semaphore 对象是一种同步原语,用于控制对共享资源的访问。它维护一个内部计数器,用来限制可以同时访问资源的线程数量。
信号量是一种用于线程同步的计数器,用来限制可以访问某资源或代码块的线程数量。它允许多个线程同时访问共享资源,但对同时访问的线程数加以限制。
信号量维护一个计数器,初始值为指定的值(默认为1)。每次线程获取信号量时,计数器减1;释放信号量时,计数器加1。当计数器为0时,尝试获取信号量的线程将被阻塞,直到其他线程释放信号量。
信号量维护着一个内部计数器。当线程调用 acquire 方法时,如果计数器大于 0 ,则计数器减 1 并且线程继续执行;如果计数器为 0 ,则线程会被阻塞,直到有其他线程调用 release 方法使计数器增加,从而唤醒被阻塞的线程。
构造方法:
semaphore = threading.Semaphore(value=1)
创建一个信号量对象。value 指定信号量的初始计数值,默认是 1。
常用方法:
acquire(blocking=True, timeout=None)
: 获取信号量。如果信号量计数大于零,则计数减一并返回 True。否则,线程阻塞直到信号量计数大于零或超时。release()
: 释放信号量,计数加一,并唤醒一个阻塞的线程(如果有)。使用场景:
简单的示例代码为:
import threading
import time
# 创建一个信号量对象,初始值为2
semaphore = threading.Semaphore(2)
def task(n):
with semaphore:
print(f"线程 {n} 获取信号量")
time.sleep(1) # 模拟任务
print(f"线程 {n} 释放信号量")
threads = []
for i in range(5):
t = threading.Thread(target=task, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
再如:
import threading
import time
semaphore = threading.Semaphore(2) # 最多允许 2 个线程同时访问
def worker(name):
with semaphore:
print(f"{name} 获得资源访问权限")
time.sleep(2)
print(f"{name} 完成操作,释放资源")
threads = [
threading.Thread(target=worker, args=("线程 1",)),
threading.Thread(target=worker, args=("线程 2",)),
threading.Thread(target=worker, args=("线程 3",)),
threading.Thread(target=worker, args=("线程 4",))
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
这个案例模拟了一个简单的网络爬虫,使用信号量来限制并发请求数,以避免对目标服务器造成过大压力。
import threading
import time
import random
import queue
class WebCrawler:
def __init__(self, max_concurrent_requests=3):
self.semaphore = threading.Semaphore(max_concurrent_requests)
self.queue = queue.Queue()
self.results = []
self.result_lock = threading.Lock()
def add_url(self, url):
self.queue.put(url)
def crawl(self):
while True:
try:
url = self.queue.get(block=False)
except queue.Empty:
break
with self.semaphore:
result = self.fetch_url(url)
with self.result_lock:
self.results.append(result)
self.queue.task_done()
def fetch_url(self, url):
# 模拟网络请求
print(f"Fetching: {url}")
time.sleep(random.uniform(0.5, 2))
return f"Content of {url}"
def run(self, num_threads):
threads = []
for _ in range(num_threads):
thread = threading.Thread(target=self.crawl)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
def get_results(self):
return self.results
def main():
crawler = WebCrawler(max_concurrent_requests=3)
# 添加要爬取的URL
for i in range(10):
crawler.add_url(f"http://example.com/page{i}")
# 开始爬取
start_time = time.time()
crawler.run(num_threads=5)
end_time = time.time()
# 打印结果
print("\nCrawling Results:")
for result in crawler.get_results():
print(result)
print(f"\nTotal time: {end_time - start_time:.2f} seconds")
print(f"Total pages crawled: {len(crawler.get_results())}")
if __name__ == "__main__":
main()
这个综合案例展示了以下几点:
这个例子很好地说明了 Semaphore 在实际应用中的重要性:
Semaphore 在需要控制并发访问资源数量的场景中非常有用。它提供了一种简单而有效的方式来管理共享资源,特别适合在网络编程、数据库连接池、线程池等领域使用。
使用 Semaphore 时需要注意:
在更复杂的系统中,Semaphore 常常与其他同步原语(如 Lock, Condition)结合使用,以实现更复杂的并发控制策略。
更新时间:June 30, 2024, 5:28 p.m. 标签:python threading 线程 信号量对象