Ayende @ Rahien

Hi!
My name is Ayende Rahien
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:

ayende@ayende.com

+972 52-548-6969

, @ Q c

Posts: 5,949 | Comments: 44,548

filter by tags archive

1st April Post – Sending Data


Since it appears to be customary, I decided that I need to make a few posts for April 1st. Here is the first of them.

private IEnumerator<int> SendInternal(AsyncEnumerator ae)
{
    try
    {
        using (var client = new TcpClient())
        {
            try
            {
                client.BeginConnect(Destination.Host, Destination.Port,
                                    ae.End(),
                                    null);
            }
            catch (Exception exception)
            {
                logger.WarnFormat("Failed to connect to {0} because {1}", Destination, exception);
                Failure(exception);
                yield break;
            }

            yield return 1;

            try
            {
                client.EndConnect(ae.DequeueAsyncResult());
            }
            catch (Exception exception)
            {
                logger.WarnFormat("Failed to connect to {0} because {1}", Destination, exception);
                Failure(exception);
                yield break;
            }

            logger.DebugFormat("Successfully connected to {0}", Destination);

            using (var stream = client.GetStream())
            {
                var buffer = Messages.Serialize();

                var bufferLenInBytes = BitConverter.GetBytes(buffer.Length);

                logger.DebugFormat("Writing length of {0} bytes to {1}", buffer.Length, Destination);

                try
                {
                    stream.BeginWrite(bufferLenInBytes, 0, bufferLenInBytes.Length, ae.End(), null);
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not write to {0} because {1}", Destination,
                                      exception);
                    Failure(exception);
                    yield break;
                }
            
                yield return 1;

                try
                {
                    stream.EndWrite(ae.DequeueAsyncResult());
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not write to {0} because {1}", Destination,
                                      exception);
                    Failure(exception);
                    yield break;
                }

                logger.DebugFormat("Writing {0} bytes to {1}", buffer.Length, Destination);

                try
                {
                    stream.BeginWrite(buffer, 0, buffer.Length, ae.End(), null);
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not write to {0} because {1}", Destination,
                                    exception);
                    Failure(exception);
                    yield break;
                }
            
                yield return 1;

                try
                {
                    stream.EndWrite(ae.DequeueAsyncResult());
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not write to {0} because {1}", Destination,
                                      exception);
                    Failure(exception);
                    yield break;
                }

                logger.DebugFormat("Successfully wrote to {0}", Destination);

                var recieveBuffer = new byte[ProtocolConstants.RecievedBuffer.Length];
                var readConfirmationEnumerator = new AsyncEnumerator();

                try
                {
                    readConfirmationEnumerator.BeginExecute(
                        StreamUtil.ReadBytes(recieveBuffer, stream, readConfirmationEnumerator, "recieve confirmation"), ae.End());
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not read confirmation from {0} because {1}", Destination,
                                      exception);
                    Failure(exception);
                    yield break;
                }

                yield return 1;

                try
                {
                    readConfirmationEnumerator.EndExecute(ae.DequeueAsyncResult());
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not read confirmation from {0} because {1}", Destination,
                                      exception);
                    Failure(exception);
                    yield break;
                }

                var recieveRespone = Encoding.Unicode.GetString(recieveBuffer);
                if (recieveRespone == ProtocolConstants.QueueDoesNotExists)
                {
                    logger.WarnFormat(
                        "Response from reciever {0} is that queue does not exists",
                        Destination);
                    Failure(new QueueDoesNotExistsException());
                    yield break;
                }
                else if(recieveRespone!=ProtocolConstants.Recieved)
                {
                    logger.WarnFormat(
                        "Response from reciever {0} is not the expected one, unexpected response was: {1}",
                        Destination, recieveRespone);
                    Failure(null);
                    yield break;
                }

                try
                {
                    stream.BeginWrite(ProtocolConstants.AcknowledgedBuffer, 0,
                                      ProtocolConstants.AcknowledgedBuffer.Length, ae.End(), null);
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Failed to write acknowledgement to reciever {0} because {1}",
                                      Destination, exception);
                    Failure(exception);
                    yield break;
                }

                yield return 1;

                try
                {
                    stream.EndWrite(ae.DequeueAsyncResult());
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Failed to write acknowledgement to reciever {0} because {1}",
                                      Destination, exception);
                    Failure(exception);
                    yield break;
                }
                Success();

                buffer = new byte[ProtocolConstants.RevertBuffer.Length];
                var readRevertMessage = new AsyncEnumerator(ae.ToString());
                readRevertMessage.BeginExecute(
                    StreamUtil.ReadBytes(buffer, stream, readRevertMessage, "revert"), ae.End());
                yield return 1;
                try
                {
                    readRevertMessage.EndExecute(ae.DequeueAsyncResult());
                    var revert = Encoding.Unicode.GetString(buffer);
                    if (revert == ProtocolConstants.Revert)
                    {
                        Failure(null);
                    }
                }
                catch (Exception)
                {
                    // expected, there is nothing to do here, the
                    // reciever didn't report anything for us
                }

            }
        }
    }
    finally
    {
        var completed = SendCompleted;
        if (completed != null)
            completed();
    }
}

And yes, it is from real code, and it is going to production soon.


Comments

q
q

"[...] And yes, it is from real code [...]"

  • Is it from some kind of new open source software you are about to move to? ;)
Yitzchok

This looks like it uses Jeffrey Richter’s Power Threading Library ;)

Rafal

How did you know its about sending data?

Comment preview

Comments have been closed on this topic.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. The RavenDB Comic Strip (3):
    28 May 2015 - Part III – High availability & sleeping soundly
  2. Special Offer (2):
    27 May 2015 - 29% discount for all our products
  3. RavenDB Sharding (3):
    22 May 2015 - Adding a new shard to an existing cluster, splitting the shard
  4. Challenge (45):
    28 Apr 2015 - What is the meaning of this change?
  5. Interview question (2):
    30 Mar 2015 - fix the index
View all series

RECENT COMMENTS

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats