事件#
警告
实验性的 Ray Workflows 库已弃用,并将在 Ray 的未来版本中移除。
为了允许事件触发工作流,Ray Workflows 支持可插拔的事件系统。使用事件框架提供了以下一些特性。
高效地等待事件(无需在等待时运行工作流任务)。
支持恰好一次事件传递语义,同时提供容错能力。
与其他工作流任务一样,事件通过检查点支持容错。当事件发生时,事件会被检查点保存,然后可选择地提交。
使用事件#
工作流事件是一种特殊类型的工作流任务。它们在事件发生时“完成”。workflow.wait_for_event(EventListenerType)
可用于创建事件任务。
import time
import ray
from ray import workflow
# Create an event which finishes after 2 seconds.
event1_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2)
# Create another event which finishes after 1 seconds.
event2_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 1)
@ray.remote
def gather(*args):
return args
# Gather will run after 2 seconds when both event1 and event2 are done.
workflow.run(gather.bind(event1_task, event2_task))
HTTP 事件#
工作流支持通过 HTTP 发送外部事件。工作流中的 HTTP 事件监听器用于连接到 HTTP 端点。下面是一个在工作流中使用 HTTP 事件的端到端示例。
HTTPListener
用于在工作流中监听 HTTP 事件。每个 HTTPListener
订阅唯一的 workflow_id
和 event_key
对。要向监听器发送事件,外部客户端的 HTTP 请求应在请求 URL 中指定 workflow_id
,并在 JSON 请求正文中指定 event_key
和 event_payload
键(见下文)。
# File name: wait_for_event_http.py
# Create a task waiting for an http event with a JSON message.
# The JSON message is expected to have an event_key field
# and an event_payload field.
event_task = workflow.wait_for_event(HTTPListener, event_key="my_event_key")
obj_ref = workflow.run_async(event_task, workflow_id="workflow_receive_event_by_http")
可以使用 http://hostname:port/event/send_event/<workflow_id>
的 HTTP 端点发送事件。在本地,可通过 http://127.0.0.1:8000/event/send_event/<workflow_id>
访问该端点。请注意,HTTP 请求必须包含相同的 workflow_id
。每个请求还应包含一个 JSON 正文,其中包含两个字段:event_key
和 event_payload
,如下例所示。event_key
字段应与监听器端传递给 workflow.wait_for_event()
的参数匹配。在工作流中,一旦接收到 HTTP 事件,事件任务将返回 event_payload
字段的值。
总而言之,要在工作流中触发 HTTP 事件,外部客户端应具备
HTTP 端点地址(例如
http://127.0.0.1:8000/event/send_event
)workflow_id
(例如 “workflow_receive_event_by_http”)包含
event_key
和event_payload
字段的有效 JSON 格式消息,其中event_key
与工作流中使用的相匹配
HTTP 请求一旦被工作流接收到事件后,将收到回复。返回的状态码可以是
200:事件成功处理。
500:事件处理失败。
404:找不到
workflow_id
或event_key
,可能是因为事件在目标工作流任务准备好之前收到。
下面的代码片段展示了外部客户端发送 HTTP 请求的示例。
# File name: wait_for_event_http.py
res = requests.post(
"http://127.0.0.1:8000/event/send_event/"
+ "workflow_receive_event_by_http",
json={"event_key": "my_event_key", "event_payload": "my_event_message"},
)
if res.status_code == 200:
print("event processed successfully")
elif res.status_code == 500:
print("request sent but workflow event processing failed")
elif res.status_code == 404:
print("request sent but either workflow_id or event_key is not found")
自定义事件监听器#
可以通过继承 EventListener 接口来编写自定义事件监听器。
from ray.workflow.common import Event
class EventListener:
def __init__(self):
"""Optional constructor. Only the constructor with no arguments will be
called."""
pass
async def poll_for_event(self, *args, **kwargs) -> Event:
"""Should return only when the event is received."""
raise NotImplementedError
async def event_checkpointed(self, event: Event) -> None:
"""Optional. Called after an event has been checkpointed and a transaction can
be safely committed."""
pass
listener.poll_for_events()
协程在事件完成后应该结束。传递给 workflow.wait_for_event
的参数会传递给 poll_for_events()
。例如,一个休眠直到某个时间戳的事件监听器可以这样编写
class TimerListener(EventListener):
async def poll_for_event(self, timestamp):
await asyncio.sleep(timestamp - time.time())
event_checkpointed
例程可以被重写,以支持具有恰好一次传递语义的系统,这类系统通常遵循以下模式:
等待事件。
处理事件。
提交事件。
工作流完成事件的检查点保存后,事件监听器将被调用并可以释放事件。例如,为了确保从 kafkaesque<https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html#synchronous-commits>
队列中消费事件
KafkaEventType = ...
class QueueEventListener:
def __init__(self):
# Initialize the poll consumer.
self.consumer = Consumer({'enable.auto.commit': False})
async def poll_for_event(self, topic) -> KafkaEventType:
self.consumer.subscribe(topic)
message = await self.consumer.poll()
return message
async def event_checkpointed(self, event: KafkaEventType) -> None:
self.consumer.commit(event, asynchronous=False)
(高级) 事件监听器语义#
在编写复杂的事件监听器时,作者应该注意以下几个特性。
事件监听器**定义**必须是可序列化的
事件监听器实例**不**会被序列化。
事件监听器应该是**无状态的**。