Two Dots最新版
170.91MB · 2025-11-24
在分布式系统或大型应用中,事件驱动和解耦通信非常重要。Celery 的 Signal 类正是实现这一目标的工具,它采用了经典的观察者模式,允许你在系统的关键节点(如任务执行前后、worker 启动关闭等)注册和触发自定义回调函数。
简单来说,Signal 就是一个“事件广播器”,你可以注册多个器(回调),当事件发生时,Signal 会通知所有器。
Signal 维护一个接收器列表(receivers),每个接收器都是一个可调用对象(函数或方法)。当事件发生时,Signal 会遍历这个列表,依次调用每个接收器,并传递相关参数。
为了避免内存泄漏,Signal 支持弱引用注册接收器。这样,如果接收器对象被销毁,Signal 不会阻止其被垃圾回收。
Signal 内部使用锁(threading.Lock),保证在多线程环境下注册、注销和触发事件都是安全的。
connect(receiver, sender=None, weak=True, dispatch_uid=None, retry=False)disconnect(receiver, sender=None, dispatch_uid=None)send(sender, **named)has_listeners(sender=None)下面用一个简单的博客评论系统模拟 Signal 的用法:
from celery.utils.dispatch.signal import Signal
# 创建一个评论发布信号
comment_posted = Signal(providing_args=['user', 'content'])
# 定义接收器:发送通知
def notify_admin(signal, sender, user=None, content=None, **kwargs):
print(f"[通知] 管理员:用户 {user} 发布了评论:{content}")
# 定义接收器:自动审核
def auto_moderate(signal, sender, user=None, content=None, **kwargs):
if "spam" in content:
print(f"[审核] 评论被判定为垃圾:{content}")
else:
print(f"[审核] 评论通过:{content}")
# 注册接收器
comment_posted.connect(notify_admin)
comment_posted.connect(auto_moderate)
# 用户发布评论,触发信号
def post_comment(user, content):
print(f"用户 {user} 正在发布评论...")
comment_posted.send(sender='blog', user=user, content=content)
# 测试
post_comment("Alice", "这是一条正常评论")
post_comment("Bob", "spam spam spam")
输出:
用户 Alice 正在发布评论...
[通知] 管理员:用户 Alice 发布了评论:这是一条正常评论
[审核] 评论通过:这是一条正常评论
用户 Bob 正在发布评论...
[通知] 管理员:用户 Bob 发布了评论:spam spam spam
[审核] 评论被判定为垃圾:spam spam spam