Show / Hide Table of Contents

Asynchronous Programming

Many of the entry points in the Open API interface have both a synchronous and asynchronous form. The advantage of the synchronous form is that tends to be easier to code against. On the other hand, the advantage of the asynchronous form is that it allows for more parallelism and potentially better performance.

We express asynchrony with the .Net Task framework. In general, when we have a void or value-returning synchronous method that look like this:

void DoOperation1(args...)
T DoOperation2(args...)

...the corresponding asynchronous methods will look like this:

Task DoOperation1Task(args...)
Task<T> DoOperation2(args...)

As a matter of fact, in cases where the asynchronous method exists, the synchronous method always simply delegates to it:

void DoOperation1(args...)
{
  DoOperation1Task(args...).Wait();
}

T DoOperation2(args...)
{
    return ExceptionUtil.ResultOrUnwrappedException(
        DoOperation2Task(args...));
}

The ResultOrUnwrappedException method is a minor implementation detail: it is a simple helper method that unwraps AggregateExceptions and throws their inner exception instead.

Interested readers should consult the .NET documentation for detailed information about the .NET Task Framework. Here we would only like to stress three important considerations:

Always Wait or observe Result on your Tasks; don't fire-and-forget

The Task Framework is an asynchronous programming framework, not a threading framework per se (even though Tasks often do run on separate threads). Callers should not simply create Tasks and assume they will run to completion on their own. Instead, callers they should be careful to always call .Wait or .Result on Tasks that they are responsible for. If they fail to do so, the Task code might not finish executing or a thrown exception might not properly propagate.

Prefer async/await

The async/await pattern makes it very easy and natural to write code that properly chains Tasks, propagate exceptions, and so forth. Once you get used to it, it tends to be a lot easier to write code as opposed to chaining callbacks e.g. with .ContinueWith.

Create asynchronous TaskCompletionSources

For the sake of efficiency, if one piece of async code is waiting on a TaskCompletionSource result, and another piece of code calls SetResult, the setting code may invoke the waiting callback immediately (on its own thread), rather than dispatching it on another thread. This can lead to some very deep stack traces and even deadlocks in Task-heavy code, because it can introduce dependencies between two pieces of code that otherwise appear independent. The simple rule of thumb is if you do use TaskCompletionSource, always make it asynchronous:

var tcs = new TaskCompletionSource<T>(
    TaskCreationOptions.RunContinuationsAsynchronously);

Example

The following (somewhat artificial) example spawns two Workers in parallel, performs a query on each Worker, and reports the number of rows returned by each query.

public static void TaskExample()
{
    using (var client = OpenApi.Connect(host))
    {
        client.Login(user, password, operateAs);
        var ticker0 = "AAPL";
        var ticker1 = "ZNGA";

        var task0 = MakeWorkerAndFetchTable(client, ticker0);
        var task1 = MakeWorkerAndFetchTable(client, ticker1);
        var sizes = Task.WhenAll(task0, task1).Result;

        Console.WriteLine(
	    $"Tasks are finished: {ticker0} had {sizes[0]} rows, and {ticker1} had {sizes[1]} rows.");
    }
}

// Note async keyword
private static async Task<long> MakeWorkerAndFetchTable(IOpenApiClient client, string tickerToFind)
{
    var workerOptions = new WorkerOptions("Default");
    DebugUtil.Print($"[{tickerToFind}]: Starting worker");
    // Note: we are using the await keyword
    using (var workerSession = await client.StartWorkerTask(workerOptions))
    {
        var scope = workerSession.QueryScope;

        DebugUtil.Print($"[{tickerToFind}]: Getting historical table");
        var table = scope.HistoricalTable("LearnDeephaven", "EODTrades");

        DebugUtil.Print($"[{tickerToFind}]: Getting columns");
        // note await keyword
        var (importDate, ticker) = await table.GetColumnsTask<StrCol, StrCol>("ImportDate", "Ticker");
        var filtered = table.Where(importDate == "2017-11-01" &&
	    ticker == tickerToFind);

        DebugUtil.Print($"[{tickerToFind}]: Getting table data");
        // note await keyword
        var tableData = await filtered.GetTableDataTask();
        return tableData.Rows;
    }
}

Note: DebugUtil.Print is our method for printing a message to the console along with its thread id. One run of the sample program looks like this:

[1]: [AAPL]: Starting worker
[1]: [ZNGA]: Starting worker
[6]: [AAPL]: Getting historical table
[6]: [AAPL]: Getting columns
[10]: [AAPL]: Getting table data
[7]: [ZNGA]: Getting historical table
[7]: [ZNGA]: Getting columns
[10]: [ZNGA]: Getting table data

Note that various pieces of the asynchronous code ended up running on various threads in the thread pool.

What's Next

In our next chapter, Table Operations, we describe the various table operations that are possible.

Back to top Generated by DocFX