Tri state waiting with async tcp streams
We recently had the need to develop a feature that requires a client to hold a connection to the server and listen to a certain event. Imagine that we are talking about a new document arriving to the database.
This led to a very simple design:
- Open a TCP connection and let the server know about which IDs you care about.
- Wait for any of those IDs to change.
- Let the client know about it.
Effectively, it was:
Unfortunately, this simple design didn’t quite work. As it turns out, having a dedicated TCP connection per id is very expensive, so we would like to be able to use a single TCP connection in order to watch multiple documents. And we don’t know about all of them upfront, so we need to find a way to talk to the server throughout the process. Another issue that we have is the problem of steady state. If none of the documents we care about actually change for a long time, there is nothing going on over the network. This is going to lead the TCP connection to fail with a timeout.
Actually, a TCP connection that passes no packets is something that is perfectly fine in theory, but the problem is that it requires resource that that need to be maintained. As long as you have systems that are not busy, it will probably be fine, but the moment that it reaches the real world, the proxies / firewalls / network appliances along the way use a very brutal policy, “if I’m not seeing packets, I’m not maintaining the connection”, and it will be dropped, usually without even a RST packet. That makes debugging this sort of things interesting.
So our new requirements are:
- Continue to accept IDs to watch throughout the lifetime of the connection.
- Let the client know of any changes.
- Make sure that we send the occasional heartbeat to keep the TCP connection alive.
This is much more fun to write, to be honest, and the code we ended up with was pretty, here it is:
There are a couple of things to note here. We are starting an async read operation from the TCP stream without waiting for it to complete, and then we go into a loop and wait for one of three options.
- A document that we watched has been changed (we are notified about that by the documentChanged task completion), in which case we notify the user. Note that we first replace the documentChanged task and then we drain the queue from all pending documents changes for this collection, after which we’ll go back to waiting. On the doc changed event, we first enqueue the document that was changed, and then complete the task. This ensure that we won’t miss anything.
- New data is available from the client. In this case we read it and add it to the IDs we are watching, while starting another async read operation (for which we’ll wait on the next loop iteration). I’m creating a new instance of the IDs collection here to avoid threading issues, and also because the number of items is likely to be very small and rarely change. If there were a lot of changes, I would probably go with a concurrent data structure, but I don’t think it is warranted at this time.
- Simple timeout.
Then, based on which task has been completed, we select the appropriate behavior (send message to client, accept new doc ID to watch, send heartbeat, etc).
The nice thing about this code is that errors are also handled quite nicely. If the client disconnects, we will get an error from the read, and know that it happened and exit gracefully (well, we might be getting that just when we are writing data to the client, but that is pretty much the same thing in terms of our error handling).
The really nice thing about this code is that for the common cases, where there isn’t actually anything for us to do except maintain the TCP connection, this code is almost never in runnable state, and we can support a very large number of clients with very few resources.
Comments
This is really nice.
Task.Delay(1000) is a resource leak, though. It leaks a timer for 1 second. Note, that timer completion processing CPU time is proportional to the number of timers currently running. It's internally O(N). It can happen once every 15ms. At 100k tasks per second you can see 6M such operations per second burning CPU time. See the latest comment at https://github.com/dotnet/coreclr/issues/6155.
I don't understand how disconnects are supposed to work. When the client gracefully disconnects newIdToWatch would become null forever and ReadLineAsync would instantly complete. This would turn this code into an infinite busy loop. Also, a null is added to idsToWatch.
Tobi, Yes, I'm aware of the issue with the timer, see the next couple of posts :-)
Disconnect will throw an error, which will be caught. We assume here that the stream will actually error here, not just return null (which is the case for network stream if you try to read from it after disconnect
Are you sure, NS will throw? Normally, Stream's are defined to return 0 forever after an orderly shutdown. The word "disconnect" is ambiguous. A reset will throw, a shutdown will not.
Prints 0,0,0,0 after Google gives up with a timeout.
Is OnDocumentChanged called on one thread at a time?
Is there a technical reason for choosing
Interlocked.Exchange
overVolatile.Read
+Volatile.Write
here? I like theInterlocked.Exchange
version because the code looks cleaner and is easier to maintain if you don't have to make every read volatile, but I'm curious whether there's a perf or correctness difference between these options.OmariO, Yes, it is only called on a single thread.
HellBrick, Volatile would require me to do that on every call site, while Interlocked allows me to do it only when I'm modifying this. It is cheaper this way mentally for me.1
Tobi, You are correct, interesting, in my tests, I must have aborted the connection all the time. The real code already handle this with end of stream exception, so nothing changed in this regards
Theoretically, you need to read idsToWatch with an acquire barrier. Due to not doing that it would be legal to always receive the first value that was set on that thread which might be null or the initial set. Also, the set contents might be weird and in an undefined state. Without an acquire the release store is meaningless.
Comment preview