小程序
传感搜
传感圈

如何在开源项目Cadence中实现轮询? 译文 精选

2022-07-16
关注

本指南适用于所有希望了解Cadence中轮询工作原理的开发人员和工程师。Cadence是相对较新(且完全开源)的容错状态代码平台,最初由Uber开发(现在得到了包括Instaclustr在内的更多公司的支持)。

Cadence的优势 

大量用例遍及单个请求-回复、复杂的状态追踪和异步事件响应,并与外部的不可靠依赖项进行通讯。构建此类应用程序的常用方法是将无状态服务、数据库、定时任务和队列系统像大杂烩一样整合在一起。

然而,这会对开发人员产生负面影响,因为大部分代码都是用于管道的——这掩盖了大量底层细节背后的实际业务逻辑。Cadence是一个完全开源的编排框架,可以帮助开发人员编写高容错且能够长时间运行的应用程序,这通常也被称为工作流。

从本质上讲,它提供了一个与特定进程无关联的虚拟内存,并保留了完整的应用程序状态,包括函数堆栈以及兼容各种主机和软件故障的局部变量。这使得开发人员在编写代码时能够充分利用编程语言的功能特性,Cadence则负责应用程序的持久性、可用性和可扩展性。由于繁忙等待通常会消耗大量非必要的CPU周期,因而应尽可能避免使用轮询,而是使用由事件触发的中断来进行实现,除非以下两者情况:

  • 只需要进行短时间的轮询
  • 能够接受在轮询过程中出现合理的等待

对于计算机而言,这相当于在长途旅行中每5分钟询问一次距离目的地还有多远。尽管如此,在很多情况下,这是唯一可用的选择。Cadence为持久计时器、长时间运行的活动和无限制重试提供强大的支持,使得其非常适合此类功能的实现。

使用Cadence轮询外部服务 

实现轮询机制有很多种方法。本文主要讲实现对外部服务的轮询,并分析这样做会从Cadence中获得怎样的收益。首先,我们来简单的解释一下Cadence的概念。Cadence的核心理念是一个无故障状态的工作流。这意味着工作流代码的状态,包括局部变量和它创建的任何线程,不受进程和Cadence服务故障的影响。这是一个非常强大的理念,因为它封装了状态、线程处理、持久计时器和事件处理程序。为了满足确定性的执行要求,工作流不允许直接调用任何外部API。相反,它们负责对活动的执行进行调度。活动是用来实现业务级功能的应用程序逻辑,例如调用服务或对媒体文件进行转码。当活动出现故障时,Cadence并不会恢复其运行状态。因此,活动函数可以包含任何代码,且不会受到任何限制。

轮询的实现 

代码本身非常简单——我们将逐行解释代码的作用:

State polledState = externalServiceActivities.getState();   while(!expectedState.equals(polledState)) {
Workflow.sleep(Duration.ofSeconds(30));
polledState = externalServiceActivities.getState();
}

左右滑动查看完整代码

我们首先调用一个活动,在这种情况下,外部服务可能是REST API。然后我们就需要进行条件判断。如果未达到所需的状态,会有10秒的等待。

这不是通常意义上的等待,而是一个持久的计时器。在这种情况下,轮询会执行周期性的等待,但时间可能会更长;而且,如果执行失败,我们一定不会希望浪费整个时间周期。Cadence通过将计时器以事件的方式进行持久化,并在完成后通知相应的工作服务(即管理工作流和活动实施的服务)来解决此问题。

这些计时器可以对从几秒到几分钟、几小时、几天甚至几个月或几年的时间间隔进行管理。最后,通过再次调用外部服务来刷新状态。 在继续进行操作之前,我们先快速了解一下Cadence究竟在后台做了哪些工作来避免潜在的问题。

重要提醒:Cadence历史记录和轮询注意事项 

Cadence是如何实现无故障状态工作流的呢?关键在于Cadence是如何坚持用工作流程执行实现的。工作流状态恢复利用事件溯源,而事件溯源对代码的编写方式施加了一些限制。事件溯源将一系列不断变化的事件转为持久化的状态。

每当工作流状态发生变化时,都会有一个新的事件追加到该工作流的事件历史记录中。然后,Cadence通过历史记录来进行操作重放,以重新建立工作流的当前状态。这就是为什么与外部环境的所有通信都应该通过活动进行,并且必须使用Cadence API来获取当前时间、等待和创建新线程。

1、谨慎使用轮询​

轮询需要根据判断条件不断地循环。由于每个活动调用和计时器事件都是持久的,因此即使是短的轮询间隔也可能会演变成不可接受的时间消耗。现在我们来研究轮询片段的历史记录会以怎样的方式呈现。

  • 首先建立轮询外部服务所需的活动。 
  • 活动由工作服务启动。 
  • 活动完成后返回其结果。  
  • 如果条件尚未满足,则启动计时器。 
  • 一旦超时,就会触发一个事件来唤醒工作流。 
  • 重复上面5个步骤,直到条件满足。 
  • 最终的轮询确认条件满足(无需设置定时器)。 
  • 工作流被标记为完成。


Cadence中轮询代码片段的事件历史记录

如果工作流在中间某个地方失败,且必须重放其历史操作记录,这可能会导致大量的事件清单被执行。有一些方法可以避免这些操作脱离掌控:避免使用较短的轮询周期,在工作流中设置合理的超时时间,限制轮询的次数。

记住所有操作都是持久的,可能需要由人重放操作。

2、配置活动重试次数​

如果外部服务由于某些原因失败了怎么办?我们需要尝试,尝试,再尝试!Cadence存在一种机制,可以让Cadence记录活动结果并能够完美地恢复工作流状态,同时还提供了对类似重试逻辑等额外功能的支持。 下面是启用重试选项的活动配置示例:

private final ExternalServiceActivities externalServiceActivities =    Workflow.newActivityStub(ExternalServiceActivities.class,   new ActivityOptions.Builder()
.setRetryOptions(new RetryOptions.Builder() .setInitialInterval(Duration.ofSeconds(10)) .setMaximumAttempts(3)
.build())
.setScheduleToCloseTimeout(Duration.ofMinutes(5)) .build());

​​左右滑动查看完整代码

通过这样的操作,我们告诉Cadence,在ExternalServiceActivities中的操作最多可以重试3次,且每次重试的间隔为10秒。这样,每个对外部服务活动的调用都可以轻松的实现重试功能,且无需编写任何重试逻辑。

用例示例:Instafood和MegaBurgers 

为了展示这种模式的实际效果,我们将在示例项目中集成一个虚构的轮询。

1、Instafood简介​

Instafood是一个基于在线应用的送餐服务。客户可以通过Instafood的移动应用从他们当地最喜欢的餐厅中订购食物。订单可以是自取或外卖。

如果选择外卖,Instafood将通知其外卖司机从餐厅取餐并将其送到客户手中。Instafood为每个餐厅提供一个展示屏或平板电脑,用于Instafood和餐厅之间的通信。客户下单后,Instafood就会通知餐厅,然后餐厅可以接受订单、提供预计完成时间或将其标记为已完成等。对于外送订单,Instafood将根据预计完成时间协调外卖司机取餐。

2、轮询"MegaBurgers"​

MegaBurgers是一家大型跨国快餐汉堡连锁店。他们有自己的移动应用程序和网站,并使用REST API作为后端为客户提供订单服务。Instafood和MegaBurgers已达成协议,Instafood客户可以通过Instafood的应用程序在MegaBurger下单,并可选择自取和外卖。与通用方案不同的是,MegaBurger并未选择在所有店面安装Instafood展示屏,而是同意将Instafood的订餐系统以集成的方式与自身基于REST的订餐系统进行对接,以完成下单和接收更新。

MegaBurger的订单状态机

MegaBurger的REST API没有推送机制(WebSockets、WebHooks等),无法接收订单状态更新。

相反,其需要定期发送GET请求来确定订单状态,这些轮询可能会导致订单工作流在Instafood端反复执行(例如安排外卖司机取餐)。

建立Instafood项目 

你需要配置一个Cadence集群来运行示例项目。在此示例中,我们将使用Instaclustr平台来执行操作。

第1步:创建Instaclustr托管集群​

Cadence集群需要连接Apache Cassandra集群作为持久层。为了成功配置Cadence和Cassandra集群,我们将遵循“创建Cadence集群”文档的操作指导。

  • 以下操作会自动进行初始化,无需人为干预:
  • 防火墙规则将在Cassandra集群上为Cadence节点自动生成配置。
  • Cadence和Cassandra之间的身份验证(包括客户端加密)将会自动生成配置。
  • Cadence的默认配置和键空间的可见性将在Cassandra中自动创建。
  • 两个集群之间会建立关联关系,以确保不会在停止Cadence集群之前因意外而删除Cassandra集群。
  • 将创建一个负载均衡器。建议通过负载均衡器地址来连接和访问集群。

第2步:配置Cadence域​

Cadence的后端由多租户服务提供支持,其中隔离单元被称为域。为了让Instafood应用程序运行,我们首先需要为它注册一个域。

1、为了与Cadence集群进行交互,我们需要安装其命令行界面客户端。

macOS​

如果使用macOS,可以通过Homebrew安装Cadence CLI,如下所示:

brew install cadence-workflow
# run command line client
cadence <command> <arguments>

其他操作系统​

可以通过Docker Hub镜像仓库ubercadence/cli来运行和使用CLI:

# run command line client
docker run --network=host --rm ubercadence/cli:master <command> <arguments>

左右滑动查看完整代码

在以后的步骤中,我们将使用cadence来指代客户端。

2、为了连接的稳定性,建议通过负载均衡器地址来连接和访问集群。可以在“连接信息”选项卡的顶部找到负载均衡器地址,如下所示:

“ab-cd12ef23-45gh-4baf-ad99-df4xy-azba45bc0c8da111.elb.us-east 1.amazonaws.com”

左右滑动查看完整代码

我们将其称为<cadence_host>。

3、现在可以通过列出当前域来测试连接:

cadence --ad <cadence_host>:7933 admin domain list

4、添加instafood域:

cadence --ad <cadence_host>:7933 --do instafood domain register --global_domain=false

​左右滑动查看完整代码

5、检查它是否已经注册:

cadence --ad <cadence_host>:7933 --do instafood domain describe

第3步:运行Instafood示例项目​

1、从Instafood项目的Git代码仓库中克隆Gradle项目。

2、打开位于instafood/src/main/resources/instafood.properties路径的配置文件,将cadenceHost的值替换为自己的负载均衡器地址:

cadenceHost=<cadence_host>

​3、通过以下方式运行该应用程序:

cadence-cookbooks-instafood/instafood$ ./gradlew run

或从IDE中执行InstafoodApplication的main class:

​4、查看终端输出以确认其是否已经正常运行:

​了解MegaBurger的API 

在了解Instafood如何与MegaBurger集成之前,让我们先快速了解一下他们的API。

1、运行MegaBurger服务​

让我们从运行服务开始。通过以下命令来启动服务:

cadence-cookbooks-instafood/megaburger$ ./gradlew run

或者在IDE中运行MegaburgerRestApplication。这是一个以内存作为持久层的Spring Boot Rest API演示示例。当应用程序关闭时,所有数据都会丢失。

2、MegaBurger的订单API​

MegaBurger发布其Orders API以便跟踪和更新每个食品订单的状态。

POST /orders

创建一个订单并返回其ID。

Request:

curl -X POST localhost:8080/orders -H “Content-Type: application/json” --data ‘{“meal”: “Vegan Burger”, “quantity”: 1}’

左右滑动查看完整代码

Response:

{
“id”: 1,
“meal”: “Vegan Burger”,
“quantity”: 1,
“status”: “PENDING”,
“eta_minutes”: null
}

GET /orders​

返回一个包含所有订单信息的列表。

Request:

curl -X GET localhost:8080/orders

​Response:

[
{
“id”: 0,
“meal”: “Vegan Burger”,
“quantity”: 1,
“status”: “PENDING”,
“eta_minutes”: null
},
{
“id”: 1,
“meal”: “Onion Rings”,
“quantity”: 2,
“status”: “PENDING”,
“eta_minutes”: null
}
]


GET /orders / {orderId}

返回ID与orderId一致的订单。

Request:

curl -X GET localhost:8080/orders/1

​Response:

{
“id”: 1,
“meal”: “Onion Rings”,
“quantity”: 2,
“status”: “PENDING”,
“eta_minutes”: null
}

PATCH /orders/{orderId}

更新ID与orderId一致的订单

Request:

curl -X PATCH localhost:8080/orders/1 -H “Content-Type: application/ json” --data ‘{“status”:“ACCEPTED”}’

​左右滑动查看完整代码

Response:

{
“id”: 1,
“meal”: “Onion Rings”,
“quantity”: 2,
“status”: “ACCEPTED”,
“eta_minutes”: null
}

 MegaBurger轮询集成项目回顾 

现在已经完成了所有配置的初始化,让我们看看Instafood和MegaBurger之间集成的实际效果如何。

1、轮询工作流​

首先定义新的工作流MegaBurgerOrderWorkflow:

public interface MegaBurgerOrderWorkflow {
@WorkflowMethod
void orderFood(FoodOrder order);
// ...
}

此工作流有一个orderFood方法,该方法将通过与MegaBurger集成来发送和跟踪相应的FoodOrder。

现在来看看它的实现方式:

public class MegaBurgerOrderWorkflowImpl implements MegaBurgerOrderWork flow {
// ...
@Override
public void orderFood(FoodOrder order) {
OrderWorkflow parentOrderWorkflow = getParentOrderWorkflow();
Integer orderId = megaBurgerOrderActivities.createOrder(mapMega BurgerFoodOrder(order));
updateOrderStatus(parentOrderWorkflow, OrderStatus.PENDING);
// Poll until Order is accepted/rejected
updateOrderStatus(parentOrderWorkflow, pollOrderStatusTransition(orderId, OrderStatus.
PENDING));
if (OrderStatus.REJECTED.equals(currentStatus)) {
throw new RuntimeException(“Order with id “ + orderId + “ was rejected”);
}
// Send ETA to parent workflow
parentOrderWorkflow.updateEta(getOrderEta(orderId)); // Poll until Order is cooking
updateOrderStatus(parentOrderWorkflow, pollOrderStatusTransition(orderId, OrderStatus.ACCEPTED)); // Poll until Order is ready
updateOrderStatus(parentOrderWorkflow, pollOrderStatusTransition(orderId, OrderStatus.COOKING)); // Poll until Order is delivered
updateOrderStatus(parentOrderWorkflow,
pollOrderStatusTransition(orderId, OrderStatus.READY)); }
// ...
}

左右滑动查看完整代码

该工作流首先获取其父工作流。MegaBurgerOrderWorkflow只处理与MegaBurger的集成,将订单交付给由独立工作流管理的客户端处理;这意味着我们使用的是子工作流。然后,通过活动来创建订单,并获得订单ID。

活动只是API客户端的装饰器,该API客户端负责发送POST请求到/orders。创建订单后,父工作流会收到一个订单现在处于PENDING状态的信号(这是一个发送给工作流的,来自外部的异步请求)。

现在我们必须等待订单从PENDING转变为ACCEPTED或REJECTED。这就是轮询发挥作用的地方。现在看看我们的函数pollOrderStatusTransition做了什么:

private OrderStatus pollOrderStatusTransition(Integer orderId,  OrderStatus orderStatus) { OrderStatus polledStatus =
megaBurgerOrderActivities.getOrderById(orderId).getStatus(); while (orderStatus.equals(polledStatus)) {
Workflow.sleep(Duration.ofSeconds(30));
polledStatus = megaBurgerOrderActivities.
getOrderById(orderId).getStatus();
}
return polledStatus;
}

左右滑动查看完整代码

这与本文介绍的其他轮询循环非常相似。唯一的区别是它用一个轮询的特定状态代替等待,直到订单状态发生变化。同样的,用于通过ID获取订单的真实API调用隐藏在活动的后面,该活动启用了重试功能。如果订单被拒绝,则会引发运行状态异常,使工作流失败。如果订单被接受,则将MegaBurger的预计完成时间返回给父工作流(父工作流使用预计完成时间来完成交付调度)。最后,图3中所示的状态将会被转换,直到订单被标记为已交付。

2、运行正常的场景​

最后,让完成一个完整的订单场景。

这个场景是示例项目中测试套件的一部分。唯一的要求是同时运行Instafood和MegaBurger服务器,然后按照前文中的步骤操作。

测试用例描述了客户端通过Instafood下单MegaBurger的新素食汉堡,并且来店面取餐:

cadence-cookbooks-instafood/instafood$ ./gradlew test

或在IDE中运行InstafoodApplicationTest:

class InstafoodApplicationTest {
// ...
@Test
public void
givenAnOrderItShouldBeSentToMegaBurgerAndBeDeliveredAccordingly() { FoodOrder order = new FoodOrder(Restaurant.MEGABURGER, “Vegan Burger”, 2, “+54 11 2343-2324”, “Díaz velez 433, La lucila”, true);
// Client orders food
WorkflowExecution workflowExecution= WorkflowClient start(orderWorkflow::orderFood, order);
// Wait until order is pending Megaburger’s acceptance await().until(() -> OrderStatus.PENDING.equals(orderWorkflow. getStatus()));
// Megaburger accepts order and sends ETA
megaBurgerOrdersApiClient.updateStatusAndEta(getLastOrderId(), “ACCEPTED”, 15);
await().until(() -> OrderStatus.ACCEPTED.equals(orderWorkflow. getStatus()));
// Megaburger starts cooking order
megaBurgerOrdersApiClient.updateStatus(getLastOrderId(), “COOKING”);
await().until(() -> OrderStatus.COOKING.equals(orderWorkflow. getStatus()));
// Megaburger signals order is ready
megaBurgerOrdersApiClient.updateStatus(getLastOrderId(), “READY”);
await().until(() -> OrderStatus.READY.equals(orderWorkflow. getStatus()));
// Megaburger signals order has been picked-up
megaBurgerOrdersApiClient.updateStatus(getLastOrderId(), “RESTAURANT_DELIVERED”);
await().until(() -> OrderStatus.RESTAURANT_DELIVERED.
equals(orderWorkflow.getStatus()));
await().until(() -> workflowHistoryHasEvent(workflowClient, workflowExecution, EventType.WorkflowExecutionCompleted)): }
}

左右滑动查看完整代码

在这个场景中,有3个参与者:Instafood、MegaBurger和客户端。

1. 客户端将订单发送到Instafood。 

2. 一旦订单到达MegaBurger(订单状态为PENDING),MegaBurgers将其标记为ACCEPTED并返回预计完成时间。 

3. 然后我们看下整个状态更新序列: 

  • MegaBurger将订单标记为COOKING。 
  • MegaBurger将订单标记为READY(这意味着它已准备好外送或自取)。
  • MegaBurger将订单标记为RESTAURANT_DELIVERED 。 

4. 由于该订单是以取餐的形式交付,因此一旦客户端完成交付,整个工作流程就结束了。

总结 

在本文中,我们学习了如何使用Cadence实现轮询。我们展示了如何让Cadence集群在Instaclustr平台上运行,以及让应用程序连接到它是多么容易。参考链接:https://dzone.com/articles/how-to-use-open-source-cadence-for-polling

译者介绍

仇凯,51CTO社区编辑,目前就职于北京宅急送快运股份有限公司,职位为信息安全工程师。主要负责公司信息安全规划和建设(等保,ISO27001),日常主要工作内容为安全方案制定和落地、内部安全审计和风险评估以及管理。

  • cadence
  • 工作流
您觉得本篇内容如何
评分

评论

您需要登录才可以回复|注册

提交评论

51CTO

这家伙很懒,什么描述也没留下

关注

点击进入下一篇

一站式EDA云高性能计算平台解决方案

提取码
复制提取码
点击跳转至百度网盘