System.Threading.Channels の記事を拝見し、Unity では UniTask に Channel があると教えてもらったので試してみました。 非同期キューイングな処理を使いたいシーンはよくあるので、知っておくと便利です。
System.Threading.Channels を導入する例は次の記事をどうぞ。
Unity - System.Threading.Channels で生産者/消費者パターンを利用する - yotiky Tech Blog
目次
基本的な使い方
UniTask のリポジトリ(Channelの項目)はこちらですね。
こちらの記事にサンプルコードが書かれており参考にさせて頂きました。
リンクにも書かれている通り、UniTask の Channel では複数の消費者はサポートされていません。複数の Consumer で購読した場合、オブザーバーパターンのように全員が等しく購読します。
実装例
生産者 1 : 消費者 1
1対1の実装例です。ほとんど参考にさせて頂いた記事のコードのままです。
private void SingleConsumer() { var channel = Channel.CreateSingleConsumerUnbounded<string>(); var reader = channel.Reader; WaitForChannelAsync(reader, this.GetCancellationTokenOnDestroy()).Forget(); var writer = channel.Writer; writer.TryWrite("1"); writer.TryWrite("2"); writer.TryWrite("3"); writer.TryComplete(); } private async UniTaskVoid WaitForChannelAsync(ChannelReader<string> reader, CancellationToken cancellationToken) { try { await reader.ReadAllAsync() .ForEachAsync(x => Debug.Log(x), cancellationToken); } catch (Exception e) { Debug.LogException(e); } }
実行結果です。
1 2 3
生産者 n : 消費者 1
n対1の実装例です。
private async UniTaskVoid MultiToSingle() { var channel = Channel.CreateSingleConsumerUnbounded<string>(); var reader = channel.Reader; WaitForChannelAsync(reader, this.GetCancellationTokenOnDestroy()).Forget(); var writer = channel.Writer; var producers = Enumerable.Range(1, 3) .Select(producerId => UniTask.Run(() => { writer.TryWrite("Producer" + producerId); })); await UniTask.WhenAll(producers); writer.Complete(); }
実行結果です。
Producer2 Producer1 Producer3
生産者 1 : 消費者 n
最後に 1対nの実装例です。先に説明したとおり、複数の消費者はキューを消費するわけではなく、等しく購読するので注意が必要です。
private void Multicast() { var channel = Channel.CreateSingleConsumerUnbounded<string>(); var connectable = channel.Reader.ReadAllAsync().Publish(); using (connectable.Connect()) { WaitForChannelAsync(1, connectable, this.GetCancellationTokenOnDestroy()).Forget(); WaitForChannelAsync(2, connectable, this.GetCancellationTokenOnDestroy()).Forget(); WaitForChannelAsync(3, connectable, this.GetCancellationTokenOnDestroy()).Forget(); var writer = channel.Writer; writer.TryWrite("A"); writer.TryWrite("B"); writer.TryWrite("C"); writer.TryComplete(); } } private async UniTaskVoid WaitForChannelAsync(int consumerId, IUniTaskAsyncEnumerable<string> enumerable, CancellationToken cancellationToken) { try { await enumerable.ForEachAsync(x => Debug.Log("Consumer" + consumerId + ":" + x), cancellationToken); } catch (Exception e) { Debug.LogException(e); } }
実行結果です。
Consumer1:A Consumer2:A Consumer3:A Consumer1:B Consumer2:B Consumer3:B Consumer1:C Consumer2:C Consumer3:C
まとめ
System.Threading.Channels と比較すると、複数の消費者で利用した場合の動きが異なるため注意が必要ですが、タスクを積んで非同期に裏で処理したいケースはそれなりに遭遇します。自作するとなると面倒ですが消費者を1に絞る、またはChannelを別けることで簡単に非同期キューイングが手に入るので知っておいて損はないと思います。