Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,583
|
Comments: 51,214
Privacy Policy · Terms
filter by tags archive
time to read 7 min | 1376 words

As I mentioned Tail/Feather is a weekend project to test out how stuff works for real. After creating the highly available distributed key/value store, we are now in need of actually building a client API for it.

Externally, that API is going to look like this:

public class TailFeatherClient : IDisposable
{
public TailFeatherClient(params Uri[] nodes);

public Task Set(string key, JToken value);

public Task<JToken> Get(string key);

public Task Remove(string key);

public void Dispose();
}

If this wasn’t a weekend project, I would add batch support, but that isn’t important for our purposes right now. The API itself is pretty stupid, which is great, but what about the actual behavior?

We want it to be able to handle dynamic cluster changes, and we need it to be smart about it. A lot of that is shared among all operations, so the next layer of the API is:

public Task Set(string key, JToken value)
{
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/set?key={0}&val={1}",
Uri.EscapeDataString(key), Uri.EscapeDataString(value.ToString(Formatting.None)))));
}

public async Task<JToken> Get(string key)
{
var reply = await ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",
Uri.EscapeDataString(key))));
var result = JObject.Load(new JsonTextReader(new StreamReader(await reply.Content.ReadAsStreamAsync())));

if (result.Value<bool>("Missing"))
return null;

return result["Value"];
}

public Task Remove(string key)
{
return ContactServer(client => client.GetAsync(string.Format("tailfeather/key-val/del?key={0}",
Uri.EscapeDataString(key))));
}

The actual behavior is in ContactServer:

private readonly ConcurrentDictionary<Uri, HttpClient> _cache = new ConcurrentDictionary<Uri, HttpClient>();
private Task<TailFeatherTopology> _topologyTask;

public TailFeatherClient(params Uri[] nodes)
{
_topologyTask = FindLatestTopology(nodes);
}

private HttpClient GetHttpClient(Uri node)
{
return _cache.GetOrAdd(node, uri => new HttpClient { BaseAddress = uri });
}

private async Task<TailFeatherTopology> FindLatestTopology(IEnumerable<Uri> nodes)
{
var tasks = nodes.Select(node => GetHttpClient(node).GetAsync("tailfeather/admin/flock")).ToArray();

await Task.WhenAny(tasks);
var topologies = new List<TailFeatherTopology>();
foreach (var task in tasks)
{
var message = task.Result;
if (message.IsSuccessStatusCode == false)
continue;

topologies.Add(new JsonSerializer().Deserialize<TailFeatherTopology>(
new JsonTextReader(new StreamReader(await message.Content.ReadAsStreamAsync()))));
}

return topologies.OrderByDescending(x => x.CommitIndex).FirstOrDefault();
}

private async Task<HttpResponseMessage> ContactServer(Func<HttpClient, Task<HttpResponseMessage>> operation, int retries = 3)
{
if (retries < 0)
throw new InvalidOperationException("Cluster is not reachable, or no leader was selected. Out of retries, aborting.");

var topology = (await _topologyTask ?? new TailFeatherTopology());

var leader = topology.AllVotingNodes.FirstOrDefault(x => x.Name == topology.CurrentLeader);
if (leader == null)
{
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);
}

// now we have a leader, we need to try calling it...
var httpResponseMessage = await operation(GetHttpClient(leader.Uri));
if (httpResponseMessage.IsSuccessStatusCode == false)
{
// we were sent to a different server, let try that...
if (httpResponseMessage.StatusCode == HttpStatusCode.Redirect)
{
var redirectUri = httpResponseMessage.Headers.Location;
httpResponseMessage = await operation(GetHttpClient(redirectUri));
if (httpResponseMessage.IsSuccessStatusCode)
{
// we successfully contacted the redirected server, this is probably the leader, let us ask it for the topology,
// it will be there for next time we access it
_topologyTask = FindLatestTopology(new[] { redirectUri }.Union(topology.AllVotingNodes.Select(x => x.Uri)));

return httpResponseMessage;
}
}

// we couldn't get to the server, and we didn't get redirected, we'll check in the cluster in general
_topologyTask = FindLatestTopology(topology.AllVotingNodes.Select(x => x.Uri));
return await ContactServer(operation, retries - 1);
}

// happy path, we are done
return httpResponseMessage;
}

There is quite a bit going on here. But the basic idea is simple. Starting from the initial list of nodes we have, contact all of them and find the topology with the highest commit index. That means that it is the freshest, so more likely to be the current one. From the topology, we take the leader, and send all queries to the leader.

If there is any sort of errors, we’ll contact all other servers to find who we are supposed to be using now. If we can’t find it after three tries, we give us and we let the caller sort it out, probably by retrying once the cluster is in a steady state again.

Now, this is really nice, but it is falling into the heading of weekend code. That is means that this is quite far from what I would call production code. What is missing?

  • Caching the topology locally in a persistent manner so we can restart when the known servers are down from last known good topology.
  • Proper error handling, and in particular, error reporting, to make sure that we can understand what is actually is going on.
  • Features such as allowing reads from non leaders, testing, etc.

But overall, I’m quite happy with this.

time to read 20 min | 3999 words

Weekend project means just that, I’m trying some things out, and writing something real is the best way to exercise. This isn’t going to be a full blown project, but it should be functional and usable.

The basic idea, I’m going to build a distributed key/value configuration store. Similar to etcd, this will allow me to explore how to handle full blown Rachis from both server & client sides.

We want this to be a full  blown implementation, which means persistence, snapshots, network api, the works.

In terms of the data model, we’ll go for the simplest possible one. A key/value store. A key is a string of up to 128 characters. A value is a json formatted value of up to 16Kb. Persistence will be handled by Voron. The persistent of the project is mostly Voron, so what we are left with is the following:

public enum KeyValueOperationTypes
{
Add,
Del
}

public class KeyValueOperation
{
public KeyValueOperationTypes Type;
public string Key;
public JToken Value;
}

public class OperationBatchCommand : Command
{
public KeyValueOperation[] Batch { get; set; }
}

This gives us the background for the actual state machine:

public class KeyValueStateMachine : IRaftStateMachine
{
readonly StorageEnvironment _storageEnvironment;

public KeyValueStateMachine(StorageEnvironmentOptions options)
{
_storageEnvironment = new StorageEnvironment(options);
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
{
_storageEnvironment.CreateTree(tx, "items");
var metadata = _storageEnvironment.CreateTree(tx, "$metadata");
var readResult = metadata.Read("last-index");
if (readResult != null)
LastAppliedIndex = readResult.Reader.ReadLittleEndianInt64();
tx.Commit();
}
}

public event EventHandler<KeyValueOperation> OperatonExecuted;

protected void OnOperatonExecuted(KeyValueOperation e)
{
var handler = OperatonExecuted;
if (handler != null) handler(this, e);
}

public JToken Read(string key)
{
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.Read))
{
var items = tx.ReadTree("items");

var readResult = items.Read(key);

if (readResult == null)
return null;


return JToken.ReadFrom(new JsonTextReader(new StreamReader(readResult.Reader.AsStream())));
}
}

public long LastAppliedIndex { get; private set; }

public void Apply(LogEntry entry, Command cmd)
{
var batch = (OperationBatchCommand)cmd;
Apply(batch.Batch, cmd.AssignedIndex);
}


private void Apply(IEnumerable<KeyValueOperation> ops, long commandIndex)
{
using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
{
var items = tx.ReadTree("items");
var metadata = tx.ReadTree("$metadata");
metadata.Add("last-index", EndianBitConverter.Little.GetBytes(commandIndex));
var ms = new MemoryStream();
foreach (var op in ops)
{
switch (op.Type)
{
case KeyValueOperationTypes.Add:
ms.SetLength(0);

var streamWriter = new StreamWriter(ms);
op.Value.WriteTo(new JsonTextWriter(streamWriter));
streamWriter.Flush();

ms.Position = 0;
items.Add(op.Key, ms);
break;
case KeyValueOperationTypes.Del:
items.Delete(op.Key);
break;
default:
throw new ArgumentOutOfRangeException();
}
OnOperatonExecuted(op);
}

tx.Commit();
}
}


public void Dispose()
{
if (_storageEnvironment != null)
_storageEnvironment.Dispose();
}
}

As you can see, there isn’t much here. Not surprising, since we are storing a key/value data structure. I’m also ignoring snapshots for now. That is good enough for now, let us go for the network portion of the work. We are going to be using Web API for the network stuff. And we’ll be initializing it like so:

var nodeName = options.NodeName ?? (Environment.MachineName + ":" + options.Port);

var kvso = StorageEnvironmentOptions.ForPath(Path.Combine(options.DataPath, "KeyValue"));
using (var statemachine = new KeyValueStateMachine(kvso))
{
using (var raftEngine = new RaftEngine(new RaftEngineOptions(
new NodeConnectionInfo
{
Name = nodeName,
Url = new Uri("http://" + Environment.MachineName + ":" + options.Port),
},
StorageEnvironmentOptions.ForPath(Path.Combine(options.DataPath, "Raft")),
new HttpTransport(nodeName),
statemachine
)))
{
using (WebApp.Start(new StartOptions
{
Urls = { "http://+:" + options.Port + "/" }
}, builder =>
{
var httpConfiguration = new HttpConfiguration();
RaftWebApiConfig.Register(httpConfiguration);
httpConfiguration.Properties[typeof(HttpTransportBus)] = new HttpTransportBus(nodeName);
httpConfiguration.Properties[typeof(RaftEngine)] = raftEngine;
builder.UseWebApi(httpConfiguration);
}))
{

Console.WriteLine("Ready & processing requests, press ENTER to sop");

Console.ReadLine();
}
}
}

Note that we need to initialize both the state machine and the raft engine, then wire the raft engine controllers. Now we are pretty much done with setup, and we can turn to the actual semantics of running the cluster. The first thing that I want to do is to setup the baseline, so we create this base controller:

public abstract class TailFeatherController : ApiController
{
public KeyValueStateMachine StateMachine { get; private set; }
public RaftEngine RaftEngine { get; private set; }

public override async Task<HttpResponseMessage> ExecuteAsync(HttpControllerContext controllerContext, CancellationToken cancellationToken)
{
RaftEngine = (RaftEngine)controllerContext.Configuration.Properties[typeof(RaftEngine)];
StateMachine = (KeyValueStateMachine)RaftEngine.StateMachine;
try
{
return await base.ExecuteAsync(controllerContext, cancellationToken);
}
catch (NotLeadingException)
{
var currentLeader = RaftEngine.CurrentLeader;
if (currentLeader == null)
{
return new HttpResponseMessage(HttpStatusCode.PreconditionFailed)
{
Content = new StringContent("{ 'Error': 'No current leader, try again later' }")
};
}
var leaderNode = RaftEngine.CurrentTopology.GetNodeByName(currentLeader);
if (leaderNode == null)
{
return new HttpResponseMessage(HttpStatusCode.InternalServerError)
{
Content = new StringContent("{ 'Error': 'Current leader " + currentLeader + " is not found in the topology. This should not happen.' }")
};
}
return new HttpResponseMessage(HttpStatusCode.Redirect)
{
Headers =
{
Location = leaderNode.Uri
}
};
}
}
}

That is a lot of error handling, but basically it just get the right values from the configuration and expose them to the controller actions, then a lot of error handling when we have a command that requires a leader that hit a follower.

Next step, actually managing the cluster, here we go:

public class AdminController : TailFeatherController
{
[HttpGet]
[Route("tailfeather/admin/fly-with-us")]
public async Task<HttpResponseMessage> Join([FromUri] string url, [FromUri] string name)
{
var uri = new Uri(url);
name = name ?? uri.Host + (uri.IsDefaultPort ? "" : ":" + uri.Port);

await RaftEngine.AddToClusterAsync(new NodeConnectionInfo
{
Name = name,
Uri = uri
});
return new HttpResponseMessage(HttpStatusCode.Accepted);
}

[HttpGet]
[Route("tailfeather/admin/fly-away")]
public async Task<HttpResponseMessage> Leave([FromUri] string name)
{
await RaftEngine.RemoveFromClusterAsync(new NodeConnectionInfo
{
Name = name
});
return new HttpResponseMessage(HttpStatusCode.Accepted);
}
}

So now we have a way to add and remove items from the cluster, which is all the admin stuff that we need to handle right now. Next, we need to actually wire the operations, this is done here:

public class KeyValueController : TailFeatherController
{
[HttpGet]
[Route("tailfeather/key-val/read")]
public HttpResponseMessage Read([FromUri] string key)
{
var read = StateMachine.Read(key);
if (read == null)
{
return Request.CreateResponse(HttpStatusCode.NotFound, new
{
RaftEngine.State,
Key = key,
Missing = true
});
}
return Request.CreateResponse(HttpStatusCode.OK, new
{
RaftEngine.State,
Key = key,
Value = read
});
}

[HttpGet]
[Route("tailfeather/key-val/set")]
public Task<HttpResponseMessage> Set([FromUri] string key, [FromUri] string val)
{
JToken jVal;
try
{
jVal = JToken.Parse(val);
}
catch (JsonReaderException)
{
jVal = val;
}

var op = new KeyValueOperation
{
Key = key,
Type = KeyValueOperationTypes.Add,
Value = jVal
};

return Batch(new[] { op });
}

[HttpGet]
[Route("tailfeather/key-val/del")]
public Task<HttpResponseMessage> Del([FromUri] string key)
{
var op = new KeyValueOperation
{
Key = key,
Type = KeyValueOperationTypes.Del,
};

return Batch(new[] { op });
}

[HttpPost]
[Route("tailfeather/key-val/batch")]
public async Task<HttpResponseMessage> Batch()
{
var stream = await Request.Content.ReadAsStreamAsync();
var operations = new JsonSerializer().Deserialize<KeyValueOperation[]>(new JsonTextReader(new StreamReader(stream)));

return await Batch(operations);
}

private async Task<HttpResponseMessage> Batch(KeyValueOperation[] operations)
{
var taskCompletionSource = new TaskCompletionSource<object>();
RaftEngine.AppendCommand(new OperationBatchCommand
{
Batch = operations,
Completion = taskCompletionSource
});
await taskCompletionSource.Task;

return Request.CreateResponse(HttpStatusCode.Accepted);
}
}

And we are pretty much set.

Note that I’ve been writing this post while I’m writing the code, so I’ve made some small changes, you can see actual code here.

Anyway, we are pretty much done. Now we can compile and try testing what is going on.

First, we seed the cluster, but running:

.\TailFeather.exe --port=9079 --DataPath=One --Name=One –Bootstrap

This tell us that this node is allowed to become a leader without having to pre-configure a cluster. This command runs and exit, so now we’ll run three such copies:

  • start .\TailFeather.exe "--port=9079 --DataPath=One --Name=One"
  • start .\TailFeather.exe "--port=9078 --DataPath=Two --Name=Two"
  • start .\TailFeather.exe "--port=9077 --DataPath=Three --Name=Three"

We have all three nodes up and running, so now is the time to actually make use of it:

http://localhost:9079/tailfeather/key-val/set?key=ravendb&val={ 'Url': 'http://live-test.ravendb.net', 'Database': 'Sample' }

In this case, you can see that we are setting a configuration value to point to a RavenDB database on the first node. Note that at this point, we have a single node cluster, and the two other are waiting to join it, but are taking no action.

We can get the value back using:

image

So far, so good. Now, let us add a second node in by inviting it to fly with our cluster. We do that using the following command:

http://localhost:9079/tailfeather/admin/fly-with-us?url=http://localhost:9078&name=Two

Which will give us:

image

Note that we are using the just added node for too look at this.

Next, we can add the third node.

http://localhost:9079/tailfeather/admin/fly-with-us?url=http://localhost:9077&name=Three

I would put the image in, but I think you get the point.

This is it for now. We have a highly available persistent & distributed key/value store. Next, we need to tackle the idea of snapshots and the client API, but I’ll deal with that at another post.

time to read 6 min | 1030 words

Rachis, def: Spinal column, also the distal part of the shaft of a feather that bears the web.

Rachis is the name we picked for RavenDB’s Raft implementation. Raft is a consensus protocol that can actually be understood without resorting to Greek philosophy. You can read all about it in here (there is a very cool interactive visualization there). I would also like to thank Diego Ongaro for both the Raft paper and a lot of help while I tried to understand the finer points of it.

Why Raft?

Raft is a distributed consensus protocol. It allows you to reach an order set of operations across your entire cluster. This means that you can apply a set of operations on a state machine, and have the same final state machine in all nodes in the cluster. It is also drastically simpler to understand than Paxos, which is the more known alternative.

What is Rachis?

Well, it is a Raft implementation. To be rather more exact, it is a Raft implementation with the following features:

  • (Obviously) the ability to manage a distributed set of state machine and reliability commits updates to said state machines.
  • Dynamic topology (nodes can join and leave the cluster on the fly, including state sync).
  • Large state machines (snapshots, efficient transfers, non voting members).
  • ACID local log using Voron.
  • Support for in memory and persistent state machines.
  • Support for voting & non voting members.
  • A lot of small tweaks for best behavior in strange situations (forced step down, leader timeout and many more).

What are you going to use this for?

To reach a consensus, of course Smile. More to the point, we got some really nice idea where this is going to allow us to do some really nice stuff. In particular, we want to use that as the backbone for the event and time series replication models.

But I’m getting ahead of myself. Before we do that, I want to build a simple reliable distributed service. We’ll call it Tail/Feather and it will be awesome, in a weekend project kind of way. I’ll post full details about this in my next post.

Where can I look at it?

The current version is here, note that you’ll need to pull Voron as well (from the ravendb repository) to get it working.

How does this work?

You can read the Raft paper and the full thesis, of course, but there are some subtleties that I had to work through (with great help from Diego), so it is worth going into a bit more detail.

Clusters are typically composed of odd number of servers (3,5 or 7), which can communicate freely with one another. The startup process for a cluster require us to designate a single server as the seed. This is the only server that can become the cluster leader during the cluster bootstrap process. This is done to make sure that during startup, before we had the chance to tell the servers about the cluster topology, they won’t consider themselves a single node cluster and start accepting requests before we add them to the cluster.

Only the seed server will become the leader, and all the others will wait for instructions. We can then let the seed server know about the other nodes in the cluster. It will initiate a join operation which will reach to the other node, setup the appropriate cluster topology. At that point, all the other servers are on equal footing, and there is no longer any meaningful distinction between them. The notion of a seed node it only  relevant for cluster bootstrap, once that is done, all servers have the same configuration, and there isn’t any difference between them.

Dynamically adding and removing nodes from the cluster

Removing a node from the cluster is a simple process. All we need to do is to update the cluster topology, and we are done. The removed server will get a notification to let it know that is has been disconnected from the cluster, and will move itself to a passive state (note that it is okay if it doesn’t get this notification, we are just being nice about it Smile).

The process of adding a server is a bit more complex. Not only are we having to add a new node, we need to make sure that it has the same state as all other nodes in the cluster. In order to do that, we handle it in multiple stages. A node added to the cluster can be in one of three states: Voting (full member of the cluster, able to become a leader), Non Voting (just listening to what is going on, can’t be a leader), Promotable (getting up to speed with the cluster state). Non voting members are a unique case, they are there to enable some advance scenarios (cross data center communication, as we currently envision it).

Promotable is a lot more interesting. Adding a node to an existing cluster can be a long process, especially if we are managing a lot of data. In order to handle that, we adding a server to the promotable category, in which case we are starting to send it the state it needs to catch up with the rest of the cluster. Once it has caught up with the cluster (it has all the committed entries in the cluster), we will automatically move it to the voting members in the cluster.

Note that it is fine for crashes to happen throughout this process. The cluster leader can crash during this, and we’ll recover and handle this properly.

Normal operations

During normal operations, there is going to be a leader that is going to be accepting all the requests for the cluster, and handle committing them cluster wide. During those operations, you can spread reads across members in the cluster, for better performance.

Now, if you don’t mind, I’m going to be writing Tail/Feather now, and see how long it takes.

time to read 6 min | 1115 words

It may not get enough attention, but we have been working on the profilers as well during the past few months.

TLDR; You can get the next generation of NHibernate Profiler and Entity Framework Profiler now, lots of goodies to look at!

I’m sure that a lot of people would be thrilled to hear that we dropped Silverlight in favor of going back to WPF UI. The idea was that we would be able to deploy anywhere, including in production. But Silverlight just made things harder all around, and customers didn’t like the production profiling mode.

Production Profiling

We have changed how we profile in production. You can now make the following call in your code:

NHibernateProfiler.InitializeForProduction(port, password);

And then connect to your production system:

image

At which point you can profile what is going on in your production system safely and easily. The traffic between your production server and the profiler is SSL encrypted.

NHibernate 4.x and Entity Framework vNext support

The profilers now support the latest version of NHibernate and Entity Framework. That include profiling async operations, better suitability for modern apps, and more.

New SQL Paging Syntax

We are now properly support SQL Server paging syntax:

select * from Users
order by Name
offset 0 /* @p0 */ rows fetch next 250 /* @p1 */ rows only

This is great for NHibernate users, who finally can have a sane paging syntax as well as beautiful queries in the profiler.

At a glance view

A lot of the time, you don’t want the profiler to be front and center, you want to just run it and have it there to glance at once in a while. The new compact view gives you just that:

image

You can park it at some point in your screen and work normally, glancing to see if it found anything. This is much less distracting than the full profiler for normal operations.

Scopes and groups

When we started working on the profilers, we followed the “one session per request” rule, and that was pretty good. But a lot of people, especially in the Entity Framework group are using multiple sessions or data contexts in a single request, but they still want to see the ability to see the operations in a request at a glance. We are now allowing you to group things, like this:

image

By default, we use the current request to group things, but we also give you the ability to define your own scopes. So if you are profiling NServiceBus application, you can set the scope as your message handling by setting ProfilerIntegration.GetCurrentScopeName or explicitly calling ProfilerIntegration.StartScope whenever you want.

Customized profiling

You can now surface troublesome issues directly from your code. If you have an issue with a query, you can mark it for attention using CustomQueryReporting .ReportError() that would flag it in the UI for further investigation.

You can also just mark interesting pieces in the UI without an error, like so:

using (var db = new Entities(conStr))
{
    var post1 = db.Posts.FirstOrDefault();

    using (ProfilerIntegration.StarStatements("Blue"))
    {
        var post2 = db.Posts.FirstOrDefault();
    }

    var post3 = db.Posts.FirstOrDefault();
    
    ProfilerIntegration.StarStatements();
    var post4 = db.Posts.FirstOrDefault();
    ProfilerIntegration.StarStatementsClear();

    var post5 = db.Posts.FirstOrDefault();
}

Which will result in:

image

Disabling profiler from configuration

You can now disable the profiler by setting:

<add key="HibernatingRhinos.Profiler.Appender.NHibernate" value="Disabled" />

This will avoid initializing the profiler, obviously. The intent is that you can setup production profiling, disable it by default, and enable it selectively if you need to actually figure things out.

Odds & ends

We move to WebActivatorEx  from the deprecated WebActivator, added xml file for the appender, fixed a whole bunch of small bugs, the most important among them is:

clip_image001[4]

 

Linq to SQL, Hibernate and LLBLGen Profilers, RIP

You might have noticed that I talked only about NHibernate and Entity Framework Profilers. The sales for the rests weren’t what we hoped they would be, and we are no longer going to sale them.

Go get them, there is a new release discount

You can get the NHibernate Profiler and Entity Framework Profiler for a 15% discount for the next two weeks.

time to read 2 min | 285 words

This post isn’t so much about this particular problem, but about the way we solved this.

We have a number of ways to track performance problems, but this is a good example, we can see that for some reason, this test has failed because it took too long to run:

image

In order to handle that, I don’t want to run the test, I don’t actually care that much about this. So I wanted to be able to run this independently.

To do that, I added:

image

This opens us the studio with all the data that we have for this test. Which is great, since this means that we can export the data.

image

That done, we can import it to an instance that we control, and start testing the performance.  In particular, we can run in under a profiler, to see what it is doing.

The underlying reason ended up being an issue with how we flush things to disk, which was easily fixed once we could narrow it down. The problem was just getting it working in a reproducible manner. This approach, being able to just stop midway through a test and capture the full state of the system is invaluable in troubleshooting what is going on.

time to read 2 min | 388 words

This post is in response for a few comments here. In particular, I get the sense that people expect businesses and systems to behave as if transactions are a real thing. The only problem here is that this is quite far from the truth.

Let me define what I mean by a transaction here. I’m not talking about database transactions, ACID or any such thing. I’m talking about the notion that any interaction between a user and a business, or between two businesses can actually be modeled with the notion of a transaction similar to what we see in databases.

That is, that we have an interaction that would either be all there, or won’t be there at all. The most obvious example is the notion of financial transaction, the notion that we debit an account and then we credit another account. And we have to do that in such a way that either both accounts were modified or none of them were modified. That is the classic example for database transactions, and it is wrong. As anyone who ever wrote a check or sent an wire money transfer can tell. A good discussion on how that actually works can be found here. Note that in this case, the way money transfer works, in the real world, is that you upload a file over FTP, then wait three to five days to see if the orders your sent were rejected.

Another example is the notion of accepting an order, in a transactional manner. If I accepted your order, after verifying that I have reserved everything, and my warehouse burned down, what do I do with your order? I can hardly roll it back.

To move away from businesses, let us consider doing something as unimportant as voting in a national election. Logically speaking, this is a pretty simple process. Identify the voter, record their vote, total the votes, select winner. Except that you can go back and force a re-election in a particular district if such is needed, or you might find a box of lost votes, or any of a hundred evil little things that crop up in the real world.

Any attempt to model such systems in neat transactional boxes with “all in or none at all” is going to break.

time to read 6 min | 1006 words

I had a very interesting discussion with Kelly Sommers in twitter. But 140 characters isn’t really enough to explain things. Also, it is interesting topic in general.

Kelly disagreed with this post: http://www.shopify.ca/technology/14909841-kafka-producer-pipeline-for-ruby-on-rails

image

You can read the full discussion here.

The basic premise is, there is a highly reliable distributed queue that is used to process messages, but because they didn’t have operational experience with this, they used a local queue to store the messages sending them over the network. Kelly seems to think that this is decreasing reliability. I disagree.

The underlying premise is simple, when do you consider it acceptable to lose a message. If returning an error to the client is fine, sure, go ahead and do that if you can’t reach the cluster. But if you are currently processing a 10 million dollar order, that is going to kinda suck, and anything that you can do to reduce the chance of that happening is good. Note that key part in this statement, we can only reduce the chance of this happening, we can’t ensure it.

One way to try that is to get a guaranteed SLA from the distributed queue. Once we have that, we can rely that it works. This seems to be what Kelly is aiming at:

image

And that is true, if you could rely on SLAs. Just this week we had a multi hour, multi region Azure outage. In fact, outages, include outages that violate SLAs are unfortunately common.

In fact, if we look at recent history, we have:

There are actually more of them, but I think that 5 outages in 2 years is enough to show a pattern.

And note specifically that I’m talking about global outages like the ones above. Most people don’t realize that complex systems operate in a constant mode of partial failure. If you ever read an accident investigative report, you’ll know that there is almost never just a single cause of failure. For example, the road was slippery and the driver was speeding and the ABS system failed and the road barrier foundation rotted since being installed. Even a single change in one of those would mitigate the accident from a fatal crash to didn’t happen to a “honey, we need a new car”.

You can try to rely on the distribute queue in this case, because it has an SLA. And Toyota also promises that your car won’t suddenly accelerate into a wall, but if you had a Toyota Camry in 2010… well, you know…

From my point of view, saving the data locally before sending over the network makes a lot of sense. In general, the local state of the machine is much more stable than than the network. And if there is an internal failure in the machine, it is usually too hosed to do anything about anyway. I might try to write to disk, and write to the network even if I can’t do that ,because I want to do my utmost to not lose the message.

Now, let us consider the possible failure scenarios. I’m starting all of them with the notion that I just got a message for a 10 million dollars order, and I need to send it to the backend for processing.

  1. We can’t communicate with the distributed queue. That can be because it is down, hopefully that isn’t the case, but from our point of view, if our node became split from the network, this has the same effect. We are writing this down to disk, so when we become available again, we’ll be able to forward the stored message to the distributed queue.
  2. We can’t communicate with the disk, maybe it is full, or there is an error, or something like that .We can still talk to the network, so we place it in the distributed queue, and we go on with our lives.
  3. We can’t communicate with the disk, we can’t communicate with the network. We can’t keep it in memory (we might overflow the memory), and anyway, if we are out of disk and network, we are probably going to be rebooted soon anyway. SOL, there is nothing else we can do at this point.

Note that the first case assumes that we actually do come back up. If the admin just blew this node away, then the data on that node isn’t coming back, obviously. But since the admin knows that we are storing things locally, s/he will at least try to restore the data from that machine.

We are safer (not safe, but more safe than without it). The question is whatever this is worth it? If your messages aren’t actually carrying financial information, you can probably just drop a few messages as long as you let the end user know about that, so they can retry. If you really care about each individual message, if it is important enough to go the extra few miles for it, then the store and forward model gives you a measurable level of extra safety.

time to read 3 min | 437 words

This is bloody strange. I have a test failing in our unit tests, which isn’t an all too uncommon occurrence after a big work. The only problem is that this test shouldn’t fail, no one touched this part.

For reference, here is the commit where this is failing. You can reproduce this by running the Raven.Tryouts console project.

Note that it has to be done in Release mode. When that happens, we consistently get the following error:

Unhandled Exception: System.NullReferenceException: Object reference not set to an instance of an object.
at Raven.Client.Connection.MultiGetOperation.<TryResolveConflictOrCreateConcurrencyException>d__b.MoveNext() in c:\Work\ravendb\Raven.Client.Lightweight\Connection\MultiGetOperation.cs:line 156
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)

Here is the problem with this stack trace:

image

Now, this only happens in release mode, but it happens there consistently. Now, I’ve verified that this isn’t an issue of me running old version of the code. So this isn’t possible. There is no concurrency going on, all the data this method is touching is only touched by this thread.

What is more, the exception is not thrown from inside the foreach loop in line 139. I’ve verified that by putting a try catch around the inside of the loop and still getting the NRE when thrown outside it. In fact, I have tried to figure it out in pretty much any way I can. Attaching a debugger make the problem disappear, as does disabling optimization, or anything like that.

In fact, at this point I’m going to throw up my hands in disgust, because this is not something that I can figure out. Here is why, this is my “fix” for this issue. I replaced the following failing code:

image

With the following passing code:

image

And yes, this should make zero difference to the actual behavior, but it does. I’m suspecting a JIT issue.

time to read 4 min | 800 words

We are pretty much done with RavenDB 3.0, we are waiting for fixes to internal apps we use to process orders and support customers, and then we can actually make a release. In the meantime, that means that we need to start looking beyond the 3.0 release. We had a few people internally focus on post 3.0 work for the past few months, and we have a rough outline for what we done there. Primarily we are talking about better distribution and storage models.

Storage models – the polyglot database

Under this umbrella we put dedicated database engines to support specific needs. We are talking about distributed counters (high scale out, rapid throughput), time series and event store as the primary areas that we are focused on. For example, the counters stuff is pretty much complete, but we didn’t have time to actually make that into a fully mature product.

I talked about this several times in the past, so I’ll not get into too many details here.

Distribution models

We have been working on a Raft implementation for the past few months, and it is now in the stage where we are starting to integrate it into the rest of our software. Raft is planned to be the core replication protocol for the time series and events databases. But you are probably going to see if first as topology super layer for RavenDB and RavenFS.

Distributed topology management

Replication support in RavenDB and RavenFS follow the multi master system. You can write to any node, and your write will be distributed by the server to all the nodes. This has several advantages, in particular, the fact that we can operate in disconnected or partially disconnected manner, and that we need little coordination between clients to get everything working. It also has the disadvantage of allow conflicts. In fact, if you are writing to multiple replicating nodes, and aren’t careful about how you are splitting writes, you are pretty much guaranteed to have conflicts. We have repeatedly heard that this is both a good thing and something that customers really don’t want to deal with.

It is a good thing because we don’t have data loss, it is a bad thing because if you aren’t ready to handle this, some of your data is inaccessible because of the conflict until it is resolved.

Because of that, we are considering implementing a server side topology management system. The actual replication mechanics are going to remain the same. But the difference is how we are going to decide how to work with it.

A cluster (in this case, a set of RavenDB servers and all databases and file systems on them)  is composed of cooperating nodes. The cluster is managed via Raft, which is used to store the topology information of the cluster. Topology include each of the nodes in the system, as well as any of the databases and file systems on the cluster. The cluster will select a leader, and that leader will also be the primary node for writes for all databases. In other words, assume we have a 3 node cluster, and 5 databases in the cluster. All the databases are replicated to all three nodes, and a single node is going to serve as the write primary for all operations.

During normal operations, clients will query any server for the replication topology (and cache that) every 5 minutes or so. If a node is down, we’ll switch over to an alternative node. If the leader is down, we’ll query all other nodes to try to find out who the new leader is, then continue using that leader’s topology from now on. This give us the advantage that a down server cause clients to switch over and stay switched. That avoid an operational hazard when you bring a down node back up again.

Clients will include the topology version they have in all communication with the server. If the topology version doesn’t match, the server will return an error, and the client will query all nodes it knows about to find the current topology version. It will always chose the latest topology version, and continue from there.

Note that there are still a chance for conflicts, a leader may become disconnected from the network, but not be aware of that, and accept writes to the database. Another node will take over as the cluster leader and clients will start writing to it. There is a gap where a conflict can occur, but it is pretty small one, and we have good mechanisms to deal with conflicts, anyway.

We are also thinking about exposing a system similar to the topology for clients directly. Basically, a small distributed and consistent key/value store. Mostly meant for configuration.

Thoughts?

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. Production postmorterm (2):
    11 Jun 2025 - The rookie server's untimely promotion
  2. Webinar (7):
    05 Jun 2025 - Think inside the database
  3. Recording (16):
    29 May 2025 - RavenDB's Upcoming Optimizations Deep Dive
  4. RavenDB News (2):
    02 May 2025 - May 2025
  5. Production Postmortem (52):
    07 Apr 2025 - The race condition in the interlock
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}