飞码网-免费源码博客分享网站

点击这里给我发消息

实践中的反应式,第8单元:CQRS,第3部分事务的ReadSideProcessor |-Java教程

飞码网-免费源码博客分享网站 爱上飞码网—https://www.codefrees.com— 飞码网-matlab-python-C++ 爱上飞码网—https://www.codefrees.com— 飞码网-免费源码博客分享网站

介绍

本单元探讨如何管理长期运行的事务流。为了证明这一点,我们将研究如何在Reactive Stock Trader中对“电汇”有界上下文进行建模和实现。这将使您了解如何在自己的系统中实现类似功能。

本单元还介绍了如何使用ReadSideProcessor来处理电汇事件。电汇是长时间运行的业务流程的一个很好的例子,因为它们不是单个原子操作,而是一个涉及多个步骤的长时间运行的业务流程。

注意:这是Lagom仍在变化中的区域。最佳解决方案是内置在Lagom中的流程管理器类型,但是Lagom团队仍在考虑最终方法。就目前而言,我们建议本单元中概述的方法是可行的,简单易行的,并且如果/当Lagom中提供了流程管理器类型时,可以重构。

读取侧处理器进行交易

假设有客户要求电汇,将10,000美元从外部储蓄帐户转入您的交易平台。现在,假设几天后,源银行向您的系统发送一条错误消息,提示资金不可​​用。最后,假设客户已经在您的系统中执行了价值$ 10,000的交易,因为您让他们立即获得了资金。

这会对您的业务产生什么影响?您是否需要撤消此错误,还是要通过延长客户临时信用额度来进行补偿?这些业务决策将确定解决系统一致性问题的最佳方法。

根据您的业务需求,有几种方法可以处理这种情况。

乐观的方法

乐观的方法是编码,就像所有传输最终都将成功完成一样。因此,一旦发出转帐请求,\ $ 10,000的资金将在客户的交易帐户中可用。这就是现实世界中有多少个支票帐户。假设我们将支票存入我们的支票帐户。如果我们是一个好的客户,我们的帐户可能会“被取消保留”,这意味着我们第二次存入了我们可以使用资金的支票,即使支票可能需要几天或几周才能清除。幸运的是,我们是一个良好的客户,并且只存有清晰的支票,因此“免除保留”是一件好事:作为客户,我们可以立即使用资金,而且我们与银行的关系仍然很牢固,因此我们可能会做得更多与他们开展业务。

悲观的方法

悲观的做法是假设由于资金不足或欺诈导致支票无法清除。这意味着我们将需要在业务流程中采取多个步骤来在存入支票后“清除支票”。第一步将涉及客户存入支票,这仅仅是从一家银行到另一家银行的花哨转账请求。下一步将涉及目标银行与源银行启动幕后支票清算流程,以验证资金是否存在并实际转移资金。为了完成存款,所涉及的现金实际上必须从源银行转手到目标银行的清算帐户。最后,支票不会被“清算”,并且在目标银行确认其拥有现金“托管”之前可用资金。现金保管一经核实,

“清算帐户”与您的个人支票帐户有很大不同。在银行系统中,您的银行不会使您的资金与其他客户的资金隔离。银行的钱更像是钱池。您的帐户无非是保证银行欠您一定数额的资金。银行的大部分资金被投资和借出,这就是银行如何获利的方式。事实上,如果所有客户都试图同时提款,那么大多数银行就会用光现金!这被称为“银行挤兑”,发生在经济危机期间。即使在现实世界中,也没有太多证据表明“严格的一致性”。请勿尝试避免系统中在实际业务流程中不存在的边缘情况!

让我们跳过乐观方法,对悲观方法进行建模,以演示一种转移方法,该方法包括使资金可用之前清除转移我们想演示这种方法,因为大多数事件源和微服务教程都采用了简便的方法,并实现了幸福路径最终的一致性,从而完全绕过了事务。这几乎完全避免了分布式系统体系结构中最棘手的问题,即在一致性,便利性,可靠性和性能之间找到了最佳结合点。

首先,我们将重新研究读取侧处理器以及它们如何帮助我们为电汇实现事务性“清算”功能。首先,让我们首先查看UI。

创建新的投资组合后,您需要采取的第一步是将资金从假设的储蓄帐户转入您的投资组合。转移完成后,现金将在您的投资组合中可用,您可以开始进行一些交易。

让我们来看一下如何建模和执行此传输流。

业务需求

我们此过程的主要目标是避免一些悲伤的状态。例如,我们不希望在电汇途中出现错误,从而从根本上复制金钱,甚至更糟的是抹去金钱。我们也不想为客户提供从未进入我们托管账户的资金。

考虑到这一点,我们将使用Lagom的ReadSideProcessor语义创建一个主管来处理传输流程。

转移过程将:

  • 从来源帐户中提取资金
  • 将资金汇到目标帐户
  • 用资金贷记客户的投资组合

让我们假设两个关键的业务需求:如果第一步失败,则应取消转移。如果第二步失败,则应取消转帐并将资金退还给发件人。

回顾读取侧处理器

如上一单元所述,ReadSideProcessor通过订阅日志中的事件(默认情况下为Cassandra)并使用特定标签处理每个事件来进行工作。

注意:还有其他方法可以执行类似的处理,例如使用消息代理API(我们将在单元9中讨论)。现在,我们将重点放在ReadSideProcessor上,因为它是实现此逻辑的最简单方法。我们应该已经对上一个单元中的ReadSideProcessor的工作方式有一个很好的了解。

首先,我们需要通过扩展ReadSideProcessor来定义一个新的读取侧处理器,它“使用PersistentEntity实例产生的事件,并更新一些针对查询进行了优化的读取侧数据存储。” 我们将其称为TransferProcess

TransferEvent

在无功交易者中,我们有六种不同类型的转移事件:

  • TransferInitiated
  • FundsReceived
  • CouldNotSecureFunds
  • DeliveryConfirmed
  • DeliveryFailed
  • RefundDelivered

您将能够在下面的代码中清楚地看到“访客模式”,该代码提供了编译时保证,我们将在处理过程中处理所有潜在的转移事件。

TransferEvent.java:

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = Void.class)
@JsonSubTypes({
    @JsonSubTypes.Type(TransferEvent.TransferInitiated.class),
    @JsonSubTypes.Type(TransferEvent.FundsRetrieved.class),
    @JsonSubTypes.Type(TransferEvent.CouldNotSecureFunds.class),
    @JsonSubTypes.Type(TransferEvent.DeliveryConfirmed.class),
    @JsonSubTypes.Type(TransferEvent.DeliveryFailed.class),
    @JsonSubTypes.Type(TransferEvent.RefundDelivered.class)
})
public abstract class TransferEvent implements AggregateEvent<TransferEvent>, Jsonable {

    public abstract <T> T visit(Visitor<T> visitor);

    public interface Visitor<T> {
        T visit(TransferInitiated transferInitiated);

        T visit(FundsRetrieved fundsRetrieved);

        T visit(CouldNotSecureFunds couldNotSecureFunds);

        T visit(DeliveryConfirmed deliveryConfirmed);

        T visit(DeliveryFailed deliveryFailed);

        T visit(RefundDelivered refundDelivered);
    }
}

汇总标签

我们在上一单元中介绍了标签,但是在这种情况下,我们也会对其进行重新介绍。我们需要考虑四个主要流程:

  • 成功
  • 不充足的资金
  • 退款失败
  • 无法提供退款
成功

transfer initiated → funds retrieved → delivery confirmed → transfer complete

这是我们将资金转移到投资组合中时所期望的主要流程。一切顺利,钱从用户的银行帐户中提取,并添加到他们的投资组合中。

不充足的资金

transfer initiated → could not secure funds → transfer complete

当用户没有足够的钱来完成转帐时,就会发生此流程。我们只需结束传输并向用户显示错误消息即可。

退款失败

transfer initiated → funds retrieved → delivery failed → refund delivered → transfer complete

在这种情况下,我们可以从用户的银行帐户中取款,但是由于某些原因,无法将其存入他们的投资组合。在实际系统中,我们很可能会先尝试进行手动修复,然后再退款。

无法提供退款

transfer initiated → funds retrieved → delivery failed → ???

这是炼狱。

与之前的流程类似,我们无法将资金转移到投资组合中,但是在这种情况下,我们也无法在合理的时间内退款,或者在退款交付过程中遇到失败。在合理的时间内没有密封的“转移完成”事件的“交付失败”事件将需要人工干预。

标记事件

要标记将由该处理器处理的事件,我们需要确保每种事件类型都可以扩展AggregateEvent和实现public AggregateEventTagger<TransferEvent> aggregateTag()正如我们在上一单元中介绍的那样,标签是写入Cassandra中每个事件的元数据,这使Lagom能够将具有相同标签的所有事件视为可以处理的事件流。

在上面的四个主要流程中,我们可以看到在“传输已启动”和“传输完成”之间的许多事件。我们只想处理适用于此业务流程的事件。好消息是,我们已经使用中的visitor模式实现了所有相关事件TransferEvent,这意味着in中的所有事件类型都TransferEvent将被标记为具有相同的值,并因此由我们的处理器处理。

请记住,标记事件是(某种程度上)永久的。每个事件都会有一个与之关联的标签,它是原始字符串类型。在应用程序中重命名类将不会更改日志中现有事件的标记。在重构过程中,无论代码库中的名称更改如何,您都需要保留标记名称,或者更改日志的内容。

buildHandler()

首先,我们将创建一个延伸的新的处理器ReadSideProcessor<T>TransferEvent,因为我们需要处理类型的所有事件TransferEvent

我们不会再讨论构建单元ReadSideProcessor<T>,因为我们已经在上一单元中对其进行了深入介绍。相反,我们将专注于buildHandler()处理电汇所需的特定功能。但是,仅为了适应自己的需要,以下是TransferProcess我们将要使用类定义:

TransferProcess.java:

public class TransferProcess extends ReadSideProcessor<TransferEvent> {
    // implementation goes here
}

接下来,我们将创建自己的ReadSideHandler<T>类型,当我们使用访问者模式实现事件时,该类型在处理需要访问的转移事件时将为我们提供灵活性。buildHandler()然后,在该方法中,我们将传递自定义HandleEvent类,而不是使用开箱即用的功能。

@Override
public ReadSideHandler<TransferEvent> buildHandler() {
    return new HandleEvent();
}

class HandleEvent extends ReadSideHandler<TransferEvent> {
    @Override
    public Flow<Pair<TransferEvent, Offset>, Done, ?> handle() { // 1
        return Flow.<Pair<TransferEvent, Offset>>create()
            .log("transferEvent")
            .withAttributes(
                Attributes.createLogLevels(
                        Attributes.logLevelInfo(),
                        Attributes.logLevelInfo(),
                        Attributes.logLevelInfo()
                )
            )
            .mapAsyncUnordered(concurrentSteps,
                e -> e.first().visit(transferEventVisitor)); // 2
    }
}

ReadSideHandler<TransferEvent>将流TransferEvent与沿着类型Offset (1) 我们使用mapAsyncUnordered (2) “映射”传输事件的实时流我们将“访问”每个事件,这将利用访问者模式并确保我们无法处理转移事件类型。

对于那些尚未使用流,特别是未使用Akka进行流处理的用户,某些语言可能有些新。mapAsyncUnordered意味着我们可以完全异步地处理每个事件。这使我们有机会并行运行多个“访客”。唯一的缺点是事件可能会乱序处理。请记住,顺序只能按顺序维护,这很有帮助,因此,每次引入异步处理时,顺序都会丢失。这与其他技术(例如Kafka)没有什么不同。concurrentSteps(上方)是我们希望同时运行多少个异步处理程序的可配置值。

接下来,我们将实现访问者本身,称为transferEventVisitor (2)这是一个回调,将在处理每个事件时执行。

TransferEventVisitor

每个传输事件都有一个关联的“访问”行为,该行为将在由读取侧处理器处理时执行。

每种visit(...)方法中都有很多代码,因此我们将逐步介绍以下代码。

我们将从“转移启动”反应开始,这将启动电汇转移处理流程。首先,在开始处理之前,我们需要确保转移已由投资组合发起(1)接下来,我们将构建FundsTransfer.Withdrawal命令(2),获得投资组合ID (3),并在投资组合服务(4)上调用取款命令在悲伤的情况(5)中,我们默认接受转移,因为这是一个演示系统。但是,在现实世界的生产系统中,您显然不希望获得免费资金!

TransferProcess.java的内部类:

class TransferEventVisitor implements TransferEvent.Visitor<CompletionStage<Done>> {
    // ...
    @Override
    public CompletionStage<Done> visit(TransferEvent.TransferInitiated transferInitiated) {
        val transferEntity = transferRepository.get(transferInitiated.getTransferId());
        if (transferInitiated
                .getTransferDetails()
                .getSource() instanceof Account.Portfolio) { // 1
            val transfer = FundsTransfer.Withdrawl.builder()
                .transferId(transferInitiated.getTransferId())
                .funds(transferInitiated.getTransferDetails().getAmount())
                .build(); // 2
            val portfolioId =
                ((Account.Portfolio) transferInitiated.getTransferDetails()
                    .getSource())
                    .getPortfolioId(); // 3
            return portfolioService
                .processTransfer(portfolioId)
                .invoke(transfer)
                .thenApply(done -> transferEntity.ask(
                    TransferCommand.RequestFundsSuccessful.INSTANCE))
                .exceptionally(ex -> transferEntity.ask(
                    TransferCommand.RequestFundsFailed.INSTANCE))
                .thenCompose(Function.identity()); // 4
        } else {
            // Any other accounts are out of scope. This means they will
            // freely accept and transfer money.
            // You don't actually want sources of free money in a production system!
            return transferEntity
                .ask(TransferCommand.RequestFundsSuccessful.INSTANCE); // 5
        }
    }
    // ...
}

注意在我们上面的描述中术语“反应”的使用。从本质上讲,以上代码是事件猛冲时反应实现如果您在本系列开始时的事件讨论会中回顾过,我们有两种发出命令的机制:

  • 用户界面输入(例如按钮单击),以及
  • 系统命令作为对事件的反应。

我们在Lagom中使用ReadSideProcessor作为我们的反应实现。这些可以帮助我们建模在没有直接用户输入的情况下如何在系统内部生成系统命令

接下来,让我们看一下“已取回资金”的事件。在这种情况下,我们经历了成功的资金交付,因此我们可以表示交易流程已成功完成,并通过完成事件对其进行“密封”。

@Override
public CompletionStage<Done> visit(TransferEvent.FundsRetrieved evt) {
    val transferEntity = transferRepository.get(evt.getTransferId());
    if (evt.getTransferDetails().getDestination() instanceof Account.Portfolio) {
        val transfer = FundsTransfer.Deposit.builder()
                .transferId(evt.getTransferId())
                .funds(evt.getTransferDetails().getAmount())
                .build();
        val portfolioId =
            ((Account.Portfolio) evt.getTransferDetails().getDestination())
            .getPortfolioId();
        return portfolioService
                .processTransfer(portfolioId)
                .invoke(transfer)
                .thenApply(done ->
                    transferEntity.ask(
                        TransferCommand.DeliverySuccessful.INSTANCE))
                .exceptionally(ex ->
                        transferEntity.ask(TransferCommand.DeliveryFailed.INSTANCE))
                .thenCompose(Function.identity());
    } else {
        // As above, any unimplemented account type just freely accepts transfers
        return transferEntity
                .ask(TransferCommand.DeliverySuccessful.INSTANCE);
    }
}

最后,我们将处理“交付失败”的情况,这是我们的不幸案例。在这种情况下,我们需要退款。仅当资金已经从来源帐户转移到我们的系统(“托管人”)时,我们才进入这种状态,但是由于某种原因,它不能记入它所指向的投资组合中。

@Override
public CompletionStage<Done> visit(TransferEvent.DeliveryFailed deliveryFailed) {
    val transferEntity = transferRepository.get(deliveryFailed.getTransferId());

    if (deliveryFailed.getTransferDetails().getSource() instanceof Account.Portfolio) {

        val portfolioId = ((Account.Portfolio) deliveryFailed.getTransferDetails()
            .getSource())
            .getPortfolioId();

        val refund = FundsTransfer.Refund.builder()
            .transferId(deliveryFailed.getTransferId())
            .funds(deliveryFailed.getTransferDetails().getAmount())
            .build();

        return portfolioService
            .processTransfer(portfolioId)
            .invoke(refund)
            .thenCompose(done ->
                transferEntity
                        .ask(TransferCommand.RefundSuccessful.INSTANCE));
    } else {
        return transferEntity
                .ask(TransferCommand.RefundSuccessful.INSTANCE);
    }
}

在现实世界的银行系统中,这并不少见。例如,如果您的银行收到了一笔电汇到您的帐户,但是该转帐没有说明资金用途,那么您的银行可能会决定拒绝该电汇并退还资金。这样的实现可以使该工作流程自动化。毕竟,即使是人工审核和拒绝电汇,也可能会成为另一个包含在访问者逻辑中的事件,因此,此过程可以扩展为处理现实世界业务过程中的各种复杂性。

结论

读取侧处理器可用于多种类型的操作。在本单元中,我们使用它们来启动从读取侧处理器到服务的调用,以帮助我们通过多个步骤处理长时间运行的事务。另外,在本单元中,我们已经完成了本系列的事件源和CQRS部分。让我们花时间来讨论一般的反应式编程。

我们认为CQRS,事件源和响应系统将变得更加流行。例如,Axon框架在2018年末下载了100万次,而Lagom继续受到关注。Vlingo是其开发周期早期的另一个有前途的框架,它围绕事件源,CQRS和DDD原则构建。

我们强烈建议高级开发人员,技术负责人和架构师对本系列中的模式有广泛的了解,以便他们准备解决明天的业务问题。例如,机器学习和AI只会增加反应式系统的吸引力。开发人员将要处理的数据量呈指数级增长,而不是呈下降趋势,因此提高读取,写入和处理数据效率的技术是解决高质量企业系统复杂性的明确方法。

我们必须指出,数据库供应商正在出现事件外包和CQRS的替代方案。例如,FaunaDB和Google Cloud Spanner是崭新的数据库,它们具有强大的一致性和水平扩展性。FaunaDB基于Calvin论文,该产品的遗产来自Twitter。Google Cloud Spanner基于Google的Spanner论文,仅在GCP中可用。两者看起来都非常有前途,因为它们声称可以提供。但是,在数据库之间存在一些折衷,这些承诺不能在规模,弹性和一致性方面做出任何妥协。例如,FaunaDB和Google Cloud Spanner是专有的,严格的商业活动。Fauna建议在自己的“无服务器云”中托管FaunaDB,而Google Cloud Spanner仅在GCP中工作。在大多数组织中,这不会满足多云要求。他们也不做任何事情来提供分析和重播事件的能力。但是,如果您不希望构建基于事件的服务,而是希望保留CRUD和OLTP语义,那么这些选项值得探讨。

最后,请记住,您可以在微服务架构中同时使用事件源/ CQRS基于OLTP的服务。可以针对每个服务优化和设计微服务体系结构,而不是对整个系统应用“千篇一律”的方法。为每种服务选择最佳方法和工具是创建高效,可扩展系统的合理途径。

 

 

飞码网-免费源码博客分享网站 爱上飞码网—https://www.codefrees.com— 飞码网-matlab-python-C++ 爱上飞码网—https://www.codefrees.com— 飞码网-免费源码博客分享网站
赞 ()
内容页底部广告位3
留言与评论(共有 0 条评论)
   
验证码: