Capturing and Parsing Serial Data with Reactive Extensions

Tags: software-engineering, software engineering, development, .net, reactive-extensions, rx, ASCOM, C#

I’m currently working on a project to refactor a telescope driver and I’ve decided to use the Reactive Extensions for .NET (Rx for short). It is proving to be very interesting.

The two core interfaces of Rx are IObservable<T> and IObserver<T>, in other words, the Observer Pattern. These are sort of the evil twins of IEnumberable<T> and IEnumerator<T>, except they are push instead of pull. In the Enumerable world, you either have to already have all the data, or you are blocked waiting for the next item in the sequence. In the Rx world, you subscribe to a sequence and you’re done; the sequence notifies you when there is some data available. This is a subtle yet radical paradigm shift that really makes it easy to write massively asynchronous code. So let’s look at a real-world example from a telescope driver that uses a Meade-like protocol.

Our telescope communicates over a serial port. We send it a command and it sends back a response, in the format :response# – the ‘:’ and ‘#’ are delimiters and are (almost) always present. So what we’d like to do is parse our response stream out into a series of strings beginning with ‘:’ and ending with ‘#’. Dealing with a serial port is never that straightforward though. Data is usually delivered in little ‘chunks’ that may contain a whole response or just part of a response. Sorting that all out and staying in sync is a bit of a nightmare that can quickly lead to spaghetti code. If you;re organized, you might write a state machine that handles a single character at a time and occasionally emits well-formed strings. We can do this job with Rx in a much more declarative way and using composition of LINQ queries. Rx is sometimes known as LINQ to Events, because it let’s you query data in motion, over time. It’s kind of hard to explain, let’s just see the code…

OK, first job is to capture the serial port’s DataReceived event and use that as the source of our data. We’ll define some extension methods to help with capturing our data source and building up useful queries. It is good practice to use extension methods for this, so that’s what we’ll do. First, we need to capture the DataReceived event of a serial port.

1 /// <summary> 2 /// Captures the <see cref="System.IO.Ports.SerialPort.DataReceived" /> event of a serial port and returns an 3 /// observable sequence of the events. 4 /// </summary> 5 /// <param name="port">The serial port that will act as the event source.</param> 6 /// <returns><see cref="IObservable{Char}" /> - an observable sequence of events.</returns> 7 public static IObservable<EventPattern<SerialDataReceivedEventArgs>> ObservableDataReceivedEvents( 8 this ISerialPort port) 9 { 10 var portEvents = Observable.FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>( 11 handler => 12 { 13 log.Debug("Event: SerialDataReceived"); 14 return handler.Invoke; 15 }, 16 handler => 17 { 18 // We must discard stale data when subscribing or it will pollute the first element of the sequence. 19 port.DiscardInBuffer(); 20 port.DataReceived += handler; 21 log.Debug("Listening to DataReceived event"); 22 }, 23 handler => 24 { 25 port.DataReceived -= handler; 26 log.Debug("Stopped listening to DataReceived event"); 27 }); 28 return portEvents; 29 } 30

This method is a little opaque, but basically it uses Observable.FromEventPattern to convert a .NET event into an observable sequence of event args. The method requires 3 delegates that get called when the event fires, one to subscribe to the event and another to unsubscribe from the event. Note that we haven’t subscribed to anything yet, so the DataReceived events will not even have a handler installed at this stage.

This gives us a sequence of events, each containing a SerialDataReceivedEventArgs object, which tells us why the event was fired, but no data. Our sequence is of the wrong type and contains no data. Not very useful. What we really want is an observable sequence of the characters that have been received. Our next extension method:

1 /// <summary> 2 /// Gets an observable sequence of all the characters received by a serial port. 3 /// </summary> 4 /// <param name="port">The port that is to be the data source.</param> 5 /// <returns><see cref="IObservable{char}" /> - an observable sequence of characters.</returns> 6 public static IObservable<char> ReceivedCharacters(this ISerialPort port) 7 { 8 var observableEvents = port.ObservableDataReceivedEvents(); 9 var observableCharacterSequence = from args in observableEvents 10 where args.EventArgs.EventType == SerialData.Chars 11 from character in port.ReadExisting() 12 select character; 13 return observableCharacterSequence; 14 }

Here we build upon our first extension method and each time we get an event in our sequence, we check that it’s for received data (line 10) and if it is, we read all the data that the serial port has in its buffer (line 11). We then iterate over the received characters, projecting a new type of sequence, IObservable<char>, and produce an element for each character. So now we’ve got the serial data and we’ve projected it into a new sequence of characters. Note that we were able to use a LINQ query to filter and transform data that we don’t even have yet – pretty cool!

So, having obtained all the characters in an observable sequence, what we really need to to chop it up into response packets and return each valid response as an individual string. No problem! A bit more LINQ and the use of Rx’s Buffer operator does the trick:

1 /// <summary> 2 /// Buffers a sequence of characters based on a pair of delimiters. 3 /// </summary> 4 /// <param name="source">The source sequence.</param> 5 /// <param name="initiator"></param> 6 /// <param name="terminator"></param> 7 /// <returns><see cref="IObservable{IList{char}}" /> - an observable sequence of buffers.</returns> 8 static IObservable<IList<char>> BufferByDelimiters(IObservable<char> source, char initiator, char terminator) 9 { 10 return source.Buffer(source.Where(c => c == initiator), x => source.Where(c => c == terminator)); 11 }

The Buffer operator is quite a sophisticated operator that can buffer by time, or by a count of the items, or (as we are doing here) by using a start trigger and a stop trigger. This gives us an observable sequence of lists of characters, not quite the strings we wanted, so another query will be used to project the sequence into strings:

1 /// <summary> 2 /// Parses a sequence of characters into a sequence of strings, based on delimiters. 3 /// The delimiters are included in the strings. Characters that do not occur between delimiters are discarded from the 4 /// sequence. 5 /// </summary> 6 /// <param name="source">The source sequence of characters.</param> 7 /// <param name="initiator">The initiator character that triggers the start of a new string.</param> 8 /// <param name="terminator">The terminator character that marks the end of a string.</param> 9 /// <returns> 10 /// <see cref="IObservable{string}" />, an observable sequence of strings each delimited by an initiator character 11 /// and a terminator character. 12 /// </returns> 13 public static IObservable<string> DelimitedMessageStrings( 14 this IObservable<char> source, 15 char initiator = ':', 16 char terminator = '#') 17 { 18 var buffers = source.Publish(s => BufferByDelimiters(s, initiator, terminator)); 19 var strings = from buffer in buffers 20 select new string(buffer.ToArray()); 21 return strings; 22 }

Here, we’ve included the initiator and the terminator as optional parameters and defaulted them to characters that are suitable for a Meade style protocol, which is very common in the world of amateur telescopes. Now all we need to do is apply our methods to a serial port, something like this:

1 var observableReceiveSequence = Port.ReceivedCharacters() 2 .Do( 3 c => Log.Debug("Rx={0}", c), 4 ex => Log.Error("Rx exception", ex), 5 () => Log.Warn("Rx OnCompleted")) 6 .Publish(); 7 var messages = observableReceiveSequence.DelimitedMessageStrings(); 8 messages.Subscribe(...); 9

This is starting to look easy! The Do() operator simply adds a ‘side effect’ to the sequence. It accepts 3 delegates that get called whenver there is a new element produced, or an error occurs, or the sequence completes. Very handy for debugging.

It turns out that this sequence doesn’t work very well for our protocol. Some responses don’t have the initial ‘:’ character, while other responses contain a time and therefore have embedded colons; our query fails in both cases, so we’ll need a slightly different one:

1 public static IObservable<string> TerminatedResponses(this IObservable<char> source, char terminator) 2 { 3 var terminatorString = terminator.ToString(); 4 var sequence = source.Scan("", (agg, c) => agg.EndsWith(terminatorString) ? c.ToString() : agg + c) 5 .Where(s => s.EndsWith(terminatorString)); 6 return sequence; 7 }

Another problem with our particular device is that sometimes it generates status notifications that we didn’t ask for. They are useful data, but here we want just the responses to our commands, so we can filter out the ‘event messages’ that would otherwise get in the way. First let’s define what we mean by an ‘event message’ in the form of a predicate:

1 /// <summary> 2 /// A predicate that evaluates to true if the message is an event message. 3 /// Event messages are messages that were generated autonomously by the drive system, not in response to any command. 4 /// Events can arrive at any time but they are guaranteed not to interrupt another response message on the serial channel. 5 /// </summary> 6 /// <param name="message">The message to be tested.</param> 7 /// <returns><c>true</c> if the specified message is an event; otherwise, <c>false</c>.</returns> 8 static bool IsEvent(this string message) 9 { 10 IList<string> eventHeaders = new[] { ":P", ":S", ":X", ":V", ":W", ":F", ":R", ":L" }; // ToDo - maybe use a RegEx? 11 return eventHeaders.Any(initiator => message.StartsWith(initiator)); 12 }

Armed with this predicated, we can now filter out the unwanted messages with a LINQ query:

1 /// <summary> 2 /// Filters a message sequence to remove 'event' messages. 3 /// </summary> 4 /// <param name="source">The source sequence.</param> 5 /// <returns><see cref="IObservable{String}"/> which contains the source sequence with events removed.</returns> 6 public static IObservable<string> IgnoreEventResponses(this IObservable<string> source) 7 { 8 var sequence = from message in source 9 where !message.IsEvent() 10 select message; 11 return sequence; 12 }

What could be simpler?

That’s enough for this article. I have a little console app and some more code that makes ‘transactions’ that allow me to send commands to the device and get the responses back and match the responses up with the commands. As a hint on where this is going, here is what the output looks like (more on the rest of the code in another article):


No Comments

Add a Comment

Let Us Help You

Find us on Facebook