Saturday, January 7, 2012

Events and Event-Driven Architecture: Part 1

The terms "event" and "event-driven" in software design are actually somewhat overloaded, so I endeavored to do a little research on their various meanings. In this blog entry I start by defining and contrasting Event-Driven Programming (EDP) from Event-Driven Architecture (EDA) and then dive into some parts of the latter. Later blog entries will delve into additional areas of EDA.

There isn't any original research or design here - rather a summary of key usages I have found.

First let's make a initial distinction between event-driven programming vs. event-driven architecture.


/*---[ Event Driven: Programming vs. Architecture ]---*/

Event Driven Architecture in general describes an architecture where layers are partitioned and decoupled from each other to allow better or easier composition, asynchronous performance characteristics, loose coupling of APIs and dependencies and easier profiling and monitoring. Its goal is a highly reliable design that allows different parts of the system to fail independently and still allow eventual consistency. Layers in an Event Driven Architecture typically communicate by means of message queues (or some other messaging backbone).

Event Driven Programming is a programming model that deals with incoming events by having the programmer process those events one after the other in a single thread. Events come from outside the main thread - from a user action, an asynchronous server or system reply - that gets put onto the event queue for the main processing thread to handle. This is the model of many GUI programming environments, such as Java AWT/Swing. The most prominent example of EDP I'm aware of is the JavaScript model in the browser and now on the server side with Node.js.

EDP and EDA have no special co-dependent relationship to each other. One can do either, both or neither. However, they do share in common the central idea that events comes in, typically from "outside" (the meaning of which is context dependent), are communicated in some known format and put onto queues to be processed one at a time.

I won't speak any more of Event Driven Programming here, but a nice introductory tutorial to Node.js is the The Node Beginner Book by Manuel Kiessling.


/*---[ Staged Event Driven Architecture ]---*/

The next major divide I will consider is between inter-system EDA and intra-system EDA.

Staged-Event Driven Architecture, or SEDA, is a form of intra-system EDA. The term was first coined by Matt Welsh for his Ph.D. work. It models a system by breaking it down into serial processing stages that are connected by queues. Welsh calls this a "well-conditioned" system, meaning one that can handle web-scale loads without dropping requests on the floor. When incoming demand exceeds system processing capacity, it can simply queue up requests in its internal buffer queues.

The processing stages of a SEDA system can either be low-level or high-level subsystems. The Sandstorm web server built by Welsh for his thesis used very low-level subsystems - things like HTTP parsers, socket readers and writers, and cache handlers.

In SEDA, events are pieces of information to be processed or acted upon, at a given stage in the processing workflow. This definition of event is more compatible with the EDP definition of event than the typical EDA definition of event (which will be covered in the next blog entry).

The Actor model is a good fit for doing SEDA. I'll speak more about this in a section below. First, let's do a quick assessment of the benefits and drawbacks of SEDA.


Proposed Benefits of SEDA Design

  • Support high demand volume with high concurrency

    • The system can be set to perform at a fixed maximum level for a given number of simultaneous requests and if overload levels are reached, then those requests pile up on the queue, rather than degrading performance by having the system process too many requests simultaneously.
  • Handle stage level "backpressure" and load management

    • requests coming in can cause Out-of-Memory errors if they come in faster than can be processed
  • Isolate well-conditioned modular services and simplify their construction

    • Simple here is the objective definition of Rich Hickey: unentwined or unentangled with other systems.

    • Well-conditioned refers to a service acting like a simple pipeline, where the depth of the pipeline is determined by the number of processing stages

  • Enable introspection by watching the queues between modules - allows a system to tune responses to conditions

    • It allows self-tuning, dynamic resource management, such as adjusting thread pool count, connection pool counts

    • Queues can incorporate filtering and throttling

    • Note: I haven't seen it mentioned in any reading/examples yet, but one could also implement the queues as priority queues that push higher priority requests to the front of the line.

However, those interested in pursuing a SEDA design for systems that will need to handle heavy loads, particularly when needing predictable latency, should read Welsh's post-thesis note and the retrospective he wrote a few years later on SEDA.

It turns out that SEDA designs can be extremely finicky - they can require a lot of tuning to get the throughput desired. In many cases, the SEDA designs do not outperform alternative designs. Welsh points out that the primary intent of the SEDA model was to handle backpressure load (when requests come in faster than they can be processed), not necessarily be more performant.

The reason for the finicky performance, I believe, was uncovered by the LMAX team in London - queues are actually very poor data structures for high concurrency environments. When many readers and writers are trying to access the queue simultaneously, the system bogs down, since both readers and writers having to "write" to the queue (in terms of where head and tail point) and most of the time is spent managing, acquiring and waiting on locks, rather than doing productive work. There are better high-volume concurrency data structures such as Ring Buffers that should be considered instead of queues.

Two side notes

  1. I wonder if anyone is considering building an Actor model on a ring buffer or the LMAX Disruptors? I haven't studied the Akka library carefully enough to know whether this would be a good idea and potentially make the Actor model even more performant - I'd love to hear from people with experience on this.

  2. There are other data structures being designed for the "multicore age". One article worth reviewing was published in the ACM in March 2011. Currently you have to pay to get the article, but there is a version available on scribd last time I checked.


/*---[ Using The Actor Model to Implement SEDA ]---*/

An alternative that Matt Welsh (originator of the SEDA model) never discusses in his writings (at least the ones I've read) is the Erlang Actor model. In Erlang, actors communicate across processes by message passing. They can be arranged in any orientation desired, but one useful orientation would certainly be a staged pipeline.

There are a number of Actor libraries that bring the Erlang model to the JVM using threads rather than Erlang-type processes. The libraries likely have differences between them. I will focus here on the [Akka|http://akka.io/] library, since that is one I am familiar with. Akka is written in Scala, but is fully usable in Java itself.

Four of the key strengths of the Erlang/Akka model are:

  1. Each Actor is fully isolated from other actors (and other non-actor objects), even more so than regular objects. The only way to communicate with an Actor is via message passing, so it is a shared-nothing environment except for the messages, which must/should be immutable.

  2. As a set of queues, this encourages designing your application in terms of workflows, determining where stage boundaries are and where concurrent processing is needed or useful.

    • Akka allows you to do message passing like you would in a traditional ESB but with speed!
  3. It is built on a philosophy that embraces failure, rather than fighting it and losing. This is Erlang’s "Let It Crash" fault-tolerant philosophy using a worker/supervisor mode of programming: http://blogs.teamb.com/craigstuntz/2008/05/19/37819/

    • In this model you don’t try to carefully program to prevent every possible thing that can go wrong. Rather, accept that failures and crashes of actors/threads/processes (even whole servers in the Erlang model) will occur and let the framework (through supervisors) restart them for you

    • This is the key to the “5 nines” of reliability in the software of the telecom industry that Erlang was built for

  4. The Actor model is built for distributed computing. In Erlang’s model, Actors are separate processes. In Akka’s model they can either be in separate processes or in the same process.

    • Akka Actors are extremely lightweight, using only 600 bytes of memory (when bare) and consuming no resources when not actively processing a message. They typically are configured to share threads with other Actors, so many thousands, even millions of Actors can be spun up in memory on modern server architecture.
  5. Finally, Actors bring a lock free approach to concurrency. If you put any mutable state in the Actor itself, then it cannot be shared across threads, avoiding the evil of shared mutable state.

Programming with messaging passing to queues (or mailboxes as they are called in Akka) raises the level of abstraction. You can now think in terms of modules or module-like subsystems that can be designed to have a deterministic pathway. The internals of a system become easier to reason about, especially compared to a hand-rolled multi-threaded code using shared mutable state.

In addition, the Akka Actor model:

  • applies well to transactional systems, CPU-bound operations, or I/O bound operations (by using multiple concurrent Actors).
  • applies to either fully isolated actions (say number crunching or the "map" part of a MapReduce function) or to coordinated actions, but using isolated mutability, rather than shared mutability.
  • Has load balancing built in (in two ways):
    • An explicit load balancer you can create and send all message to (optional)
    • A dispatcher (required) behind the scenes that determines how events are passed and shared between Actors. For example, one Dispatcher is “work stealing”, having actors that are heavily loaded give work to actors that are less heavily loaded (in terms of items in their mailbox)

With these features, hopefully you can see how the Actor model fits a SEDA approach. Next I describe one way to do so.


(One Model For) Applying a SEDA Design to Actors

  1. Define one Actor class per Stage (http://www.slideshare.net/bdarfler/akka-developing-seda-based-applications)
  2. Use a shared dispatcher and load balancer
  3. Grab a reference to the Actors (or Load Balancer) of the next stage by looking it up in the Actor registry (set up automatically by the Akka framework).
  4. In cases where you need to throttle load by applying “backpressure” to requests, you can configure Actor mailboxes
    • Note: I need more information on exactly what this entails and when it would be useful, as I haven't researched this enough yet - feedback from readers welcome!
  5. The Actor benchmarks I have seen are very impressive – hundreds of thousands of messages per second, but as with SEDA in general, Akka provides many knobs and dials to adjust nearly everything
    • As with SEDA, it can be a two-edged sword – you have many knobs tweak to get better throughput in your system, but you may also have to do a higher amount of tweaking to get performance that you might get with by default with a more traditional programming model, which doesn't require so much fiddling.




I'll stop here for today. I have not yet hit on what I think is the main meaning of Event-Driven Architecture - that of inter-system EDA.

Another use of events is for "Event-Sourcing", which often fits hand-in-glove with the Command Query Responsibility Segregation (CQRS) pattern.

I will definitely do a write-up on EDA for another blog entry and possibly will dive into Event Sourcing and CQRS at a later time.

Feedback, disagreements, builds from readers with experience in these domains are welcome!




Key Links/Sources:

SEDA
http://www.eecs.harvard.edu/~mdw/proj/seda/
http://matt-welsh.blogspot.com/2010/07/retrospective-on-seda.html
http://www.theserverside.com/news/1363672/Building-a-Scalable-Enterprise-Applications-Using-Asynchronous-IO-and-SEDA-Model
http://www.slideshare.net/soaexpert/actors-model-asynchronous-design-with-non-blocking-io-and-seda
http://www.infoq.com/articles/SEDA-Mule


Akka
http://akka.io/
http://www.slideshare.net/bdarfler/akka-developing-seda-based-applications


Next entries in this blog series:
Events and Event-Driven Architecture: Part 2
Events and Event-Driven Architecture: Part 3

8 comments:

  1. SEDA is not something new, as well as Actor. Both are easy way to think about how things go concurrently. Another is STM. As you mentioned, they will have performance issues.
    GC! lock free data structure create lots of garbage too. only primitive type array can help.
    Disruptor will deploy the ring space at the beginning. Think about there are millions of actors. And I'm not sure it supports multi productor and consumer.
    And you need to make sure NIO everywhere.
    One word: JVM won't fix all multi-core age problem.

    ReplyDelete
  2. > I wonder if anyone is considering building an Actor model on a ring buffer or the LMAX Disruptors?

    That would be exactly what the LMAX people tried to circumvent. Actors and SEDA use queueing with lots of synchronization overhead.

    Disrupter does not really support multiple heterogeneous customers.

    IO in a disrupter system is better with normal io;
    The disrupter is all about keeping a thread running continuously from the cpu cache.

    ReplyDelete
  3. @iron9light - Your comment on lock free data structures creating a lot of garbage reminds of a comment made by Rich Hickey (creator of Clojure) at the end of one of his talks - namely that using immutable data structures (even his memory efficient ones using Tries) with STM puts a lot of pressure on the garbage collector. The topic of garbage collection is a very important one. We are now in the era when "memory is the new disk". 512GB to 2TB servers are feasible now. But unfortunately JVM (and .NET?) heap sizes of hundreds of terabytes are not because the garbage collectors available with HotSpot will have stop-the-world pauses as long as a couple of minutes for these types of heap sizes. If you're interested, take a look at Gil Tene's presentations on garbage collection on InfoQ. I may do a write up on these soon.

    For the Disruptor/RingBuffer, however, I'm not sure it would create any more garbage than a queue - it is a persistent (in memory) reused datastructure. The data being pushed and pulled off it would of course create high levels of garbage, but that would be true of queues as well.

    On whether the Disruptor/RingBuffer supports multiple producers and consumers: yes, to a point. You can have any number of consumers, though with too many your throughput will be slowed. The RingBuffer can only efficiently support as many producers as you have cores on your machine, otherwise you need to make a "network" of RingBuffers. But with multi-core now on a Moore's law increase path, this makes RingBuffers a very viable concurrency data structure for the future of computing (IMHO) :-)

    ReplyDelete
  4. @Erik van Oosten - I definitely agree that for throughputs that the LMAX guys need (6 million transactions per second!), you don't go with an Actor model, but opt for with Disruptor/RingBuffers as your programming model.

    However, what I was thinking about is since they have proven that queues are poorly performing under high concurrency environments, would it be worth considering using RingBuffers rather than queues as the Actor mailbox data structure? So in situations where using Actors is fine (i.e., you don't need 6 million tps), but still want the best possible throughput and lowest latency, perhaps replacing queues with RingBuffers would be a good idea?

    ReplyDelete
  5. Hello Michael. IMO, Disruptor can only support few stable channels(RingBuffer is GC efficient, but will deploy space as a full queue), but you should feel free to create and destroy a new actor. For a web server, every connection will create a new actor, or an event to the Disruptor. They are different Model.

    ReplyDelete
  6. @iron9light - That's a good point, but I think it depends on how you employ the Actor model. If you employing it in a SEDA fashion, then you could have Actors that are long-lived. They would be continually processing new messages coming into the system or rather into their part of the SEDA pipeline. In the system my team is currently building, we have servlet threads hand off work to a stable pool of Actors that are the second stage of the SEDA pipeline (and then there is another pool of Actors in the third stage of the pipeline). In that case, I could see doing this with Disruptors rather than Actors.

    ReplyDelete
  7. Hey Michael. Glad you liked my Akka/SEDA slides. The basic point to realize for implementing SEDA with Akka is that you need to configure a blocking mailbox with an infinite timeout. From there almost everything else falls in place. You just have to watch out for places that assume non blocking mailboxes (like the work stealing dispatcher).

    ReplyDelete