Ayende @ Rahien

Hi!
My name is Oren Eini
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:

ayende@ayende.com

+972 52-548-6969

, @ Q c

Posts: 6,206 | Comments: 46,145

filter by tags archive

ChallengeThe race condition in the TCP stack, answer

time to read 3 min | 410 words

In my previous post, I discussed a problem in missing data over TCP connection that happened in a racy manner, only every few hundred runs. As it turns out, there is a simple way to make the code run into the problem every single time.

The full code for the repro can be found here.

Change these lines:

image

And voila, you will consistently run into the problem .  Wait, run that by me again, what is going on here?

As it turns out, the issue is in the server, more specifically, here and here. We use a StreamReader to read the first line from the client, do some processing, and then hand it to the ProcessConnection method, which also uses a StreamReader. More significantly, it uses a different StreamReader.

Why is that significant? Well, because of this, the StreamReader has buffers, by default, that are 1KB in size. So here is what happens in the case above: we send a single packet to the server, and when the first StreamReader reads from the stream, it fills the buffer with the two messages. But since there is a line break between them, when we call ReadLineAsync, we actually only get the first one.

Then, we when get to the ProcessConnection method, we have another StreamReader, which also reads from the stream, but the second message had already been read (and is waiting in the first StreamReader buffer), so we are waiting for more information from the client, which will never come.

So how come it sort of works if we do this in two separate calls? Well, it is all about the speed. In most cases, when we split it into two separate calls, the server socket has only the first message in there when the first StreamReader runs, so the second StreamReader is successful in reading the second line. But in some cases, the client manages being fast enough and sending both messages to the server before the server can read them, and voila, we have the same behavior, only much more unpredictable.

The key problem was that it wasn’t obvious we were reading too much from the stream, and until we figured that one out, we were looking in a completely wrong direction. 

ChallengeThe race condition in the TCP stack

time to read 3 min | 463 words

Occasionally, one of our tests hangs. Everything seems to be honky dory, but it just freezes and does not complete. This is a new piece of code, and thus is it suspicious unless proven otherwise, but an exhaustive review of it looked fine. It took over two days of effort to narrow it down, but eventually we managed to point the finger directly at this line of code:

image

In certain cases, this line would simply not read anything on the server. Even though the client most definitely sent the data. Now, given that TCP is being used, dropped packets might be expected. But we are actually testing on the loopback device, which I expect to be reliable.

We spent a lot of time investigating this, ending up with a very high degree of certainty that the problem was in the TCP stack somewhere. Somehow, on the loopback device, we were losing packets. Not always, and not consistently, but we were absolutely losing packets, which led the server to wait indefinitely for the client to send the message it already did.

Now, I’m as arrogant as the next developer, but even I don’t think I found that big a bug in TCP. I’m pretty sure that if it was this broken, I would have known about it. Beside, TCP is supposed to retransmit lost packets, so even if there were lost packets on the loopback device, we should have recovered from that.

Trying to figure out what was going on there sucked. It is hard to watch packets on the loopback device in WireShark, and tracing just told me that a message is sent from the client to the server, but the server never got it.

But we continued, and we ended up with a small reproduction of the issue. Here is the code, and my comments are below:

This code is pretty simple. It starts a TCP server, and listens to it, and then it reads and writes to the client. Nothing much here, I think you’ll agree.

If you run it, however, it will mostly work, except that sometimes (anywhere between 10 runs and 500 runs on my machine), it will just hang. I’ll save you some time and let you know that there are no dropped packets, TCP is working properly in this case. But the code just doesn’t. What is frustrating is that it is mostly working, it takes a lot of work to actually get it to fail.

Can you spot the bug? I’ll continue discussion of this in my next post.

The high level interview question: Proposed solution

time to read 2 min | 332 words

Here is the original post, and now let us get down to solving it…

The key part of solving this issue is knowing that if you wait for the actual cluster change, there is very little that you can do. Technically speaking, if you have the keys, you can try to figure out all the ranges that fallunder the new nodes in the cluster, and then query on all those ranges. It will work, but anyone who answers that is going to be hit when there are multiple concurrent additions to the cluster. (Adding 1 node, then 3, then 5, etc). That is something that is incredibly common when you start going up, and if you having moved all the data yet, you don’t want to have to wait until you do that before you start responding to the current workload. There there are things like “what happens if we reboot midway through”, etc?

A much simpler alternative is to move some of the work to write time for each data item. We already need to compute which node a particular item is going to reside on, after all. What we are also going to compute is when that is going to change. And we are going to record that. When the cluster size grows above that size, we can simple query for all the data items that are going to be moved when the cluster size is higher. This way, we gain a couple of interesting properties.

We don’t need to worry about adding multiple nodes concurrently, just doing “WHERE NeedToMoveWhenSizeIsGreaterThan < :ClusterSize” is going to be enough to find all the data items that needs to be moved, it is resilient to restarts / errors, and can gracefully handle the multiple moves scenario.

Oh, and how to find the next cluster size where this particular data item is going to have to move? Well, I got to let the future candidates with googling skillz something to actually handle.

The high level interview question

time to read 3 min | 410 words

The following is likely to end up in the list of questions we’ll ask candidates to answer when they apply to Hibernating Rhinos.

Imagine a sharded database. A sharded database is one where the data is split among multiple nodes. To make things simple, we will assume that each datum in the database has a 64 bits key associated with it, and we are interested in distributing the information evenly among the nodes. This can be done using Jump Consistent Hashing (see paper for details), and can be implemented using the following simple C# function:

This function is responsible for taking a key and (given how many nodes there are in the cluster) provide which node this key resides on.

So far, so good, and this make quite a lot of things much simpler. This function ensures that roughly 1/N of the data items in the databases will require movement to a new node when it is introduced. Which is pretty much exactly what we want in a sharded environment. However, this function doesn’t help us figure out what to move.

Assume that we already have a database that has 5 nodes, and ten billion data items spread across all 5 nodes, spread according to the consistent jump function. Because of load, we need to add additional 2 nodes to the cluster, and we need to move 2/7 (2.8 billion data items) of the cluster data to the new nodes. However, since moving the data items alone is going to be costly, we want to avoid scanning through all 10 billion items in all nodes to figure out which ones we need to ship to the new nodes, and which ones should remain with the current node.

Suggest a way that will allow the database to find out which data items need to be moved to the new nodes, without having to scan through all of them. In other words, anything that requires O(number of items in each node) is out.

You are rated on the impact of your suggestion on the overall cluster behavior. The cheaper your option, the better. You are free to store additional information (at cluster creation / modification, as data items are entered into the cluster / deleted, etc) if this can help you, but be aware that any impact on standard operations (reads & writes) should be minimal and well justified.

You only need to consider adding nodes to the cluster, removing nodes from the cluster is not required.

Proposed solution to the low level interview question

time to read 3 min | 578 words

For the actual question, see the original post.

So the first thing that we need to decide is what will be the data format on the tire. Since we have only 32KB to work with, we need to consider the actual storage.

32KB is small enough to fit in a unsigned short, so all the references we’ll used will be shorts. We also need to store a bit of metadata, so we’ll use the first 4 bytes as the header for just that.

  • ushort SpaceUsed;
  • ushort LastAllocation;

Now that we have this, we need to decide how to store the actual data. To make things easy, we are going to define the following way to allocate memory:

This is about the simplest way that you can go about doing things, note that we use a length prefix value, and we limit allocations to a max of 127 bytes each. We use a negative size to indicate a delete marker.

So basically, now we have a pretty trivial way to allocate memory, and we can implement the trie as we would normally do. There are a few wrinkles, however.

Deleting the memory doesn’t actually make it eligible for reuse, and it is quite likely to get fragmented easily. In order to handle that, we will track the amount of space that is used, and if we got to the end of the space, we’ll check the UsedSpace value. If this is still too little, we can abort, there is no available space here. However, if we go to the end of the buffer, but we have free space available, we can do the following:

  • Scan the buffer for available spots (find available locations that have negative size).
  • Failing that, we will copy the data to a temporary buffer, then re-add everything to the buffer from scratch. In other words, we defrag it.

Another issue we have is that the maximum size we can allocate is 127. This value is big enough so most actual strings can fit into it nicely, but a trie already has the property that a large string might be broken into pieces, we’ll just cut each node in the trie to a max size of 127. Actually, the max size is likely to be less than that, because there is also some information that we need to keep track per entry.

  • byte NumberOfChildren;
  • byte Flags; // node type, (internal, leaf or both)
  • ushort ChildrenPosition;

So in practice we have about 123 bytes to work with for the length. Note that we don’t store the string value of the node’s length (we can get that from the allocation information), and that we store the actual children in an array that is stored separately. This allows us to easily add items to the trie as child nodes. If the node is a leaf node, we also need to store the actual value (which is 8 bytes), we store that information at the end of the value (giving us 115 bytes for that section of the value).

All in all, there is going to be a bit of pointer arithmetic and bit counting, but is likely to be a pretty simple implementation.

Note that additional optimizations would be to try align everything so it would fit into a cache line, trying to place nodes near their children (which are more likely to be followed), etc.

The low level interview question

time to read 2 min | 234 words

The following is likely to end up in the list of questions we’ll ask candidates to answer when they apply to Hibernating Rhinos.

We need to store information about keys and their location on disk. We need to implement a trie. We want the trie to store int64 size values and unbounded UTF8 string keys.

Given that, we have the following interface:

We need to implement that with the following properties:

  • The trie will be represented by single 32 kilobytes byte array.
    • You cannot store any additional information about the trie outside of the array.
  • The costs of searching in the trie in proportional to the size of the key.
  • There are no duplicates.
  • This is a single thread implementation.
  • If the array is full, it is fine to return false from the TryWrite()
  • Unsafe code is fine (but not required).
  • You cannot use any in memory structure other than the byte array. But it is fine to allocate memory during the processing of the operations (for example, to turn the string key into a byte array).

We will be looking at the following aspect in the implementation:

  • Correctness
  • Performance
  • Space used

The idea is that we can pack as much as possible into as small a space as possible, but at the same time, we can get great performance.

Dependencies management in a crisis

time to read 3 min | 480 words

Typically when people talk about dependencies they talk about how easy it is to version them, deploy them, change & replace them, etc. There seems to be a very deep focus on the costs of dependencies during development.

Today I want to talk about another aspect of that. The cost of dependencies when you have a crisis. In particular, consider the case of having a 2 AM support call that is rooted to one of your dependencies. What do you do then?

The customer see a problem in your software, so they call you, and you are asked to resolve it. After you narrowed the problem down to a particular dependency, you now need to check whatever this is your usage of the dependency that is broken, or whatever there is a genuine issue with the dependency.

Let us take a case in point with a recent support call we had. When running RavenDB on a Windows Cluster with both nodes sharing the same virtual IP, authentication doesn’t work. It took us a while to narrow it down to Windows authentication doesn’t work, and that is where we got stuck. Windows authentication is a wonderfully convenient  tool, but if there is an error, just finding out about it require specialized knowledge and skills. After verifying that our usage of the code looked correct, we ended up writing a minimal reproduction with about 20 lines of code, which also reproduced the issue.

At that point, we were able to escalate to Microsoft with the help of the customer. Apparently this is a Kerberos issue and you need to use NTLM and there was a workaround with some network configuration (check our docs if you really care about the details). But the key point here is that we would really have absolutely no way to figure it out on our own. Our usage of Windows authentication was according to the published best practices, but in this scenario you had to do something different to get it to work.

The point here is that if we weren’t able to escalate that to Microsoft, we would be in a pretty serious issue with the customer “we can’t fix this issue” is something that no one wants to hear.

As much as possible, we try to make sure that any dependencies that we take are either:

  • Stuff that we wrote and understand.
  • Open source* components that are well understood.
  • Have a support contract that we can fall back on, with the SLA we require.
  • Non essential / able to be disabled without major loss of functionality.

* Just taking OSS component from some GitHub repo is a bad idea. You need to be able to trust them, which means that you need to be sure that you can go into the code and either fix things or understand why they are broken.

Code review challengeThe concurrent dictionary refactoring–answer

time to read 4 min | 612 words

Here is the full method that we refactored:

 public void ReturnMemory(byte* pointer)
 {
     var memoryDataForPointer = GetMemoryDataForPointer(pointer);

     _freeSegments.AddOrUpdate(memoryDataForPointer.SizeInBytes, x =>
     {
         var newQueue = new ConcurrentStack<AllocatedMemoryData>();
         newQueue.Push(memoryDataForPointer);
         return newQueue;
     }, (x, queue) =>
     {
         queue.Push(memoryDataForPointer);
         return queue;
     });
 }

And here is the allocation map for this method:

public unsafe void ReturnMemory(byte* pointer)
{
    <>c__DisplayClass9_0 CS$<>8__locals0 = new <>c__DisplayClass9_0();
    CS$<>8__locals0.memoryDataForPointer = this.GetMemoryDataForPointer(pointer);
    this._freeSegments.AddOrUpdate(CS$<>8__locals0.memoryDataForPointer.SizeInBytes, 
new Func<int, ConcurrentStack<AllocatedMemoryData>>(CS$<>8__locals0.<ReturnMemory>b__0),
new Func<int, ConcurrentStack<AllocatedMemoryData>, ConcurrentStack<AllocatedMemoryData>>(CS$<>8__locals0.<ReturnMemory>b__1)); }

As you can see, we are actually allocating three objects here. One is the captured variables class generated by the compiler (<>c__DisplayClass9_0) and two delegate instances. We do this regardless of if we need to add or update.

The refactored code looks like this:

 public void ReturnMemory(byte* pointer)
 {
     var memoryDataForPointer = GetMemoryDataForPointer(pointer);

     var q = _freeSegments.GetOrAdd(memoryDataForPointer.SizeInBytes, size => new ConcurrentStack<AllocatedMemoryData>());
     q.Push(memoryDataForPointer);

 }

And what actually gets called is:

public unsafe void ReturnMemory(byte* pointer)
{
    Interlocked.Increment(ref this._returnMemoryCalls);
    AllocatedMemoryData memoryDataForPointer = this.GetMemoryDataForPointer(pointer);
    if(<>c.<>9__9_0 == null)
    {
        <>c.<>9__9_0 = new Func<int, ConcurrentStack<AllocatedMemoryData>>(this.<ReturnMemory>b__9_0);
    }
    this._freeSegments.GetOrAdd(memoryDataForPointer.SizeInBytes, <>c.<>9__9_0).Push(memoryDataForPointer);
}

The field (<>c.<>9__9_0) is actually a static field, so it is only allocated once. Now we have a zero allocation method.

Code review challengeThe concurrent dictionary refactoring

time to read 2 min | 216 words

In a recent code review, I had modified the following code:

_freeSegments.AddOrUpdate(memoryDataForPointer.SizeInBytes, x =>
{
   var newQueue = new ConcurrentQueue<AllocatedMemoryData>();
   newQueue.Enqueue(memoryDataForPointer);
   return newQueue;
}, (x, queue) =>
{
   queue.Enqueue(memoryDataForPointer);
   return queue;
});

Into this code:

var q = _freeSegments.GetOrAdd(memoryDataForPointer.SizeInBytes, 
                         size => new ConcurrentQueue<AllocatedMemoryData>());
q.Enqueue(memoryDataForPointer);

Can you tell me why?

FUTURE POSTS

  1. Stack arena memory allocation context - one day from now
  2. RavenDB Restorspective - 5 days from now

There are posts all the way to Sep 30, 2016

RECENT SERIES

  1. Voron internals (5):
    13 Sep 2016 - The diff is the way
  2. Database Building 101 (8):
    25 Aug 2016 - Graph querying over large datasets
  3. Production postmortem (16):
    23 Aug 2016 - The insidious cost of managed memory
  4. Digging into the CoreCLR (3):
    12 Aug 2016 - Exceptional costs, Part II
  5. The Guts n’ Glory of Database Internals (20):
    08 Aug 2016 - Early lock release
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats