Saturday, January 21, 2012

Events and Event-Driven Architecture: Part 3

In part 1 and part 2 of this series, I introduced Event Driven Architecture (EDA), compared it to Event Driven Programming and then spent time distinguishing between intra-system and inter-system EDA. In this installment, I enhance the discussion around inter-system Domain Event Driven Architecture with a hodge podge of follow-on topics:

  1. Domain Event Formats
  2. Performance Characteristics of EDA
  3. Complex Event Processing
  4. Guaranteed Delivery


/* ---[ Domain Event Formats ]--- */

Events in inter-system EDA, remember, are basically messages from one system to another that fully describe an occurrence in the system that issues it. That event cascades to some number of subscribers, unknown to the publisher (aka event generator).

What formats could be used for a Domain Event? Basically any data format with a header and body. Let's consider a few options here and criteria for selecting one appropriate your domain requirements.

First, viable options include:

  1. XML
  2. JSON
  3. ASN.1
  4. Google Protocol Buffers
  5. CSV
  6. Serialized Objects
  7. Hessian
  8. Industry-specific options

Criteria by which these can be evaluated include:

  1. Is there are standard in my particular industry I should follow?
  2. Do it have support for a schema?
  3. Binary vs. text-based
  4. Does it have cross-platform (cross-language) support?

If the answer to #1 is yes, then that is probably your default choice. Beyond that though, some argue that there are definite advantages in choosing a format that has a schema.

First off, a schema obviously allows you to validate against the structure as well as the content. But probably it's most important feature is to be able to do versioning of formats. In part 2 I described that a key advantage of inter-system EDA is that systems do not have to be lockstep when upgrading to new functionality and versions.

If a consumer can only process version 1, as version 2 causes it to choke, one way to handle this would be to use the Message Translator Pattern to convert from version 2 back to version 1.

Another option is for the event generator to publish both versions either to different event channels (such as different JMS Topics) or to the same channel with different metadata, such as the version itself. This allows the subscribers to selectively choose which version to subscribe to and process.

Binary formats will be smaller and thus faster to send and probably faster to parse. The documentation on Google Protocol Buffers, for example, claims that processing them is 20 to 100 times faster than a comparable XML document.

Lastly, if you work in an environment that needs to interoperate with multiple languages, you will need to use a format that is amenable to all those platforms.

Using the above criteria, good old XML is probably the best text-based format, as it supports schemas and is universal to all platforms. For a binary protocol, Google Protocol Buffers is quite attractive, as it is schema-based, binary and efficient. It is not inherently universal, however, since you must have language bindings to serialize and deserialize objects to and from that format. Google supports Java, C++ and Python. There are community supported open source bindings for other languages here, but you'll have to research how mature and production ready any given one is.

The documentation on Protocol Buffers is very good and provides helpful advice on planning for domain event content/format changes. For example, they recommend making most fields optional rather than required, especially new ones being added in subsequent versions and to have a convention on how to deprecate older non-required fields, such as using an OBSOLETE_ prefix.

Stefan Norberg's presentation on Domain EDA, which I referenced heavily in part 2, has a more detailed breakdown of Domain Event formats using these criteria and a few more (see slides 47-49).

One option not considered in the Norberg presentation is Thrift, originally built by Facebook, and now an Apache project. It has support for more than a dozen programming languages, is binary and has support for versioning (which I assume means it has a schema, but I haven't been able to tell from the reading I've done on it). Unfortunately, I found complaints as far back as 2008 about is lack of documentation and the its apache web site still is sadly lacking in that department. I wonder if it has lost out to Google Protocol Buffers? If any readers have experience using it, I'd be interested in hearing from you about it.


/* ---[ Performance Characteristics ]--- */

Event Driven Architecture gains loose coupling by basically passing data structures (event payloads) into generic, standardized queueing or messaging systems and using asynchrony. Since EDA is asynchronous by nature, it is amenable to high performance, high throughput and high scalability, with the tradeoff being that you have to live with eventual consistency between systems.

In a synchronous model the highest throughput possible (or lowest latency if that is the measure you most care about) is set by the slowest system in the chain, whereas in a environment where systems are connected asynchronously the lowest latency is not constrained by the slowest system.

To illustrate, suppose we have two systems A and B. System A by itself can process 500 transactions per second (tps), while system B can process 200 tps by itself. Next assume system A has to send some sort of message or RPC call to system B. If we make the interaction synchronous, system A can now process at most 200 tps (ignoring communication overhead between A and B). But if we can connect them with asynchronous messaging, system A can continue to process at 500 tps.

In isolation, this is excellent and often this works out very well. Of course, an architect considering both systems will need to consider that system B may continually fall farther and farther behind and never catch up to system A, which could be a problem in some scenarios. Nevertheless, the principle of asynchronous connectivity allows EDA to maximize throughput (and ideally minimize latency of individual requests) for only the parts that absolutely must be in request-reply synchronous mode.

Related Idea: Overcoming SLA Inversion

A side note I want to throw in here that isn't related to performance but follows the same line of reasoning as the above example is the fact that EDA can help overcome SLA Inversion, a termed coined (as far as I know) by Michael Nygard in Release It! (which if you haven't read, should move to the top of your reading list).

SLA (Service Level Agreement) inversion is the problem of trying to meet an SLA about uptime (say 99.5%) when your system is dependent on other systems. If those other dependent systems are coupled to yours with synchronous call behavior and their SLAs are less than yours, then you cannot meet your higher SLA. In fact, in synchronously coupled systems, the SLA of your app is the product of all the SLAs of the systems.

For example, if system A's SLA is 99.5%, system B's is 99.5% and system C's is 98.0%, if system A is synchronously dependent on B and C, then at best its SLA is (0.995 * 0.995 * 0.98), which calculates to an overall 97.0% SLA.

One of Nygard's proposed solutions to SLA inversion is decoupling middleware, which is exactly what is provided by an Event Driven Architecture with an asynchronous messaging backbone between systems.


/* ---[ Complex Event Processing ]--- */

To define Complex Event Processing (CEP), let's first give an informal definition of "simple" or regular event processing. Simple processing is typically a situation where a single event from an event source causes a predictable, expected response that does not require fancy algorithms to respond to. Examples of simple processing include a data warehouse that subscribes to events and simply consumes them into its own data store. Or a case where a "low threshold" event, such as from an inventory system triggers a "restock" action in a downstream consumer.

In a case where patterns over time in the event stream must be monitored, compared and evaluated, then you have complex event processing. Let's return to the fraud detection system we outlined in part 2. In order to detect fraud, two types of analyses might be set up:

  1. Watch for abnormal purchase behavior against some relative scale, such as previous purchase amounts or purchase frequency.
  2. Watch for abnormal purchase behavior against some absolute scale, such as many repeated purchases in a short period of time.

The event streams can be monitored in real-time and constantly compared to fraud rules and heuristics based on past behavior. Averages and other aggregations for a time series, either overall or on a per use basis can also be kept in the consumer's own data store. If a significant pattern is detected, the CEP can take action or itself fire off a domain event to a system to handle it.

For the right need, CEP can be a very powerful use of event driven architecture. In addition, you don't have to build it from scratch. Tools like Esper for both Java and .NET are designed to do Event Stream Processing and make inferences against patterns in high-volume event streams using algorithms of your choice or design.


/* ---[ Guaranteed Delivery ]--- */

In part 2, I discussed using asynchronous events from a source system, such as an inventory or point-of-sale system to populate a reporting database. In many cases, this would require that the reporting database have complete data - no gaps due to lost messages. To do this via events put onto a message bus, you would need some form of guaranteed delivery, where each and every message sent can be guaranteed to be delivered to the consumer.

Guaranteed Delivery (my definition)

Once a message is placed into a message channel, it is guaranteed to ultimately reach all its targeted destinations, even if various parts of the system fail at any point between initial publication and final delivery. Or the delivery failure would fail in a way that the broker/sender knows the message needs to be resent.

Here I outline three potential ways of achieving guaranteed delivery from a messaging system. There are probably other options and I'd be eager to hear of them from readers.

  1. Synchronous transactional mode
  2. Asynchronous send + reply with acknowledgment
  3. Persistent messages with durable subscriptions (also asynchronous)

Note: I don't have much knowledge of the Microsoft/.NET side of the world, so in this discussion I will focus on JMS (Java Message Service) and some parts of AMQP (Advanced Message Queueing Protocol) that I have read about.

Option 1: Synchronous transactional mode

JMS has a synchronous mode which in theory could be used for guaranteed delivery, but obviously in a very limiting way that defeats the purpose of asynchronous messaging systems.

In AMQP (from my limited study so far) it appears that the typical way to get guaranteed delivery is via synchronous transactions, where the publisher must wait until the broker has processed the message before continuing. This is likely to be the option with the lowest throughput (see note in RabbitMQ section below).

Option 2: Asynchronous send + reply with acknowledgment

The next option is to set up an asynchronous request/reply-with-acknowledgment system to ensure that all messages delivered are received by all relevant subscribers. The ActiveMQ in Action book describes the model shown in Figure 1 below.

Figure 1: Request / reply model using asynchronous messaging

One option for doing this is to use header fields in the messages (JMSReplyTo and JMSCorrelationID) and create temporary destinations (queues) for the acknowledgment replies. Any replies not received in some time frame could be considered undelivered messages that would be resent to the receiver. To handle cases where the producer’s message was received by the consumer, but the acknowledgment was not received, the receiver will have to be able to detect and ignore duplicate messages.

RabbitMQ, an AMQP compliant broker (but not JMS compliant), provides an extension to AMQP they call Lightweight Publisher Confirms. It implements a form of asynchronous messaging followed by acknowledgement of receipt.

The RabbitMQ website claims this is 100 times faster than using typical AMQP synchronous transaction model for guaranteed delivery.

Option 3: Persistent messages with durable subscriptions (also asynchronous)

Finally, in JMS, the preferred option to get guaranteed delivery appears to be a model where:

  1. Messages are persisted to disk once they are delivered to a channel (broker)
  2. Subscribers have durable subscriptions

I describe this option in detail below.

-- Persisting Messages --

In JMS, message persistence is set by the "delivery mode" property (rather than "subscription type") - JMSDelivery.PERSISTENT or JMSDelivery.NON_PERSISTENT. If a message delivery mode is persistent, then it must be saved to stable storage on the broker.

For Apache ActiveMQ, for example, there are two file-based persistence options (AMQ and KahaDB) and relational database-persistence options using JDBC. For ActiveMQ, the file-based persistence options have higher throughput and KahaDB is the recommended option for recent releases.

As shown in Figure 2 below, you can choose to have a single central broker or you can set up a local (or embedded) broker on the sender computer (or VM) where the message is persisted before being sent to a remote broker.

Figure 2: Options for persistence of JMS messages


-- Message durability vs. message persistence --

Message durability refers to a what JMS calls a durable subscription - the "durability" is an attribute of the messages with respect to the behavior of the subscriber. In the durable subscription model, if a subscriber disconnects, the messages will be retained until it reconnects and gets them. If the subscription is set to be non-durable, then the subscriber will miss any messages delivered while it is not connected.

Message persistence refers to the persistence of messages in the face of a stability problem in the broker or provider itself. If the broker goes down while there are undelivered messages, will the messages be there when it starts up again? If the messages are set to persistent mode and the persistence is to disk (rather than memory cache), then and only then is the answer "yes".

With a durable subscription, the broker remembers what the last message delivered was on a per (durable) subscriber basis, as shown below:

Figure 3: Durable subscribers – the broker remembers which subscribers have received which messages. The arrows to the durable subscribers show the last message sent to that subscriber.

-- Performance Costs of Guaranteed Delivery with Message Persistence and Durable Subscriptions --

There is a performance hit to having message persistence and durable subscriptions. I was only able to locate a single source that had published any metrics – a 2008 report for Apache ActiveMQ. If anyone knows of others, please let me know.

I summarize the key findings of that paper with respect to the settings required for guaranteed delivery. The paper shows the following two graphs relevant to the performance hit of the "guaranteed delivery" options:

Figure 4: Performance (throughput) of ActiveMQ with non-persistent message and non-durable subscriptions (Note: “1/1/1” refers to a one producer/one consumer/one broker – the second, third or fourth options shown in Figure 2.)

Figure 5: Performance (throughput) of ActiveMQ with persistent messages and non-durable subscriptions (blue) and persistent message with durable subscriptions (red). The tests here are otherwise the same as those shown in Figure 4.

ComparisonTimes Faster
non-persistent msgs, non-durable subs > persistent msgs with durable producer only2.3
non-persistent msgs, non-durable subs > persistent msgs with durable producer and consumer15.6
persistent msgs with durable producer only > persistent msgs with durable producer and consumer6.7

It is not clearly laid out in this benchmarking document, but I believe “durable producer / non-durable consumer” means the second option shown in Figure 2, namely that messages are persisted in a single broker on the producer side. If that is correct, then “durable producer / durable consumer” means the first option in Figure 2 – that there are two brokers, one on the producer side and one on the consumer side as each has persistent messages and durable subscribers.


/* ---[ Other Uses of Events ]--- */

In this blog series we have covered events from the point of view of Event-Driven Programming (encompassing topics like "Evented I/O"), Intra-system Staged Event Driven Architecture and inter-system Event-Driven Architecture. There is at least one other major context that uses the concept of event and that is "Event-Sourcing", which leads to topics like Retroactive Event Processing and CQRS. I encourage you to investigate those areas as I believe they are becoming more popular. Greg Young has done a lot to popularize them and convince people that it is a robust model for enterprise computing.

As a reminder, nothing in this short series was intended to be original work on my part. Rather, I have tried to do a survey of the various meanings around "event" in programming and architecture. Someday I may do a piece on Event Sourcing and CQRS, but have plans to blog on other topics for the near term. I can recommend this blog entry from Martin Krasser, which hopefully will be the first in a very interesting series of a first-hand account of building an event-sourced web app in Scala.



Previous entries in this blog series:
Events and Event-Driven Architecture: Part 1
Events and Event-Driven Architecture: Part 2


No comments:

Post a Comment