Thursday, July 2, 2015

An Exercise in Profiling a Go Program

Recently, while working on my current project ogonori, a Go client for the OrientDB database, I found that I had a defect in the code that encodes and decodes integer values in the way that the OrientDB binary network protocol requires (namely zigzag encoding, followed by encoding that output as a variable length integer).

After fixing the issue, first with the encoder and then with the decoder, I decided that I should do an exhaustive test of all 64 bit integers: start with MinInt64 (-9223372036854775808), zigzag-encode it, varint encode it, then varint decode it and zigzag-decode it and you should get back the number you started with. Increment by 1 and try it again, until you reach MaxInt64 (9223372036854775807).

(Note: I only have to use the Min/Max range of signed integers, since OrientDB is a Java database and only allows signed ints.)

I ran a small range of the possible 64-bit integer space and found that doing this exhaustive test was going to take a very long time. Since I have 8 CPUs on my system, I decided to first parallelize the test into 8 separate goroutines, each taking 1/8 of the total range:

With this code, I spawn 8 threads, running 10 goroutines. Eight of them do the encoding/decoding test and if any integer encode/decode fails the test it is written to "failure channel" of type chan string, which the main goroutine monitors.

A sync.WaitGroup (a counting semaphore) is created and shared among the goroutines. When each "range tester" finishes, it calls Done() on the WaitGroup to decrement the semaphore. The final (nameless) goroutine waits until all "range tester" goroutines have finished and then closes the single shared failure channel.

Closing of the failure channel, causes the loop over that channel in the main goroutine to exit and the whole program finishes.


/* ---[ Performance Baseline ]--- */

I fired this up with the following smaller testrange:

    ranges := []testrange{
        {100000001, 150000001},
        {200000001, 250000001},
        {300000001, 350000000},
        {400000001, 450000000},
        {500000001, 550000000},
        {600000001, 650000000},
        {700000001, 750000000},
        {800000001, 850000000},
    }

and ran top. To my surprise I was only using ~400% CPU, rather than ~800% (the max my system supports):

$ top -d1
 PID USER      PR  ... S  %CPU %MEM     TIME+ COMMAND                                                       
1736 midpete+  20  ... S 420.9  0.0   1:31.33 ogonori

I then looked at the CPU usage of each thread using the -H option to top and saw that my 8 range-tester goroutines were each using only about 50% CPU. And that there was a 9th thread that was also consistently using 40 to 50% CPU. My guess was that this was a GC thread.

$ top -d1 -H
 PID USER      PR  ...  S %CPU %MEM     TIME+ COMMAND                                                        
1740 midpete+  20  ...  S 50.1  0.0   0:21.47 ogonori                                                        
1744 midpete+  20  ...  R 50.1  0.0   0:21.52 ogonori                                                        
1742 midpete+  20  ...  S 49.2  0.0   0:21.38 ogonori                                                        
1736 midpete+  20  ...  S 47.2  0.0   0:21.53 ogonori                                                        
1738 midpete+  20  ...  S 46.2  0.0   0:22.11 ogonori                                                        
1745 midpete+  20  ...  R 46.2  0.0   0:20.37 ogonori                                                        
1741 midpete+  20  ...  S 45.2  0.0   0:21.41 ogonori                                                        
1743 midpete+  20  ...  R 42.3  0.0   0:21.26 ogonori                                                        
1739 midpete+  20  ...  S 40.3  0.0   0:21.35 ogonori                                                        
1737 midpete+  20  ...  S  3.9  0.0   0:02.07 ogonori                  

So I have an algorithm that should be trivially parallelizable with no shared memory and no contention (in theory), but it was only using half the CPU available to it. Hmmm...

Next I ran the test on my system several times to get a baseline performance metric:

$ time ./ogonori -z  # the -z switch tells the ogonori code to only this
                     # benchmark rather than the usual OrientDB tests

Run1: real    3m44.602s
Run2: real    3m42.818s
Run3: real    3m28.917s

Avg ± StdErr:  218.8 ± 5 sec

Then I remembered I had not turned off the CPU power saving throttling on my Linux system (it was set to ondemand), so I ran the following script and repeated the benchmarks:

#!/bin/bash
for i in /sys/devices/system/cpu/cpu[0-7]
do
    echo performance > $i/cpufreq/scaling_governor
done

$ time ./ogonori -z
Run1: real    2m12.605s
Run2: real    2m12.382s
Run3: real    2m13.172s
Run4: real    2m18.992s
Run5: real    2m17.538s
Run6: real    2m14.437s

Avg ± StdErr:  134.9 ± 1 sec

Wow, OK. So that alone gave me about a 60% improvement in throughput. Off to a good start.


/* ---[ Profiling the Code ]--- */

If you've never read Russ Cox's 2011 blog post on profiling a Go program, put it on your list - it is a treat to read.

Using what I learned there, I profiled the zigzagExhaustiveTest code to see how and where to improve it.

$ ./ogonori -z -cpuprofile=varint0.prof

I then opened the .prof file with golang's pprof tool and looked at the top 10 most heavily used functions:

$ rlwrap go tool pprof ogonori xvarint0.prof
  # Using rlwrap gives you bash-like behavior and history

(pprof) top 10
171.48s of 255.92s total (67.01%)
Dropped 171 nodes (cum <= 1.28s)
Showing top 10 nodes out of 36 (cum >= 8.78s)
      flat  flat%   sum%        cum   cum%
    45.98s 17.97% 17.97%     45.98s 17.97%  scanblock
    25.63s 10.01% 27.98%     33.58s 13.12%  runtime.mallocgc
    19.20s  7.50% 35.48%    111.35s 43.51%  g/q/o/o/b/varint.ReadVarIntToUint
    14.94s  5.84% 41.32%     15.62s  6.10%  bytes.(*Buffer).grow
    12.44s  4.86% 46.18%     12.44s  4.86%  runtime.MSpan_Sweep
    11.87s  4.64% 50.82%     15.93s  6.22%  bytes.(*Buffer).Read
    11.33s  4.43% 55.25%     21.56s  8.42%  bytes.(*Buffer).WriteByte
    11.18s  4.37% 59.62%     11.18s  4.37%  runtime.futex
    10.13s  3.96% 63.57%     19.16s  7.49%  bytes.(*Buffer).Write
     8.78s  3.43% 67.01%      8.78s  3.43%  runtime.memmove

(pprof) top10 -cum
110.32s of 255.92s total (43.11%)
Dropped 171 nodes (cum <= 1.28s)
Showing top 10 nodes out of 36 (cum >= 25.50s)
      flat  flat%   sum%        cum   cum%
         0     0%     0%    147.62s 57.68%  runtime.goexit
     2.94s  1.15%  1.15%    147.49s 57.63%  main.func·018
    19.20s  7.50%  8.65%    111.35s 43.51%  g/q/o/o/b/varint.ReadVarIntToUint
         0     0%  8.65%     77.81s 30.40%  GC
    45.98s 17.97% 26.62%     45.98s 17.97%  scanblock
     4.90s  1.91% 28.53%     38.48s 15.04%  runtime.newobject
    25.63s 10.01% 38.55%     33.58s 13.12%  runtime.mallocgc
     6.65s  2.60% 41.15%     31.39s 12.27%  g/q/o/o/b/varint.VarintEncode
         0     0% 41.15%     30.48s 11.91%  System
     5.02s  1.96% 43.11%     25.50s  9.96%  encoding/binary.Read

We can see that a significant percentage of time (>30%) is being spent in GC, so the program is generating a lot of garbage somewhere - plus the cost of generating new heap data, which the runtime.mallocgc figure tells me is at least 13% of the program run time.

Remember that there are four steps to my algorithm:

  1. zigzag encode (varint.ZigzagEncodeUInt64)
  2. varint encode (varint.VarintEncode)
  3. varint decode (varint.ReadVarIntToUint)
  4. zigzag decode (varint.ZigzagDecodeInt64)

The zigzag encode/decode steps are simple bit manipulations, so they are fast. Typing web at the pprof prompt launches an SVG graph of where time was spent. The zigzag functions don't even show up - they were dropped off as being too small (not shown here).

So I needed to focus on steps 2 and 3 which take (cumulatively) 43.5% and 12.3%, respectively.

Since varint.ReadVarIntToUint is the biggest offender let's look at it in detail in the pprof tool:

I've marked the biggest time sinks with an arrow on the left side. Generally one should start with the biggest bottleneck, so let's rank these by cumulative time (2nd col):

->  32.41s   111:   err = binary.Read(&buf, binary.LittleEndian, &u)
->  16.83s    73:   n, err = r.Read(ba[:])
->  15.93s   106:   buf.WriteByte(y | z)
->  14.82s    88:   var buf bytes.Buffer
->   8.53s   110:   padTo8Bytes(&buf)

First, it is very interesting how expensive creating a bytes.Buffer is. But first we need to deal with binary.Read.

Because I'm only ever passing in uint64's, the only real functionality I'm using in this function is:

*data = order.Uint64(bs)


/* ---[ Optimization #1 ]--- */

But it's even worse. If you look back at varint.ReadVarIntToUint you'll see that I'm creating a bytes.Buffer and copying bytes into it only so that I can pass that Buffer (as an io.Reader) into the binary.Read function:

err = binary.Read(buf, binary.LittleEndian, &u)

which then immediately copies all those bytes back out of the buffer:

if _, err := io.ReadFull(r, bs); err != nil {
    return err
}

So this is nothing but wasteful data copying and the heap allocations for it.

binary.Read also does a type switch where a good percentage of time is spent

     2.01s      4.49s    151:       switch data := data.(type) {

and, as stated, the only useful method ever called in it is:

 -->  460ms      2.76s    167:          *data = order.Uint64(bs)

So I should try just calling binary.LittleEndian.Uint64(bs) directly.

Here's the revised varint.ReadVarIntToUint function (with everything inlined for easier reading and profiling analysis):

This change also removes the padTo8Bytes method that wrote one byte at a time to the bytes.Buffer and took >3% of program time itself.

Now let's rerun the benchmarks:

Run 1: real  0m27.182s
Run 2: real  0m27.053s
Run 3: real  0m28.200s
Run 4: real  0m25.762s
Run 5: real  0m26.031s
Run 6: real  0m26.813s

Avg ± StdErr:  26.8 ± 0.4 sec

Outstanding! Throughput increased 5x (134.9/26.8). And using top, I see that the goroutines are consuming nearly all available CPU:

$ top -d1
  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
12983 midpete+  20   0  352496   5768   2736 R 763.7  0.0   1:35.64 ogonori

$ top -d1 -H
  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                       
13231 midpete+  20   0  286960   5772   2744 R 97.5  0.0   0:22.51 ogonori                                                       
13225 midpete+  20   0  286960   5772   2744 R 91.7  0.0   0:22.47 ogonori                                                       
13227 midpete+  20   0  286960   5772   2744 R 90.7  0.0   0:23.09 ogonori                                                       
13232 midpete+  20   0  286960   5772   2744 S 90.7  0.0   0:22.26 ogonori                                                       
13235 midpete+  20   0  286960   5772   2744 R 90.7  0.0   0:09.72 ogonori                                                       
13230 midpete+  20   0  286960   5772   2744 R 88.7  0.0   0:22.14 ogonori                                                       
13233 midpete+  20   0  286960   5772   2744 R 73.1  0.0   0:22.70 ogonori                                                       
13228 midpete+  20   0  286960   5772   2744 R 71.2  0.0   0:22.39 ogonori                                                       
13229 midpete+  20   0  286960   5772   2744 R 70.2  0.0   0:23.09 ogonori 

I also used pprof to profile this run, so let's examine compare the cumulative top10 before and after:

Before (reprinted from above):

(pprof) top10 -cum
110.32s of 255.92s total (43.11%)
Dropped 171 nodes (cum <= 1.28s)
Showing top 10 nodes out of 36 (cum >= 25.50s)
      flat  flat%   sum%        cum   cum%
         0     0%     0%    147.62s 57.68%  runtime.goexit
     2.94s  1.15%  1.15%    147.49s 57.63%  main.func·018
    19.20s  7.50%  8.65%    111.35s 43.51%  g/q/o/o/b/varint.ReadVarIntToUint
         0     0%  8.65%     77.81s 30.40%  GC
    45.98s 17.97% 26.62%     45.98s 17.97%  scanblock
     4.90s  1.91% 28.53%     38.48s 15.04%  runtime.newobject
    25.63s 10.01% 38.55%     33.58s 13.12%  runtime.mallocgc
     6.65s  2.60% 41.15%     31.39s 12.27%  g/q/o/o/b/varint.VarintEncode
         0     0% 41.15%     30.48s 11.91%  System
     5.02s  1.96% 43.11%     25.50s  9.96%  encoding/binary.Read

After:

(pprof) top15 -cum
63680ms of 65970ms total (96.53%)
Dropped 33 nodes (cum <= 329.85ms)
Showing top 15 nodes out of 18 (cum >= 930ms)
      flat  flat%   sum%        cum   cum%
    2280ms  3.46%  3.46%    64470ms 97.73%  main.func·018
         0     0%  3.46%    64470ms 97.73%  runtime.goexit
   17760ms 26.92% 30.38%    34190ms 51.83%  g/q/o/o/b/varint.ReadVarIntToUint
    5890ms  8.93% 39.31%    26370ms 39.97%  g/q/o/o/b/varint.VarintEncode
    8550ms 12.96% 52.27%    16360ms 24.80%  bytes.(*Buffer).Write
    9080ms 13.76% 66.03%    11500ms 17.43%  bytes.(*Buffer).Read
    1460ms  2.21% 68.24%     7550ms 11.44%  runtime.newobject
    4370ms  6.62% 74.87%     6090ms  9.23%  runtime.mallocgc
    5650ms  8.56% 83.43%     5650ms  8.56%  runtime.memmove
    4580ms  6.94% 90.37%     4580ms  6.94%  bytes.(*Buffer).grow
     680ms  1.03% 91.41%     1630ms  2.47%  bytes.(*Buffer).Reset
    1500ms  2.27% 93.68%     1500ms  2.27%  encoding/binary.littleEndian.Uint64
         0     0% 93.68%     1030ms  1.56%  GC
     950ms  1.44% 95.12%      950ms  1.44%  bytes.(*Buffer).Truncate
     930ms  1.41% 96.53%      930ms  1.41%  runtime.gomcache

More good news. In the previous version, GC was taking 30% of the total CPU time. Now, more than 90% of the time is now being spent in the two main workhorse methods: varint.ReadVarIntToUint and varint.VarintEncode. GC time has been reduced to 1.5%!

I suspect the reason that goroutines in the earlier code version only took 40-50% of a CPU is because GC was the contention point. Garbage Collection in golang is a stop-the-world affair, so all other threads are paused until it finishes. By reducing GC to only 1.5%, now the range-testing goroutines can spend far more time running - approaching 100%.


/* ---[ Optimization #2 ]--- */

Are there further improvements we can make? Since the program now spends 40% of its time in varint.VarintEncode, let's look at that function in detail:

Almost 75% of the time in this function is spent writing to the io.Writer (a bytes.Buffer). We write one byte at a time to it. Perhaps it would be better to write it all to a byte slice first and then issue one w.Write.

The new code is then:

And the next round of benchmarks are:

real    0m38.899s
real    0m45.135s
real    0m38.047s
real    0m42.377s
real    0m32.894s
real    0m37.962s
real    0m38.926s
real    0m37.870s

Avg ± StdErr: 39.0 ± 1.2

Hmm, not good. It looks like this second revision caused my code to go backwards in performance by 30%. To be sure, I reverted the change and re-ran the benchmarks with only optimization #1 again: they returned to the ~25s/run timeframe I saw before. So it is true: this second change made things worse.

And the analysis of top agreed: the goroutines were no long using 90%+ CPU:

$ top -d1
  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                      
22149 midpete+  20   0  286960   5776   2744 R 593.9  0.0   1:06.66 ogonori 

$ top -d1 -H
  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                       
22205 midpete+  20   0  229620   7812   2744 R 74.6  0.0   0:10.68 ogonori                                                       
22201 midpete+  20   0  229620   7812   2744 R 73.6  0.0   0:10.14 ogonori                                                       
22202 midpete+  20   0  229620   7812   2744 S 71.6  0.0   0:10.77 ogonori                                                       
22207 midpete+  20   0  229620   7812   2744 S 70.6  0.0   0:10.97 ogonori                                                       
22206 midpete+  20   0  229620   7812   2744 R 68.7  0.0   0:11.14 ogonori                                                       
22204 midpete+  20   0  229620   7812   2744 R 65.7  0.0   0:10.07 ogonori                                                       
22199 midpete+  20   0  229620   7812   2744 R 56.9  0.0   0:10.98 ogonori                                                       
22203 midpete+  20   0  229620   7812   2744 R 53.0  0.0   0:11.19 ogonori                                                       
22197 midpete+  20   0  229620   7812   2744 R 43.2  0.0   0:11.17 ogonori                                                       
22200 midpete+  20   0  229620   7812   2744 S 17.7  0.0   0:09.95 ogonori                                                       
22198 midpete+  20   0  229620   7812   2744 S  3.9  0.0   0:00.77 ogonori         

Let's look at the pprof data for revision #2:

(pprof) top10 -cum
59.31s of 86.44s total (68.61%)
Dropped 92 nodes (cum <= 0.43s)
Showing top 10 nodes out of 24 (cum >= 5.36s)
      flat  flat%   sum%        cum   cum%
     1.95s  2.26%  2.26%     74.02s 85.63%  main.func·018
         0     0%  2.26%     74.02s 85.63%  runtime.goexit
    20.71s 23.96% 26.21%     44.80s 51.83%  g/q/o/o/b/varint.ReadVarIntToUint
     5.57s  6.44% 32.66%     24.39s 28.22%  g/q/o/o/b/varint.VarintEncode
       15s 17.35% 50.01%     18.71s 21.65%  bytes.(*Buffer).Read
     5.64s  6.52% 56.54%     13.46s 15.57%  runtime.makeslice
         0     0% 56.54%      8.39s  9.71%  GC
     5.86s  6.78% 63.32%      8.23s  9.52%  runtime.mallocgc
     2.48s  2.87% 66.18%      7.82s  9.05%  runtime.newarray
     2.10s  2.43% 68.61%      5.36s  6.20%  bytes.(*Buffer).Write

Now GC is back up to nearly 10% of the total running time. So let's look at the profile of the VarintEncode function we changed:

(pprof) list VarintEncode
Total: 1.44mins
   5.57s     24.39s (flat, cum) 28.22% of Total
       .          .     40://
   290ms      290ms     41:func VarintEncode(w io.Writer, v uint64) error {
   550ms     14.01s     42: bs := make([]byte, 0, 10)
   170ms      170ms     43: for (v & 0xffffffffffffff80) != 0 {
   2.04s      2.04s     44:     bs = append(bs, byte((v&0x7f)|0x80))
   320ms      320ms     45:     v >>= 7
       .          .     46: }
   680ms      680ms     47: bs = append(bs, byte(v&0x7f))
       .          .     48:
   1.20s      6.56s     49: n, err := w.Write(bs)
   120ms      120ms     50: if err != nil {
       .          .     51:     return oerror.NewTrace(err)
       .          .     52: }
       .          .     53: if n != len(bs) {
       .          .     54:     return fmt.Errorf("Incorrect number of bytes written. Expected %d. Actual %d", len(bs), n)
       .          .     55: }
   200ms      200ms     56: return nil
       .          .     57:}

We can see that 58% of the time of this method is spent allocating new memory (the []byte slice on line 42), thereby causing GC to take longer. Here's why - if you look at the implementation of bytes.Buffer, you'll see that it has a fixed bootstrap array it allocates to handle small buffers and another fixed byte array (runeBytes) to handle writes to WriteByte; both of these allow it to avoid memory allocation for small operations.

Since my test code is reusing the same bytes.Buffer for each iteration, no new allocations were occurring during each call to varint.VarintEncode. But with this second revision I'm creating a new byte slice of capacity 10 in each round. So this change should be reverted.


/* ---[ Lessons Learned ]--- */

When you have an algorithm that you think should be CPU bound and your threads are not using ~100% CPU, then you have contention somewhere. In many scenarios that will be IO wait. But if you have no IO in that portion of your app, then you either have hidden thread contention (mutexes) and/or you may have a lot of garbage collection happening, which pauses all your worker threads/goroutines while GC is happening. Use the pprof tool to determine where time is being spent.

For performance sensitive algorithms, you will want to be garbage free in the main path as much as possible.

Once you know where the time is going, you should generally go after the largest bottleneck first. There's always a primary bottleneck somewhere. Removing the bottleneck in one place causes it to move to another. In my case, I wanted that bottleneck to just be CPU speed (or as is often the case, the time to get data from main memory or a CPU cache into a register).

A big lesson learned here is to be wary of convenience methods in Go's standard library. Many are provided for convenience, not performance. The binary.Read(buf, binary.LittleEndian, &u) call in my case is one such example. The third parameter to binary.Read is of type interface{}, so a type switch has to be done to detect the type. If your code is only ever passing in one type (uint64 in my case), then go read the stdlib code and figure out if there is a more direct method to call. That change contributed to a 5x throughput improvement in my case!

Next, be careful of too much data copying. While the io.Writer is a nice interface, if you are working with byte slices and want to pass it to some stdlib method that requires io.Writer, you will often copy the data into a bytes.Buffer and pass that in. If the function you call copies those bytes back out to yet another byte slice, then garbage is being generated and time is being wasted. So be aware of what's happening in the methods you call.

Finally, always measure carefully before and after any attempted optimizations. Intuition about where bottlenecks are and what will speed things up are often wrong. The only thing of value is to measure objectively. To end I'll quote "Commander" Pike:

Rule 1. You can't tell where a program is going to spend its time. Bottlenecks occur in surprising places, so don't try to second guess and put in a speed hack until you've proven that's where the bottleneck is. --Rob Pike's 5 Rules of Programming


/* ---[ Misc Appendix Notes ]--- */

The overall int64 space will still take too long to run even with these improvements, so I've settled for sampling from the state space instead.

All benchmark comparisons done were statistically significant (p<0.01) using Student's t-test, as analyzed with this tool: http://studentsttest.com. The mean and standard errors were also calculated here.

I notice that even with my best optimization (#1), there is still a ninth thread using >70% CPU. I used kill -QUIT on the program to get a stack dump of all the goroutines. I get 10 go routines - the 8 doing the fnRangeTester work, one waiting on the WaitGroup and the main goroutine which is waiting on the range failchan line. So I'm not sure what that 9th thread is doing churning up 50-70% CPU. Anyone know how to tell?


[Update - 08-July-2015]

In the comments, Carlos Torres asked for the pprof line-by-line output of the ReadVarIntToUint function after the first optimization. I did two profiling runs and compared the pprof outputs and they were both nearly identical. Here is one of them:

(pprof) list ReadVarIntToUint
Total: 1.13mins
ROUTINE ======================== g/q/o/o/b/varint.ReadVarIntToUint 
 18.20s     35.45s (flat, cum) 52.08% of Total
      .          .     25://
  480ms      480ms     26:func ReadVarIntToUint(r io.Reader) (uint64, error) {
      .          .     27:    var (
  270ms      270ms     28:        varbs []byte
  120ms      3.84s     29:        ba    [1]byte
      .          .     30:        u     uint64
      .          .     31:        n     int
  180ms      180ms     32:        err   error
      .          .     33:    )
      .          .     34:
  260ms      260ms     35:    varbs = make([]byte, 0, 10)
      .          .     36:
      .          .     37:    /* ---[ read in all varint bytes ]--- */
      .          .     38:    for {
  3.84s     15.72s     39:        n, err = r.Read(ba[:])
  530ms      530ms     40:        if err != nil {
      .          .     41:            return 0, oerror.NewTrace(err)
      .          .     42:        }
   10ms       10ms     43:        if n != 1 {
      .          .     44:            return 0, oerror.IncorrectNetworkRead{Expected: 1, Actual: n}
      .          .     45:        }
  3.21s      3.21s     46:        varbs = append(varbs, ba[0])
  980ms      980ms     47:        if IsFinalVarIntByte(ba[0]) {
  570ms      570ms     48:            varbs = append(varbs, byte(0x0))
      .          .     49:            break
      .          .     50:        }
      .          .     51:    }
      .          .     52:
      .          .     53:    /* ---[ decode ]--- */
      .          .     54:
      .          .     55:    var right, left uint
      .          .     56:
  620ms      620ms     57:    finalbs := make([]byte, 8)
      .          .     58:
      .          .     59:    idx := 0
  1.08s      1.08s     60:    for i := 0; i < len(varbs)-1; i++ {
  360ms      360ms     61:        right = uint(i) % 8
   20ms       20ms     62:        left = 7 - right
  230ms      230ms     63:        if i == 7 {
      .          .     64:            continue
      .          .     65:        }
  840ms      840ms     66:        vbcurr := varbs[i]
  900ms      900ms     67:        vbnext := varbs[i+1]
      .          .     68:
  120ms      120ms     69:        x := vbcurr & byte(0x7f)
  670ms      670ms     70:        y := x >> right
  670ms      670ms     71:        z := vbnext << left
  650ms      650ms     72:        finalbs[idx] = y | z
  780ms      780ms     73:        idx++
      .          .     74:    }
      .          .     75:
  540ms      2.19s     76:    u = binary.LittleEndian.Uint64(finalbs)
  270ms      270ms     77:    return u, nil
      .          .     78:}

If you compare it to the pprof before the optimization, the top half looks about the same, but the bottom half is dramatically different. For example, more than 30s was spent in binary.Read(&buf, binary.LittleEndian, &u) in the original version. The replacement code, binary.LittleEndian.Uint64(finalbs), only takes up about 2 seconds of processing time.

The only remaining spot I see for any further optimization is the 15s (out of 35s) spent in r.Read(ba[:]). The problem, however, is that with a varint you don't know how many bytes long it is in advance, so you have read and examine them one at a time. There is probably a way to optimize this, but I haven't attempted it yet.

Sunday, June 7, 2015

Merkle Tree

I hit upon the need this week to do checkpointing in a data processing system that has the requirement that no data event can ever be lost and no events can be processed and streamed out of order. I wanted a way to auto-detect this in production in real time.

There are a couple of ways to do this, but since our data events already have a signature attached to them (a SHA1 hash), I decided that a useful way to do the checkpoint is basically keep a hash of hashes. One could do this with a hash list, where a chain of hashes for each data element is kept and when a checkpoint occurs the hash of all those hashes in order is taken.

A disadvantage of this model is if the downstream system detects a hash mismatch (either due to a lost message or messages that are out-of-order) it would then have to iterate the full list to detect where the problem is.

An elegant alternative is a hash tree, aka a Merkle Tree named after its inventor Ralph Merkle.


/* ---[ Merkle Trees ]--- */

Merkle trees are typically implemented as binary trees where each non-leaf node is a hash of the two nodes below it. The leaves can either be the data itself or a hash/signature of the data.

Thus, if any difference at the root hash is detected between systems, a binary search can be done through the tree to determine which particular subtree has the problem. Thus typically only log(N) nodes need to be inspected rather than all N nodes to find the problem area.

Merkle trees are particularly effective in distributed systems where two separate systems can compare the data on each node via a Merkle tree and quickly determine which data sets (subtrees) are lacking on one or the other system. Then only the subset of missing data needs to be sent. Cassandra, based on Amazon's Dynamo, for example, uses Merkle trees as an anti-entropy measure to detect inconsistencies between replicas.

The Tree Hash EXchange format (THEX) is used in some peer-to-peer systems for file integrity verification. In that system the internal (non-leaf) nodes are allowed to have a different hashing algorithm than the leaf nodes. In the diagram below IH=InternalHashFn and LH=LeafHashFn.

The THEX system also defines a serialization format and format for dealing with incomplete trees. The THEX system ensures that all leaves are at the same depth from the root node. To do that it "promotes" nodes. That is when a parent only has one child, it cannot does not take a hash of the child hash; instead it just "inherits" it. If that is confusing, think of the Merkle tree as being built from the bottom up: all the leaves are present and hashes of hashes are built until a single root is present.

Notation: The first token is a node label, followed by a conceptual value for the hash/signature of the node. Note that E, H and J nodes all have the same signature, since they only have one child node.


/* ---[ Merkle Tree as Checkpoint Data ]--- */

I recently published my implementation of a Merkle Tree: https://github.com/quux00/merkle-tree

Before I describe the implementation, it will help to see the use case I'm targeting.

The scenario above is a data processing pipeline where messages flow in one direction. All the messages that come out of A go into B and are processed and transformed to some new value-added structure and sent on to C. In between are queues to decouple the systems.

Throughput needs to be as high as possible and every message that comes out of A must be processed by B and sent to C in the same order. No data events can be lost or reordered. System A puts a signature (a SHA1 hash) into the metadata of the event and that metadata is present on the message event that C receives.

To ensure that all messages are received and in the correct order, a checkpoint is periodically created by A, summarizing all the messages sent since the last checkpoint. That checkpoint message is put onto the Queue between A and B; B passes it downstream without alteration so that C can read it. Between checkpoints, system C keeps a running list of all the events it has received so that it can compute the signatures necessary to validate what it has received against the checkpoint message that periodically comes in from A.


/* ---[ My Implementation of a Merkle Tree ]--- */

The THEX Merkle Tree design was the inspiration for my implementation, but for my use case I made some simplifying assumptions. For one, I start with the leaves already having a signature. Since THEX is designed for file integrity comparisons, it assumes that you have segmented a file into fixed size chunks. That is not the use case I'm targeting.

The THEX algorithm "salts" the hash functions in order to ensure that there will be no collisions between the leaf hashes and the internal node hashes. It concatenates the byte 0x01 to the internal hash and the byte 0x00 to the leaf hash:

internal hash function = IH(X) = H(0x01, X)
leaf hash function = LH(X) = H(0x00, X)

It is useful to be able to distinguish leaf from internal nodes (especially when deserializing), so I morphed this idea into one where each Node has a type byte -- 0x01 identifies an internal node and 0x00 identifies a leaf node. This way I can leave the incoming leaf hashes intact for easier comparison by the downstream consumer.

So my MerkleTree.Node class is:

static class Node {
  public byte type;  // INTERNAL_SIG_TYPE or LEAF_SIG_TYPE
  public byte[] sig; // signature of the node
  public Node left;
  public Node right;
}


/* ---[ Hash/Digest Algorithm ]--- */

Since the leaf nodes are being passed in, my MerkleTree does not know (or need to know) what hashing algorithm was used on the leaves. Instead it only concerns itself with the internal leaf node digest algorithm.

The choice of hashing or digest algorithm is important, depending if you want to maximize performance or security. If one is using a Merkle tree to ensure integrity of data between peers that should not trust one another, then security is paramount and a cryptographically secure hash, such as SHA-256, Tiger, or SHA-3 should be used.

For my use case, I was not concerned with detecting malicious tampering. I only need to detect data loss or reordering, and have as little impact on overall throughput as possible. For that I can use a CRC rather than a full hashing algorithm.

Earlier I ran some benchmarks comparing the speed of Java implementations of SHA-1, Guava's Murmur hash, CRC32 and Adler32. Adler32 (java.util.zip.Adler32) was the fastest of the bunch. The typical use case for the Adler CRC is to detect data transmission errors. It trades off reliability for speed, so it is the weakest choice, but I deemed it sufficient to detect the sort of error I was concerned with.

So in my implementation the Adler32 checksum is hard-coded into the codebase. But if you want to change that we can either make the internal digest algorithm injectable or configurable or you can just copy the code and change it to use the algorithm you want.

The rest of the code is written to be agnostic of the hashing algorithm - all it deals with are the bytes of the signature.


/* ---[ Serialization / Deserialization ]--- */

My implementation has efficient binary serialization built into the MerkleTree and an accompanying MerkleDeserializer class that handles the deserialization.

I chose not to use the Java Serialization framework. Instead the serialize method just returns an array of bytes and deserialize accepts that byte array.

The serialization format is:

(magicheader:int)(numnodes:int)
[(nodetype:byte)(siglength:int)(signature:[]byte)]

where (foo:type) indicates the name (foo) and the type/size of the serialized element. I use a magic header of 0xcdaace99 to allow the deserializer to be certain it has received a valid byte array.

The next number indicates the number of nodes in the tree. Then follows an "array" of numnodes size where the elements are the node type (0x01 for internal, 0x00 for leaf), the length of the signature and then the signature as an array of bytes siglength long.

By including the siglength field, I can allow leaf nodes signatures to be "promoted" to the parent internal node when there is an odd number of leaf nodes. This allows the internal nodes to use signatures of different lengths.


/* ---[ Usage ]--- */

For the use case described above, you can imagine that system A does the following:

List<String> eventSigs = new ArrayList<>();

while (true) {
  Event event = receiveEvent();
  String hash = computeHash(event);
  // ... process and transmit the message to the downstream Queue
  sendToDownstreamQueue(hash, event);

  eventSigs.add(has);

  if (isTimeForCheckpoint()) {
    MerkleTree mtree = new MerkleTree(eventSigs);
    eventSigs.clear();
    byte[] serializedTree = mtree.serialize();
    sendToDownstreamQueue(serializedTree);
  }
}

And system C would then do something like:

List<String> eventSigs = new ArrayList<>();

while (true) {
  Event event = receiveEvent();

  if (isCheckpointMessage(event)) {
    MerkleTree mytree = new MerkleTree(eventSigs);
    eventSigs.clear();

    byte[] treeBytes = event.getDataAsBytes();
    MerkleTree expectedTree = MerkleDeserializer.deserialize(treeBytes);
    byte[] myRootSig = mytree.getRoot().sig;
    byte[] expectedRootSig = expectedTree.getRoot().sig;
    if (!signaturesAreEqual(myRootSig, expectedRootSig)) {
      evaluateTreeDifferences(mytree, expectedTree);
      // ... send alert
    }

  } else {
    String hash = event.getOriginalSignature();
    eventSigs.add(hash);
    // .. do something with event
  }
}