事件#

警告

实验性的 Ray Workflows 库已弃用,并将在 Ray 的未来版本中移除。

为了允许事件触发工作流,Ray Workflows 支持可插拔的事件系统。使用事件框架提供了以下一些特性。

  1. 高效地等待事件(无需在等待时运行工作流任务)。

  2. 支持恰好一次事件传递语义,同时提供容错能力。

与其他工作流任务一样,事件通过检查点支持容错。当事件发生时,事件会被检查点保存,然后可选择地提交。

使用事件#

工作流事件是一种特殊类型的工作流任务。它们在事件发生时“完成”。workflow.wait_for_event(EventListenerType) 可用于创建事件任务。

import time
import ray
from ray import workflow

# Create an event which finishes after 2 seconds.
event1_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2)

# Create another event which finishes after 1 seconds.
event2_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 1)

@ray.remote
def gather(*args):
    return args

# Gather will run after 2 seconds when both event1 and event2 are done.
workflow.run(gather.bind(event1_task, event2_task))

HTTP 事件#

工作流支持通过 HTTP 发送外部事件。工作流中的 HTTP 事件监听器用于连接到 HTTP 端点。下面是一个在工作流中使用 HTTP 事件的端到端示例。

HTTPListener 用于在工作流中监听 HTTP 事件。每个 HTTPListener 订阅唯一的 workflow_idevent_key 对。要向监听器发送事件,外部客户端的 HTTP 请求应在请求 URL 中指定 workflow_id,并在 JSON 请求正文中指定 event_keyevent_payload 键(见下文)。

# File name: wait_for_event_http.py
# Create a task waiting for an http event with a JSON message.
# The JSON message is expected to have an event_key field
# and an event_payload field.

event_task = workflow.wait_for_event(HTTPListener, event_key="my_event_key")

obj_ref = workflow.run_async(event_task, workflow_id="workflow_receive_event_by_http")

可以使用 http://hostname:port/event/send_event/<workflow_id> 的 HTTP 端点发送事件。在本地,可通过 http://127.0.0.1:8000/event/send_event/<workflow_id> 访问该端点。请注意,HTTP 请求必须包含相同的 workflow_id。每个请求还应包含一个 JSON 正文,其中包含两个字段:event_keyevent_payload,如下例所示。event_key 字段应与监听器端传递给 workflow.wait_for_event() 的参数匹配。在工作流中,一旦接收到 HTTP 事件,事件任务将返回 event_payload 字段的值。

总而言之,要在工作流中触发 HTTP 事件,外部客户端应具备

  • HTTP 端点地址(例如 http://127.0.0.1:8000/event/send_event

  • workflow_id(例如 “workflow_receive_event_by_http”)

  • 包含 event_keyevent_payload 字段的有效 JSON 格式消息,其中 event_key 与工作流中使用的相匹配

HTTP 请求一旦被工作流接收到事件后,将收到回复。返回的状态码可以是

  1. 200:事件成功处理。

  2. 500:事件处理失败。

  3. 404:找不到 workflow_idevent_key,可能是因为事件在目标工作流任务准备好之前收到。

下面的代码片段展示了外部客户端发送 HTTP 请求的示例。

# File name: wait_for_event_http.py
res = requests.post(
        "http://127.0.0.1:8000/event/send_event/"
        + "workflow_receive_event_by_http",
        json={"event_key": "my_event_key", "event_payload": "my_event_message"},
    )
if res.status_code == 200:
    print("event processed successfully")
elif res.status_code == 500:
    print("request sent but workflow event processing failed")
elif res.status_code == 404:
    print("request sent but either workflow_id or event_key is not found")

自定义事件监听器#

可以通过继承 EventListener 接口来编写自定义事件监听器。

from ray.workflow.common import Event

class EventListener:
    def __init__(self):
        """Optional constructor. Only the constructor with no arguments will be
          called."""
        pass

    async def poll_for_event(self, *args, **kwargs) -> Event:
        """Should return only when the event is received."""
        raise NotImplementedError

    async def event_checkpointed(self, event: Event) -> None:
        """Optional. Called after an event has been checkpointed and a transaction can
          be safely committed."""
        pass

listener.poll_for_events() 协程在事件完成后应该结束。传递给 workflow.wait_for_event 的参数会传递给 poll_for_events()。例如,一个休眠直到某个时间戳的事件监听器可以这样编写

class TimerListener(EventListener):
    async def poll_for_event(self, timestamp):
        await asyncio.sleep(timestamp - time.time())

event_checkpointed 例程可以被重写,以支持具有恰好一次传递语义的系统,这类系统通常遵循以下模式:

  1. 等待事件。

  2. 处理事件。

  3. 提交事件。

工作流完成事件的检查点保存后,事件监听器将被调用并可以释放事件。例如,为了确保从 kafkaesque<https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html#synchronous-commits> 队列中消费事件

KafkaEventType = ...

class QueueEventListener:
    def __init__(self):
        # Initialize the poll consumer.
        self.consumer = Consumer({'enable.auto.commit': False})

    async def poll_for_event(self, topic) -> KafkaEventType:
        self.consumer.subscribe(topic)

        message = await self.consumer.poll()
        return message

    async def event_checkpointed(self, event: KafkaEventType) -> None:
         self.consumer.commit(event, asynchronous=False)

(高级) 事件监听器语义#

在编写复杂的事件监听器时,作者应该注意以下几个特性。

  • 事件监听器**定义**必须是可序列化的

  • 事件监听器实例**不**会被序列化。

  • 事件监听器应该是**无状态的**。