Ayende @ Rahien

Oren Eini aka Ayende Rahien CEO of Hibernating Rhinos LTD, which develops RavenDB, a NoSQL Open Source Document Database.

Get in touch with me:

oren@ravendb.net

+972 52-548-6969

Posts: 7,173 | Comments: 50,177

Privacy Policy Terms
filter by tags archive
time to read 10 min | 1949 words

For a database engine, controlling the amount of work that is being executed is a very important factor for the stability of the system. If we can’t control the work that is being done, it is quite easy to get to the point where we are overwhelmed.

Consider the case of a(n almost) pure computational task, which is completely CPU bound. In some cases, those tasks can easily be parallelized. Here is one such scenario:

This example seems pretty obvious, right? This is complete CPU bound (sorting), and leaving aside that sort itself can be done in parallel, we have many arrays that we need to sort. As you can imagine, I would much rather this task to be done as quickly as possible.

One way to make this happen is to parallelize the work. Here is a pretty simple way to do that:

Parallel.ForEach(arrayOfArrays, array => Array.Sort(array));

A single one liner and we are going to see much better performance from the system, right?

Yep, this particular scenario will run faster, depending on the sizes, that may be much faster. However, that single one liner is a nasty surprise for the system stability as a whole. What’s the issue?

Under the covers, this is going to use the .NET thread pool to do the work. In other words, this is going to be added to the global workload on the system. What else is using the same thread pool? Well, pretty much everything. For example, processing requests in Kestrel is done in the thread pool, all the async machinery uses the thread pool under the covers as well as pretty much everything else.

What is the impact of adding a heavily CPU bounded work to the thread pool, one may ask? Well, the thread pool would start executing these on its threads. This is heavy CPU work, so likely will run for a while, displacing other work. If we consider a single instance of this code, there is going to be a limit of the number of concurrent work that is placed in the thread pool. However, if we consider whatever we run the code above in parallel… we are going to be placing a lot of work on the thread pool. That is going to effectively starve the rest of the system. The thread pool will react by spawning more threads, but this is a slow process, and it is easy to get into a situation where all available threads are busy, leaving nothing for the rest of the application to run.

From the outside, it looks like a 100% CPU status, with the system being entirely frozen. That isn’t actually what is going on, we are simply holding up all the threads and can’t prioritize the work between request handling (important) and speeding up background work (less important). The other problem is that you may be running the operation in an otherwise idle system, and the non parallel code will utilize a single thread out of the many that are available.

In the context of RavenDB, we are talking here about indexing work. It turns out that there is a lot of work here that is purely computational. Analyzing and breaking apart text for full text search, sorting terms for more efficient access patterns, etc. The same problem above remains, how can we balance the indexing speed and the overall workload on the system?

Basically, we have three scenarios that we need to consider:

  1. Busy processing requests – background work should be done in whatever free time the system has (avoiding starvation), with as little impact as possible.
  2. Busy working on many background tasks – concurrent background tasks should not compete with one another and step on each other’s toes.
  3. Busy working on a single large background task – which should utilize as much of the available computing resources that we have.

For the second and third options, we need to take into account that the fact that there isn’t any current request processing work doesn’t matter if there is incoming work. In that case, the system should prioritize the request processing over background work.

Another important factor that I haven’t mentioned is that it would be really good for us to be able to tell what work is taking the CPU time. If we are running a set of tasks on multiple threads, it would be great to be able to see what tasks they are running in a debugger / profiler.

This sounds very much like a class in operating systems, in fact, scheduling work is a core work for an operating system. The question we have here, how do we manage to build a system that would meet all the above requirements, given that we can’t actually schedule CPU work directly.

We cannot use the default thread pool, because there are too many existing users there that can interfere with what we want. For that matter, we don’t actually want to have a dynamic thread pool. We want to maximize the amount of work we do for computational heavy workload. Instead, for the entire process, we will define a set of threads to handle work offloading, like this:

This creates a set of threads, one for each CPU core on the machine. It is important to note that these threads are running with the lowest priority, if there is anything else that is ready to run, it will get a priority. In order to do some background work, such as indexing, we’ll use the following mechanism:

Because indexing is a background operation in RavenDB, we do that in a background thread and we set the priority to below normal. Request process threads are running at normal priority, which means that we can rely on the operating system to run them first and at the same time, the starvation prevention that already exists in the OS scheduler will run our indexing code even if there is extreme load on the system.

So far, so good, but what about those offload workers? We need a way to pass work from the indexing to the offload workers, this is done in the following manner:

Note that the _globalWorkQueue is process wide. For now, I’m using the simple sort an array example because it make things easier, the real code would need a bit more complexity, but it is not important to explain the concept. The global queue contains queues for each task that needs to be run.

The index itself will have a queue of work that it needs to complete. Whenever it needs to start doing the work, it will add that to its own queue and try to publish to the global queue. Note that the size of the global queue is limited, so we won’t use too much memory there.

Once we published the work we want to do, the indexing thread will start working on the work itself, trying to solicit the aim of the other workers occasionally. Finally, once the indexing thread is done process all the remaining work, we need to wait for any pending work that is currently being executed by the workers. That done, we can use the results.

The workers all run very similar code:

In other words, they pull a queue of tasks from the global tasks queue and start working on that exclusively. Once they are done processing a single index queue to completion, the offload worker will try pick another from the global queue, etc.

The whole code is small and fairly simple, but there are a lot of behavior that is hidden here. Let me try to explore all of that now.

The indexing background work push all the work items to its local queue, and it will register the queue itself in the global queue for the offloading threads to process. The indexing queue may be registered multiple times, so multiple offloading threads will take part in this. The indexing code, however, does not rely on that and will also process its own queue. The idea is that if there are offloading threads available, they will help, but we do not rely on them.

The offloading thread, for its part, will grab an indexing thread queue and start processing all the items from the queue until it is done. For sorting arbitrary arrays, it doesn’t matter much, but for other workloads, we’ll likely get much better locality in terms of task execution by issuing the same operation over a large set of data.

The threads priority here is also important, mind. If there is nothing to do, the OS will schedule the offloading threads and give them work to do. If there is a lot of other things happening, it will not be scheduled often. This is fine, we are being assisted by the offloading threads, they aren’t mandatory.

Let’s consider the previous scenarios in light of this architecture and its impact.

If there are a lot of requests, the OS is going to mostly schedule the request processing threads, the indexing threads will also run, but it is mostly going to be when there is nothing else to do. The offload threads are going to get their chance, but mostly that will not have any major impact. That is fine, we want most of the processing power to be on the request processing.

If there is a lot of work, on the other hand, the situation is different. Let’s say that there are 25 indexes running and there are 16 cores available for the machine. In this case, the indexes are going to compete with one another. The offloading threads again not going to get a lot of chance to run, because there isn’t anything that adding more threads in this context will do. There is already competition between the indexing threads on the CPU resources. However, the offloading threads are going to be of some help. Indexing isn’t pure computation, there are enough cases where it needs to do I/O, in which case, the CPU core is available. The offloading threads can then take advantage of the free cores (if there aren’t any other indexing threads that are ready to run instead).

It is in the case that there is just a single task running on a mostly idle system that this really shines. The index is going to submit all its work to the global queue, at which point, the offloading threads will kick in and help it to complete the task much sooner than it would be able to other wise.

There are other things that we need to take into account in this system:

  • It is, by design, not fair. An index that is able to put its queue into the global queue first may have all the offloading threads busy working on its behalf. All the other threads are on their own. That is fine, when that index will complete all the work, the offloading threads will assist another index. We care about overall throughput, not fairness between indexes.
  • There is an issue with this design under certain loads. An offloading thread may be busy doing work on behalf of an index. That index completed all other work and is now waiting for the last item to be processed. However, the offloading thread has the lower priority, so will rarely get to run. I don’t think we’ll see a lot of costs here, to be honest, that requires the system to be at full capacity (rare, and admins usually hate that, so prevent it) to actually be a real issue for a long time. If needed, we can play with thread priorities dynamically, but I would rather avoid it.

This isn’t something that we have implemented, rather something that we are currently playing with. I would love to hear your feedback on this design.

time to read 2 min | 363 words

A few days ago I posted about looking at GitHub projects for junior developer candidates. One of the things that is very common in such scenario is to see them use string concatenation for queries, I hate that. I just reached to a random candidate GitHub profile right now and found this gem:

The number of issues that I have with this code is legion.

  • Not closing the connection or disposing the command.
  • The if can be dropped entirely.
  • And, of course, the actual SQL INJECTION vulnerability in the code.

There is a reason that I have such a reaction for this type of code, even when looking at junior developer candidates. For them, this is acceptable, I guess. They are learning and focusing mostly on what is going on, not the myriad of taxes that you have to pay in order to get something to production. This is never meant to be production code (I hope, at least). I’m not judging this on that level. But I have to very consciously remind myself of this fact whenever I run into code like this (and as I said, this is all too common).

The reason I have such a visceral reaction to this type of code is that I see it in production systems all too often. And that leads to nasty stuff like this:

And this code led to a 70GB data leak on Gab. The killer for me that this code was written by someone with 23 years of experience.

I actually had to triple check what I was seeing when I read the code the first time, because I wasn’t sure that this is actually possible. I thought maybe this is some fancy processing done to avoid SQL injection, not that this is basically string interpolation.

Some bugs are things that you can excuse. A memory leak or a double free are things that will happen to anyone who is writing in C, regardless of experience and how careful they write. They are often subtle and easy to miss, happening in corner cases of error handling.

This sort of bug is a big box of red flags. It is also on fire.

time to read 2 min | 398 words

I run into an interesting scenario the other day. As part of the sign in process, the application will generate a random token that will be used to authenticate the following user requests. Basically, an auth cookie. Here is the code that was used to generate it:


This is a pretty nice mechanism. We use cryptographically secured random bytes as the token, so no one can guess what this will be.

There was a serious issue with this implementation, however. The problem was that on application startup, two different components would race to complete the signup. As you can see from the code, this is meant to be done in such as a way that two such calls within the space of one hour will return the same result. In most cases, this is exactly what happens. In some cases, however, we got into a situation where the two calls would race each other. Both would load the document from the database at the same time, get a(n obviously) different security token and write it out, then one of them would return the “wrong” security token.

At that point, it meant that we got an authentication attempt that was successful, but gave us the wrong token back. The first proposed solution was to handle that using a cluster wide transaction in RavenDB. That would allow us to ensure that in the case of racing operations, we’ll fail one of the transactions and then have to repeat it.

Another way to resolve this issue without the need for distributed transactions is to make the sign up operation idempotent. Concurrent calls at the same time will end up with the same result, like so:

In this case, we generate the security token using the current time, rounded to an hour basis. We use Argon2i (a password hashing algorithm) to generate the required security token from the user’s own hashed password, the current time and some pepper to make it impossible for outsiders to guess what the security token is even if they know what the password is. By making the output predictable, we make the rest of the system easier.

Note that the code above is till not completely okay. If the two request can with millisecond difference from one another, but on different hours, we have the same problem. I’ll leave the problem of fixing that to you, dear reader.

time to read 9 min | 1604 words

imageI was talking to a developer recently and had a really interesting discussion around the notion of consistency. For simplicity’s sake, let’s imagine that we are talking about a game and we need to deal with awarding achievements.

The story in question begins with a seemingly innocent business requirement:

We want to announce a unique achievement in the game. The first player to kill 10,000 rabbits will get a unique class-appropriate item.

Conceptually, that means that we need to write the following code:

The code is simple, easy and trivial. I wish we could end the post at this point, but as you can imagine, the situation is a bit more complex.

Consider the typical architecture of a game, we have the game world, which is composed of multiple servers, often located in different parts of the world. In this case, we can assume that we have three data centers, in separate continents.

image

Notice that we have a world first scenario here? That means that we need to synchronize the state across all of them in some manner, including when there are issues in the network, failures, etc.

For that matter, when we are talking about an achievement for a character, we can at least be sure that there is a single chain of events that we can follow, but what happens if we were to apply this achievement to a guild? In this case, multiple players may be competing to complete this achievement. If we allow to to also run on different servers (and regions), the situation now become quite complex.

Are there are technical ways to resolve this? Of course there are.

We can use distributed transactions to handle this, at first glance. However, that introduce an utterly unworkable spanner in the works (See what I did here Smile). Games care a lot about performance and latency, forcing us to run a globally distributed transaction on every rabbit kill is not a possible solution. There are stiff performance costs associated with it. For that matter, in a partial failure scenario, you still want to allow players to kill rabbits, even if you lost some servers in another continent. That lead to several additional scenarios that you need to cover:

  • What happened if a player killed rabbits but wasn’t able to record that using the world scope state? Do we also store that in a character / server level state? What happens if we already awarded the achievement and then find out that there are delayed updates in the mix?
  • What happens when two players kill their 10,000th rabbit at times T1 and T2 (where T2 > T1), but this happens near enough to one another that the write about the 2nd player is committed first?
    • Note that this can happen on the same server, on the same region or across different regions.
  • What happens when two players kill their 10,000th rabbit at the same instant?
    • Note that this can happen on the same server, on the same region or across different regions.
    • Note that this can actually happen at different times, but clock divergence will say it happened at the same time
  • What happens when a player kill their 10,000 rabbit, but we had a network hiccup and couldn’t record the action in time?

In fact, there are a multitude of issues that we have to deal with, if we accept the scenario as is. And the impact on the entire system because of the unstated requirements of a single achievement are huge.

That is likely not what the intended result is. We can do some minimal changes to the system and get pretty much what we want at a much reduced cost.  We start by giving up the implicit assumption that we have to award the achievement for the 10,000th within the same tick as the actual kill.  If we give up this requirement, it means that we have a far more relaxed environment to deal with.

We can say that we’ll process the achievements at the end of the next hour, for example. That gives us enough time to get updates from the rest of the system, settle the dust and avoid millisecond level decision making processes. In almost all businesses, there is no such thing as a race condition. The best example of that is “the check is in the mail”. The payment date for a check is not when you cashed it but when you posted it. My uncle used to go to the post office at 4:50 PM on a Friday to post checks. They would get the right time stamp, but would sit in the post office over the weekend before actually being delivered. That gave his 3 – 5 extra days before the check was cashed.

In the case of our game, giving us the time for things to settle make sense. It also make sense from a marketing and community perspective. World wide announcements don’t just happen, they can be scheduled for a particular time frame to maximize impact. Delaying the announcement of such an achievement make it a lot easier from a technical perspective but also give us more business level benefits to work with.

In many situation, we jump to assumptions about the kind of requirements that we have to meet, but usually there is a lot more flexibility. And discovering where you can get some slack can be of tremendous value.

Possible reaction from the business in this scenario can be:

  • We don’t actually care if this is exact. We want to give the achievement when this happens but it is okay if there is a little fudge factor and the “wrong” person won if there is a tiny amount of time difference.
  • It is actually fine if two players get the achievement when it happens at the “same” time.
  • Oh, I didn’t realize that this is so hard. Can we make it a server-wide achievement, then? Would that be easier?

Any of those responses will translate to a great simplification of what we have to deliver. In the first case, we can track the state of bunny killing without the use of distributed transactions and only apply a distributed transaction when we get to the 10,000th rabbit. That is going to be a rare event, so it is fine to get to pay a little extra there.

For that matter, a good question is to ask is how important the operation is. What happened when we have a failure in the distributed transaction when we record the 10,000th rabbit? I’m not talking about someone else getting there first, I”m talking bout a network hiccup or a faulty wire that cause the operation to fail. Do we retry? Do we output an error (and if so, to where?) or just ignore this? Do we need to have a secondary mechanism for checking for errors in the process?

The answer is that it depends, you can get into effectively infinite loop of trying to solve ever more unlikely scenarios. The question is what is the impact on the system at large and is it worth the cost?

For a game achievement, the answer is probably no (but then again, I’m not a gamer). People usually draw the line at money as the place where they’ll do their utmost to avoid issues.  This is a strange scenario, because money specifically has so many ways that you can run compensating actions as part of the normal workflow that you shouldn’t bend yourself into a pretzel to avoid that.

Let’s say that we are offering three unique mounts in our game, selling them for 9.99$. We obviously have to deal with race conditions here. There are only three mounts, but there are more users who want it and are willing to pay for the privilege. We are dealing with money here, so we can assume that we want to be careful about that, the question is, how careful?

We have three mounts, but more users. The payment process itself takes at least a few seconds, so how do you deal with it?

  • Throw the purchase attempts into a queue.
  • Pull the first three offers from the queue and attempt to charge them.
  • If charging failed, we mark the purchase attempt as errored and move on to the next item in the queue.
  • Once we successfully processed three purchases, we mark all the other purchase attempts as failed.

Simple, right?

What happens when a charge took longer than expected and a request timed out, but the charge actually happened? This can happen if you have SMS authentication in the process. So the card company will send a message to the card holder and they have to send an SMS back to approve the transaction. That can take long enough that the transaction will time out. But the user did authorize the transaction, funds were transferred, but you already sold the unique mount to someone else, with worse credit card security features.

What happens then? You can refund the money, provide another unique mount, etc.

The key here is that even for something that may consider critical, the number of failure modes is too high. Trying to handle them and ensuring a consistent world is usually too expensive. It is far better to have the hooks in place to handle failures and apply compensations. They are far rarer and much easier to deal with in isolation.

When you start seeing that something happens on a frequent basis, that is when you want to figure out a way to automate that failure mode.

time to read 5 min | 925 words

Unless I get good feedback / questions on the other posts in the series, this is likely to be the last post on the topic. I was trying to show what kind of system and constraints you have to deal with if you wanted to build a social media platform without breaking the bank.

I talked about the expected numbers that we have for the system, and then set out to explain each part of it independently. Along the way, I was pretty careful not to mention any one particular technological solution. We are going to need:

  • Caching
  • Object storage (S3 compatible API)
  • Content Delivery Network
  • Key/value store
  • Queuing and worker infrastructure

Note that the whole thing is generic and there are very little constraints on the architecture. That is by design, because if your architecture can hit the lowest common denominator, you have a lot more freedom. Instead of tying yourself to a particular provider, you have a lot more freedom. For that matter, you can likely set things up so you can have multiple disparate providers without too much of a hassle.

My goal with this system was to be able to accept 2,500 posts per second and to handle reads of 250,000 per second. This sounds like a lot, but a most of the load is meant to be handled by CDN and the infrastructure, not the core servers. Caching in a social network is somewhat problematic, since you’ll have a lot of the work is obviously personalized. That said, there is still quite a lot that can be cached, especially the more popular posts and threads.

If we’ll assume that only about 10% of the reading load hits our servers, that is 25,000 reads per second. If we have just 25 servers for handling this (assuming five each in five separate data centers) we can accept the load at 1,000 requests per second. On the one hand, that is a lot, but on the other hand…. most of the cost is supposed to be about authorization, minor logic, etc. We can also at this point add more application servers and scale linearly.

Just to give some indication of costs, a dedicated server with 8 cores & 32 GB disk will cost 100$ a month, and there is no charge for traffic. Assuming that I’m running 25 of these, that will cost me 2,500 USD a month. I can safely double or triple that amount without much trouble, I think.

Having to deal with 1,000 requests per server is something that requires paying attention to what you are doing, but it isn’t really that hard, to be frank. RavenDB can handle more than a million queries a second, for example.

One thing that I didn’t touch on, however, which is quite important, is the notion of whales. In this case, a whale is a user that has a lot of followers. Let’s take Mr. Beat as an example, he has 15 million followers and is a prolific poster. In our current implementation, we’ll need to add to the timeline of all his followers every time that he posts something. Mrs. Bold, on the other hand, has 12 million followers. At one time Mr. Beat and Mrs. Bold got into a post fight. This looks like this:

  1. Mr. Beat: I think that Mrs. Bold has a Broccoli’s bandana.
  2. Mrs. Bold: @mrBeat How dare you, you sniveling knave
  3. Mr. Beat: @boldMr2 I dare, you green teeth monster
  4. Mrs. Bold: @mrBeat You are a yellow belly deer
  5. Mr. Beat: @boldMr2 Your momma is a dear

This incredibly witty post exchange happened during a three minute span. Let’s consider what this will do, given the architecture that we outlined so far:

  • Post #1 – written to 15 million timelines.
  • Post #2 - 5 – written to the timelines of everyone that follows both of them (mention), let’s call that 10 million.

That is 55 million timeline writes to process within the span of a few minutes. If other whales also join in (and they might) the number of writes we’ll have to process will sky rocket.

Instead, we are going to take advantage of the fact that only a small number of accounts are actually followed by many people. We’ll place the limit at 10,000 followers. At which point, we’ll no longer process writes for such accounts. Instead, we’ll place the burden at the client’s side. The code for showing the timeline will then become something like this:

In other words, we record the high profile users in the system and instead of doing the work for them on write, we’ll do that on read. The benefit of doing it in this manner is that the high profile users tiimeline reads will have very high cache utilization.

Given that the number of high profile people you’ll follow are naturally limited, that can save quite a lot of work.

The code above can be improved, of course, there are usually a lot of difference in the timeline posts, so we may have a high profile user that is off for a day or two, they shouldn’t show up in the current timeline and can be removed entirely. You need to do a bit more work around the time frames as well, which means that timeline should also allow us to query itself by most recent post id, but that is also not too hard to implement.

And with that, we are at the end. I think that I covered quite a few edge cases and interesting details, and hopefully that was interesting for you to read.

As usual, I really appreciate any and all feedback.

time to read 3 min | 551 words

A social media platform has to deal with the concept of now and its history. For the most part, most users are interacting with the current state of the system. Looking at their timeline, watching current posts, etc.  At the same time, there is a wealth of information that you can get from looking at the past.

It isn’t out of the question that you’ll have users diving into the history of posts of another user going as far back as possible. That can be a parent whose kids just left the house, looking at baby pictures or it can be a new friend, trying to learn some interesting tidbits before a party (when we still had those).

It can also be automated processes, such as: “5 years ago you posted…”

The architecture that I presented in these posts is relatively agnostic for such a scenario. Given the timeline feature, going back in time means that you can fairly easily discriminate based on age. Older sections in the timeline can be moved to lower class storage tier (basically, move to HDD instead of NVMe, for example). They are still accessible, still available, but far cheaper to store.

I don’t believe that you can usually go with an archive tier level for the timelines, not unless you are willing to effectively be unable to access them if a user requests it, but a policy of moving old and rarely used timeline sections and posts to HDD is absolutely doable. Note that things like intelligent tiering is not a good solution for our needs. That would move items based on age and access, but while we want to move items by age, older items are still access, just far more rarely, so we don’t want to move them back into hot storage if they are rarely accessed.

That said, certain posts are likely to generate active for a long time. So we can’t just send data to cold storage just based on age. Need to also take into account the recent access patterns. On the other hand, consider a post a few years ago that talks about Broccoli, when people still did that. Mr. Beat discovers that Mrs. Bold has such a post and blast it all over social media. Very quickly that old post become very active. That means that we should have a way to move data back to hot storage if there is enough access.

Ideally, we can rely on the underlying storage to do that for us, but we have to know how it actually works behind the scenes and understand what is actually going on there. The nice thing about this is that unlike most of the details we discussed so far, that is something that we can punt down the road, we already have the architecture in place that will allow us to introduce this cost savings measure down the line, we don’t have to have it figured out from day one. Given the fact that we have multi level caches, that means that we can probably just age out old information to cold storage and not usually have to think about it too much.

When we have enough data that this is a serious concern, on the other hand… we will have the time and resources to also handle it.

time to read 3 min | 555 words

Quite a few of the features that we consider native to social media actually came about as a result of users’ behavior, not pre-planned actions. For example, the #tagging and @mentions were both created by users and then adopted as an official action by the social media giants.

I already touched on how mentioned are handled, as part of writing the document itself, we insert the post id to the timeline of the mentioned user. For tagging, the process is very similar. Each tag has a timeline, and we can insert posts into the tag’s timeline. From there on, the process is basically identical for what we already describe.

I want to stop for a second and emphasis the coolness factor of a significant feature being handle (in the backend) via simply:

There is probably UI work to do here, but that is roughly all you’ll need to manage tags. Presumably you’ll want some better policies, but that is the core behavior you’ll need to support this sort of feature.

What about searching? Full search indexes are nothing new and you can get an off the shelve solution to manage your searches easily enough. That is likely to be one of the annoying pieces of the system. Luckily, we can usually handle things by offering two tiers of searching. We have the first tier, which cover posts in the recent past (a month or two) which must have very high speed queries and then we have full data search, for which we have a lot longer SLA. By far most queries are going to be hitting the recent data set, which makes the task itself easier. The actual choice of indexing solution and its usage is fairly irrelevant at this point. You’ll need something that is distributed, but there is enough variety there that you can get away with selecting pretty much anything.

We aren’t going to need to provide sophisticated full text search features, we just want users to be able to find results by text queries.

You’ll note that throughout this series of posts, I’m not trying to find novel ways to get the best solution. I’m using practical options for the actual use case presented and in many cases, I can get away with a lot by changing the requirements just slightly. For that matter, a lot of the limitations that I accept are real limitations that you’ll find with other social media networks as well.

Finally, I just wanted to show how we can enable basic search capability using minimal amount of code, given the infrastructure we have so far:

As you can see, we built a simple full text search here. To query it, you get the timeline for a particular term and get the list of post that it has.

For tags and searches, as you can imagine, this can be a huge list, which is partly why timelines are built on the concept of sections that can be so easily distributed.

The solution above isn’t actually a good one for full text search. I can’t easily turn that into a search by a phrase, and there are many other features that I’ll likely want to have, but that is a good example of how the infrastructure that we built for one part of the system can be utilized for completely different purpose.

time to read 4 min | 623 words

I touched briefly on the issue of posts statistics in a previous post, but it deserve its own post. There are all sort of metrics that we want to track on a post. Here are just a few of them:

image

Unlike most of the items that we discussed so far, these details are going to be very relevant for both reads and writes. In particular, it is very common for these numbers to be update concurrently, especially when talking about the popular posts. At the simplest level, these can be represented as a map<key, int64>. That gives us the maximum flexibility for our needs and can be also utilized in the future for additional use cases.

Given that this is effectively a distributed counter problem, there are all sort of ways that we can handle this. At the client level, we send the increment operation to the server and manually update the value. That gets us 90% there in terms of the UX factors, but there is a lot to handle this behind the scenes.

A good algorithm to use for this is the PN Counters model from the CRDT playbook. RavenDB implements these for you, for example. In essence, that means that we have the following data model:

The likes and replies object has a property per each node that increment a value. That contains the value that we have for that node as well as the etag for this change. It is easy to merge such a model between different versions, because we can always take value of the higher etag to get the latest value. In this way, we can allow concurrent and distributed updates across the entire system and it will resolve itself in the end to the right value. Another option may be to push the commands all way to the owning data center, where we’ll apply the operations, but that may add a high load on hot posts in the system. Better to distribute this globally and not really concern ourselves with the matter.

Looking at Twitter, there are about 200 billion tweets a year. That means that we have to be ready for quite a few of those values. Having that in a dedicated system is a good idea, since it has far different read & write skew than other parts of our system. As part of reading of posts, however, we’ll likely want to build some mechanism for pushing those counters to the post itself so we can remove that from the rest of the system. An easy way to handle that is to do some on an hourly basis. So instead of the format above, we’ll have:

Here we have the last two hours of updates of operations on the post. Once every hour we’ll consolidate all the updates from two hours ago and write them to the post itself. When we get to the point where we have no more updates in the post, we can safely delete the value.

The reason you want to add this complexity is that there is a big difference between all the posts in a social media and the active working set. That tends to be far smaller value and can dramatically reduce the amount of data we need to keep and manage. Assuming that the working set is at 25 millions posts or so across the network seems reasonable, and that amount of data can be easily handle by any server instance you care to use. Managing 200 billion per year, on the other hand, puts us in a different class of problem, and we’ll need more and more resources down the line.

time to read 5 min | 834 words

In the series so far, we talked about reading and writing posts, updating the timeline and distributing it, etc. I talked briefly about the challenges of caching data when we have to deal with updates in the background, but I haven’t really touched on that. Edits and updates are a pain to handle, because they invalidate the cache, which is one of the primary ways we can scale cheaply.

We need to consider a few different aspects of the problem. What sort of updates do we have in the context of a social media platform? We can easily disable editing of posts, of course, but you have to be able to support deletes. A user may post about Broccoli, which is verboten and we have to be able to remove that. And of course, users will want to be able to delete their own post, let’s their salad tendencies come back to hunt them in the future. Another reason we need to handle updates is this:

image

We keep track of the interactions of the post and we need to update them as they change. In fact, in many cases we want to update them “live”. How are we going to handle this? I discussed the caching aspects of this earlier, but the general idea is that we have two caching layers in place.


image

An user getting data will first hit the CDN, which may cache the data (green icon) and then the API, which will get the data from the backend. The API endpoint will query its own local state and other pieces of the puzzle are responsible for the data distribution.

When changes happen, we need to deal with them, like so:

image

Each change means that we have to deal with a policy decision. For example, a deletion to a post means that we need to go and push an update to all the data centers to update the data. The same is relevant for updates to the post itself. In general, updating the content of the post or updating its view counts aren’t really that different. We’ll usually want to avoid editing the post content for non technical reasons, not for lack of ability to do so.

Another important aspect to take into account is latency and updates. Depending on the interaction model with the CDN, we likely have it setup to cache data based on duration, so API requests are cached for a period of a few seconds to a minute or two. That is usually good enough to reduce the load on our servers and still retain good enough level of updates.

Another advantage that we can use is the fact that when we get to high numbers, we can reduce the update rate. Consider:

image

We now need to update the post only once every 100,000 likes or shares or once for 10,000 replies. Depending on the rate of change, we can skip that if this happened recently enough. That is the kind of thing that can reduce the load curve significantly.

There is also the need to consider live updates. Typically, that means that we’ll have the client connected via web socket to a server and we need to be able to tell it that a post has been updated. We can do that using the same cache update mechanism. The update cache command is placed on a queue and the web socket servers process messages from there. A client will indicate what post ids it is interested in and the web socket server will notify it about such changes.

The idea is that we can completely separate the different pieces in the system. We have the posts storage and the timeline as one system and the live updates as a separate system. There is some complexity here about cache usage, but it is actually better to assume that stuff will not work than to try for cache coherency.

For example, a client may get an update that a particular post was updated. When it query for the new post details, it gets a notice that it wasn’t modified. This is a classic race condition issue which can case a lot of trouble for the backend people to eradicate. If we don’t try, we can simply state that on the client side, getting a not modified response after an update note is not an error. Instead, we need to schedule (on the client) to query the post again after the cache period elapsed.

A core design tenant in the system is to assume failure and timing issues, to avoid having to force a unified view of the system, because that is hard. Punting the problem even just a little bit allows us a much better architecture.

time to read 12 min | 2245 words

In the series of posts so far, when discussing reads, I punted the part where we know what to read. I mentioned that we get a whole batch of post ids from some where and discussed how that is going to work, but that was it. Now I want to talk exactly on how this works.

The timeline concept is a fairly simple one. We have a list of posts ids that the user goes through. As they are browsing through the list, items are added at the top, etc. This is basically the Twitter model. Another alternative is that as you scroll, if there are new items in the list, you are shown them before older values (the Facebook approach), but that is more complex.

Conceptually, the timeline is as simple as:

In other words, we just have a list of post ids, we add items at the end and as we scroll we keep track of where we started. That is sufficient to get us pretty much all the features that we want, surprisingly enough.

When you go to the home page and look at your timeline, you’ll typically start with whatever the latest value there, we’ll record the last position we saw and then start scrolling backward in time. In other words, if we have 10,000 items in the timeline, we’ll record that the position we started at was 10,000 and then start going back toward zero. If there are new items, the size of the list will increase and we can jump back to the top, etc.

That is simple enough, but how does this actually help us? That may be good if I wanted to see the public timeline of the entire network, but what about the actual features. I don’t care that a restraint in Prague is now offering discounted deliveries, for example. I care about the accounts that I follow.

The idea is that we don’t have just a single such timeline, but many. In fact, pretty much all operations in the social platform can be represented using the timeline abstraction.

Let’s consider typical usage of an account. I’m adding posts, but I also want to be able to see people talking to me or about tags that I follow. How is that going to work?

Well, I’m actually going to have two timelines:

  • Public Timeline – where we’ll add all the posts from the user, and maybe posts that mention / reply, etc.
  • Private Timeline – where we’ll add posts from users that you follow, mentions, replies to discussion the user took part of, etc.

In both cases, the behavior of the system is identical. We simply go through the list.  If you’ll recall, I left a lot unsaid when I discussed writing posts. In particular, how do I publish them to interested parties. This is where we start to apply policies. Part of the process of adding a post is to figure out what timelines it should go to.

By moving most of the cost to the write side, we drastically reduce the overall complexity. Furthermore, it also make a lot more sense, given that most posts aren’t going to have a wide blast radius.

When posting a message, we need to consider the following:

  • Is this a high impact account? (Let’s say, > 50,000 followers) If so, we’ll have special behaviors.
  • Who is following this user?
  • Is this a reply to another post?
  • Are there mentions on this post? If so, need to apply the logic based on the mention policies.

As you can imagine, this is quite involved, but in general, the way it will work is something like this:

The key here is the whole manner in which this works is done via selecting what we’ll publish to. Further more, you can see that a user have multiple timelines, and in the case of a mention, we can apply additional policies to see how the post gets routed. This is complex and often changing, but it also happens a lot less often than reads. So it is a net benefit to move all the costs to the write side.

Another thing to notice in this case is how we handle a reply. A reply it just appended to the timeline of the post. In other words, it is timelines all the way down. We want to have a single simple abstraction to handle as much of the system as we want. In this case, we need to handle replies on a post and that can be anything from very few to hundreds of thousands. By generating a timeline for the post as well, we can reuse all the same behaviors and it just works.

As for the timeline itself? It is merely a queue of post ids, and it allows you to set a position in it in an efficient manner. The list of post ids above is how it works conceptually, but we have to think about the numbers here. How big can a timeline get?

  • The personal timeline of a user is limited to how many posts they can make. In general, even very heavy users will not top a few hundreds a day and low thousands a month. That means that we have a good reasonable upper bound to how big the personal timeline can grow. Ten years of posting 5,000 posts a month will get you over half a million, but that I would assume be the top rate for anything that isn’t an automated system.
  • Your Public Timeline is impact by how many people you follow and how prolific they are. There is a natural limit to how many people an account can follow, so there is a bound here, but assuming that you follow 1000 accounts that all post 1000 posts a month, that adds up to a million posts a month. Over a ten year span, that would be 120 million posts. That said, we’ll discuss other properties of the public timeline below.
  • Post’s timeline is all the replies that were made to the post. Most posts have very few replies, but some will garner a lot. It took me a minute to find a Tweet on Twitter that had close to 400,000 replies, for example.

So a timeline may be big, potentially very big. However, there is an interesting issue here, how much do we actually need to keep?

The purpose of the Public Timeline, for example, is to show you the front page of the site, how much data back do we need to keep? Is there a reason to keep your timeline from three years ago? The answer is probably no. We can keep the public timeline at a certain size and likely benefit from a lot of space savings. On the other hand, the replies for a post can be quite interesting, and while they can grow very big, it probably make sense to never trim them.

So we have the concept of a timeline, but what is it actually going to be?

In terms of REST API, we are going to have the following endpoints:

  • GET /timelines?id=1351081943163123854
    GET /timelines/sections/F4BE2048BF51F3DCC69EA4CA4ED08F12A36BD6524C9F12018BA0CE6F7C076BB2

In other words, we can access the timeline or a section in that timeline. I would rather show the output and then discuss what it all means. The first endpoint gives us the timeline itself, and looks like this:

There are a few things to note here. When we ask for a timeline, we get the most recent posts in the timeline, as well as the past few sections. The way a timeline works, you can always append posts ids to it, which works great, except that at some point the sheer size that is involved is starting to be problematic.

If we consider a big timeline, one with 400,000 posts in it, that comes to about 3.2 MB used, just to store the post ids (8 bytes each). In practice, due to concurrency and distribution concerns, we can’t have an actual list of post ids, so we need some better management. Another factor is that you very rarely need or want to get the entire timeline, you want to start from the top and work your way down.

We can handle that easily enough using two stage approach. First, all the new post ids appended to a timeline are written in a “loose” form. Each one with each own entry. Once we hit a certain limit (128, for example), we know that this is likely to grow bigger. We can grab the loose post ids in the timeline and gather them into a section. A section is an independently addressed part of the timeline. The idea is that we gather all of the post ids currently loose in the timeline, write them into a single object and compress that. Then we use the hash of the resulting object to as the key to an object store.

Side note, timelines are immutable. Once the section is created, it cannot change. You can add additional filtering on the timeline on read, on the other hand. The timeline also should handle the case where posts in the timeline has been deleted, since we aren’t cannot modify it. For ease of implementation, we’ll also allow duplication in the posts ids. Clients are expected to handle and ignore duplicate post ids that happen within a certain time range.

The reason that the timeline section is compressed is to reduce the size, obviously. In my testing, I was able to get 65% reduction in size without taking any special efforts. Throwing the compressed data into object storage (S3 and the like) also means that it is much easier to scale reads on them. If we have a user who is very popular, we can move that timeline to a compression section faster to reduce load. This design explicitly acknowledge the problems with distributed systems and concurrency. It is possible that a compressed section will have an id that also appears in the loose portion of the timeline. The responsibility to handle such a scenario is on the client code, which is able to do so far more easily than the server side portion.

After compressing the loose posts in the timeline, we record the new section hash and allow clients to access it. It might be easier to see how that would work in the following image:

image

Given a post id size of 8 bytes, and assuming that we can compress it by 65% (my naïve tests using Brotli & GZip says yes, can probably do better than that) we can state that every thousand post ids or so we can generate a new section (meaning that it would be about 2KB in size, in the end). Even a very big timeline with hundreds of thousands of entries would end up with just a few hundreds of sections at the top.

The entire mechanism is very limited, quite intentionally. The external operations we allow on a timeline are append and get, with the client expected to understand the manner in which they are going to go deeper into the timeline. The limitation and expectation from the client (like allowing duplicate post ids, handling post ids that point to deleted posts, etc) are all there to make it easy to handle scaling out the system.

Consider a typical use case, I go into a popular account and look at their posts. Effectively, I’m browsing their public timeline. My interactions with the server goes like this:

  • Get a list of the post ids in the timeline. The first step is: GET /timelines?id=1351081943163123854
    • This gets me the list of loose post ids and the recent segments.
    • Notice that this API call is open for caching as well, so we can get the scaling benefit of that as well.
  • Get the actual posts, which I can do with the batch post read API that we discussed earlier.

In many cases, the cost of getting the timeline for the first time will be amortize over the reading time of many posts. The bulk read API gets me 128 posts at a time, so as the user is reading, I can get the next batch ready and give them the next part immediately.

Once I’m done with the loose posts, I can go into the compressed sections and do the same there. If each section has about 1000 posts ids, that will be sufficient for quite some time. And because I’m driving this from the client, it is very easy for me to scale. Throwing the data into object store like S3 means that I can get both CDN support easily and my scaling issue is now: “serve a lot of small files”, which is a very well understood problem.

I still have to take into account permissions, but that is already something that we handle in the batch read API. Notice that for the common case of public posts, pretty much the whole thing has caching and distribution baked and the amount of work that we can let the rest of the system handle is very high.

Hot spots in the system are going to be handled by the infrastructure, not by our own code, big machines or clever algorithms. They are going to be handled by the architecture of the system making it simple to manage them, giving plenty of room for caching and CDN to take charge and reduce our costs.

That is, after all, the whole point of this series of posts.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. Postmortem (2):
    23 Jul 2021 - Accidentally quadratic indexing output
  2. re (28):
    23 Jun 2021 - The performance regression odyssey
  3. Challenge (58):
    16 Jun 2021 - Detecting livelihood in a distributed cluster
  4. Webinar (4):
    11 Jun 2021 - Machine Learning and Time Series in RavenDB with Live Examples
  5. Webinar recording (13):
    24 May 2021 - The Rewards of Escaping the Relational Mindset
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats