Reactive Extensionsを使用して複数のサービス非同期コールを連続して呼べるか試してみた。その2

昨日、Reactive Extensionsでサービス非同期コールのメソッドチェーンに挑戦してみました。

しかし、多数の課題が残ってしまいました。そこで、いくつかの課題をつぶしてみました。

アジェンダ

  • チェーンの始動ポイントを最初に持ってくる。
  • 前に呼んだサービスの引数にアクセスする。
  • サービスコール時に発生したエラー処理を行う。
  • サービスコール時に発生したエラー処理を行う拡張メソッドを定義する。
  • 改良版非同期サービスコールのメソッドチェーン
  • 残課題
  • まとめ

チェーンの始動ポイントを最初に持ってくる。

前回では、最初のサービスコールをメソッドチェーンの外で、かつメソッドチェーンの後ろに記述していました。

getACompleted.SelectMany((IEvent<GetACompletedEventArgs> e) =>
{
   // 省略
})
.Subscribe((IEvent<GetACompletedEventArgs> e) =>
{
    string result = e.EventArgs.Result;
    System.Diagnostics.Debug.WriteLine("Client5. 非同期サービスコールチェーン終了 = " + result);
});

// サービスコールチェーンを開始する。
client.GetAAsync("さーびすこーるちぇーん開始");

これを、

Observable.Defer(() =>
{
    // TODO : 何をトリガーにDeferに渡したメソッドは動いているのか不明
    client.GetAAsync("てすとその2");
    return getACompleted;
})
.Subscribe((IEvent<GetACompletedEventArgs> e) =>
{
    string result = e.EventArgs.Result;
    System.Diagnostics.Debug.WriteLine("Client5. 非同期サービスコールチェーン終了 = " + result);
});

とできるようです。しかし、Deferに渡したメソッド渡したメソッドは何をトリガーに動いているのでしょうか…。謎です。

前に呼んだサービスの戻り値にアクセスする。

自分よりも前に読んだサービスの戻り値にアクセスる為に、メソッド変数を使用します。

// サービスの戻り値を保持するメソッド変数。
string keyB = null;

// サービスコールチェーンを定義
Observable.Defer(() =>
{
    // TODO : 何をトリガーにDeferに渡したメソッドは動いているのか不明
    client.GetAAsync("てすとその2");
    return getACompleted;
})
.SelectMany(e =>
{
    System.Diagnostics.Debug.WriteLine("Client1. call GetBAsync");

    // メソッド変数に値をセット。
    keyB = e.EventArgs.Result;

    client.GetBAsync(keyB);

    System.Diagnostics.Debug.WriteLine("Client2. return getBCompleted");
    return getBCompleted;
})
.Subscribe(e =>
{
    // メソッドチェーンにセットした値にアクセスする。
    string temp = keyB;

    string result = e.EventArgs.Result;
    System.Diagnostics.Debug.WriteLine("Client5. 非同期サービスコールチェーン終了 = " + result);
});

サービスコール時に発生したエラー処理を行う。

Subscribeのオーバロードにはチェーン中に発生した例外をキャッチするActionを渡せるものがあります

public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError);

というわけで、Subscribeを上記の物にさしかえれば、チェーン中に発生した例外をキャッチできます。なお、サービスコール時に発生した例外はErrorプロパティにセットされますが、このErrorプロパティの値をスローすることによって、SubscribeのonErrorに渡します。また、Subscribeの直前のサービスコールで例外が発生したときは、SubscribeのonNextでErrorプロパティの値を処理する必要があります。

Observable.Defer(() =>
{
    // TODO : 何をトリガーにDeferに渡したメソッドは動いているのか不明
    client.GetAAsync("てすとその2");
    return getACompleted;
})
.SelectMany((IEvent<GetACompletedEventArgs> e) =>
{
    if (e.EventArgs.Error == null)
    {
        System.Diagnostics.Debug.WriteLine("Client1. call GetBAsync");
        keyB = e.EventArgs.Result;
        client.GetBAsync(keyB);

        System.Diagnostics.Debug.WriteLine("Client2. return getBCompleted");
        return getBCompleted;
    }
    else
    {
        // エラーをそのままスローする。
        throw e.EventArgs.Error;
    }
})
.Subscribe((IEvent<GetCCompletedEventArgs> e) =>
{
    if (e.EventArgs.Error == null)
    {
        string result = e.EventArgs.Result;
        System.Diagnostics.Debug.WriteLine("Client5. 非同期サービスコールチェーン終了 = " + result);
    }
    else
    {
        // サービスコール時のエラー処理
        MessageBox.Show(e.EventArgs.Error.Message);
    }
},
(Exception ex) =>
{
    // スローされたエラーをキャッチし、エラー処理を行う。
    MessageBox.Show(ex.Message);
});

サービスコール時に発生したエラー処理を行う拡張メソッドを定義する。

チェーン内に毎回Errorプロパティの値のチェックなどを記述するのは品雑です。そこで、サービスコール時のエラー処理を行う拡張メソッドを用意します。

public static class ServiceClientExMethods
{
    public static IObservable<TOther> ServiceCompleted<T, TOther>(this IObservable<IEvent<T>> source, Func<IEvent<T>, IObservable<TOther>> selector)
        where T : AsyncCompletedEventArgs
    {
        return source.SelectMany((IEvent<T> e) =>
        {
            if (e.EventArgs.Error == null)
            {
                return selector(e);
            }
            else
            {
                throw e.EventArgs.Error;
            }
        });
    }

    public static IDisposable ServiceCompleted<T>(this IObservable<IEvent<T>> source, Action<IEvent<T>> onNext, Action<Exception> onError)
        where T : AsyncCompletedEventArgs
    {
        Action<IEvent<T>> onNextWrapper = e =>
            {
                if (e.EventArgs.Error == null)
                {
                    onNext(e);
                }
                else
                {
                    onError(e.EventArgs.Error);
                }
            };

        return source.Subscribe(onNextWrapper, onError);
    }
}

使い方は、SelectManyやSubscribeと変わりません。

改良版非同期サービスコールのメソッドチェーン

上記で紹介した考えをまとめたメソッドチェーンは下記の通りです。

// サービスクライアントの生成
Service1Client client = new Service1Client();

// サービスコンプリートイベント監視用Observableインスタンスを定義
IObservable<IEvent<GetACompletedEventArgs>> getACompleted
    = Observable.FromEvent<GetACompletedEventArgs>(client, "GetACompleted");

IObservable<IEvent<GetBCompletedEventArgs>> getBCompleted
    = Observable.FromEvent<GetBCompletedEventArgs>(client, "GetBCompleted");

IObservable<IEvent<GetCCompletedEventArgs>> getCCompleted
    = Observable.FromEvent<GetCCompletedEventArgs>(client, "GetCCompleted");

string keyB = null;
// サービスコールチェーンを定義
Observable.Defer(() =>
{
    client.GetAAsync("てすとその2");
    return getACompleted;
})
.ServiceCompleted((IEvent<GetACompletedEventArgs> e) =>
{
    System.Diagnostics.Debug.WriteLine("Client1. call GetBAsync");
    keyB = e.EventArgs.Result;
    client.GetBAsync(keyB);

    System.Diagnostics.Debug.WriteLine("Client2. return getBCompleted");
    return getBCompleted;
})
.ServiceCompleted((IEvent<GetBCompletedEventArgs> e) =>
{
    // getBCompletedで監視していたイベントの戻り値をSselectManyで取得できる。
    System.Diagnostics.Debug.WriteLine("Client3. call GetCAsync");
    string keyC = keyB + e.EventArgs.Result;
    client.GetCAsync(keyC);
    System.Diagnostics.Debug.WriteLine("Client4. return getCCompleted");
    return getCCompleted;
})
.ServiceCompleted((IEvent<GetCCompletedEventArgs> e) =>
{
    string result = e.EventArgs.Result;
    System.Diagnostics.Debug.WriteLine("Client5. 非同期サービスコールチェーン終了 = " + result);
},
(Exception ex) =>
{
    // サービスコール時のエラー処理
    MessageBox.Show(ex.Message);
});

残課題

  • Deferに渡したメソッド渡したメソッドがコールされるトリガーを調べる。
  • いちいち、getACompletedなどを定義するのが面倒くさい。
  • サービスからの戻り値によって次に呼ぶサービスを分岐させたい場合、どのように対処するか。

まとめ

前回で課題のいくつかの対処した非同期サービスコールのメソッドチェーンを作成しました。しかし、課題が残ってしまいした。というか新しい課題も発生しました。
Reactive Extensionsは一筋縄ではいきませんね・・・。