Voron & time series data

time to read 26 min | 5115 words

One of the things that Voron does very well is the ability to read a lot of data fast. One of the interesting scenarios we deal with is when we want to deal with time series data.

For example, let us say that we have a bunch of sensors reporting on the temperature metrics within an area (said while the heaviest storm in 5 decades is blowing outside). Every minute, we have some data coming in. For fun, we will make the following assumptions:

  • We have do deal with late writes (a sensor sending us updates from 1 hour ago because of communication update).
  • Dates aren’t unique.
  • All queries will take into account the dates.

First, let me show you the full code for that, then we can talk about how it works:

   1: public class DateTimeSeries : IDisposable
   2: {
   3:     private readonly JsonSerializer _serializer = new JsonSerializer();
   4:     private readonly StorageEnvironment _storageEnvironment;
   5:     private long _last;
   6:     private readonly Slice _lastKey;
   7:  
   8:     public DateTimeSeries(string path)
   9:     {
  10:         _lastKey = "last-key";
  11:         _storageEnvironment = new StorageEnvironment(StorageEnvironmentOptions.ForPath(path));
  12:         using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
  13:         {
  14:             _storageEnvironment.CreateTree(tx, "data");
  15:             var read = tx.State.Root.Read(tx, _lastKey);
  16:  
  17:             _last = read != null ? read.Reader.ReadInt64() : 1;
  18:  
  19:             tx.Commit();
  20:         }
  21:     }
  22:  
  23:     public void AddRange<T>(IEnumerable<KeyValuePair<DateTime, T>> values)
  24:     {
  25:         using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
  26:         {
  27:             var data = tx.GetTree("data");
  28:             var buffer = new byte[16];
  29:             var key = new Slice(buffer);
  30:             var ms = new MemoryStream();
  31:             foreach (var kvp in values)
  32:             {
  33:                 var date = kvp.Key;
  34:                 EndianBitConverter.Big.CopyBytes(date.ToBinary(), buffer, 0);
  35:                 EndianBitConverter.Big.CopyBytes(_last++, buffer, 8);
  36:                 ms.SetLength(0);
  37:                 _serializer.Serialize(new StreamWriter(ms), kvp.Value);
  38:                 ms.Position = 0;
  39:  
  40:                 data.Add(tx, key, ms);
  41:             }
  42:  
  43:             tx.State.Root.Add(tx, _lastKey, new MemoryStream(BitConverter.GetBytes(_last)));
  44:             tx.Commit();
  45:         }
  46:     }
  47:  
  48:     public IEnumerable<T> ScanRange<T>(DateTime start, DateTime end)
  49:     {
  50:         using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.Read))
  51:         {
  52:             var data = tx.GetTree("data");
  53:             var startBuffer = new byte[16];
  54:             EndianBitConverter.Big.CopyBytes(start.ToBinary(), startBuffer, 0);
  55:             var startKey = new Slice(startBuffer);
  56:  
  57:             using (var it = data.Iterate(tx))
  58:             {
  59:                 var endBuffer = new byte[16];
  60:                 EndianBitConverter.Big.CopyBytes(end.ToBinary(), endBuffer, 0);
  61:                 EndianBitConverter.Big.CopyBytes(long.MaxValue, endBuffer, 8);
  62:  
  63:                 it.MaxKey = new Slice(endBuffer);
  64:                 if (it.Seek(startKey) == false)
  65:                     yield break;
  66:                 do
  67:                 {
  68:                     var reader = it.CreateReaderForCurrent();
  69:                     using (var stream = reader.AsStream())
  70:                     {
  71:                         yield return _serializer.Deserialize<T>(new JsonTextReader(new StreamReader(stream)));
  72:                     }
  73:                 } while (it.MoveNext());
  74:             }
  75:         }
  76:             
  77:     }
  78:  
  79:     public void Dispose()
  80:     {
  81:         _storageEnvironment.Dispose();
  82:     }
  83: }

In line 14, we create the data tree, which will hold the actual time series data, and the last-key, which I’ll explain in a bit.

The AddRange method in line 23 is probably the most interesting. We create a key that is composed of the date of the entry, and an incrementing number. Note that we use big endian encoding because that allow easy byte string sorting. The implications of this sort of key is that the values are actually sorted by the date, but if we have multiple values for the same millisecond, we don’t overwrite the data. Along with adding the actual data, we record the change in the incrementing counter ,so if we need to restart, we’ll continue from where we left off.

Finally, we have the actual ScanRange method. Here we basically start from the minimum value for the start date, and set the MaxKey as the stop condition for the maximum value for the end date. And then it is just getting the values out.

Pretty simple, I think.