lock = threading.Lock():生成锁对象,全局唯一;lock.acquire():获取锁。如果没有获取到,程序就会被阻塞,直到获取到锁才会继续执行;lock.release():释放锁,返回后其他人也可以调用; 【注意】:lock.acquire() 和 lock.release()必须成对出现,否则可能会导致死锁。
为了避免这个问题,你可以使用上下文管理器来锁定。如下图:
导入线程
锁=线程.Lock()
带锁:
#这里写下你要实现的代码
pass 【说明】with语句会在该代码块执行前自动获取锁,执行后自动释放锁。
为何要“上”锁 ?
进口螺纹
导入时间
g_num=0
def test1(num):
全局g_num
对于范围(num): 内的i
mutex.acquire() # 锁定
g_num +=1
mutex.release() # 解锁
print("---test1---g_num=%d"%g_num)
def test2(num):
全局g_num
对于范围(num): 内的i
mutex.acquire() # 锁定
g_num +=1
mutex.release() # 解锁
print("---test2---g_num=%d"%g_num)
# 创建互斥锁
# 默认为解锁状态
互斥锁=threading.Lock()
#创建2个线程,让它们各自对g_num加1000000次
p1=threading.Thread(target=test1, args=(1000000,))
p1.start()
p2=threading.Thread(target=test2, args=(1000000,))
p2.start()
# 等待计算完成
而len(threading.enumerate()) !=1:
时间.睡眠(1)
print("两个线程操作同一个全局变量后最终结果为:%s" % g_num) 输出:
---test1---g_num=1909909
---test2---g_num=2000000
两个线程操作同一个全局变量后最终结果为:2000000 【总结】进入互斥锁后,结果符合预期。
关于死锁
【说明】线程间共享多个资源时,如果两个线程各自占用部分资源,同时等待对方的资源,就会发生死锁。
让我们看一个例子:
导入线程
导入时间
类MyThread1(threading.Thread):
def 运行(自我):
# 锁定互斥体A
mutexA.acquire()
# mutexA被锁定后,延迟1秒,等待另一个线程锁定mutexB
print(self.name+"----do1---up----")
时间.睡眠(1)
# 此时会被阻塞,因为mutexB已经被另一个线程锁定了。
mutexB.acquire()
print(self.name+"----do1---down----")
mutexB.release()
# 解锁互斥体A
mutexA.release()
类MyThread2(threading.Thread):
def 运行(自我):
# 锁定互斥体B
mutexB.acquire()
# mutexB被锁定后,延迟1秒,等待另一个线程锁定mutexA。
print(self.name+"----do2---up----")
时间.睡眠(1)
# 此时会被阻塞,因为mutexA已经被另一个线程锁定了。
mutexA.acquire()
print(self.name+"----do2---down----")
mutexA.release()
# 解锁互斥体B
mutexB.release()
mutexA=threading.Lock()
mutexB=threading.Lock()
如果__name__=="__main__":
t1=MyThread1()
t2=MyThread2()
t1.start()
t2.start() 【要点】标准的锁对象(threading.Lock)并不关心当前是哪个线程占有了该锁;如果该锁已经被占有了,那么任何其它尝试获取该锁的线程都会被阻塞,包括已经占有该锁的线程也会被阻塞。【获取锁和释放锁的语句也可以使用Python的with实现】
[知识改进] 如果一个线程修改了两个函数调用之间的共享资源,我们最终会得到不一致的数据。 【最直接的解决办法】就是在这个函数中也使用lock。然而,这是不可行的。里面的两个访问函数将会阻塞,因为外面的语句已经拥有锁。
饱受争议的GIL(全局锁)
什么是GIL呢?
【说明】任何Python线程在执行之前,必须首先获得GIL锁。然后,每执行100个字节码,解释器就会自动释放GIL锁,让其他线程有机会执行。这个GIL全局锁实际上锁住了所有线程的执行代码。因此,Python中多线程只能交替执行。即使100个核的CPU上运行100个线程,也只能使用1个核。
GIL执行过程
1).设置一个GIL; 2)。切换线程准备任务执行(Runnale就绪状态); 3)。跑步; 4).可能的状态:线程任务执行结束; time.sleep()需要获取其他信息才能继续执行(如:读取文件,需要从网上下载html网页)5)。将线程设置为休眠状态;
6).解锁GIL;
【重要】任何时候python解释器中都只有一个线程在执行;
I/O 密集型(输入、输出):
计算密集型(cpu始终被占用):
那么如何避免受到GIL的影响?
使用多处理而不是多线程。更改Python解释器,不要使用CPython
Queue队列
谈论多线程时,我们不得不谈论Queue队列,这是从一个线程向另一个线程发送数据的最安全的方式。创建一个由多个线程共享的Queue 对象,这些线程使用put() 和get() 操作从队列中添加或删除元素。
关于Queue队列的重要函数
从队列导入队列
# maxsize默认为0,无限制
# 一旦为0并且消息数量达到限制,q.put()也会阻塞
q=队列(最大大小=0)
# 阻塞程序并等待队列消息。
q.get()
# 获取消息并设置超时时间
q.get(超时=5.0)
# 发送消息
q.put()
# 等待所有消息被消费完
q.join()
# 只需知道以下三个方法,但不要在代码中使用它们。
#查询当前队列中的消息数量
q.qsize()
# 是否所有队列消息都已被消费,True/False
q.empty()
# 检查队列中的消息是否已满
q.full()
生产者-消费者模型(继承实现)
什么是生产者-消费者模型?
某个模块专门负责生产+数据,可以认为是生产者;
另一个模块负责处理产生的数据,可以被视为消费者。
在生产者和消费者之间添加一个缓冲区(队列实现),可以将其视为存储。
【生产者】===》【缓冲区】===》【消费者】
生产者和消费者概念图
生产者-消费者模型的优点
1).生产者和消费者之间的依赖减少,逻辑连接更少,代码简化; 2)。生产者和消费者是两个独立的个体,可以并发执行;
关于线程池
在Python3中,创建线程池是通过concurrent.futures函数库中的ThreadPoolExecutor类实现的。
未来对象:是在未来某个时间完成操作的对象。提交方法可以返回一个future 对象。
我们先看例子:简单的线程池实现
#线程执行的函数
def 添加(n1,n2):
v=n1 + n2
print("添加:", v , ", tid:",threading.currentThread().ident)
时间.睡眠(n1)
返回v
#通过submit将需要执行的函数扔到线程池中。
#submit直接返回一个future对象
ex=ThreadPoolExecutor(max_workers=3) #设置运行N个线程
f1=ex.submit(add,2,3)
f2=ex.submit(add,2,2)
print("主线程运行")
print(f1.done()) #done 查看任务是否结束
print(f1.result()) #获取结果,阻塞方法的简单线程池实现
导入队列
导入线程
导入时间
"""
这个简单示例的想法是:
1.利用Queue特性在Queue中创建多个线程对象
2.当我执行代码时,我去队列中获取线程!
如果线程池中有可用的,则直接取。
如果线程池中没有可用的内容,则等待。
3、线程执行完毕后,返回到线程池中。
"""
class ThreadPool(object): #创建线程池类
def __init__(self,max_thread=20):#构造方法,设置最大线程数为20
self.queue=Queue.Queue(max_thread) #创建队列
for i in xrange(max_thread):#循环将线程对象添加到队列中
self.queue.put(线程.Thread)
#将线程的类名放入Queue中并执行完毕
def get_thread(self):#定义从队列中获取线程的方法
返回self.queue.get()
def add_thread(self):#定义将线程添加到队列的方法
self.queue.put(线程.Thread)
池=线程池(10)
def func(arg,p):
打印参数
时间.睡眠(2)
p.add_thread() #当前线程已经执行完毕,我将向队列中添加一个线程!
对于我在xrange(300):
thread=pool.get_thread() #线程池有10个线程,每个周期都会拿走一个!默认是queue.get(),如果队列中没有数据就会等待。
t=线程(目标=func,args=(i,池))
t.start()
"""
self.queue.put(threading.Thread) 添加一个类,而不是一个对象。如果同一个类在内存中只占用一块内存空间
而如果对象存放在这里的话,每次添加的时候,都要在内存中开辟一块内存空间。
而如果是对象的话:下面的语句就不能这样调用了!
对于我在xrange(300):
线程=pool.get_thread()
t=线程(目标=func,args=(i,池))
t.start()
通过查看源码可以知道,线程的构造函数中: self.__args=args self.__target=target 都是私有字段,所以调用应该这样写
对于我在xrange(300):
ret=pool.get_thread()
ret._Thread__target=func
ret._Thread__args=(i,池)
ret.start()【map方法】
返回值与提交的顺序一致。也就是说,它是有序的。
#下面是map方法的简单使用。
#注意:map返回一个生成器并且是有序的。
URLS=["http://www.baidu.com", "http://www.qq.com", "http://www.sina.com.cn"]
def get_html(url):
print("线程id:",threading.currentThread().ident,"访问过:",url)
#这里使用requests模块
返回请求.get(url)
ex=ThreadPoolExecutor(max_workers=3)
#内部迭代时,每个URL启动一个线程
res_iter=ex.map(get_html,URLS)
对于res_iter: 中的资源
#这会阻塞直到线程完成或者发生异常
print("url:%s ,len: %d"%(res.url,len(res.text)))【 as_completed 】
用于确定提交何时完成,以避免一次又一次调用future.done 或使用future.result。
并发.futures.as_completed(fs, timeout=None):返回一个在迭代过程中会阻塞的生成器。
【关联】map方法按顺序返回,as_completed表示哪个线程先完成/失败再返回。
【举个栗子】
#as_completed 返回一个迭代生成器,该生成器在线程完成(或失败)后返回
URLS=["http://www.baidu.com", "http://www.qq.com", "http://www.sina.com.cn"]
def get_html(url):
时间.睡眠(1)
print("线程id:",threading.currentThread().ident,"访问过:",url)
return requests.get(url) #这里使用了requests模块
ex=ThreadPoolExecutor(max_workers=3) #最多3个线程
future_tasks=[ex.submit(get_html,url) for url in URLS] #创建3个未来对象
for future in as_completed(future_tasks): #迭代生成器
尝试:
响应=future.result()
除了异常e:
打印("%s"%e)
否则:
print("%s 有%d 个字节!"%(resp.url, len(resp.text))) 输出:
"""
主题id: 5160 访问了: http://www.baidu.com
帖子id: 7752 访问了: http://www.sina.com.cn
帖子id: 5928 访问了: http://www.qq.com
http://www.qq.com/有240668字节!
http://www.baidu.com/有2381字节!
https://www.sina.com.cn/有577244字节!
"""【重点】关于回调函数add_done_callback(fn)
回调函数在调用线程完成后在同一线程中调用。
导入操作系统、系统、时间、请求、线程
来自同期进口期货
网址=[
"http://baidu.com",
"http://www.qq.com",
"http://www.sina.com.cn"
]
def load_url(url):
print("tid:",threading.currentThread().ident,",url:",url)
将requests.get(url) 作为resp:
返回响应内容
def call_back(obj):
print("-call_back , tid:",threading.currentThread().ident, ",obj:",obj)
将futures.ThreadPoolExecutor(max_workers=3) 作为ex:
# mp={ex.submit(load_url,url) : url for URLS}
mp=字典()
对于URLS: 中的url
f=ex.submit(load_url,url)
mp[f]=网址
f.add_done_callback(call_back)
对于futures.as_completed(mp): 中的f
网址=mp[f]
尝试:
数据=f.result()
except 异常为exc:
打印(例外,",url:",url)
否则:
print("url:", url, ",len:",len(数据),",数据[:20]:",数据[:20])
"""
tid: 7128,url: http://baidu.com
tid: 7892,url: http://www.qq.com
tid: 3712,url: http://www.sina.com.cn
-call_back , tid: 7892 ,obj:url: http://www.qq.com ,len: 251215 ,data[:20]: b"ncall_back , tid: 3712 ,obj:url: http://www.sina .com .cn ,len: 577333 ,data[:20]: b"n
【Python并发编程:深入解析锁、全局锁、队列与线程池】相关文章:
用户评论
终于可以开始学习多线程了!
有10位网友表示赞同!
感觉要用到这些东西以后,代码跑起来会快很多。
有13位网友表示赞同!
锁和全局锁的概念还是挺容易理解的。
有14位网友表示赞同!
Queue队列看起来很有用,应该可以用来传递数据吧?
有18位网友表示赞同!
线程池好像是一个很方便的方法,能快速创建和管理多个线程。
有14位网友表示赞同!
这个教程感觉很好入门,慢慢学习一遍吧!
有18位网友表示赞同!
我已经在自己的项目里碰到了多线程的问题,刚好看到这篇教程挺欣喜的。
有8位网友表示赞同!
代码实现的多线程功能应该很有帮助,能更快提高程序效率。
有18位网友表示赞同!
看了一些相关文档,感觉锁和队列都是很关键的概念!
有13位网友表示赞同!
需要好好学习一下Python的多线程机制,希望可以写出更优秀的代码。
有13位网友表示赞同!
多线程的应用场景很多,以后有机会好好研究一下。
有12位网友表示赞同!
之前一直用单线程开发,现在决定开始学习多线程,希望能提升项目性能!
有19位网友表示赞同!
看到这个主题感觉很有潜力,可以让我在编程上进一步进阶。
有19位网友表示赞同!
队列的使用方法还是需要仔细理解一下才能灵活运用。
有16位网友表示赞同!
线程池的管理机制很有趣,应该会让我对Python的多Threading有更深的理解。
有13位网友表示赞同!
期待学习这个Python多线程相关的知识内容!
有16位网友表示赞同!
这个标题包含了多个关键词,说明教程内容应该是很全面的。
有5位网友表示赞同!
学习完多线程后,我的编程能力应该会得到很大的提升。
有15位网友表示赞同!
多线程可以解决很多复杂的同步问题,很有必要学习。
有19位网友表示赞同!