.NET, async, LINQ, Programming

Make LINQ Aggregate asynchronous

I often use LINQ in my code. Well, put it in another way: I can’t live without using LINQ in my daily work. One of the my favorite methods is Aggregate. Applying it wisely could save you from having explicit loops, naturally chain into other LINQ methods and at the same time keep your code readable and well-structured. Aggregate is similar to reduce and fold functions which is hammer and anvil of functional programming tooling.

When you use Entity Framework it provides you with async extensions methods like ToListAsync(), ToArrayAsync(), SingleAsync(). But what if you want to achieve asynchronous behavior using LINQ Aggregate method? You will not find async extension in existing framework (on the moment of writing this article I’m using .NET Core 3.1 and C# 8.0). But let me give you a real-world example of the case when you could find this really useful.

Let’s say you need to fetch from database all distinct values for multiple columns in order to build multi-selection filter like this:

Let’s also assume you use SQL Server as it is most common one. For keeping it simple I will show you example with using Dapper micro-ORM.

The function could look like this:

public List<MultiSelectionModel> GetMultiSelectionFilterValues(string[] dataFields) {
  var results = new List<MultiSelectionModel>();

  var query = dataFields.Aggregate(new StringBuilder(), (acc, field) =>{
    return acc.AppendLine($ "SELECT [{field}] FROM Table GROUP BY [{field}];");
  });

  using var connection = new SqlConnection(this.connectionString);
  connection.Open();

  using(var multi = connection.QueryMultiple(query.ToString())) {
    results.AddRange(dataFields.Aggregate(
     new List<MultiSelectionModel>(), (acc, field) =>{
      acc.Add(new MultiSelectionModel {
        DataField = field,
        Values = multi.Read(),
      });

      return acc;
    }));
  }

  return results;
}

The function receives as input parameter array of data fields (columns) for which we need to fetch distinct values for multi-selection filter and returns a list of multi-selection model which is just simple data structure defined as:

public class MultiSelectionModel
{
    public string DataField { get; set; }
    public IEnumerable<dynamic> Values { get; set; }
}

On lines 4-6 you see how Aggregate method applied for building a SELECT query for fetching distinct values for provided columns. I uses GROUP BY in this example, but you can use DISTINCT with same effect, although there difference in performance between distinct and group by for more complex queries which is excellently explained in this article. Lines 13-21 highlights the main logic of the function where we actually querying database with multi.Read() and assign results with distinct values for each data field in resulting model. In both cases following Aggregate extension used:

public static TAccumulate Aggregate<TSource, TAccumulate>(
	this IEnumerable<TSource> source,
	TAccumulate seed,
	Func<TAccumulate, TSource, TAccumulate> func
)

In first case as a seed parameter we provided StringBuilder. Second parameter is a function which receives accumulator and element from the source and returns accumulator which is StringBuilder in our case. In second case, as a seed we used List<MultiSelectionModel> which is resulting collection, so that final list is accumulated in that collection.

So that works. You can stop reading now and go for a couple of 🍺 with fellows…

Oh, you still here 😏. You know, curiosity killed the cat. But we different animals, so let’s move on. Well, as you can notice, in the first example we used what is known in Dapper as multi-result result. It executes multiple queries within the same command and map results. The good news is that it also has async version. The bad news is that our Aggregate does not have async version. Should we go back to old good for-each loop for mapping results from query execution then? No way!

So how could we implement all the way down async version of GetMultiSelectionFilterValues? Well, let’s re-write it how we would like to see it:

public async Task<List<MultiSelectionModel>> GetMultiSelectionFilterValuesAsync(string[] dataFields) {
  var results = new List<MultiSelectionModel>();

  var query = dataFields.Aggregate(new StringBuilder(), (acc, field) =>{
    return acc.AppendLine($ "SELECT [{field}] FROM Table GROUP BY [{field}];");
  });

  using var connection = new SqlConnection(this.connectionString);
  connection.Open();

  using(var multi = await connection.QueryMultipleAsync(query.ToString())) {
    results.AddRange(await dataFields.AggregateAsync(
     new List<MultiSelectionModel>(), async (acc, field) =>{
      acc.Add(new MultiSelectionModel {
        DataField = field,
        Values = await multi.ReadAsync(),
      });

      return acc;
    }));
  }

  return results;
}

Much better now, isn’t it? I’ve highlighted the changes. This is fully asynchronous Aggregate method now. Of course you wish to know where did I get this async extension 😀? Here the extension methods I come up with to make it work:

public static class AsyncExtensions {
	public static Task<TSource> AggregateAsync<TSource>(
	this IEnumerable<TSource> source, Func<TSource, TSource, Task<TSource>> func) {
		if (source == null) {
			throw new ArgumentNullException(nameof(source));
		}

		if (func == null) {
			throw new ArgumentNullException(nameof(func));
		}

		return source.AggregateInternalAsync(func);
	}

	public static Task<TAccumulate> AggregateAsync<TSource,
	TAccumulate>(
	this IEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> func) {
		if (source == null) {
			throw new ArgumentNullException(nameof(source));
		}

		if (func == null) {
			throw new ArgumentNullException(nameof(func));
		}

		return source.AggregateInternalAsync(seed, func);
	}

	private static async Task<TSource> AggregateInternalAsync <TSource> (
	this IEnumerable <TSource> source, Func<TSource, TSource, Task<TSource>> func) {
		using
		var e = source.GetEnumerator();

		if (!e.MoveNext()) {
			throw new InvalidOperationException("Sequence contains no elements");
		}

		var result = e.Current;
		while (e.MoveNext()) {
			result = await func(result, e.Current).ConfigureAwait(false);
		}

		return result;
	}

	private static async Task<TAccumulate> AggregateInternalAsync<TSource,	TAccumulate>(
	this IEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> func) {
		var result = seed;
		foreach(var element in source) {
			result = await func(result, element);
		}

		return result;
	}
}

I did it for two of three existing Aggregate overloads. The last one you can implement yourself if you need it. It will be good exercise for you to understand how aggregate works behind the scenes.

Stay tuned and have fun.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s