Sagas模式

Sagas模式Sagas 属于一个错误管理模式 也同时用于控制复杂事务的执行和回滚等 同时 Compensating Transaction 模式的的实现也是也是类似于 Sagas 策略的 可以对比参考一下 Sagas 的最开始的出现是因为一些长时间的事务的实现 最开始的时候仅仅是因为数据内的事务

大家好,我是讯享网,很高兴认识大家。

Sagas属于一个错误管理模式,也同时用于控制复杂事务的执行和回滚等。同时,Compensating-Transaction模式的的实现也是也是类似于Sagas策略的,可以对比参考一下。

这里写图片描述
讯享网

上图展示了一个简单的Saga。如果有旅客预订了行程,需要预订汽车,航班以及旅店。如果无法获取全部的信息,可能最好就是不要出发。对开发者来说,肯定无法将所有的服务都定义为分布式的ACID事务。这时可以将租车行为定义为一个整体,其中包含如何去预订以及如何取消,当然,机票和酒店也提供同样的服务。

然后这些行为可以组合在一起构成了一个行为链。开发者还可以将整个行为链加密,这样只有该行为链的接收者才能够操控这个行为链。当一个行为完成后,会将完成的信息记录到一个集合(比如说,是一个队列)中,之后可以通过这个集合访问到对应的行为。当一个行为失败的实收,行为将本地清理完毕,然后将消息发送给该集合,从而路由到之前执行成功的行为,然后回滚所有的事务。

如果开发者对于旅行行程了解一些的话,就会知道上面的行为链其实是有风险的。一般来说,提前预订租车服务几乎都会成功,因为租车公司都会有足够的时间来帮助你安排车辆。但是预订宾馆就有一些风险了,一般无押金的情况下,只能提前24小时预订,而航班的退改一般情况下还要收费的,所以最后预订是对的。

使用举例

下面的程序作为样例可以帮助我们更好的了解Sagas策略

程序会生成一个典型的集合用来访问对应的行为链中的行为,会创建3个独立的进程,每一个进程都会负责一个指定的任务。分别是租车,预订酒店以及预订机票三个独立的任务。

static ActivityHost[] processes; static void Main(string[] args) { var routingSlip = new RoutingSlip(new WorkItem[] { new WorkItem<ReserveCarActivity>(new WorkItemArguments), new WorkItem<ReserveHotelActivity>(new WorkItemArguments), new WorkItem<ReserveFlightActivity>(new WorkItemArguments) }); // imagine these being completely separate processes with queues between them processes = new ActivityHost[] { new ActivityHost<ReserveCarActivity>(Send), new ActivityHost<ReserveHotelActivity>(Send), new ActivityHost<ReserveFlightActivity>(Send) }; // hand off to the first address Send(routingSlip.ProgressUri, routingSlip); } static void Send(Uri uri, RoutingSlip routingSlip) { // this is effectively the network dispatch foreach (var process in processes) { if (process.AcceptMessage(uri, routingSlip)) { break; } } }

讯享网

其中的AcitivityHost就是对外部服务的一个抽象,RoutingSlip是对前面说的集合的抽象。
下面是具体的一个服务的简化实现,ReserveHotelActivity以及ReserveFlightActivity的实现就不在此处列出了,下面是ReserveCarActivity的实现。其中主要包括的几个方法:

DoWork以及Compensate方法是Activity抽象出来的用来执行实际操作以及回滚的补偿方法。
WorkItemQueueAddress以及CompensationQueueAddress都是用来索引到对应服务的。参考如下代码:

讯享网class ReserveCarActivity : Activity { static Random rnd = new Random(2); public override WorkLog DoWork(WorkItem workItem) { Console.WriteLine("Reserving car"); var car = workItem.Arguments["vehicleType"]; var reservationId = rnd.Next(); Console.WriteLine("Reserved car {0}", reservationId); return new WorkLog(this, new WorkResult { { "reservationId", reservationId } }); } public override bool Compensate(WorkLog item, RoutingSlip routingSlip) { var reservationId = item.Result["reservationId"]; Console.WriteLine("Cancelled car {0}", reservationId); return true; } public override Uri WorkItemQueueAddress { get { return new Uri("sb://./carReservations"); } } public override Uri CompensationQueueAddress { get { return new Uri("sb://./carCancellactions"); } } }

RoutingSlip是对成功行为集合的抽象,用来索引到对应的服务,包含了两个队列,一个是完成的任务,一个是等待执行的任务。RoutingSlip主要用来控制连接多个行为。如果成功就会将任务向前执行,如果失败就会向后执行
RoutingSlip使用队列来向前执行,使用栈来向后执行。

class RoutingSlip { readonly Stack<WorkLog> completedWorkLogs = new Stack<WorkLog>(); readonly Queue<WorkItem> nextWorkItem = new Queue<WorkItem>(); public RoutingSlip() { } public RoutingSlip(IEnumerable<WorkItem> workItems) { foreach (var workItem in workItems) { this.nextWorkItem.Enqueue(workItem); } } public bool IsCompleted { get { return this.nextWorkItem.Count == 0; } } public bool IsInProgress { get { return this.completedWorkLogs.Count > 0; } } public bool ProcessNext() { if (this.IsCompleted) { throw new InvalidOperationException(); } var currentItem = this.nextWorkItem.Dequeue(); var activity = (Activity) Activator.CreateInstance( currentItem.ActivityType); try { var result = activity.DoWork(currentItem); if (result != null) { this.completedWorkLogs.Push(result); return true; } } catch (Exception e) { Console.WriteLine("Exception {0}", e.Message); } return false; } public Uri ProgressUri { get { if (IsCompleted) { return null; } else { return ((Activity)Activator.CreateInstance(this.nextWorkItem.Peek().ActivityType)). WorkItemQueueAddress; } } } public Uri CompensationUri { get { if (!IsInProgress) { return null; } else { return ((Activity) Activator.CreateInstance( this.completedWorkLogs.Peek().ActivityType)). CompensationQueueAddress; } } } public bool UndoLast() { if (!this.IsInProgress) { throw new InvalidOperationException(); } var currentItem = this.completedWorkLogs.Pop(); var activity = (Activity) Activator.CreateInstance( currentItem.ActivityType); try { return activity.Compensate(currentItem, this); } catch (Exception e) { Console.WriteLine("Exception {0}", e.Message); throw; } } }

ActivityHost会调用RoutingSlip上面的ProcessNext方法来解析下一个行为并正向调用DoWork()或者反向调用Compensate()方法.

讯享网abstract class ActivityHost { Action<Uri, RoutingSlip> send; public ActivityHost(Action<Uri, RoutingSlip> send) { this.send = send; } public void ProcessForwardMessage(RoutingSlip routingSlip) { if (!routingSlip.IsCompleted) { // if the current step is successful, proceed // otherwise go to the Unwind path if (routingSlip.ProcessNext()) { // recursion stands for passing context via message // the routing slip can be fully serialized and passed // between systems. this.send(routingSlip.ProgressUri, routingSlip); } else { // pass message to unwind message route this.send(routingSlip.CompensationUri, routingSlip); } } } public void ProcessBackwardMessage(RoutingSlip routingSlip) { if (routingSlip.IsInProgress) { // UndoLast can put new work on the routing slip // and return false to go back on the forward // path if (routingSlip.UndoLast()) { // recursion stands for passing context via message // the routing slip can be fully serialized and passed // between systems this.send(routingSlip.CompensationUri, routingSlip); } else { this.send(routingSlip.ProgressUri, routingSlip); } } } public abstract bool AcceptMessage(Uri uri, RoutingSlip routingSlip); }

相关的其他模式

  • Compensating-Transaction模式。Compensating-Transaction通常被用来回滚需要实现最终一致性模型的功能,其中针对于数据一致性以及补偿失败的考虑更为详细。
  • Data Consistency PrimerData Consistency Primer中的内容提供了更多关于最终一致性的特性的说明。
小讯
上一篇 2025-03-16 13:14
下一篇 2025-02-06 20:42

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/41804.html