企业宣传,产品推广,广告招商,广告投放联系seowdb

如何在开源项目Cadence中实现轮询

本指南适用于所有希望了解Cadence中轮询工作原理的开发人员和工程师。Cadence是相对较新(且完全开源)的容错状态代码平台,最初由Uber开发(现在得到了包括Instaclustr在内的更多公司的支持)。

大量用例遍及单个请求-回复、复杂的状态追踪和异步事件响应,并与外部的不可靠依赖项进行通讯。构建此类应用程序的常用方法是将无状态服务、数据库、定时任务和队列系统像大杂烩一样整合在一起。

然而,这会对开发人员产生负面影响,因为大部分代码都是用于管道的——这掩盖了大量底层细节背后的实际业务逻辑。Cadence是一个完全开源的编排框架,可以帮助开发人员编写高容错且能够长时间运行的应用程序,这通常也被称为工作流。

从本质上讲,它提供了一个与特定进程无关联的虚拟内存,并保留了完整的应用程序状态,包括函数堆栈以及兼容各种主机和软件故障的局部变量。这使得开发人员在编写代码时能够充分利用编程语言的功能特性,Cadence则负责应用程序的持久性、可用性和可扩展性。由于繁忙等待通常会消耗大量非必要的CPU周期,因而应尽可能避免使用轮询,而是使用由事件触发的中断来进行实现,除非以下两者情况:

对于计算机而言,这相当于在长途旅行中每5分钟询问一次距离目的地还有多远。尽管如此,在很多情况下,这是唯一可用的选择。Cadence为持久计时器、长时间运行的活动和无限制重试提供强大的支持,使得其非常适合此类功能的实现。

实现轮询机制有很多种方法。本文主要讲实现对外部服务的轮询,并分析这样做会从Cadence中获得怎样的收益。首先,我们来简单的解释一下Cadence的概念。Cadence的核心理念是一个无故障状态的工作流。这意味着工作流代码的状态,包括局部变量和它创建的任何线程,不受进程和Cadence服务故障的影响。这是一个非常强大的理念,因为它封装了状态、线程处理、持久计时器和事件处理程序。为了满足确定性的执行要求,工作流不允许直接调用任何外部API。相反,它们负责对活动的执行进行调度。活动是用来实现业务级功能的应用程序逻辑,例如调用服务或对媒体文件进行转码。当活动出现故障时,Cadence并不会恢复其运行状态。因此,活动函数可以包含任何代码,且不会受到任何限制。

代码本身非常简单——我们将逐行解释代码的作用:

State polledState = externalServiceActivities.getState();while(!expectedState.equals(polledState)) {Workflow.sleep(Duration.ofSeconds(30));polledState = externalServiceActivities.getState();}

左右滑动查看完整代码

我们首先调用一个活动,在这种情况下,外部服务可能是REST API。然后我们就需要进行条件判断。如果未达到所需的状态,会有10秒的等待。

这不是通常意义上的等待,而是一个持久的计时器。在这种情况下,轮询会执行周期性的等待,但时间可能会更长;而且,如果执行失败,我们一定不会希望浪费整个时间周期。Cadence通过将计时器以事件的方式进行持久化,并在完成后通知相应的工作服务(即管理工作流和活动实施的服务)来解决此问题。

这些计时器可以对从几秒到几分钟、几小时、几天甚至几个月或几年的时间间隔进行管理。最后,通过再次调用外部服务来刷新状态。在继续进行操作之前,我们先快速了解一下Cadence究竟在后台做了哪些工作来避免潜在的问题。

Cadence历史记录和轮询注意事项

Cadence是如何实现无故障状态工作流的呢?关键在于Cadence是如何坚持用工作流程执行实现的。工作流状态恢复利用事件溯源,而事件溯源对代码的编写方式施加了一些限制。事件溯源将一系列不断变化的事件转为持久化的状态。

每当工作流状态发生变化时,都会有一个新的事件追加到该工作流的事件历史记录中。然后,Cadence通过历史记录来进行操作重放,以重新建立工作流的当前状态。这就是为什么与外部环境的所有通信都应该通过活动进行,并且必须使用Cadence API来获取当前时间、等待和创建新线程。

1、谨慎使用轮询​

轮询需要根据判断条件不断地循环。由于每个活动调用和计时器事件都是持久的,因此即使是短的轮询间隔也可能会演变成不可接受的时间消耗。现在我们来研究轮询片段的历史记录会以怎样的方式呈现。

Cadence中轮询代码片段的事件历史记录

如果工作流在中间某个地方失败,且必须重放其历史操作记录,这可能会导致大量的事件清单被执行。有一些方法可以避免这些操作脱离掌控:避免使用较短的轮询周期,在工作流中设置合理的超时时间,限制轮询的次数。

记住所有操作都是持久的,可能需要由人重放操作。

2、配置活动重试次数​

如果外部服务由于某些原因失败了怎么办?我们需要尝试,尝试,再尝试!Cadence存在一种机制,可以让Cadence记录活动结果并能够完美地恢复工作流状态,同时还提供了对类似重试逻辑等额外功能的支持。下面是启用重试选项的活动配置示例:

private final ExternalServiceActivities externalServiceActivities =Workflow.newActivityStub(ExternalServiceActivities.class,new ActivityOptions.Builder().setRetryOptions(new RetryOptions.Builder().setInitialInterval(Duration.ofSeconds(10)).setMaximumAttempts(3).build()).setScheduleToCloseTimeout(Duration.ofMinutes(5)).build());

​左右滑动查看完整代码

通过这样的操作,我们告诉Cadence,在ExternalServiceActivities中的操作最多可以重试3次,且每次重试的间隔为10秒。这样,每个对外部服务活动的调用都可以轻松的实现重试功能,且无需编写任何重试逻辑。

为了展示这种模式的实际效果,我们将在示例项目中集成一个虚构的轮询。

1、Instafood简介​

Instafood是一个基于在线应用的送餐服务。客户可以通过Instafood的移动应用从他们当地最喜欢的餐厅中订购食物。订单可以是自取或外卖。

如果选择外卖,Instafood将通知其外卖司机从餐厅取餐并将其送到客户手中。Instafood为每个餐厅提供一个展示屏或平板电脑,用于Instafood和餐厅之间的通信。客户下单后,Instafood就会通知餐厅,然后餐厅可以接受订单、提供预计完成时间或将其标记为已完成等。对于外送订单,Instafood将根据预计完成时间协调外卖司机取餐。

2、轮询"MegaBurgers"​

MegaBurgers是一家大型跨国快餐汉堡连锁店。他们有自己的移动应用程序和网站,并使用REST API作为后端为客户提供订单服务。Instafood和MegaBurgers已达成协议,Instafood客户可以通过Instafood的应用程序在MegaBurger下单,并可选择自取和外卖。与通用方案不同的是,MegaBurger并未选择在所有店面安装Instafood展示屏,而是同意将Instafood的订餐系统以集成的方式与自身基于REST的订餐系统进行对接,以完成下单和接收更新。

MegaBurger的REST API没有推送机制(WebSockets、WebHooks等),无法接收订单状态更新。

相反,其需要定期发送GET请求来确定订单状态,这些轮询可能会导致订单工作流在Instafood端反复执行(例如安排外卖司机取餐)。

你需要配置一个Cadence集群来运行示例项目。在此示例中,我们将使用Instaclustr平台来执行操作。

第1步:创建Instaclustr托管集群​

Cadence集群需要连接Apache Cassandra集群作为持久层。为了成功配置Cadence和Cassandra集群,我们将遵循“创建Cadence集群”文档的操作指导。

第2步:配置Cadence域​

Cadence的后端由多租户服务提供支持,其中隔离单元被称为域。为了让Instafood应用程序运行,我们首先需要为它注册一个域。

1、为了与Cadence集群进行交互,我们需要安装其命令行界面客户端。

如果使用macOS,可以通过Homebrew安装Cadence CLI,如下所示:

brew install cadence-workflow# run command line clientcadence <command> <arguments>

其他操作系统​

可以通过Docker Hub镜像仓库ubercadence/cli来运行和使用CLI:

# run command line clientdocker run --network=host --rm ubercadence/cli:master <command><arguments>

左右滑动查看完整代码

在的步骤中,我们将使用cadence来指代客户端。

2、为了连接的稳定性,建议通过负载均衡器地址来连接和访问集群。可以在“连接信息”选项卡的顶部找到负载均衡器地址,如下所示:

“ab-cd12ef23-45gh-4baf-ad99-df4xy-azba45bc0c8da111.elb.us-east 1.amazonaws.com”

左右滑动查看完整代码

3、现在可以通过列出当前域来测试连接:

cadence --ad <cadence_host>:7933 admin domain list
cadence --ad <cadence_host>:7933 --do instafood domain register --global_domain=false

左右滑动查看完整代码

cadence --ad <cadence_host>:7933 --do instafood domain describe

第3步:运行Instafood示例项目​

1、从Instafood项目的Git代码仓库中克隆Gradle项目。

2、打开位于instafood/src/main/resources/instafood.properties路径的配置文件,将cadenceHost的值替换为自己的负载均衡器地址:

cadenceHost=<cadence_host>

​3、通过以下方式运行该应用程序:

cadence-cookbooks-instafood/instafood$ ./gradlew run

​4、查看终端输出以确认其是否已经正常运行:

在了解Instafood如何与MegaBurger集成之前,让我们先快速了解一下他们的API。

1、运行MegaBurger服务​

让我们从运行服务开始。通过以下命令来启动服务:

cadence-cookbooks-instafood/megaburger$ ./gradlew run

或者在IDE中运行MegaburgerRestApplication。这是一个以内存作为持久层的Spring Boot Rest API演示示例。当应用程序关闭时,所有数据都会丢失。

2、MegaBurger的订单API​

MegaBurger发布其Orders API以便跟踪和更新每个食品订单的状态。

POST /orders

创建一个订单并返回其ID。

curl -X POST localhost:8080/orders -H “Content-Type: application/json” --data ‘{“meal”: “Vegan Burger”, “quantity”: 1}’

左右滑动查看完整代码

{“id”: 1,“meal”: “Vegan Burger”,“quantity”: 1,“status”: “PENDING”,“eta_minutes”: null}

GET /orders​

返回一个包含所有订单信息的列表。

curl -X GET localhost:8080/orders
[{“id”: 0,“meal”: “Vegan Burger”,“quantity”: 1,“status”: “PENDING”,“eta_minutes”: null},{“id”: 1,“meal”: “Onion Rings”,“quantity”: 2,“status”: “PENDING”,“eta_minutes”: null}]

GET /orders / {orderId}

curl -X GET localhost:8080/orders/1
{“id”: 1,“meal”: “Onion Rings”,“quantity”: 2,“status”: “PENDING”,“eta_minutes”: null}

PATCH /orders/{orderId}

curl -X PATCH localhost:8080/orders/1 -H “Content-Type: application/ json” --data ‘{“status”:“ACCEPTED”}’

左右滑动查看完整代码

{“id”: 1,“meal”: “Onion Rings”,“quantity”: 2,“status”: “ACCEPTED”,“eta_minutes”: null}

现在已经完成了所有配置的初始化,让我们看看Instafood和MegaBurger之间集成的实际效果如何。

1、轮询工作流​

首先定义新的工作流MegaBurgerOrderWorkflow:

public interface MegaBurgerOrderWorkflow {@WorkflowMethodvoid orderFood(FoodOrder order);// ...}

此工作流有一个orderFood方法,该方法将通过与MegaBurger集成来发送和跟踪相应的FoodOrder。

现在来看看它的实现方式:

public class MegaBurgerOrderWorkflowImpl implements MegaBurgerOrderWork flow {// ...@Overridepublic void orderFood(FoodOrder order) {OrderWorkflow parentOrderWorkflow = getParentOrderWorkflow();Integer orderId = megaBurgerOrderActivities.createOrder(mapMegaBurgerFoodOrder(order)); updateOrderStatus(parentOrderWorkflow, OrderStatus.PENDING);// Poll until Order is accepted/rejectedupdateOrderStatus(parentOrderWorkflow,pollOrderStatusTransition(orderId, OrderStatus.PENDING));if (OrderStatus.REJECTED.equals(currentStatus)) { throw new RuntimeException(“Order with id “ + orderId + “was rejected”);}// Send ETA to parent workflowparentOrderWorkflow.updateEta(getOrderEta(orderId));// Poll until Order is cooking updateOrderStatus(parentOrderWorkflow,pollOrderStatusTransition(orderId, OrderStatus.ACCEPTED)); //Poll until Order is readyupdateOrderStatus(parentOrderWorkflow,pollOrderStatusTransition(orderId, OrderStatus.COOKING)); //Poll until Order is deliveredupdateOrderStatus(parentOrderWorkflow,pollOrderStatusTransition(orderId, OrderStatus.READY)); }// ...}

左右滑动查看完整代码

该工作流首先获取其父工作流。MegaBurgerOrderWorkflow只处理与MegaBurger的集成,将订单交付给由独立工作流管理的客户端处理;这意味着我们使用的是子工作流。然后,通过活动来创建订单,并获得订单ID。

活动只是API客户端的装饰器,该API客户端负责发送POST请求到/orders。创建订单后,父工作流会收到一个订单现在处于PENDING状态的信号(这是一个发送给工作流的,来自外部的异步请求)。

现在我们必须等待订单从PENDING转变为ACCEPTED或REJECTED。这就是轮询发挥作用的地方。现在看看我们的函数pollOrderStatusTransition做了什么:

private OrderStatus pollOrderStatusTransition(Integer orderId,OrderStatus orderStatus) { OrderStatus polledStatus =megaBurgerOrderActivities.getOrderById(orderId).getStatus();while (orderStatus.equals(polledStatus)) {Workflow.sleep(Duration.ofSeconds(30)); polledStatus = megaBurgerOrderActivities. getOrderById(orderId).getStatus();}return polledStatus;}

左右滑动查看完整代码

这与本文介绍的其他轮询循环非常相似。唯一的区别是它用一个轮询的特定状态代替等待,直到订单状态发生变化。同样的,用于通过ID获取订单的真实API调用隐藏在活动的后面,该活动启用了重试功能。如果订单被拒绝,则会引发运行状态异常,使工作流失败。如果订单被接受,则将MegaBurger的预计完成时间返回给父工作流(父工作流使用预计完成时间来完成交付调度)。最后,图3中所示的状态将会被转换,直到订单被标记为已交付。

2、运行正常的场景​

最后,让完成一个完整的订单场景。

这个场景是示例项目中测试套件的一部分。唯一的要求是同时运行Instafood和MegaBurger服务器,然后按照前文中的步骤操作。

测试用例描述了客户端通过Instafood下单MegaBurger的新素食汉堡,并且来店面取餐:

cadence-cookbooks-instafood/instafood$ ./gradlew test
class InstafoodApplicationTest {// ...@Testpublic voidgivenAnOrderItShouldBeSentToMegaBurgerAndBeDeliveredAccordingly() {FoodOrder order = new FoodOrder(Restaurant.MEGABURGER,“Vegan Burger”, 2, “+54 11 2343-2324”, “Díaz velez 433, Lalucila”, true);// Client orders foodWorkflowExecution workflowExecution= WorkflowClientstart(orderWorkflow::orderFood, order);// Wait until order is pending Megaburger’s acceptance await().until(() -> OrderStatus.PENDING.equals(orderWorkflow.getStatus()));// Megaburger accepts order and sends ETAmegaBurgerOrdersApiClient.updateStatusAndEta(getLastOrderId(),“ACCEPTED”, 15);await().until(() -> OrderStatus.ACCEPTED.equals(orderWorkflow.getStatus()));// Megaburger starts cooking ordermegaBurgerOrdersApiClient.updateStatus(getLastOrderId(),“COOKING”);await().until(() -> OrderStatus.COOKING.equals(orderWorkflow.getStatus()));// Megaburger signals order is readymegaBurgerOrdersApiClient.updateStatus(getLastOrderId(),“READY”);await().until(() -> OrderStatus.READY.equals(orderWorkflow.getStatus()));// Megaburger signals order has been picked-upmegaBurgerOrdersApiClient.updateStatus(getLastOrderId(),“RESTAURANT_DELIVERED”);await().until(() -> OrderStatus.RESTAURANT_DELIVERED.equals(orderWorkflow.getStatus()));await().until(() -> workflowHistoryHasEvent(workflowClient,workflowExecution, EventType.WorkflowExecutionCompleted)):} }

左右滑动查看完整代码

在这个场景中,有3个参与者:Instafood、MegaBurger和客户端。

2. 一旦订单到达MegaBurger(订单状态为PENDING),MegaBurgers将其标记为ACCEPTED并返回预计完成时间。

3. 然后我们看下整个状态更新序列:

4. 由于该订单是以取餐的形式交付,因此一旦客户端完成交付,整个工作流程就结束了。

在本文中,我们学习了如何使用Cadence实现轮询。我们展示了如何让Cadence集群在Instaclustr平台上运行,以及让应用程序连接到它是多么容易。

译者介绍

仇凯,社区编辑,目前就职于北京宅急送快运股份有限公司,职位为信息安全工程师。主要负责公司信息安全规划和建设(等保,ISO27001),日常主要工作内容为安全方案制定和落地、内部安全审计和风险评估以及管理。

© 版权声明
评论 抢沙发
加载中~
每日一言
不怕万人阻挡,只怕自己投降
Not afraid of people blocking, I'm afraid their surrender