yotiky Tech Blog

とあるエンジニアの備忘録

Unity - UniTask の Channel で生産者/消費者パターンを利用する

System.Threading.Channels の記事を拝見し、Unity では UniTask に Channel があると教えてもらったので試してみました。 非同期キューイングな処理を使いたいシーンはよくあるので、知っておくと便利です。

qiita.com

System.Threading.Channels を導入する例は次の記事をどうぞ。

Unity - System.Threading.Channels で生産者/消費者パターンを利用する - yotiky Tech Blog

目次

基本的な使い方

UniTask のリポジトリ(Channelの項目)はこちらですね。

github.com

こちらの記事にサンプルコードが書かれており参考にさせて頂きました。

qiita.com

リンクにも書かれている通り、UniTask の Channel では複数の消費者はサポートされていません。複数の Consumer で購読した場合、オブザーバーパターンのように全員が等しく購読します。

オブザーバーパターンで出てくるのはPub/Subで、通知、購読に関するデザインパターンであり、購読しても"消費"はされません。生産者/消費者(Producer/Consumer)は、マルチスレッドなどのデザインパターンとして登場し、消費者は積まれたデータを消費します。似たような処理のため混乱しがちです。

実装例

生産者 1 : 消費者 1

1対1の実装例です。ほとんど参考にさせて頂いた記事のコードのままです。

   private void SingleConsumer()
    {
        var channel = Channel.CreateSingleConsumerUnbounded<string>();

        var reader = channel.Reader;
        WaitForChannelAsync(reader, this.GetCancellationTokenOnDestroy()).Forget();

        var writer = channel.Writer;
        writer.TryWrite("1");
        writer.TryWrite("2");
        writer.TryWrite("3");
        writer.TryComplete();
    }

    private async UniTaskVoid WaitForChannelAsync(ChannelReader<string> reader, CancellationToken cancellationToken)
    {
        try
        {
            await reader.ReadAllAsync()
                .ForEachAsync(x => Debug.Log(x), cancellationToken);
        }
        catch (Exception e)
        {
            Debug.LogException(e);
        }
    }

実行結果です。

1
2
3

生産者 n : 消費者 1

n対1の実装例です。

    private async UniTaskVoid MultiToSingle()
    {
        var channel = Channel.CreateSingleConsumerUnbounded<string>();

        var reader = channel.Reader;
        WaitForChannelAsync(reader, this.GetCancellationTokenOnDestroy()).Forget();

        var writer = channel.Writer;

        var producers = Enumerable.Range(1, 3)
            .Select(producerId =>
                UniTask.Run(() =>
                {
                    writer.TryWrite("Producer" + producerId);
                }));

        await UniTask.WhenAll(producers);
        writer.Complete();
    }

実行結果です。

Producer2
Producer1
Producer3

生産者 1 : 消費者 n

最後に 1対nの実装例です。先に説明したとおり、複数の消費者はキューを消費するわけではなく、等しく購読するので注意が必要です。

    private void Multicast()
    {
        var channel = Channel.CreateSingleConsumerUnbounded<string>();

        var connectable = channel.Reader.ReadAllAsync().Publish();
        using (connectable.Connect())
        {
            WaitForChannelAsync(1, connectable, this.GetCancellationTokenOnDestroy()).Forget();
            WaitForChannelAsync(2, connectable, this.GetCancellationTokenOnDestroy()).Forget();
            WaitForChannelAsync(3, connectable, this.GetCancellationTokenOnDestroy()).Forget();

            var writer = channel.Writer;

            writer.TryWrite("A");
            writer.TryWrite("B");
            writer.TryWrite("C");

            writer.TryComplete();
        }
    }

    private async UniTaskVoid WaitForChannelAsync(int consumerId, IUniTaskAsyncEnumerable<string> enumerable, CancellationToken cancellationToken)
    {
        try
        {
            await enumerable.ForEachAsync(x => Debug.Log("Consumer" + consumerId + ":"  + x), cancellationToken);
        }
        catch (Exception e)
        {
            Debug.LogException(e);
        }
    }

実行結果です。

Consumer1:A
Consumer2:A
Consumer3:A
Consumer1:B
Consumer2:B
Consumer3:B
Consumer1:C
Consumer2:C
Consumer3:C

まとめ

System.Threading.Channels と比較すると、複数の消費者で利用した場合の動きが異なるため注意が必要ですが、タスクを積んで非同期に裏で処理したいケースはそれなりに遭遇します。自作するとなると面倒ですが消費者を1に絞る、またはChannelを別けることで簡単に非同期キューイングが手に入るので知っておいて損はないと思います。