哪些Python包提供独立的事件系统?

我知道pydispatcher,但是Python周围一定有其他与事件相关的包。

哪些库是可用的?

我对作为大型框架一部分的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型骨架解决方案。

307292 次浏览

我使用zope.event。这是你能想象到的最简单的骨架。: -) 事实上,这里是完整的源代码:

subscribers = []


def notify(event):
for subscriber in subscribers:
subscriber(event)

例如,请注意不能在进程之间发送消息。它不是一个消息系统,只是一个事件系统,仅此而已。

如果我在pyQt中做代码,我使用QT套接字/信号范式,django也是如此

如果我正在做异步I/O,我使用本机选择模块

如果我使用SAX python解析器,我使用SAX提供的事件API。所以看起来我是底层API的受害者:-)

也许你应该问问自己,你对事件框架/模块有什么期望。我个人的偏好是使用套接字/信号范式从qt。更多的信息可以找到在这里

我们使用Michael ford在事件模式中建议的EventHook:

只需添加EventHooks到你的类:

class MyBroadcaster()
def __init__():
self.onChange = EventHook()


theBroadcaster = MyBroadcaster()


# add a listener to the event
theBroadcaster.onChange += myFunction


# remove listener from the event
theBroadcaster.onChange -= myFunction


# fire event
theBroadcaster.onChange.fire()

我们添加了从对象中移除所有侦听器到Michaels类的功能,最终得到了这样的结果:

class EventHook(object):


def __init__(self):
self.__handlers = []


def __iadd__(self, handler):
self.__handlers.append(handler)
return self


def __isub__(self, handler):
self.__handlers.remove(handler)
return self


def fire(self, *args, **keywargs):
for handler in self.__handlers:
handler(*args, **keywargs)


def clearObjectHandlers(self, inObject):
for theHandler in self.__handlers:
if theHandler.im_self == inObject:
self -= theHandler

我在价值的教训上找到了这个小脚本。它似乎具有我所追求的简单性/功率比。Peter Thatcher是以下代码的作者(未提及许可)。

class Event:
def __init__(self):
self.handlers = set()


def handle(self, handler):
self.handlers.add(handler)
return self


def unhandle(self, handler):
try:
self.handlers.remove(handler)
except:
raise ValueError("Handler is not handling this event, so cannot unhandle it.")
return self


def fire(self, *args, **kargs):
for handler in self.handlers:
handler(*args, **kargs)


def getHandlerCount(self):
return len(self.handlers)


__iadd__ = handle
__isub__ = unhandle
__call__ = fire
__len__  = getHandlerCount


class MockFileWatcher:
def __init__(self):
self.fileChanged = Event()


def watchFiles(self):
source_path = "foo"
self.fileChanged(source_path)


def log_file_change(source_path):
print "%r changed." % (source_path,)


def log_file_change2(source_path):
print "%r changed!" % (source_path,)


watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()

这里有另一个模块供考虑。对于要求更高的应用程序,这似乎是一个可行的选择。

Py-notify是一个Python包 提供实现工具 观察者编程模式。这些 工具包括信号、条件和 变量。< / p > 信号是处理程序的列表 当信号发出时调用。 条件基本上是布尔型的 变量与信号的耦合表明 在条件状态时发射 的变化。它们可以组合使用 标准逻辑运算符(不是,和, 等)转化为复合条件。 与条件不同,变量可以保持不变 任何Python对象,而不仅仅是布尔值,

我一直是这样做的:

class Event(list):
"""Event subscription.


A list of callable objects. Calling an instance of this will cause a
call to each item in the list in ascending order by index.


Example Usage:
>>> def f(x):
...     print 'f(%s)' % x
>>> def g(x):
...     print 'g(%s)' % x
>>> e = Event()
>>> e()
>>> e.append(f)
>>> e(123)
f(123)
>>> e.remove(f)
>>> e()
>>> e += (f, g)
>>> e(10)
f(10)
g(10)
>>> del e[0]
>>> e(2)
g(2)


"""
def __call__(self, *args, **kwargs):
for f in self:
f(*args, **kwargs)


def __repr__(self):
return "Event(%s)" % list.__repr__(self)

然而,就像我看到的其他东西一样,没有自动生成的pydoc,也没有签名,这真的很糟糕。

PyPI包

截至2022年10月,这些是PyPI上可用的事件相关软件包,

有更多的

有很多库可供选择,使用非常不同的术语(事件、信号、处理程序、方法分派、钩子等等)。

我试图对上述软件包以及这里的答案中提到的技术进行概述。

首先,一些术语……

观察者模式

事件系统最基本的风格是'处理程序方法的包',这是一个 观察者模式.

. 0的简单实现

基本上,处理程序方法(可调用对象)存储在一个数组中,每个方法在事件“触发”时被调用。

发布-订阅

观察者事件系统的缺点是你只能在实际事件上注册处理程序 对象(或处理程序列表)。所以在注册时,事件需要已经存在 这就是为什么存在第二种类型的事件系统 发布-订阅模式。 这里,处理程序不在事件对象(或处理程序列表)上注册,而是在中央调度程序上注册。 此外,通知器只与调度程序通信。该听什么,该发布什么 由'signal'决定,它只不过是一个名称(字符串)

调停者模式

也许你也会感兴趣:调停者模式

钩子

钩子系统通常用于应用程序插件的上下文中。的 应用程序包含固定的集成点(钩子),每个插件都可以

.连接到该钩子并执行某些操作

其他“事件”

线程。事件不是一个“事件系统” 在上述意义上。这是一个线程同步系统,其中一个线程等待,直到另一个线程“信号”事件对象

网络消息传递库也经常使用术语“事件”;有时它们在概念上是相似的;有时不是。 它们当然可以跨越线程、进程和计算机边界。如见。 pyzmq pymq, 扭曲的龙卷风geventeventlet.

弱引用

在Python中,保留对方法或对象的引用可确保它不会被删除 被垃圾收集器。这可能是可取的,但也可能导致内存泄漏: 链接的处理程序从来都不是 清理。< / p >

一些事件系统使用弱引用而不是常规引用来解决这个问题。

一些关于各种图书馆的词汇

观察者风格的事件系统:

  • zope.event显示了它如何工作的基本原理(参见Lennart的回答)。注意:这个例子甚至不支持处理程序参数。
  • LongPoke的“可调用列表”的实现表明,这样的事件系统可以通过子类化list来实现。
  • Felk的变体EventHook也确保了被调用者和调用者的签名。
  • spassig的EventHook (Michael ford的事件模式)是一个简单的实现。
  • 约瑟普的重要课程事件类基本相同,但使用set而不是list来存储包,并实现了__call__,这两个都是合理的添加。
  • PyNotify在概念上类似,并且还提供了变量和条件的附加概念('变量更改事件')。主页不正常。
  • 阿克塞尔基本上是一个处理程序包,具有更多与线程、错误处理、…
  • python-dispatch要求从pydispatch.Dispatcher派生偶数源类。
  • buslane是基于类的,支持单个或多个处理程序,并方便广泛的类型提示。
  • Pithikos的观察者/事件是一个轻量级的设计。

发布-订阅库:

  • 信号灯有一些漂亮的功能,如自动断开连接和基于发送者的过滤。
  • PyPubSub是一个稳定的包,并承诺“高级功能,方便调试和维护主题和消息”。
  • pymitter是Node.js EventEmitter2的Python端口,提供名称空间、通配符和TTL。
  • PyDispatcher似乎强调了多对多发布等方面的灵活性。支持弱引用。
  • 路易是一个重做的PyDispatcher,应该“在各种各样的上下文中”工作。
  • pypydispatcher是基于(你猜对了…)PyDispatcher也可以在PyPy中工作。
  • django.dispatch是一个重写的PyDispatcher“接口更有限,但性能更高”。
  • pyeventdispatcher基于PHP的Symfony框架的事件分派器。
  • 调度程序是从django中提取的。但是已经相当老了。
  • Cristian Garcia的EventManger是一个非常简短的实现。

其他:

  • pluggy包含一个钩子系统,由pytest插件使用。
  • RxPy3实现了可观察模式,允许合并事件,重试等。
  • Qt的信号和插槽可从PyQtPySide2。当在同一个线程中使用时,它们作为回调工作, 或者作为两个不同线程之间的事件(使用事件循环)。信号和槽有它们的限制 仅适用于派生自QObject的类的对象

我创建了一个EventManager类(代码在最后)。语法如下:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )


#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )


#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )


#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun


#Delete an event
del EventManager.eventName


#Fire the event
EventManager.eventName()

下面是一个例子:

def hello(name):
print "Hello {}".format(name)
    

def greetings(name):
print "Greetings {}".format(name)


EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello


print "\nInitial salute"
EventManager.salute('Oscar')


print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

输出:

初次礼炮
奥斯卡
你好奥斯卡

现在删除问候语
你好奥斯卡< / p >

EventManger代码:

class EventManager:
    

class Event:
def __init__(self,functions):
if type(functions) is not list:
raise ValueError("functions parameter has to be a list")
self.functions = functions
            

def __iadd__(self,func):
self.functions.append(func)
return self
            

def __isub__(self,func):
self.functions.remove(func)
return self
            

def __call__(self,*args,**kvargs):
for func in self.functions : func(*args,**kvargs)
            

@classmethod
def addEvent(cls,**kvargs):
"""
addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
creates events using **kvargs to create any number of events. Each event recieves a list of functions,
where every function in the list recieves the same parameters.
        

Example:
        

def hello(): print "Hello ",
def world(): print "World"
        

EventManager.addEvent( salute = [hello] )
EventManager.salute += world
        

EventManager.salute()
        

Output:
Hello World
"""
for key in kvargs.keys():
if type(kvargs[key]) is not list:
raise ValueError("value has to be a list")
else:
kvargs[key] = cls.Event(kvargs[key])
        

cls.__dict__.update(kvargs)

这里是一个最小的设计,应该工作得很好。你要做的就是在类中继承Observer,然后使用observe(event_name, callback_fn)监听特定的事件。当特定事件在代码中的任何地方被触发时(例如。Event('USB connected')),相应的回调将触发。

class Observer():
_observers = []
def __init__(self):
self._observers.append(self)
self._observed_events = []
def observe(self, event_name, callback_fn):
self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})




class Event():
def __init__(self, event_name, *callback_args):
for observer in Observer._observers:
for observable in observer._observed_events:
if observable['event_name'] == event_name:
observable['callback_fn'](*callback_args)

例子:

class Room(Observer):
def __init__(self):
print("Room is ready.")
Observer.__init__(self) # DON'T FORGET THIS
def someone_arrived(self, who):
print(who + " has arrived!")


# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)


# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')
你可以看看pymitter (pypi)。它是一个小的单文件(~250 loc)方法

. "提供命名空间,通配符和TTL"

这里有一个基本的例子:

from pymitter import EventEmitter


ee = EventEmitter()


# decorator usage
@ee.on("myevent")
def handler1(arg):
print "handler1 called with", arg


# callback usage
def handler2(arg):
print "handler2 called with", arg
ee.on("myotherevent", handler2)


# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"


ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"

我对Longpoke的极简主义方法做了一个变化,也确保了呼叫者和呼叫者的签名:

class EventHook(object):
'''
A simple implementation of the Observer-Pattern.
The user can specify an event signature upon inizializazion,
defined by kwargs in the form of argumentname=class (e.g. id=int).
The arguments' types are not checked in this implementation though.
Callables with a fitting signature can be added with += or removed with -=.
All listeners can be notified by calling the EventHook class with fitting
arguments.


>>> event = EventHook(id=int, data=dict)
>>> event += lambda id, data: print("%d %s" % (id, data))
>>> event(id=5, data={"foo": "bar"})
5 {'foo': 'bar'}


>>> event = EventHook(id=int)
>>> event += lambda wrong_name: None
Traceback (most recent call last):
...
ValueError: Listener must have these arguments: (id=int)


>>> event = EventHook(id=int)
>>> event += lambda id: None
>>> event(wrong_name=0)
Traceback (most recent call last):
...
ValueError: This EventHook must be called with these arguments: (id=int)
'''
def __init__(self, **signature):
self._signature = signature
self._argnames = set(signature.keys())
self._handlers = []


def _kwargs_str(self):
return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())


def __iadd__(self, handler):
params = inspect.signature(handler).parameters
valid = True
argnames = set(n for n in params.keys())
if argnames != self._argnames:
valid = False
for p in params.values():
if p.kind == p.VAR_KEYWORD:
valid = True
break
if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
valid = False
break
if not valid:
raise ValueError("Listener must have these arguments: (%s)"
% self._kwargs_str())
self._handlers.append(handler)
return self


def __isub__(self, handler):
self._handlers.remove(handler)
return self


def __call__(self, *args, **kwargs):
if args or set(kwargs.keys()) != self._argnames:
raise ValueError("This EventHook must be called with these " +
"keyword arguments: (%s)" % self._kwargs_str())
for handler in self._handlers[:]:
handler(**kwargs)


def __repr__(self):
return "EventHook(%s)" % self._kwargs_str()

如果你想做更复杂的事情,比如合并事件或重试,你可以使用Observable模式和一个成熟的库来实现它。https://github.com/ReactiveX/RxPY。可观察对象在Javascript和Java中非常常见,在一些异步任务中使用非常方便。

from rx import Observable, Observer




def push_five_strings(observer):
observer.on_next("Alpha")
observer.on_next("Beta")
observer.on_next("Gamma")
observer.on_next("Delta")
observer.on_next("Epsilon")
observer.on_completed()




class PrintObserver(Observer):


def on_next(self, value):
print("Received {0}".format(value))


def on_completed(self):
print("Done!")


def on_error(self, error):
print("Error Occurred: {0}".format(error))


source = Observable.create(push_five_strings)


source.subscribe(PrintObserver())

输出:

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!

你可以尝试buslane模块。

这个库使基于消息的系统的实现更容易。它支持命令(单个处理程序)和事件(0或多个处理程序)方法。Buslane使用Python类型注释来正确注册处理程序。

简单的例子:

from dataclasses import dataclass


from buslane.commands import Command, CommandHandler, CommandBus




@dataclass(frozen=True)
class RegisterUserCommand(Command):
email: str
password: str




class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):


def handle(self, command: RegisterUserCommand) -> None:
assert command == RegisterUserCommand(
email='john@lennon.com',
password='secret',
)




command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
email='john@lennon.com',
password='secret',
))

安装巴士道,只需使用pip:

$ pip install buslane
前段时间我写了一个库,可能对你有用。 它允许您拥有本地和全局侦听器、多种不同的注册方式、执行优先级等等。< / p >
from pyeventdispatcher import register


register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)


dispatch(Event("foo.bar", {"id": 1}))
# first second

看看pyeventdispatcher

如果你需要一个跨进程或网络边界的事件总线,你可以尝试PyMQ。它目前支持发布/订阅、消息队列和同步RPC。默认版本工作在一个Redis后端,所以你需要一个运行的Redis服务器。还有一个用于测试的内存后端。您还可以编写自己的后端。

import pymq


# common code
class MyEvent:
pass


# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
print('event received')


# publisher code
pymq.publish(MyEvent())


# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

初始化系统。

from pymq.provider.redis import RedisConfig


# starts a new thread with a Redis event loop
pymq.init(RedisConfig())


# main application control loop


pymq.shutdown()

声明:我是这个库的作者

另一个方便的包是事件。它封装了事件订阅和事件触发的核心,感觉很“自然”。语言的一部分。它似乎类似于c#语言,后者提供了一种方便的方式来声明、订阅和触发事件。从技术上讲,一个事件是一个“槽”;其中可以附加回调函数(事件处理程序)—称为订阅事件的流程。

# Define a callback function
def something_changed(reason):
print "something changed because %s" % reason


# Use events module to create an event and register one or more callback functions
from events import Events
events = Events()
events.on_change += something_changed

触发事件时,将按顺序调用所有附加的事件处理程序。要触发该事件,请对槽位执行调用:

events.on_change('it had to happen')

这将输出:

'something changed because it had to happen'

更多的文档可以在github回购文档中找到。