Reactive ASCOM

Tags: ASCOM, software-engineering, reactive-extensions, rx, astronomy

Using the Reactive Extensions for .NET to Build a Better Communications Model for ASCOM Drivers

[Update 2015-05-30] The code snippets in this blog post are already a little out of date as we update the codebase based on feeback and practical experience. We suggest checking out the Beta-0.0.0 branch from the Git repository and looking at the code there. https://bitbucket.org/tigranetworks/ta.ascom.reactivecommunications/branch/beta/0.0.0

Recently I’ve been experimenting with different approaches to handling device communications in ASCOM drivers. My reasons for doing this were driven by the following ‘pain points’.

  • A naive serial port handler can quickly get into difficulties with multi-threading and re-entrancy issues, particularly when it is loaded into a Single Threaded Apartment (STA Thread), which is typically the case for a VB6 application or anything that uses a Windows Forms based GUI. Some solutions were proposed for this which mainly involve some form of hand-off to a worker thread, but then you have all the problems of multi-threading to deal with and the issue still might not be solved.
  • There is a general assumption of a strict “"command-response protocol. In most of the drivers I have worked on, that assumption hasn’t held true.
  • Every driver is currently ‘reinventing the wheel’. At best, the serial port helper in ASCOM.Utilities can be used, but there are still potential threading/reentrancy issues with that and it does nothing to guarantee correct sequencing of commands and responses.
  • Modern devices are increasingly using Ethernet and I wanted a solution that worked equally well for multiple types of communications channel.

Initially I began looking at ways to solve the re-entrancy issues by handing off transactions to a worker thread. Then I hit upon the idea of using the Reactive Extensions (abbreviated “Rx”) and once I put that together with the concept of transactions, a new design began to take shape which would make it possible to produce a stand-alone library that could be re-used easily with little or no work. I have produced the library and retrofitted it to one of my telescope drivers and the results are very encouraging. I am presenting this library, which I call “Reactive ASCOM” to the public domain, I hope someone will find it useful.

What’s this Reactive Extensions thing?

In all honesty, the concept is a little bit hard to explain, even though it is rather simple. In .NET there are collections and they can be queried using LINQ (Language INtegrated Query). This has been an amazingly successful technology. The core interfaces in LINQ are IEnumerable<T> and IEnumerator<T> which allow data to be enumerated and processed by LINQ queries. In the world of IEnumerable, you have a dataset and you run a query over every element in the data. In fact it is possible to enumerate a partially complete dataset while it is still being produced and in that situation the code will normally get to a point where it is blocked waiting for the next element to be produced. IEnumerable is therefore a PULL technology because it is always trying to pull the next piece of data to complete the query.

RxInterfacesWhat Reactive Extensions does is to stand that concept on its head with the introduction of two similar interfaces, IObservable<T> and IObserver<T>. The authors of Rx realized that there is an amazing similarity between IEnumerable and IObservable, they are really just the same interface but viewed from the other side of the data source. Where IEnumerable views data as a PULL collection, IObservable views the data as a PUSH collection. That is, nothing happens until there is a piece of data. When a new element is produced, the data is pushed to any observers that have subscribed to it. In this way, there is never any blocking and code runs exactly when it is needed. Further, Rx defines a strictly observed grammar for observable sequences of data and makes certain guarantees about what order things will happen in. Sequencing and thread-safety fall naturally out of these guarantees. I have found that using Rx is not initially very intuitive, but once the light bulb switches on and you really start to ‘get it’ then you begin to wonder why you haven’t always taken this approach to handling data streams (the answer of course is because Rx wasn’t available!). It is impossible for me to go into much detail about Rx here; for more background information on Rx, please refer to the references at the end of this article.

Note: you don’t need a deep understanding of Rx in order to use Reactive ASCOM, because most of the details are hidden within the library. You just need to understand how to compose a LINQ query and be able to code your classes derived from DeviceTransaction. There is a sample application in the library source code that shows how to do this.

The Serial Port As an Observable Sequence

It seemed to me that data received from a serial port can be viewed as simple observable sequence of characters. This simple view leaves out everything that is complicated about serial ports: event handling, timing, end-of-line characters, “chunking” of the data, connecting and disconnecting, and so on. As luck would have it, Rx provides a way to import data from the standard .NET event pattern, which serial ports implement, and export it again as a sequence. We can capture the serial port events using this little extension method, which acts on a serial port and gives back a sequence of DataReceivedEventArgs:

1 public static IObservable<EventPattern<SerialDataReceivedEventArgs>> ObservableDataReceivedEvents( 2 this ISerialPort port) 3 { 4 var portEvents = Observable.FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>( 5 handler => 6 { 7 log.Debug("Event: SerialDataReceived"); 8 return handler.Invoke; 9 }, 10 handler => 11 { 12 // We must discard stale data when subscribing or it will pollute the first element of the sequence. 13 port.DiscardInBuffer(); 14 port.DataReceived += handler; 15 log.Debug("Listening to DataReceived event"); 16 }, 17 handler => 18 { 19 port.DataReceived -= handler; 20 log.Debug("Stopped listening to DataReceived event"); 21 }); 22 return portEvents; 23 } 24

This code tells Rx what to do each time an event is fired, how to subscribe to the event and how to unsubscribe from it (events are a lot like observables).

The next really neat thing that Rx does is to let you use LINQ queries over observable sequences and you can see an example of that in the following code, which uses a LINQ comprehension query to transform the sequence of DataReceivedEventArgs into a sequence of the actual characters received from the serial port.

1 public static IObservable<char> ReceivedCharacters(this ISerialPort port) 2 { 3 var observableEvents = port.ObservableDataReceivedEvents(); 4 var observableCharacterSequence = from args in observableEvents 5 where args.EventArgs.EventType == SerialData.Chars 6 from character in port.ReadExisting() 7 select character; 8 return observableCharacterSequence; 9 }

As is the case for normal LINQ, the query is not executed immediately. It is only executed when the source sequence produces a value. One way of looking at it is that we are running a query in time, on data in motion that we haven’t received yet. This is a very powerful concept. One can go on adding layers of queries to any observable sequence until exactly and only the data required is obtained, causing exactly the right code to execute at exactly the right moment.

In the Reactive ASCOM library, we use the concept of a Communications Channel to abstract the actual device (in this case, a serial port) being used. The ICommunicationChannel interface sets out the features expected of a channel and we kept it to a minimum, following the . One of the interface members is an IObservable<char> which exposes the received data as a character stream. We chose <char> rather than <string> or anything else because char seems to be the lowest common denominator and LINQ queries can always be used to rebuild string sequences later. This gives the most flexibility for supporting the widest range of devices and communication technologies.

Transactions as Observables

So we have our received data as an observable sequence of characters, but what about data going the other way, from the ASCOM driver out to the device? I have modelled that as an IObservable<DeviceTransaction>. DeviceTansaction is an abstract base class from which special purpose transactions can be derived. I have supplied a few that help with Meade style protocols, such as BooleanTransaction, NoReplyTransaction and TerminatedStringTransaction, more to demonstrate the correct approach than anything else. In my own drivers I have specialized these classes even further into things like SexagesimalValueTransaction and TerminatedSingleCharacterTransaction. Other drivers and protocols will require specialized transactions of their own.

The ASCOM driver creates transactions as required and hands them off to an instance of ITransactionProcessor. We have supplied an implementation called ReactiveTransactionProcessor which raises a TransactionAvailable event each time a new transaction is supplied by the driver. You might be able to guess where this is going… we know that Rx provides a way to easily capture a .Net event and that’s exactly what we do, to provide an observable sequence of EventArgs<DeviceTransaction>. This sequence is subscribed to by a custom observer called TransactionObserver.

Transactions as Observers

Each transaction implements (or inherits from the base class) the three handlers necessary for it to be an IObserver. We stop short of making it implement the IObserver interface because we didn’t want to tie it to a particular type of observable, because the type depends on the LINQ query that it defines. Instead, transactions must implement a method called SubscribeObserver() in which they can create whatever type of ovservable sequence they like (based on querying the original IObservavle<char> receive sequence) and subscribe to that. Thus, the observer is created on demand to match the sequence being subscribed to. The OnNext, OnError and OnCompleted handlers can be overridden as necessary or replaced with completely different handler methods. Transactions that return a value will typically override at least the OnCompleted handler.

Unit Testing

A quick aside on unit testing. It might not be clear why we’ve structured the code in exactly this way, but the ITransactionProcessor interface provides a deliberate ‘seam’ in the code which is very useful for unit testing.This interface can also be used with the to add logging and other processing around transactions. We’ve used another interface (ISerialPort) and a wrapper class called SerialPort that is a very thin wrapper around System.IO.Ports.SerialPort for similar reasons.

The Life of a Transaction

The life cycle of a transaction proceeds something like this.

First the driver creates a transaction based on the type of value it is looking for in response. The transaction is handed to the ITransactionProcessor implementation by calling ITransactionProcessor.CommitTransaction(). There is no going back from that point, you are committed and the transaction will either succeed and return a value, or fail and return an error.

If you are not interested in the response, you are done. You can just return and get on with the next bit of code. If you want a value back from the transaction, then you must call transaction.WaitForCompletionOrTimeout().

Committing a transaction raises the TransactionAvailable event, which produces an item that gets pushed to the TransactionObserver. The TransactionObserver subscribes the transaction to the observable sequence of received characters published by the ICommunicationsChannel so that the transaction becomes ‘hot’ and can receive its response. The transaction is responsible for selecting the response it needs from the input stream (example later). To do this, the transaction will need to provide its own LINQ query to pick out the data it is interested in, and it must do so in such a way that the sequence terminates once it has received valid data (typically using .Take(1)). When the transaction’s receive sequence terminates, it is automatically unsubscribed from the receive stream and its OnCompleted handler is called, where it can carry out any further parsing or type conversion required.

The OnCompleted handler calls transaction.SignalCompletion() which allows the waiting WaitForCompletionOrTimeout() call to return and the caller can then retrieve its response from the Value property.

Rx guarantees correct sequencing, so that even if transactions are being created on multiple threads, or reentrantly, only one transaction can be ‘hot’ and any given moment. We included assertions within TransactionObserver to ensure this is the case and it will throw an exception if any overlap is detected.

Should an error occur, or the transaction not receive its response within the allotted time, then Response will be Mabe.Empty and transaction.Failed will be true. There may also be an error message in the ErrorMessage property. Failed transactions do not cause an exception, so whoever waits for the result must check that it worked.

image

Thus, the input and output streams are brought together so that the correct responses go to the correct transactions. Let’s see how this looks in code. First, a BooleanTransaction:

1 public class BooleanTransaction : DeviceTransaction 2 { 3 public BooleanTransaction(string command) : base(command) { } 4 5 public override void ObserveResponse(IObservable<char> source) 6 { 7 source.TerminatedBoolean('#').Take(1).Subscribe(OnNext, OnError, OnCompleted); 8 } 9 10 protected override void OnCompleted() 11 { 12 try 13 { 14 if (Response.Any()) 15 Value = Response.Single().StartsWith("1"); 16 } 17 catch (FormatException) 18 { 19 Value = false; 20 } 21 base.OnCompleted(); 22 } 23 24 public bool Value { get; private set; } 25 }

In the ObserveResponse method, the transaction uses a fluent LINQ query to limit its input to only “terminated booleans”. It then uses .Take(1) so that it only receives a single response and that itOnCompleted method will be called.Then it subscribes to that filtered sequence and returns. At some point in the future, the expected character sequence will arrive which satisfies the LINQ query and the OnNext method will get called. We haven’t overridden OnNext so the default implementation is to copy the received text into the Response property. Next, because of the Take(1), the sequence completes and OnCompleted() gets called. Here, we have overridden OnCompleted to parse the response into a boolean, which we copy into the Value property. Finally, calling base.OnCompleted() marks the transaction as complete and signals the sender that the response is available in the Value property.

An example of how the driver would use such a transaction (the following code is taken from a production driver and is the implementation of the SlewToTargetAsync() method.

1 public void SlewToTargetAsync() 2 { 3 if (AtPark) 4 throw new ParkedException(); 5 awr.SafetyInterlock(false); //[AWR-34] Remove safety interlock prior to slew. 6 SetTarget(TargetRightAscension, TargetDeclination); 7 var gotoStartedSuccessfully = awr.TransactBool(Protocol.StartGoTo); 8 if (gotoStartedSuccessfully) 9 Log.Info("Slew started successfully"); 10 else 11 Log.Error("Slew did not start"); 12 Thread.Sleep((int) Settings.Default.ASCOMSlewSettleTimeSeconds * 1000); 13 }

That’s not quite the complete story because this particular driver is split into two layers. Here’s the implementation of TransactBoo:

1 public bool TransactBool(string txMsg, bool sendVerbatim=false) 2 { 3 Log.Debug("Begin boolean transaction"); 4 var transaction = TransactionFactory.Boolean(sendVerbatim ? txMsg : EnsureProtocolDelimiters(txMsg)); 5 transactionProcessor.CommitTransaction(transaction); 6 transaction.WaitForCompletionOrTimeout(); 7 Log.Debug("Boolean result is {0}", transaction.Value); 8 return transaction.Value; 9 }

I’m leaving out some of the details here, but hopefully you can get the general idea.

A Practical Example

Working out how to use third party code is often the tough bit, so we’ve provided a working example in the form of a console app. This little app will connect to a drive system that uses a Meade-style protocol and query it’s current coordinates. We’ve made the sample deliberately awkward in the way it commits transactions, to demonstrate the sequencing and thread-safety guarantees – it could probably be shortened by half without all the fluff. Even so, hope you’ll agree it’s pretty simple…

1 internal class Program 2 { 3 static void Main(string[] args) 4 { 5 #region Setup for Reactive ASCOM 6 var connectionString = Settings.Default.ConnectionString; // Edit in App.config, default is "COM1:" 7 var endpoint = DeviceEndpoint.FromConnectionString(connectionString); 8 ICommunicationChannel channel = new SerialCommunicationChannel(endpoint); 9 var transactionObserver = new TransactionObserver(channel); 10 var processor = new ReactiveTransactionProcessor(); 11 processor.SubscribeTransactionObserver(transactionObserver); 12 channel.Open(); 13 #endregion Setup for Reactive ASCOM 14 15 #region Submit some transactions 16 // Ready to go. We are going to use tasks to submit the transactions, just to demonstrate thread safety. 17 var raTransaction = new TerminatedStringTransaction(":GR#") {Timeout = TimeSpan.FromSeconds(2)}; 18 var decTransaction = new TerminatedStringTransaction(":GD#") {Timeout = TimeSpan.FromSeconds(2)}; 19 Task.Run(() => processor.CommitTransaction(raTransaction)); 20 Task.Run(() => processor.CommitTransaction(decTransaction)); 21 #endregion Submit some transactions 22 23 #region Wait for the results 24 // NOTE we are using the transactions in the reverse order that we committed them, just to prove a point. 25 Console.WriteLine("Waiting for declination"); 26 decTransaction.WaitForCompletionOrTimeout(); 27 Console.WriteLine("Declination: {0}", decTransaction.Response); 28 Console.WriteLine("Waiting for Right Ascensions"); 29 raTransaction.WaitForCompletionOrTimeout(); 30 Console.WriteLine("Right Ascension: {0}", raTransaction.Response); 31 #endregion Wait for the results 32 33 #region Cleanup 34 // To clean up, we just need to dispose the TransactionObserver and the channel is closed automatically. 35 // Not strictly necessary, but good practice. 36 transactionObserver.OnCompleted(); // There will be no more transactions. 37 transactionObserver = null; // not necessary, but good practice. 38 #endregion Cleanup 39 } 40 }

The output will vary depending on your machine environment, but on our test system connected to a Meade-compatible drive system, we get this:

1 2015-05-27 19:32:48.0078 INFO Transaction pipeline connected to channel with endpoint COM1:9600,None,8,One 2 2015-05-27 19:32:48.0928 INFO Channel opening => COM1:9600,None,8,One 3 2015-05-27 19:32:48.1168 DEBUG Event: SerialDataReceived 4 2015-05-27 19:32:48.1168 DEBUG Listening to DataReceived event 5 Waiting for declination 6 2015-05-27 19:32:48.1739 INFO Committing transaction TID=1 [:GR#] [{no value}] 00:00:02 7 2015-05-27 19:32:48.1949 DEBUG Sending [:GR#] 8 2015-05-27 19:32:48.2379 DEBUG Rx=1 9 2015-05-27 19:32:48.2379 DEBUG Rx=0 10 2015-05-27 19:32:48.2379 DEBUG Rx=: 11 2015-05-27 19:32:48.2379 DEBUG Rx=5 12 2015-05-27 19:32:48.2379 DEBUG Rx=9 13 2015-05-27 19:32:48.2379 DEBUG Rx=: 14 2015-05-27 19:32:48.2379 DEBUG Rx=0 15 2015-05-27 19:32:48.2379 DEBUG Rx=6 16 2015-05-27 19:32:48.2379 DEBUG Rx=# 17 2015-05-27 19:32:48.2519 INFO Transaction 1 completed 18 2015-05-27 19:32:48.2519 INFO Completed transaction TID=1 [:GR#] [10:59:06#] 00:00:02 19 2015-05-27 19:32:48.2519 INFO Committing transaction TID=2 [:GD#] [{no value}] 00:00:02 20 2015-05-27 19:32:48.2519 DEBUG Sending [:GD#] 21 2015-05-27 19:32:48.2709 DEBUG Rx=- 22 2015-05-27 19:32:48.2709 DEBUG Rx=1 23 2015-05-27 19:32:48.2709 DEBUG Rx=8 24 2015-05-27 19:32:48.2709 DEBUG Rx=ß 25 2015-05-27 19:32:48.2709 DEBUG Rx=3 26 2015-05-27 19:32:48.2709 DEBUG Rx=9 27 2015-05-27 19:32:48.2769 DEBUG Rx=: 28 2015-05-27 19:32:48.2769 DEBUG Rx=0 29 2015-05-27 19:32:48.2769 DEBUG Rx=0 30 2015-05-27 19:32:48.2769 DEBUG Rx=# 31 Declination: -18ß39:00# 32 Waiting for Right Ascensions 33 Right Ascension: 10:59:06# 34 2015-05-27 19:32:48.2769 INFO Transaction 2 completed 35 2015-05-27 19:32:48.2769 INFO Channel closing => COM1:9600,None,8,One 36 2015-05-27 19:32:48.2769 INFO Completed transaction TID=2 [:GD#] [-18ß39:00#] 00:00:02 37 2015-05-27 19:32:48.2769 DEBUG Stopped listening to DataReceived event

Get the Code

Reactive ASCOM is available as source code from BitBucket at: https://bitbucket.org/tigranetworks/ta.ascom.reactivecommunications

It is also available as a NuGet package, simply Install-Package TA.Ascom.ReactiveCommunications

For more details, please refer to the at

No Comments

Add a Comment

Let Us Help You

Find us on Facebook