Voron & time series data
One of the things that Voron does very well is the ability to read a lot of data fast. One of the interesting scenarios we deal with is when we want to deal with time series data.
For example, let us say that we have a bunch of sensors reporting on the temperature metrics within an area (said while the heaviest storm in 5 decades is blowing outside). Every minute, we have some data coming in. For fun, we will make the following assumptions:
- We have do deal with late writes (a sensor sending us updates from 1 hour ago because of communication update).
- Dates aren’t unique.
- All queries will take into account the dates.
First, let me show you the full code for that, then we can talk about how it works:
1: public class DateTimeSeries : IDisposable2: {
3: private readonly JsonSerializer _serializer = new JsonSerializer();4: private readonly StorageEnvironment _storageEnvironment;5: private long _last;6: private readonly Slice _lastKey;7:
8: public DateTimeSeries(string path)9: {
10: _lastKey = "last-key";11: _storageEnvironment = new StorageEnvironment(StorageEnvironmentOptions.ForPath(path));12: using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))13: {
14: _storageEnvironment.CreateTree(tx, "data");15: var read = tx.State.Root.Read(tx, _lastKey);
16:
17: _last = read != null ? read.Reader.ReadInt64() : 1;18:
19: tx.Commit();
20: }
21: }
22:
23: public void AddRange<T>(IEnumerable<KeyValuePair<DateTime, T>> values)24: {
25: using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))26: {
27: var data = tx.GetTree("data");28: var buffer = new byte[16];29: var key = new Slice(buffer);30: var ms = new MemoryStream();31: foreach (var kvp in values)32: {
33: var date = kvp.Key;
34: EndianBitConverter.Big.CopyBytes(date.ToBinary(), buffer, 0);
35: EndianBitConverter.Big.CopyBytes(_last++, buffer, 8);
36: ms.SetLength(0);
37: _serializer.Serialize(new StreamWriter(ms), kvp.Value);38: ms.Position = 0;
39:
40: data.Add(tx, key, ms);
41: }
42:
43: tx.State.Root.Add(tx, _lastKey, new MemoryStream(BitConverter.GetBytes(_last)));44: tx.Commit();
45: }
46: }
47:
48: public IEnumerable<T> ScanRange<T>(DateTime start, DateTime end)49: {
50: using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.Read))51: {
52: var data = tx.GetTree("data");53: var startBuffer = new byte[16];54: EndianBitConverter.Big.CopyBytes(start.ToBinary(), startBuffer, 0);
55: var startKey = new Slice(startBuffer);56:
57: using (var it = data.Iterate(tx))58: {
59: var endBuffer = new byte[16];60: EndianBitConverter.Big.CopyBytes(end.ToBinary(), endBuffer, 0);
61: EndianBitConverter.Big.CopyBytes(long.MaxValue, endBuffer, 8);62:
63: it.MaxKey = new Slice(endBuffer);64: if (it.Seek(startKey) == false)65: yield break;66: do67: {
68: var reader = it.CreateReaderForCurrent();
69: using (var stream = reader.AsStream())70: {
71: yield return _serializer.Deserialize<T>(new JsonTextReader(new StreamReader(stream)));72: }
73: } while (it.MoveNext());74: }
75: }
76:
77: }
78:
79: public void Dispose()80: {
81: _storageEnvironment.Dispose();
82: }
83: }
In line 14, we create the data tree, which will hold the actual time series data, and the last-key, which I’ll explain in a bit.
The AddRange method in line 23 is probably the most interesting. We create a key that is composed of the date of the entry, and an incrementing number. Note that we use big endian encoding because that allow easy byte string sorting. The implications of this sort of key is that the values are actually sorted by the date, but if we have multiple values for the same millisecond, we don’t overwrite the data. Along with adding the actual data, we record the change in the incrementing counter ,so if we need to restart, we’ll continue from where we left off.
Finally, we have the actual ScanRange method. Here we basically start from the minimum value for the start date, and set the MaxKey as the stop condition for the maximum value for the end date. And then it is just getting the values out.
Pretty simple, I think.
Comments
What is the "_lastKey" member about? I see it added to the transaction state, but don't know enough about "why" you want to do that.
Also, I'm not sure, but it looks like there is a bug in the ScanRange method unless "it.MaxKey" is inclusive, then it is fine; otherwise, you need to figure out the end by adding a millisecond (or the smallest denomination to it) and then using that as the Max Key.
Khalid, This is just the last value of the counter in that transaction. We use this to create unique values for duplicate date time values. So if you have to inputs at the exact same instance, they won't overwrite one another.
And there isn't a bug there, because it is a bit tricky. Note that the key is actually a concat of the date + long.MaxValue, you are never going to get that, so there is no issue with that.
@Khalid - also, if the process _restarts_, _lastKey saves, well, the last key that was used so that the process can resume properly.
As someone who has worked extensivly with process control historians, I'm curious how you would handle storing data in Voron where you have multiple named (or at least identified) streams of time series data, that you want to be able to query by identifier, starttime, endtime.
Conceivably you could either have multiple tree's, or make the name part of the key, or make it part of the value. I'm just wondering which would be best / trade-off's of each approach. Especially in the context of having 10000's of different streams.
Sam, You would probably handle that using different trees, yes. The question here is how much data you have per named stream. If you have relatively small number of streams, you would use a named tree. We're actually working to ensure that you can have tens of thousands of trees in Voron without issue, but I am still getting to used to that. The major issue here is how much data you have per stream, I think.
Thanks for the response. I realise that Voron was probably not intended primarily as a time series data store, but I often see vendors pushing time series data into a relational database (which never strikes me as a good idea for a primary data store). So I find it interesting to see what the implications on the storage system / structure / format are for different storage layers.
For reference, the system I am currently working with has just over 100,000 streams, each holding about 5 years of 30 second interval values (so around 525,600,000,000 values in total).
Sam, Oh, I think that Voron would do pretty well as a time series db. I just finished making sure that there aren't any structural reasons why we couldn't have as many streams as we wanted. If you can share your data, I can try to do some benchmarks on that. It should be fun to do.
Ah. I was going to ask what would be the impact of creating 100k trees. Is that a non-issue now?
Sorry Ayende, I didn't mean to imply that Voron could not handle time series data. It was simply a recognition that it wasn't its primary target.
Unfortunately I won't be able to share that dataset with you, as it is commercially sensitive. If it would be of help though, I’m happy to do some more analysis of the data set and try to produce an equivalent (in terms of streams, data types, frequency of data etc) sample dataset. Let me know if that would be of assistance.
Kijana, We haven't tested this therally yet, but yes, that is the idea.
Sam, Yes, that would be very helpful
Comment preview