When dealing with processes that can potentially span hours, days and even months going with full ACID transactions is unpractical, because of internal locks.
To model such interactions saga pattern is used. Saga breaks the interaction with the system into a series of small transactions and manages the consistency through timeouts and compensations. Saga relaxes Atomicity and Isolation requirements to achieve greater scalability.
Service bus frameworks (such as NServiceBus) have saga support out of the box. Most of the frameworks (NServiceBus, MassTransit and Rhino Service Bus) infer which messages saga orchestrates by static analysis of saga types. For example the following is an example of Saga implementation in NServiceBus:
1: public class OrderSaga : Saga<OrderSagaData>,
2: IAmStartedByMessages<OrderMessage>,
3: IHandleMessages<OrderAuthorizationResponseMessage>,
4: IHandleMessages<CancelOrderMessage>
5: {
6: public void Handle(OrderMessage message){...}
7: public void Handle(OrderAuthorizationResponseMessage message){...}
8: public void Handle(CancelOrderMessage message){...}
9: }
When bus starts the framework finds all saga classes and uses reflection to determine which messages they process.
This greatly simplifies the whole story and generally works very well, but recently I have encountered a need to implement a DSL like the following:
1: Expect.MessageSequence("BatchProcessVerifier1")
2: .StartsWith<BatchProcessStart>()
3: .ForEach(x => x.Node)
4: .Then<BatchProcessEnd>()
5: .In(TimeSpan.FromSeconds(300));
This definitely requires a saga, but NServiceBus sagas aren’t designed for handling such cases. Generally because NSB decides which messages are orchestrated by sagas using reflection (infers the static structure of the classes), while DSL requires dynamic registration of saga type and the set of messages it orchestrates.
But it turned out that it’s not so hard to implement Dynamic Sagas with NServiceBus.
Dynamic Saga
First of all let’s define IDynamicSaga interface:
1: public interface IDynamicSaga
2: {
3: void Handle(IMessage message);
4: bool Completed { get; }
5: IDynamicSagaEntity Data { get; set; }
6: IDynamicSagaDescription SagaDescription { get; set; }
7: }
8:
9: public interface IDynamicSaga<TSagaEntity> : IDynamicSaga
10: where TSagaEntity : IDynamicSagaEntity
11: {
12: new TSagaEntity Data { get; set; }
13: }
There is a mandatory Handle(IMessage message) method that will process all incoming messages regardless of their type.
IDynamicSagaEntity is saga state bag that will be persisted while waiting for messages arrival (this is similar to NServiceBus’ ISagaEntity).
Completed is just a way to query the saga if it has been completed.
There is also SagaDescription, which contains runtime information for the saga, basically all the data setup in the DSL above will go to SagaDescription.
Saga Finder
The purpose of Saga Finder is to find saga entities using the message being processed.
This part become a bit tricky because of the third line of the DSL:
The line states that there should be a saga per Node and the finder should know about this as well. For this reason IDynamicSagaFinder is asked to find the saga not only by the message being processed, but also by saga description:
1: public interface IDynamicSagaFinder
2: {
3: IDynamicSagaEntity FindSagas(IMessage message, IDynamicSagaDescription sagaDescription);
4: }
And the interface of IDynamicSagaDescription looks like the following:
1: public interface IDynamicSagaDescription
2: {
3: string GetCorrelationKey(IMessage message);
4: }
For the Saga backing the DSL above GetCorrelationKey will just concatenate the Name of the message sequence and the values of the properties defined in ForEach expression. As a result the correlation key looks something like “BatchProcessVerifier1 : Node1”. The only thing left to the finder is to search the entity by it’s CorrelationKey.
Saga Example
The following saga can be used for processing the DSL statement above:
1: public class MessageSequenceDynamicSaga : DynamicSaga<MessageSequenceDynamicSagaEntity, MessageSequenceDescription>
2: {
3: #region Overrides of DynamicSaga<MessageSequenceDynamicSagaEntity>
4:
5: public override void Handle(IMessage message)
6: {
7: var messageType = message.GetType();
8: var messageTypesSequence = SagaDescription.GetMessageTypesSequence().ToList();
9: var messageTypeIndex = messageTypesSequence.FindIndex(x => x == messageType);
10: var expectedIndex = Data.ExpectedMessageIndex;
11:
12: if (messageTypeIndex == -1 || messageTypeIndex < expectedIndex)
13: return;
14:
15: if (messageTypeIndex > expectedIndex)
16: {
17: Bus.HandleCurrentMessageLater();
18: return;
19: }
20:
21: Data.ExpectedMessageIndex++;
22:
23: if (Data.ExpectedMessageIndex == messageTypesSequence.Count)
24: MarkAsCompleted();
25: else
26: RequestTimeout(SagaDescription.GetTimeoutForNextMessageType(messageType), Data.ExpectedMessageIndex);
27: }
28:
29: #endregion
30:
31: public override void Timeout(NServiceBus.Saga.TimeoutMessage message)
32: {
33: var index = Convert.ToInt32(message.State);
34: if (index == Data.ExpectedMessageIndex)
35: {
36: Bus.Publish(new ExpectationViolated
37: {
38: ExpectationName = SagaDescription.Name,
39: ViolationRegistered = DateTime.UtcNow,
40: });
41: MarkAsCompleted();
42: }
43: }
44:
45: }
MarkAsCompleted and RequestTimeout are helper methods defined in abstract DynamicSaga class to mimic original NServiceBus saga experience.
And the description for it:
1: public class MessageSequenceDescription : IDynamicSagaDescription
2: {
3: private readonly string _name;
4: private readonly Type _firstMessageType;
5: private readonly List<Expression> _groupByExpressions;
6: private readonly List<MessageExpectation> _expectations;
7:
8: public MessageSequenceDescription(string name, Type firstMessageType, IEnumerable<Expression> groupByExpressions, params MessageExpectation[] expectations)
9: {
10: _name = name;
11: _firstMessageType = firstMessageType;
12: _groupByExpressions = new List<Expression>(groupByExpressions);
13: _expectations = new List<MessageExpectation>(expectations);
14: }
15:
16: public List<Expression> GroupByExpressions
17: {
18: get { return _groupByExpressions; }
19: }
20:
21: public Type FirstMessageType
22: {
23: get { return _firstMessageType; }
24: }
25:
26: public List<MessageExpectation> Expectations
27: {
28: get { return _expectations; }
29: }
30:
31: public string Name
32: {
33: get { return _name; }
34: }
35:
36: public IEnumerable<Type> GetMessageTypesSequence()
37: {
38: yield return _firstMessageType;
39: foreach (var expectation in Expectations)
40: yield return expectation.MessageType;
41: }
42:
43: public TimeSpan GetTimeoutForNextMessageType(Type messageType)
44: {
45: if (messageType == _firstMessageType)
46: return _expectations[0].RelativeTime;
47: var index = _expectations.FindIndex(x => x.MessageType == messageType);
48: return _expectations[index + 1].RelativeTime;
49: }
50:
51: #region Implementation of IDynamicSagaDescription
52:
53: public string GetCorrelationKey(IMessage message)
54: {
55: var groupMembers = new List<KeyValuePair<string, object>>();
56: foreach (var groupByExpression in GroupByExpressions)
57: {
58: var labmdaExpression = (LambdaExpression)groupByExpression;
59:
60: var memberExpression = (MemberExpression)labmdaExpression.Body;
61: var member = memberExpression.Member;
62: var memberName = member.Name;
63:
64: var property = message.GetType().GetProperty(memberName);
65: var memberValue = property.GetValue(message, null);
66:
67: if (memberValue != null)
68: {
69: var memberStringValue = memberValue.ToString();
70: groupMembers.Add(new KeyValuePair<string, object>(memberName, memberStringValue));
71: }
72: }
73:
74: return groupMembers.OrderBy(x => x.Key).Aggregate(_name, (c, e) => c + " : " + e.Value);
75: }
76:
77: #endregion
78: }
Sagas can be registered in runtime with the following static method of DynamicSagas class:
void RegisterSaga(Type sagaType, IDescriptionProvider descriptionProvider, Type[] sagaMessageTypes, Type finder)
Conclusion
It’s was quite a bit of code which still needs some testing and polishing, and it heavily replicates NServiceBus’ saga facility, but it works.
It’s also worth mentioning that NServiceBus’ sagas can be based on Dynamic Sagas engine, but not vice versa.