Ayende @ Rahien

Hi!
My name is Oren Eini
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:

ayende@ayende.com

+972 52-548-6969

, @ Q c

Posts: 18 | Comments: 65

filter by tags archive

1st April Post – Sending Data

time to read 19 min | 3673 words

Since it appears to be customary, I decided that I need to make a few posts for April 1st. Here is the first of them.

private IEnumerator<int> SendInternal(AsyncEnumerator ae)
{
    try
    {
        using (var client = new TcpClient())
        {
            try
            {
                client.BeginConnect(Destination.Host, Destination.Port,
                                    ae.End(),
                                    null);
            }
            catch (Exception exception)
            {
                logger.WarnFormat("Failed to connect to {0} because {1}", Destination, exception);
                Failure(exception);
                yield break;
            }

            yield return 1;

            try
            {
                client.EndConnect(ae.DequeueAsyncResult());
            }
            catch (Exception exception)
            {
                logger.WarnFormat("Failed to connect to {0} because {1}", Destination, exception);
                Failure(exception);
                yield break;
            }

            logger.DebugFormat("Successfully connected to {0}", Destination);

            using (var stream = client.GetStream())
            {
                var buffer = Messages.Serialize();

                var bufferLenInBytes = BitConverter.GetBytes(buffer.Length);

                logger.DebugFormat("Writing length of {0} bytes to {1}", buffer.Length, Destination);

                try
                {
                    stream.BeginWrite(bufferLenInBytes, 0, bufferLenInBytes.Length, ae.End(), null);
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not write to {0} because {1}", Destination,
                                      exception);
                    Failure(exception);
                    yield break;
                }
            
                yield return 1;

                try
                {
                    stream.EndWrite(ae.DequeueAsyncResult());
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not write to {0} because {1}", Destination,
                                      exception);
                    Failure(exception);
                    yield break;
                }

                logger.DebugFormat("Writing {0} bytes to {1}", buffer.Length, Destination);

                try
                {
                    stream.BeginWrite(buffer, 0, buffer.Length, ae.End(), null);
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not write to {0} because {1}", Destination,
                                    exception);
                    Failure(exception);
                    yield break;
                }
            
                yield return 1;

                try
                {
                    stream.EndWrite(ae.DequeueAsyncResult());
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not write to {0} because {1}", Destination,
                                      exception);
                    Failure(exception);
                    yield break;
                }

                logger.DebugFormat("Successfully wrote to {0}", Destination);

                var recieveBuffer = new byte[ProtocolConstants.RecievedBuffer.Length];
                var readConfirmationEnumerator = new AsyncEnumerator();

                try
                {
                    readConfirmationEnumerator.BeginExecute(
                        StreamUtil.ReadBytes(recieveBuffer, stream, readConfirmationEnumerator, "recieve confirmation"), ae.End());
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not read confirmation from {0} because {1}", Destination,
                                      exception);
                    Failure(exception);
                    yield break;
                }

                yield return 1;

                try
                {
                    readConfirmationEnumerator.EndExecute(ae.DequeueAsyncResult());
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Could not read confirmation from {0} because {1}", Destination,
                                      exception);
                    Failure(exception);
                    yield break;
                }

                var recieveRespone = Encoding.Unicode.GetString(recieveBuffer);
                if (recieveRespone == ProtocolConstants.QueueDoesNotExists)
                {
                    logger.WarnFormat(
                        "Response from reciever {0} is that queue does not exists",
                        Destination);
                    Failure(new QueueDoesNotExistsException());
                    yield break;
                }
                else if(recieveRespone!=ProtocolConstants.Recieved)
                {
                    logger.WarnFormat(
                        "Response from reciever {0} is not the expected one, unexpected response was: {1}",
                        Destination, recieveRespone);
                    Failure(null);
                    yield break;
                }

                try
                {
                    stream.BeginWrite(ProtocolConstants.AcknowledgedBuffer, 0,
                                      ProtocolConstants.AcknowledgedBuffer.Length, ae.End(), null);
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Failed to write acknowledgement to reciever {0} because {1}",
                                      Destination, exception);
                    Failure(exception);
                    yield break;
                }

                yield return 1;

                try
                {
                    stream.EndWrite(ae.DequeueAsyncResult());
                }
                catch (Exception exception)
                {
                    logger.WarnFormat("Failed to write acknowledgement to reciever {0} because {1}",
                                      Destination, exception);
                    Failure(exception);
                    yield break;
                }
                Success();

                buffer = new byte[ProtocolConstants.RevertBuffer.Length];
                var readRevertMessage = new AsyncEnumerator(ae.ToString());
                readRevertMessage.BeginExecute(
                    StreamUtil.ReadBytes(buffer, stream, readRevertMessage, "revert"), ae.End());
                yield return 1;
                try
                {
                    readRevertMessage.EndExecute(ae.DequeueAsyncResult());
                    var revert = Encoding.Unicode.GetString(buffer);
                    if (revert == ProtocolConstants.Revert)
                    {
                        Failure(null);
                    }
                }
                catch (Exception)
                {
                    // expected, there is nothing to do here, the
                    // reciever didn't report anything for us
                }

            }
        }
    }
    finally
    {
        var completed = SendCompleted;
        if (completed != null)
            completed();
    }
}

And yes, it is from real code, and it is going to production soon.


Comments

q
q

"[...] And yes, it is from real code [...]"

  • Is it from some kind of new open source software you are about to move to? ;)
Yitzchok

This looks like it uses Jeffrey Richter’s Power Threading Library ;)

Rafal

How did you know its about sending data?

Comment preview

Comments have been closed on this topic.

FUTURE POSTS

  1. RavenDB 3.0 New Stable Release - 11 hours from now
  2. Production postmortem: The case of the lying configuration file - about one day from now
  3. Production postmortem: The industry at large - 2 days from now
  4. The insidious cost of allocations - 3 days from now
  5. Buffer allocation strategies: A possible solution - 6 days from now

And 4 more posts are pending...

There are posts all the way to Sep 11, 2015

RECENT SERIES

  1. Find the bug (5):
    20 Apr 2011 - Why do I get a Null Reference Exception?
  2. Production postmortem (10):
    31 Aug 2015 - The case of the memory eater and high load
  3. What is new in RavenDB 3.5 (7):
    12 Aug 2015 - Monitoring support
  4. Career planning (6):
    24 Jul 2015 - The immortal choices aren't
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats