yotiky Tech Blog

とあるエンジニアの備忘録

Unity - Monobit MRS のクライアント実装

Monobit MRS のクライアント実装をした際の備忘録。
サーバーサイドはノータッチなので省略。

公式ページ

検証環境

  • Unity 2019.4.24f1
  • MRS 2.0.0

主なAPI

※説明はAPIリファレンスより抜粋

基本

  • mrs_initialize
    • MRSのライブラリに必要なメモリを割り当てて初期化を行います
  • mrs_connect
    • 新しい接続(MrsConnection)をひとつ割り当てて初期化し、接続を開始します。 mrs_connect関数は接続を開始するだけで、実際のトランスポート層パケットはこの関数を呼び出しても送信されません。 実際のパケットは、後でmrs_update関数を呼び出すことによって送信されます。
  • mrs_set_read_record_callback
    • 接続において接続相手が送信したレコードを1個受信するごとに1回呼ばれるコールバック関数を設定します
  • mrs_set_read_callback
    • 接続において接続相手が送信したバイナリデータを受信するごとに1回呼ばれるコールバック関数を設定します
  • mrs_update
    • 内部でOSに対してポーリングを行い、MRSが監視しているソケットにデータが到着している場合は必要なコールバックを呼び出し、また送信が可能な場合はソケットに対してデータ(レコード)を送信します。 OSに対する入出力はすべてここで行われます。
  • mrs_write_record
    • 接続に対して、レコードを1個送信します この関数を呼び出した時点では実際に送信されず、次の mrs_update 関数の呼び出し時に可能な限り送信しようとします。 ただし、OSやソケットの状態によっては、次の呼び出しで確実に送信されるとは限りません。 送信の準備が成功したらtrue,メモリが足りないなどによって失敗したらfalseが返されます。 MRSはレコードを送信するときに内部的に通し番号を付与します。
    • payload_type : 送信したいデータの種別をアプリケーションが自由に指定します。
  • mrs_write
    • 接続に対して、バイナリデータを送信します この関数を呼び出した時点では実際に送信されず、次の mrs_update 関数の呼び出し時に可能な限り送信しようとします。 ただし、OSやソケットの状態によっては、次の呼び出しで確実に送信されるとは限りません。 MRSはレコードを送信するときに内部的に通し番号を付与します。
  • mrs_finalize
    • MRSライブラリが確保しているメモリをすべて解放します

その他

  • mrs_connection_is_connected
    • MrsConnectionがサーバへの接続を完了しているかどうかを調べます
    • (mrs_connectを呼ぶ前や、引数が IntPtr.Zero でも false を返す)
  • mrs_set_connect_callback
    • サーバへの接続が完了したことを検出するためのコールバック関数を設定します mrs_connect関数で作られた接続(MrsConnection)に対して、その接続が実際に接続が成功したことを検出するためのコールバック関数を設定します。コールバック関数は接続ごとに設定する必要があります。 接続ごとに異なるコールバック関数を設定することも可能です。
  • mrs_set_disconnect_callback
    • 接続が切れたときに呼び出されるコールバック関数を設定します サーバーが起動していない場合やファイアウォールなど、何らかの原因によって接続に失敗した場合は、このコールバック関数は呼ばれず、 mrs_set_error_callback 関数で設定したコールバック関数が呼び出されます。
  • mrs_set_error_callback
    • 接続において何らかのエラーが発生したことを検出するためのコールバック関数を設定します

処理イメージ

  1. mrs_initialize
  2. mrs_connect
  3. mrs_set_read_record_callback でコールバック設定
  4. mrs_write_record で送信データをスタック
    1. payload_type は各自定義なのでサーバーとどういった処理するかの取り決めが必要
    2. Getはサーバーからデータを送信してもらう、Setはサーバーへデータを送信する、など
  5. mrs_update で実際にデータを送受信、受信があればコールバックを呼び出す
  6. mrs_finalize

サンプルコード

※抜粋して書き出したので動作確認はしていません。

using MrsConnection = System.IntPtr;

public class SampleClient : Mrs
{
    static SampleClient()
    {
        mrs_initialize();
    }

    private string address = "127.0.0.1";
    private ushort port = 22222;
    private uint timeout = 5000;

    private MrsConnection mrsConnection;

    public void Connect()
    {
        mrsConnection = mrs_connect(MrsConnectionType.TCP, address, port, timeout);
        mrs_set_connect_callback(mrsConnection, on_connect);
        mrs_set_disconnect_callback(mrsConnection, on_disconnect);
        mrs_set_error_callback(mrsConnection, on_error);
        mrs_set_read_record_callback(mrsConnection, on_read_record);
    }

    [AOT.MonoPInvokeCallback(typeof(MrsConnectCallback))]
    private void on_connect(MrsConnection connection, MrsConnection connection_data)
    {
    }
    [AOT.MonoPInvokeCallback(typeof(MrsDisconnectCallback))]
    private void on_disconnect(MrsConnection connection, MrsConnection connection_data)
    {
    }
    [AOT.MonoPInvokeCallback(typeof(MrsErrorCallback))]
    private void on_error(MrsConnection connection, MrsConnection connection_data, MrsConnectionError status)
    {
    }
    [AOT.MonoPInvokeCallback(typeof(MrsReadRecordCallback))]
    private void on_read_record(MrsConnection connection, MrsConnection connection_data, 
        uint seqnum, ushort options, ushort payload_type, MrsConnection _payload, uint payload_len)
    {
        var payload = ToBytes(_payload, payload_len);

        switch ((PayloadType)payload_type)
        {
            case PayloadType.GetData:
                var buffer = new mrs.Buffer(payload, payload_len);
                var data_len = buffer.ReadInt32();
                var data = new byte[data_len];
                buffer.Read(data, (uint)data_len);
                var str_data = Encoding.UTF8.GetString(data);
                break;
        }
    }

    public void GetDataRequest()
    {
        var buffer = new mrs.Buffer();

        var str_key = "NanikaNoKey";
        var key_data = Encoding.UTF8.GetBytes(str_key);
        var key_data_len = key_data.Length;

        buffer.WriteInt32(key_data_len);
        buffer.Write(key_data);

        mrs_write_record(mrsConnection, (ushort)MrsRecordOption.NONE, (ushort)PayloadType.GetData, buffer.GetData(), buffer.GetDataLen());
    }

    public void SetData()
    {
        var buffer = new mrs.Buffer();

        var str_key = "NanikaNoKey";
        var key_data = Encoding.UTF8.GetBytes(str_key);
        var key_data_len = key_data.Length;
        buffer.WriteInt32(key_data_len);
        buffer.Write(key_data);

        var str_data = "hogehoge";
        var data = Encoding.UTF8.GetBytes(str_data);
        var data_len = data.Length;
        buffer.WriteInt32(data_len);
        buffer.Write(data);

        mrs_write_record(mrsConnection, (ushort)MrsRecordOption.NONE, (ushort)PayloadType.SetData, buffer.GetData(), buffer.GetDataLen());
    }

    void Update()
    {
        mrs_update();
    }

    void OnDestroy()
    {
        mrs_finalize();
    }

    enum PayloadType
    {
        GetData = 1,
        SetData = 2,
    }
}

補足

  • mrs_connect
    • 接続が確立するまで多少のタイムラグがあるため、短い間隔でmrs_connection_is_connectedを使うとfalseになる可能生がある mrs_connection_is_connectedを見てmrs_connectを呼ぶようにしていると、連続して呼び出した時に複数回コネクションが接続することがある
  • Keep Alive
    • MonobitServerSettings にある Time Settings の Keep Alive

      この時間を経過してもMUNサーバからKeepAliveの信号を受信することができない場合、接続状態を維持できていないと判断し、自動的に切断(接続失敗)の処理を行ないます。

再接続処理

サンプルコード

コールバックはstaticなので注意が必要。

using MrsConnection = System.IntPtr;

public class MrsSampleClient : Mrs
{
    private const string LOGPREFIX = "[MRS Callback] ";
    private string address = "127.0.0.1";
    private ushort port = 22222;
    private uint timeout = 5000;

    private static MrsConnection mrsConnection;
    private static bool isClosing = false;
    private static bool isRetry = false;

    public void Connect()
    {
        if (mrs_connection_is_connected(mrsConnection))
        {
            isClosing = true;
            mrs_close(mrsConnection);
            mrsConnection = IntPtr.Zero;
        }
        mrs_initialize();
        mrsConnection = mrs_connect(MrsConnectionType.TCP, address, port, timeout);
        mrs_set_connect_callback(mrsConnection, on_connect);
        mrs_set_disconnect_callback(mrsConnection, on_disconnect);
        mrs_set_error_callback(mrsConnection, on_error);
        mrs_set_read_record_callback(mrsConnection, on_read_record);
    }

    public void Disconnect()
    {
        if (mrs_connection_is_connected(mrsConnection))
        {
            isClosing = true;
            mrs_close(mrsConnection);
            mrsConnection = IntPtr.Zero;
        }
    }

    void OnDestroy()
    {
        isClosing = true;
        mrs_finalize();
    }
    private void OnApplicationQuit()
    {
        isClosing = true;
        mrs_finalize();
    }

    private float waitTime = 3f;
    private float elapsedTime = 0f;
    private void Update()
    {
        if (isRetry)
        {
            if (mrs_connection_is_connected(mrsConnection) || isClosing)
            {
                isRetry = false;
            }
            else
            {
                elapsedTime += Time.deltaTime;
                if (waitTime < elapsedTime)
                {
                    Debug.Log("Retry connect...");
                    isRetry = false;
                    elapsedTime = 0f;
                    Connect();
                }
            }
        }
    }

    [AOT.MonoPInvokeCallback(typeof(MrsConnectCallback))]
    private static void on_connect(MrsConnection connection, IntPtr connection_data)
    {
        Debug.Log($"{LOGPREFIX}on_connect");
        mrsConnection = connection;
    }
    [AOT.MonoPInvokeCallback(typeof(MrsDisconnectCallback))]
    private static void on_disconnect(MrsConnection connection, IntPtr connection_data)
    {
        Debug.Log($"{LOGPREFIX}on_disconnect");

        if (!isClosing)
        {
            isRetry = true;
        }
    }
    [AOT.MonoPInvokeCallback(typeof(MrsErrorCallback))]
    private static void on_error(MrsConnection connection, IntPtr connection_data, MrsConnectionError status)
    {
        Debug.Log($"{LOGPREFIX}on_error : {status}");

        if (status == MrsConnectionError.CONNECT_TIMEOUT || status == MrsConnectionError.CONNECT_ERROR)
        {
            if (!isClosing)
            {
                isRetry = true;
            }
        }
    }
    [AOT.MonoPInvokeCallback(typeof(MrsReadRecordCallback))]
    private void on_read_record(MrsConnection connection, MrsConnection connection_data,
        uint seqnum, ushort options, ushort payload_type, MrsConnection _payload, uint payload_len)
    {
        Debug.Log($"{LOGPREFIX}on_read_record");
    }
}

実行ログ

Connect、Disconnectした時のログ。

f:id:yotiky:20210715031649p:plain

Connectして、Unityを停止(アプリを終了)した時のログ。

f:id:yotiky:20210715031708p:plain

Connectして、ネットワークを切断、復旧した時のログ。(再接続処理)

f:id:yotiky:20210715031746p:plain

ネットワークが切断した場合は、on_disconnectが呼ばれる。 その後オフライン状態が続く場合は、タイムアウト後にon_errorが呼ばれる。

ネットワークの切断は原因によってCONNECT_ERRORKEEPALIVE_TIMEOUTCONNECT_TIMEOUTなどが発生する。 また、これらはOSに依存するため原因が同じでも発生する内容が異なる場合がある。

KeepAliveのタイムアウトMonobitServerSettings.assetの Time Settings で設定している値(ms)となる。実際はエラーが発生するまでこの時間の倍±αくらいの時間がかかるようだ。

f:id:yotiky:20210803085010p:plain