RavenFS and NServiceBus’ Data Bus
The NServiceBus data bus allows you to send very large messages by putting them on a shared resource and sending the reference to it. An obvious use case for this is using RavenFS. I took a few moments and wrote an implementation for that*.
public class RavenFSDataBus : IDataBus, IDisposable
{
private readonly FilesStore _filesStore;
private Timer _timer;
private object _locker = new object();
private void RunExpiration(object state)
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_locker, ref lockTaken);
if (lockTaken == false)
return;
using (var session = _filesStore.OpenAsyncSession())
{
var files = session.Query()
.WhereLessThan("Time-To-Be-Received", DateTime.UtcNow.ToString("O"))
.OrderBy("Time-To-Be-Received")
.ToListAsync();
files.Wait();
foreach (var fileHeader in files.Result)
{
session.RegisterFileDeletion(fileHeader);
}
session.SaveChangesAsync().Wait();
}
}
finally
{
if (lockTaken)
Monitor.Exit(_locker);
}
}
public RavenFSDataBus(string connectionString)
{
_filesStore = new FilesStore
{
ConnectionStringName = connectionString
};
}
public RavenFSDataBus(FilesStore filesStore)
{
_filesStore = filesStore;
}
public Stream Get(string key)
{
return _filesStore.AsyncFilesCommands.DownloadAsync(key).Result;
}
public string Put(Stream stream, TimeSpan timeToBeReceived)
{
var key = "/data-bus/" + Guid.NewGuid();
_filesStore.AsyncFilesCommands.UploadAsync(key, stream, new RavenJObject
{
{"Time-To-Be-Received", DateTime.UtcNow.Add(timeToBeReceived).ToString("O")}
}).Wait();
return key;
}
public void Start()
{
_filesStore.Initialize(ensureFileSystemExists: true);
_timer = new Timer(RunExpiration);
_timer.Change(TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
}
public void Dispose()
{
if (_timer != null)
_timer.Dispose();
if (_filesStore != null)
_filesStore.Dispose();
}
}
* This is written to check it out, hasn’t been tested very well yet.
Comments
Comment preview