RavenFS and NServiceBus’ Data Bus

time to read 5 min | 887 words

The NServiceBus data bus allows you to send very large messages by putting them on a shared resource and sending the reference to it. An obvious use case for this is using RavenFS. I took a few moments and wrote an implementation for that*.

public class RavenFSDataBus : IDataBus, IDisposable
{
private readonly FilesStore _filesStore;
private Timer _timer;


private object _locker = new object();
private void RunExpiration(object state)
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_locker, ref lockTaken);
if (lockTaken == false)
return;

using (var session = _filesStore.OpenAsyncSession())
{
var files = session.Query()
.WhereLessThan("Time-To-Be-Received", DateTime.UtcNow.ToString("O"))
.OrderBy("Time-To-Be-Received")
.ToListAsync();

files.Wait();

foreach (var fileHeader in files.Result)
{
session.RegisterFileDeletion(fileHeader);
}

session.SaveChangesAsync().Wait();
}
}
finally
{
if (lockTaken)
Monitor.Exit(_locker);
}
}

public RavenFSDataBus(string connectionString)
{
_filesStore = new FilesStore
{
ConnectionStringName = connectionString
};
}

public RavenFSDataBus(FilesStore filesStore)
{
_filesStore = filesStore;
}

public Stream Get(string key)
{
return _filesStore.AsyncFilesCommands.DownloadAsync(key).Result;
}

public string Put(Stream stream, TimeSpan timeToBeReceived)
{
var key = "/data-bus/" + Guid.NewGuid();
_filesStore.AsyncFilesCommands.UploadAsync(key, stream, new RavenJObject
{
{"Time-To-Be-Received", DateTime.UtcNow.Add(timeToBeReceived).ToString("O")}
}).Wait();

return key;
}

public void Start()
{
_filesStore.Initialize(ensureFileSystemExists: true);
_timer = new Timer(RunExpiration);
_timer.Change(TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
}

public void Dispose()
{
if (_timer != null)
_timer.Dispose();
if (_filesStore != null)
_filesStore.Dispose();
}
}

* This is written to check it out, hasn’t been tested very well yet.