Share via


Streaming with WCF - Part 2: Plugging a custom stream into a WCF service



Last time, I introduced this topic and gave an example of a custom stream implementation. This time, I'm going to make a few enhancements to this implementation and get into the topic of using it in a WCF Service.

 

Streaming with WCF

Part 2: Plugging a custom stream into a WCF service

So, I've made a couple modifications to the CustomStreamImplementation. I added a couple knobs to control when it stops streaming. Here's the complete code:

using System;

using System.IO;

using System.Threading;

 

namespace CustomStreamImplementation

{

    public class ContinuousStream : Stream

    {

        // Used to control when Read will return 0.

        public bool StopStreaming { get; set; }

 

        public TimeSpan ReadThrottle { get; set; }

 

        // Only set this if you don't want to manually control when

        // the stream stops.

        // Keep it low - less than 1 second. The server can send bytes very quickly (without a throttle), so

        // sending a continuous stream will easily blow the MaxReceivedMessageSize buffer.

        public TimeSpan StreamDuration { get; set; }

 

        DateTime readStartedTime;

        long totalBytesRead = 0;

 

        public override bool CanRead

        {

            get { return !StopStreaming; }

        }

 

        public override bool CanSeek

        {

            get { return false; }

        }

 

        public override bool CanWrite

        {

            get { return false; }

        }

 

        public override long Length

        {

            get { throw new NotImplementedException(); }

        }

 

        public override long Position

        {

            get

            {

                return totalBytesRead;

            }

            set

            {

                totalBytesRead = value;

            }

        }

 

        public override void Flush()

        {

        }

 

        public override int Read(byte[] buffer, int offset, int count)

        {

            // Duration-based streaming logic: Control the "StopStreaming" flag based on a Duration

            if (StreamDuration != TimeSpan.Zero)

            {

                if (readStartedTime == DateTime.MinValue)

                {

                    readStartedTime = DateTime.Now;

                }

                if (DateTime.Now - readStartedTime >= StreamDuration)

                {

                    StopStreaming = true;

                }

  }

 

            if (StopStreaming)

            {

                buffer[offset] = 0;

                return 0;

            }

 

            // Allow Read to continue as long as StopStreaming is false.

            // Just fill buffer with as many random bytes as necessary.

            int seed = DateTime.Now.Millisecond;

            Random rand = new Random(seed);

            byte[] randomBuffer = new byte[count];

            rand.NextBytes(randomBuffer);

 

            randomBuffer.CopyTo(buffer, offset);

            totalBytesRead += count;

 

            if (ReadThrottle != TimeSpan.Zero)

            {

                Thread.CurrentThread.Join(ReadThrottle);

            }

            return count;

        }

 

        public override long Seek(long offset, SeekOrigin origin)

        {

            throw new NotImplementedException();

        }

 

        public override void SetLength(long value)

        {

            throw new NotImplementedException();

        }

 

        public override void Write(byte[] buffer, int offset, int count)

        {

            throw new NotImplementedException();

        }

    }

}

 

Downloading the stream from the service

First, I'll create the WCF service. I'll make it selfhosted to simplify things a bit and give me full control of what code I add.

The service is a new Console Application in my solution. I added a reference to the CustomStreamImplementation project. The first time I ever created a WCF service that was supposed to return a custom stream, I thought I'd end up having a contract that returned something of type "CustomStream." But then I was trying to figure out how to stream something from within the service operation, yet still return from the operation with the stream still going. It just didn't make sense, until I asked around and figured out what really happens. Your Stream object is basically passed down to the channel layer, which then invokes Read on the stream when the request comes in to download the stream. Bytes from the stream are loaded into a buffer and sent on the wire. As long as Read doesn't return 0, then it continues to create buffers and send them on the wire. I'll go into high-level details about how this works in a separate post.

 

Service Implementation

I'll just paste the entire code; it's pretty simple.

 

Contract:

using System.IO;

using System.ServiceModel;

 

namespace CustomStreamService

{

    [ServiceContract]

    public interface ICustomStreamService

    {

        [OperationContract]

  Stream DownloadStream();

    }

}

 

Implementation:

using System;

using System.IO;

using System.ServiceModel;

using CustomStreamImplementation;

 

namespace CustomStreamService

{

    [ServiceBehavior]

    class CustomStreamService : ICustomStreamService

    {

        [OperationBehavior]

        public Stream DownloadStream()

        {

            ContinuousStream continuousStream = new ContinuousStream();

            continuousStream.StreamDuration = TimeSpan.FromSeconds(30);

        continuousStream.ReadThrottle = TimeSpan.FromSeconds(1);

            return continuousStream;

        }

    }

}

 

Driver - The service host and client code:

using System;

using System.IO;

using System.ServiceModel;

 

namespace CustomStreamService

{

   class Program

    {

        static void Main(string[] args)

        {

            // Service code

            string address = "https://localhost/CustomStreamService";

            BasicHttpBinding binding = new BasicHttpBinding();

            binding.TransferMode = TransferMode.Streamed;

            ServiceHost host = new ServiceHost(typeof(CustomStreamService), new Uri(address));

           

            host.AddServiceEndpoint(typeof(ICustomStreamService), binding, address);

            host.Open();

            Console.WriteLine("Service is {0}, Press ENTER to continue.", host.State);

            Console.ReadLine();

 

            // Client code

            // For ad-hoc testing, I'll just re-use the binding, address, and contract that's defined in this project.

            // If I wanted to test the client cross-machine, I would have to either generate it using wsdl and svcutil, or

            // simply copy-paste the contract (ICustomStreamService) into the client project. Then make sure the binding and address are correct.

 

            ChannelFactory<ICustomStreamService> factory = new ChannelFactory<ICustomStreamService>(binding, address);

          ICustomStreamService client = factory.CreateChannel();

            Stream streamFromService = client.DownloadStream(); // Note that my Client knows nothing about the stream implementation.

 

            DateTime beginReadTime = DateTime.Now;

            Console.WriteLine("{0}: starting to read.", beginReadTime);

           

            // My oddball way of reading the bytes from the stream and writing them to the console.

            int bytesRead;

            int bufferSize = 256;

            byte[] buffer = new byte[bufferSize];

            do

            {

                bytesRead = streamFromService.Read(buffer, 0, buffer.Length);

                foreach (byte b in buffer)

                {

                    Console.Write(b);

                    Console.Write(' ');

                }

            } while (bytesRead != 0);

            Console.WriteLine("{0}: done reading. Elapsed time is {1}", DateTime.Now, DateTime.Now - beginReadTime);

 

            Console.ReadLine();

        }

    }

}

 

Troubleshooting

If you were to add all this code into a console app and run it, you'd see the client downloading bytes for a few seconds, then the streamFromService.Read call would eventually throw an IOException ("An exception has been thrown when reading the stream.") Examining the exception shows the following inner exception:

"The maximum message size quota for incoming messages (65536) has been exceeded. To increase the quota, use the MaxReceivedMessageSize property on the appropriate binding element."

But I thought I was Streaming?

Don't worry, you still are. But the MaxReceivedMessageSize is a mechanism for "time-boxing" the reading of the stream. Generally, you'd want this on a service that reads a stream from a client. You don't want a client to be able to tie up the service forever with an infinite stream, so this property takes care of that. Similarly, the property applies for clients. If you want to read a larger stream, simply do what the exception says. In this scenario, I'm just going to open the floodgates and let the client read as long as I can:

            binding.MaxReceivedMessageSize = int.MaxValue;

 

With that, my client could read the entire 30 seconds of the stream from the service:

17/2011 10:12:51 AM: done reading. Elapsed time is 00:00:34.6390000

 

If you're wondering about that extra 4.639 seconds of processing time, increasing the buffer size used by the read operation can increase the performance of my ad-hoc client. But I suggest you play with it - there's certainly an optimal value for this buffer, based on the size of the buffer used by the wire stream on the service. Set the bufferSize to something large, like 4k, and you'll see what I mean. Of course, logging each byte has the largest impact on performance.

 

Still can't get streaming to work?

Leave me a comment. I'll try to troubleshoot any issues with streaming and WCF that I can. I'm considering writing a brief posting about the things in WCF that don't support streaming. Off the top of my head, I can think of 2: MessageSecurity and Streaming on a Duplex Callback. There's probably a handful of other streaming-not-supported scenarios.