如何在 Microservice/事件驱动架构中处理 HTTP 请求?

背景:

我正在构建一个应用程序,所提出的架构是基于微服务架构的事件/消息驱动架构。

做事情的单一方式是,我有一个 User/HTTP request,它的行动一些命令,有一个直接的 synchronous response。因此,响应相同的用户/HTTP 请求是“免费的”。

enter image description here

问题是:

用户向 用户界面服务(有多个 UI 服务)发送一个 HTTP request,它会向队列(Kafka/RabbitMQ/any)发送一些事件。一个 N 的服务拾起事件/消息做一些魔术一路上和 然后在某个时刻,同一个 UI 服务应该获取响应,并将其返回给发出 HTTP 请求的用户。请求处理是 ASYNC,但是 User/HTTP REQUEST->RESPONSESYNC根据您的典型 HTTP 交互。

问题: 在这个不可知/事件驱动的世界中,我如何向发起动作的同一 UI 服务(通过 HTTP 与用户交互的服务)发送响应?

目前为止我的研究 我一直在观察,似乎有些人正在用 WebSocket 解决这个问题。

但是复杂性层是需要一些映射 (RequestId->Websocket(Client-Server))的表,这些表用于“发现”网关中哪个节点具有针对某些特定响应的 websocket 连接。但是即使我理解了这个问题和复杂性,我仍然找不到任何文章来告诉我如何在实现层解决这个问题。这仍然不是一个可行的选择,因为第三方集成,如支付提供商(世界支付) ,期望 REQUEST->RESPONSE-特别是在3DS 验证。

所以我不愿意认为 WebSockets 是一个选项。但是,即使 WebSocket 适用于面向 Web 的应用程序,对于连接到外部系统的 API 来说,也不是一个很好的架构。

我不知道你在说什么

即使长轮询是使用 202 Accepted a Location headerretry-after header的 WebService API 的一种可能的解决方案,它也不适用于高并发性和高能力的网站。 想象一下,大量的人试图获得每个请求的事务状态更新,你不得不使 CDN 缓存失效(现在就去解决这个问题吧!哈)。

但是最重要的和我的案例相关的是我的第三方 API,比如支付系统,3DS 系统有自动重定向,由支付提供商系统处理,他们期望一个典型的 REQUEST/RESPONSE flow,因此这个模型对我不起作用,套接字模型也不起作用。

因为这个用例,HTTP REQUEST/RESPONSE应该按照典型的方式进行处理,其中我有一个哑客户机,它期望在后端处理进程的复杂性。

因此,我正在寻找一种解决方案,其中外部我有一个典型的 Request->Response(SYNC) ,并且状态的复杂性(系统的 ASYNCrony)在内部 处理

这是一个长轮询的例子,但是这个模型不适用于第三方 API,比如 3DS Redirects上不在我控制范围内的支付提供商。

 POST /user
Payload {userdata}
RETURNs:
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Date: Mon, 27 Nov 2018 17:25:55 GMT
Location: https://mydomain/user/transaction/status/:transaction_id
Retry-After: 10


GET
https://mydomain/user/transaction/status/:transaction_id

enter image description here

15766 次浏览

不幸的是,我相信您可能不得不使用长轮询或 web-socket 来完成类似的工作。您需要向用户“推送”某些内容,或者在某些内容返回之前保持 http 请求处于打开状态。

为了处理将数据返回给实际用户的问题,您可以使用类似于 Socket.io的工具。当用户连接时,socket.io 创建一个 id。任何时候用户进行连接时,都需要将 userid 映射到 id socket.io 给出的。 一旦每个请求都附加了一个 userid,您就可以将结果发送回正确的客户机。流程是这样的:

Web 请求订单(POST,带有数据和 userId)

Ui 服务对队列进行排序(这个排序应该包含 userId)

按顺序运行的服务数量(每次传递 userId)

Ui 服务从主题消费。在某个时候,数据出现在主题上。它使用的数据具有 userId,ui 服务查找映射以确定向哪个套接字发出。

无论在 UI 上运行什么代码,都需要是事件驱动的,因此它将处理没有原始请求上下文的数据推送。你可以使用类似 复制的东西。从本质上讲,您可以让服务器在客户机上创建 redux 操作,它工作得非常好!

希望这个能帮上忙。

好问题。我对此的回答是,在系统中引入同步流。

我使用 rabbitMq,所以我不知道卡夫卡,但你应该搜索卡夫卡的同步流。

WebSockets 似乎有点夸张。

希望能帮上忙。

如果你想要实时,Socket.io 也是一个解决方案。

再看看 CQRS,这个架构模式适合事件驱动模型和微服务架构。

那更好了,看看 这个

下面是一个非常简单的示例,说明如何实现 用户界面服务,使其能够与普通的 HTTP 请求/响应流一起工作。它使用 node.js events.EventEmitter类将响应“路由”到正确的 HTTP 处理程序。

执行大纲:

  1. 连接生产者/消费者客户端到卡夫卡

    1. 生成器用于将请求数据发送到内部微服务
    2. 使用者用于侦听来自微服务的数据,这意味着请求已经被处理,我假设这些 Kafka 条目也包含应该返回给 HTTP 客户端的数据。
  2. EventEmitter类创建一个全局 事件调度程序事件调度程序

  3. 注册一个 HTTP 请求处理程序
    1. 为请求创建一个 UUID,并将其包含在推送到 Kafka 的有效负载中
    2. 向事件调度程序注册事件侦听器,其中 UUID 用作它侦听的事件名
  4. 开始使用 Kafka 主题,检索 HTTP 请求处理程序正在等待的 UUID,并为其发出一个事件。在示例代码中,我没有在发出的事件中包含任何有效负载,但是您通常希望包含来自 Kafka 数据的一些数据作为参数,以便 HTTP 处理程序可以将其返回给 HTTP 客户机。

请注意,我试图保持代码尽可能小,省略了错误和超时处理等!

还要注意,为了简化测试,kafkaProduceTopickafkaConsumTopic是相同的主题,不需要另一个服务/功能来产生 用户界面服务消费主题。

代码假设 kafka-nodeuuid包已经安装了 npm,并且可以在 localhost:9092上访问卡夫卡

const http = require('http');
const EventEmitter = require('events');
const kafka = require('kafka-node');
const uuidv4 = require('uuid/v4');


const kafkaProduceTopic = "req-res-topic";
const kafkaConsumeTopic = "req-res-topic";


class ResponseEventEmitter extends EventEmitter {}


const responseEventEmitter = new ResponseEventEmitter();


var HighLevelProducer = kafka.HighLevelProducer,
client = new kafka.Client(),
producer = new HighLevelProducer(client);


var HighLevelConsumer = kafka.HighLevelConsumer,
client = new kafka.Client(),
consumer = new HighLevelConsumer(
client,
[
{ topic: kafkaConsumeTopic }
],
{
groupId: 'my-group'
}
);


var s = http.createServer(function (req, res) {
// Generate a random UUID to be used as the request id that
// that is used to correlated request/response requests.
// The internal micro-services need to include this id in
// the "final" message that is pushed to Kafka and consumed
// by the ui service
var id = uuidv4();


// Send the request data to the internal back-end through Kafka
// In real code the Kafka message would be a JSON/protobuf/...
// message, but it needs to include the UUID generated by this
// function
payloads = [
{ topic: kafkaProduceTopic, messages: id},
];
producer.send(payloads, function (err, data) {
if(err != null) {
console.log("Error: ", err);
return;
}
});


responseEventEmitter.once(id, () => {
console.log("Got the response event for ", id);
res.write("Order " + id + " has been processed\n");
res.end();
})
});


s.timeout = 10000;
s.listen(8080);


// Listen to the Kafka topic that streams messages
// indicating that the request has been processed and
// emit an event to the request handler so it can finish.
// In this example the consumed Kafka message is simply
// the UUID of the request that has been processed (which
// is also the event name that the response handler is
// listening to).
//
// In real code the Kafka message would be a JSON/protobuf/... message
// which needs to contain the UUID the request handler generated.
// This Kafka consumer would then have to deserialize the incoming
// message and get the UUID from it.
consumer.on('message', function (message) {
responseEventEmitter.emit(message.value);
});

从一个更普遍的角度来看——在接收请求时,你可以在当前请求的上下文中注册一个订阅者(意味着当请求对象在作用域内时) ,该订阅者在负责的服务完成其作业时接收到一个确认(就像一个状态机,它维护总操作数的进度)。当达到终止状态时,它返回响应并删除侦听器。我认为这在任何发布/订阅样式的消息队列中都适用。下面是我的建议的一个过于简化的演示。

// a stub for any message queue using the pub sub pattern
let Q = {
pub: (event, data) => {},
sub: (event, handler) => {}
}
// typical express request handler
let controller = async (req, res) => {
// initiate saga
let sagaId = uuid()
Q.pub("saga:register-user", {
username: req.body.username,
password: req.body.password,
promoCode: req.body.promoCode,
sagaId: sagaId
})
// wait for user to be added
let p1 = new Promise((resolve, reject) => {
Q.sub("user-added", ack => {
resolve(ack)
})
})
// wait for promo code to be applied
let p2 = new Promise((resolve, reject) => {
Q.sub("promo-applied", ack => {
resolve(ack)
})
})


// wait for both promises to finish successfully
try {
    

var sagaComplete = await Promise.all([p1, p2])
// respond with some transformation of data
res.json({success: true, data: sagaComplete})


} catch (e) {
logger.error('saga failed due to reasons')
// rollback asynchronously
Q.pub('rollback:user-added', {sagaId: sagaId})
Q.pub('rollback:promo-applied', {sagaId: sagaId})
// respond with appropriate status
res.status(500).json({message: 'could not complete saga. Rolling back side effects'})
}
  

}

正如您可能看到的,这看起来像是一个通用模式,可以将其抽象为一个框架,以减少代码重复并管理横切关注点。这就是 长篇故事模式的本质。客户机将只等待完成所需操作所需的时间(即使所有操作都是同步的,也会发生这种情况) ,再加上由于服务间通信而增加的延迟。如果使用基于事件循环的系统(如 NodeJS 或 Python 旋风) ,请确保不要阻塞线程。

简单地使用基于 web-socket 的推送机制并不一定能提高系统的效率或性能。然而,建议您使用套接字连接将消息推送到客户端,因为它使您的体系结构更加通用(甚至您的客户端的行为也与您的服务一样)、一致并允许更好的关注点分离。它还允许您独立地扩展推送服务,而不必担心业务逻辑。可以扩展 saga 模式,以便在出现部分故障或超时时启用回滚,从而使系统更易于管理。

正如我所期望的那样,人们试图把每件事都融入到一个概念中,即使它并不适合那里。这不是批评,这是根据我的经验和阅读你的问题和其他答案后的观察。

是的,你是对的,微服务架构是基于异步消息传递模式的。然而,当我们谈论 UI 的时候,在我的脑海中有两种可能的情况:

  1. 用户界面需要立即响应(例如读操作或那些用户希望立即响应的命令)。这些不一定是异步的.如果立即在屏幕上需要响应,为什么要增加消息传递和异步的开销呢?没道理啊。微服务体系结构应该解决问题,而不是通过增加开销来创建新的问题。

  2. 可以对 UI 进行重构以容忍延迟响应(例如,在准备响应时,UI 可以直接提交命令、接收确认并让用户做其他事情,而不必等待结果)。在这种情况下,可以引入异步。入口服务(UI 直接与之交互)可以协调异步处理(等待完成事件等等) ,准备好后,它可以与 UI 进行通信。我见过 UI 在这种情况下使用 SignalR,网关服务是一个接受套接字连接的 API。如果浏览器不支持套接字,理想情况下应该回到轮询。无论如何,重点是,这只能与一个偶然事件: 用户界面可以容忍延迟回答工作。

如果微服务确实与您的情况相关(情况2) ,那么相应地构建 UI 流,并且后端的微服务中不应该存在挑战。在这种情况下,你的问题可以归结为对服务集合应用事件驱动架构(edge 是连接事件驱动和 UI 交互的网关微服务)。这个问题(事件驱动的服务)是可以解决的,您知道这一点。您只需要决定是否可以重新考虑 UI 的工作方式。

问: 在这个不可知/事件驱动的世界中,如何向发起操作(通过 HTTP 与用户交互的服务)的同一 UI 服务发送响应?

因此,我正在寻找一种解决方案,其中外部有一个典型的 Request-> Response (SYNC) ,而状态的复杂性(系统的 ASYNCrony)在内部处理

我认为重要的是要注意以下几点。

简而言之:

  • 系统的边缘可以是同步的,同时它以异步的基于事件的方式在内部处理内容。参见: 插图倒车

  • UI 服务(Web 服务器或 API 网关)可以启动一个异步/等待函数(用 Node.js 编写) ,然后继续处理其他请求。然后,当 UI 服务从后端的另一个微服务接收到“ OrderConfirmed”事件(例如,通过监听 Kafka 日志)时,它将再次拾取用户请求(回调)并向客户端发送指定的响应。

  • 试试 反应式交互网关(RIG),它会为您处理这个问题。

长话短说:

  • 客户端可以同步服务,而后端可以异步处理其内部事件。客户不知道,或者需要知道。但客户必须等待。因此,要小心处理时间过长。因为客户端的 HTTP 请求可能会超时(对于 HTTP 请求,通常是 30秒至2分钟)。或者,如果 UI 服务在 云端功能中运行,它也可能超时; 默认值为1分钟后,但可以扩展到9分钟)。但是,如果请求是同步的端到端请求,超时实际上也是一个问题。异步事件驱动架构并没有改变这一点,它只是提升了人们心中的担忧。

  • 由于客户端发送了一个同步请求,它被阻塞了(用户必须在 UI 中等待)。但是,这并不意味着 UI 服务(Web 服务器或 API 网关)必须被阻塞。它只需启动一个异步/等待函数(用 Node.js 编写) ,然后继续处理其他请求。然后,当 UI 服务接收到“ OrderConfirmed”事件时,它将再次拾取该函数(回调)并将指定的响应发送给客户端。

  • 任何位于后端边缘的(微)服务都可以通过典型的 HTTP 请求/响应流与第三方系统进行同步交互(尽管通常您希望在这里也进行异步/等待,以便在第三方处理时释放您自己的微服务资源)。当它同步地从第三方接收到响应时,它就可以向后端的其余部分发送一个异步事件(例如‘ StockResupied’事件)。如果 UI 服务被设计为在给出响应之前等待这样的事件,那么它可以传播回客户端。

灵感来源: Brad Irby 对相关问题的回答

除上述方法外,另一种方法是:

设计客户端的用户界面的方式,使用户不必阻塞/等待(可能已经给出了一个乐观的成功响应,符合“乐观的用户界面”原则) ,但随后异步接收一个 服务器发送事件(SSE)

  • 服务器发送事件(Server-Sent Events,SSE)是一种技术,它使浏览器(客户端)能够通过 HTTP 连接从服务器接收自动更新,如基于文本的事件数据

  • SSE 在后台“只使用一个长期存在的 HTTP 连接”。

你可能还想知道:

  • “很高兴知道 SSE 受到最大打开连接数量的限制,这在打开各种标签时尤其痛苦,因为每个浏览器的限制是6个。”

  • “与 SSE 相比,WebSocket 的设置要复杂得多,而且任务要求高得多。”

资料来源: https://www.telerik.com/blogs/websockets-vs-server-sent-events

此外:

使用 带有 ServiceWorker 的 Web 推送 API 是 SSE 的另一种选择会给用户带来烦人的弹出窗口,而且更适合在浏览器外和/或 web 应用关闭时发送通知。

你可能还想看看:

反应交互网关(RIG) ,对于一些不错的 建筑结构图和一个 自述一个 href = “ https://accenture.github.io/抽象-交互-网关/docs/Feature ures.html # sync ously——-asnysync-response”rel = “ nofollow noReferrer”> 说明了这个答案中的主要观点。RIG 是一个针对微服务的可伸缩的免费开源 API 网关,可以处理 SSE、 WebSocket、长轮询等。“订阅卡夫卡主题,同时保持与所有活跃前端的连接,将事件转发给它们所寻找的用户,所有这些都以可伸缩的方式进行。除此之外,它还处理授权,所以你的服务也不必关心这个。”