2009-07-04

RPC サーバの遅延リターン

最近は過労気味でウェブにものを書くこともできない, という話で上司の同情を誘うべく 日本人の労働時間やストレスの実態をまとめた エンドレス・ワーカーズ を読んだら, 自分の労働時間は日本人労働者の上位 2 割から漏れていることを知り愕然とした. あんなに働いたってのに...残業エリートへの道は険しい. 道を進みたいわけじゃないけれど. (平均は越えてたぜ!)

いずれにせよ流行からはすっかり脱落しているので, 時流を無視して仕事の話でもしよう.

以前, 会社の blog で RPC の結果をノンブロッキングスタイルで受け取るプリミティブ "弱関数" を提案した. でも試行錯誤の結果, いまは使っていない. C++ での弱参照は意図しないリークを作りやすい. 使いわすれることも多く, 忘れた頃にクラッシュする. 要求は明示的にキャンセルした方がいいことがわかった. (世間はみんなそうやってますね...)

さて, 結果の受け取りは非同期/ノンブロッキング RPC の一面に過ぎない. 今日は呼び出された RPC を非同期に処理する作りについて書いてみたい.

復習: RPC のノンブロッキング呼び出し

まず復習として, RPC のノンブロッキング呼び出し側についてざっとバリエーションを眺めよう. 例題はありがちな名簿サービス:

interface AddressBook {
  // 住所の文字列から一致した人を返すつもり...
  List<Person> findByAddress(String address);
};

RPC のノンブロッキング呼び出しにはいくつかのアプローチがある. FutureIAsyncResult のようなポリング派と, Deferred のようなコールバック派が二大派閥だ.

使いやすさは一長一短だけれど, ポリング派はブロッキングで作られたアプリケーションを部分的にノンブロッキング化するとき, コールバック派は最初からノンブロッキング主体に書かれたコードベースで, 使われることが多いと思う. 足回りに使う通信フレームワークの流儀に合わせることもある. 古の ONC RPC はどちらもサポートしている. (教科書ではそれぞれ nonblock rpc, callback rpc と呼ばれていた.)

私の仕事はノンブロッキング主体のコードベースが相手だから, とりあえずコールバック派に限って話を進めたい.

コールバック派の中にもコールバック関数の登録方法でバリエーションがある. Deferred のように戻り値へコールバックを登録させるものもあれれば...

// コールバック派のノンブロッキング RPC 呼び出し疑似コード
public classs AddressBookProxy implements IAddressBookClient
{
  public Async< List<Person> > findByAddress(String address) { ... }
};
...
Deferred< List<Person> > found = addrbook.findByAddress("suzuran st.");
found.addCallback(...); // ここで登録

GWT RPC のように コールバック自身を追加の引数に渡す亜種もある.

// これも疑似コード
public classs AddressBookProxy implements IAddressBookClient
{
  public void findByAddress(String address, Async< List<Person> > callback) { ... }
};

本題: RPC 受け取り側のノンブロック化

本来なら RPC の受け取り(サーバ)側にも同様の議論があるはずだが, クライアントのノンブロッキング呼び出しをサポートする RPC 処理系であっても サーバ側の実装をノンブロッキングに作れるものは案外少ない. サーバにはマルチスレッドかつブロッキングを想定することが多い.

// ブロッキングスタイルの RPC サービス実装
public class AddressBookServiceImpl implements IAddressBookService {
  public List<Person> findByAddress(String address) {
     List<Person> found = .... データベースに問合せて結果をとりだす....;
     return found;
  }
};

けれどサーバの実装がシングルスレッドや再入を許さないマルチスレッドで作られているとき, ブロッキングスタイルは都合が悪い. ノンブロッキングにしたい = リターンを遅延したいことがある.

リターンを遅延するとは, RPC の実装メソッド (上の例だと AddressBookServiceImpl.findByAddress()) の中では 結果の値を返すかわりに遅延を通知し, 別のタイミングで実際の値を返したいということ. たとえばデータベースの処理をワーカスレッドに任せ, 自分自身は処理を終えたい. そして計算の終わったワーカから戻り値を返したい. あるいは別のサーバに(非同期で)問合せた結果を自身の戻り値に利用したい.

けれど素朴なブロッキングスタイルでサーバを実装すると, こうした期待は叶わない.

public class AddressBookServiceImpl implements IAddressBookService {
  public List<Person> findByAddress(String address) {
     m_workers.enqueue(new AddressFinder(address, ...));
     ... ここでどうすればいい? ...
  }
};

ではどのように遅延リターンを実現するのが望ましいだろう? まずは RPC の前提である "要求-応答" モデルを持つ通信層として, いくつかの HTTP サーバを調べてみよう.

Twisted.Web : 戻り値で遅延を通知+引数のメソッドでリターンを通知

ノンブロッキングの通信フレームワークである Twisted は, ノンブロッキングの HTTP サーバ Twisted.Web を持っている. Twisted.Web で応答を遅延するコードは次のように書ける. (コピペ元.)

class ExampleResource(Resource):
     def render_GET(self, request):
        def hello():
          request.write("hello world")
          request.finish() # これでリターン通知
        reactor.callLater(10, hello) # 10 秒後に "hello world" と応答する
        return server.NOT_DONE_YET # 遅延通知

コールバックである render_GET() からフレームワークに特別な戻り値 NOT_DONE_YET を返し, タイマーに登録したコールバックから遅延処理を実行している. ちなみに応答を遅延しない場合は戻り値に応答の中身 ("hello world" など) を返せばいい.

Jetty 6.0 Continuation : 例外で遅延を通知

Java の HTTP サーバである Jetty の バージョン 6.0 は, 遅延リターンの仕組み "Continuation" を実装していた. (コピペ元.)

private void doPoll(HttpServletRequest request, AjaxResponse response)
{
    HttpSession session = request.getSession(true);

    synchronized (mutex)
    {
        Member member = (Member)chatroom.get(session.getId());

        // Is there any chat events ready to send?
        if (!member.hasEvents())
        {
            // No - so prepare a continuation
            Continuation continuation = ContinuationSupport.getContinuation(request, mutex);
            member.setContinuation(continuation);

            // wait for an event or timeout
            continuation.suspend(timeoutMS);
        }
        member.setContinuation(null);

        // send any events
        member.sendEvents(response);
    }
}

一見するとよくわからないけれど, coninuation.suspend() の呼びだしで 特別な例外がおこり, 処理は doPoll() を飛び出して Jetty に戻る. そして別の誰かが coninuation.resume() を呼びだすと, 再び doPoll() が呼び出される. つまり遅延通知に例外を使っている. Twisted.Web よりだいぶわかりづらい.

Servlet 3.0: 引数のメソッドと注釈で遅延を通知

Servlet は 3.0 から非同期応答をサポートした. Jetty の continuation もこれで置き換えられるらしい. (コピペ元.)

@WebServlet("/foo" asyncSupported=true)
public class MyServlet extends HttpServlet {
    public void doGet(HttpServletRequest request, HttpServletResponse response) {
        ...
        AsyncContext aCtx = request.startAsync(request, response); // 遅延通知
        ScheduledThreadPoolExecutor executor = new ThreadPoolExecutor(10);
        executor.execute(new AsyncWebService(aCtx));
    }
}
...
public class AsyncWebService implements Runnable {
    AsyncContext ctx;
    public AsyncWebService(AsyncContext ctx) {
         this.ctx = ctx;
    }
    public void run() {
        // Invoke web service and save result in request attribute
        // Forward the request to render the result to a JSP.
        ctx.forward("/render.jsp");  // これがリターン
    }
}

まず doGet() メソッドに特別な注釈をつけ, 実行時には 引数の request オブジェクトに対し遅延を通知するメソッド startAsync() を呼びだしている. サンプルでは同時にワーカスレッドを起動する. そして処理が完了すると, スレッドは startAsync() の戻り値である AsyncContext のメソッドを呼び出して戻り値を返す. (=リターンする.) Jetty の例外脱出 + 同じメソッド再呼び出しモデルよりはわかりやすくなった.

...

HTTP の遅延リターンは三つの実装に三種類のバリエーションを見ることができた:

HTTP の非同期応答 API を一通り眺めたところで, 今度はその上に載せる RPC の API を調べてみよう.

Ice RPC : 引数のメソッドでリターン通知

RPC 実装のひとつ ZeroC の Ice は非同期応答をサポートしていた. (コピペ元はパッケージ付属の async サンプル)

// Hello.ice のインターフェイス定義
interface Hello
{
    ["ami", "amd"] idempotent void sayHello(int delay)
        throws RequestCanceledException;

    void shutdown();
};

// 実装
void
HelloI::sayHello_async(const Demo::AMD_Hello_sayHelloPtr& cb, int delay, const Ice::Current&)
{
    if(delay == 0)
    {
        cout << "Hello World!" << endl;
        cb->ice_response(); // これがリターン
    }
    else
    {
        _workQueue->add(cb, delay);
    }
}

インターフェイス定義で "amd" (Asynchrounous Method Dispatching) を指定すると, 実装の第一引数に特別なオブジェクトがやってくるようになる. 渡ってきたオブジェクト cb に対し, リターンを示す ice_response() を呼び出すまで応答が遅延されるのだろう. デフォルトの挙動が非同期なため, 遅延を明示的に通知する必要はないようだ.

protocol buffer : 引数のメソッドでリターンを通知

protocol buffer はどうか. (コピペ元.)

// in .proto
service SearchService {
  rpc Search (SearchRequest) returns (SearchResponse);
}
...
// in .cc
class ExampleSearchService : public SearchService {
public:
 void Search(protobuf::RpcController* controller,
             const SearchRequest* request,
             SearchResponse* response,
             protobuf::Closure* done) {
   if (request->query() == "google") {
     response->add_result()->set_url("http://www.google.com");
   } else if (request->query() == "protocol buffers") {
     response->add_result()->set_url("http://protobuf.googlecode.com");
   }
   done->Run(); // これがリターン
 }
};

Run() がリターンに相当する. これもデフォルトで非同期なため, 遅延を通知する必要はない.

少し面白いのは, 複数の引数があっても XxxRequest という単一のオブジェクトにまとめられるところ. RPC を "引数や戻り値の直列化とメソッドディスパッチ" と割り切り, 実装に使える関数シグネチャの自由度は諦めている. 引数の個数を揃えるとランタイムとの相性が良くなり, 生成するコードサイズを小さくできるのかもしれない. (単なる手抜きかもしれないけれど.)

WCF: 属性と戻り値による遅延+リターン通知

WCF も 遅延リターンをサポートしている. ここでも IAsyncResult を使っているのは一貫性があっていい. (コピペ元.) WCF のデフォルトはブロッキングスタイルなので, 属性の AsyncPattern パラメタで非同期化を支持している.

// interface
 [OperationContractAttribute(AsyncPattern=true)]
 IAsyncResult BeginServiceAsyncMethod(string msg, AsyncCallback callback, object asyncState);

 // Note: There is no OperationContractAttribute for the end method.
 string EndServiceAsyncMethod(IAsyncResult result);

// implementation
 ...
 public IAsyncResult BeginServiceAsyncMethod(string msg, AsyncCallback callback, object asyncState)
 {
      Console.WriteLine("BeginServiceAsyncMethod called with: \"{0}\"", msg);
      return new CompletedAsyncResult<string>(msg);
 }
 ...
 public string EndServiceAsyncMethod(IAsyncResult r)
 {
      CompletedAsyncResult<string> result = r as CompletedAsyncResult<string>;
      Console.WriteLine("EndServiceAsyncMethod called with: \"{0}\"", result.Data);
      return result.Data;
 }
 ...

クライアントにはこれを ServiceAsyncMethod() として公開することもできるらしい. サービスの EndServiceAsyncMethod() はどんなタイミングで呼ばれるのだろう. スレッドモデルが想像できない.

直接の真似はできないにせよ, WCF のハイテクぶりと protobuf のローテクは遅延リターンの作りを決める論点を絞ってくれた: RPC の約束する "関数呼び出し" の幻影を, ノンブロッキングという現実の中で誰にどれだけ信じさせるか. 聴衆の種類と努力の量が支配的なパラメタになりそうだ. クライアントに幻影を見せつつサービス実装側を泥臭くするのか. サービス実装者も夢を見たいのか. そのためのランタイムはどれだけリッチにするか. コード生成だけで逃げきるか. リフレクションの魔法に頼るか. 魔法を使ったときにデバッグはできるか... バリエーションは広い.

ベイパーウェア A: 戻り値を使った遅延通知

さて, HTTP サーバと RPC はそれぞれに遅延リターンの手法があり, 共通点もあれば差異もあった. HTTP 固有の方法に着目し, それを RPC に借用して幻影度を高められないだろうか.

たとえば Twisted.Web のように戻り値を遅延の通知に使えないか考えてみる.

けれど, これは少し難しい. RPC では, サービスの結果をあらわすのに戻り値を使いたい. 戻り値のデータ型はメソッド毎に IDL 上で定義されている. その型が遅延をあらわす NOT_DONE_YET 相当をうまく表現できるとは限らない. 動的型付けの言語なら戻り値の型に制限を受けないから, シングルトンの NOT_DONE_YET を 遅延通知に使えるかもしれない. 静的型付けの言語でも型システムのトリックでうまく "特別な値" を作れるかもしれない.

もう一つ問題がある 遅延したリターンは最終的にどこから返せばいいだろう.

class AddressBookServiceImpl:
  def findByAddress(address):
    def find(self, ...):
      ....
      return xxx # この戻り値をどうやってクライアントに返すか?
    reactor.callWorker(find) # ワーカスレッドに処理を任せる空想上のメソッド
    return NOT_DONE_YET

あまり良いアイデアがない.

ベイパーウェア B: Deferred で戻り値を使ったリターン通知

そういえば Twisted はクライアント(呼び出し)側で Deferred を特別な戻り値に使っていた. WCF の IAsyncResult のようにサーバ側でも同じスタイルをとれないか.

class AddressBookServiceImpl:
  def findByAddress(address):
    d = Deferred(...)
    def find(self, ...):
      ....
      d.callback(ret) # Deferred を介しリターンを通知
    reactor.callWorker(find)
    return d # 結果の値ではなく Deferred オブジェクトを返す

これはとても自然に見えるけれど, 面倒なところもある. サービスの実装を呼びだした RPC 処理系は, Deferred に対して結果を受け取るコールバックを登録したい. ところが戻り値として Deferred を返す前に, findByAddress() 内から Deferred の callback() を呼び出されてしまう(=処理を遅延しない)こともありうる.

class AddressBookServiceImpl:
  def findByAddress(address):
    d = Deferred(...)
    d.callback(ret) # NG!: findByAddress() がリターンし
                    #       addCallback() される前に戻り値をコールバックしてしまう
    return d
...
# テストケースなど
d = service.findByAddress(...) # ← ここで d.callback() されているので
d.addCallback(...)             # ← ここで登録したコールバックは取りこぼされる

このとき素朴な RPC 処理系はサービスの戻り値を受けとりそびれるかもしれない. 処理系は取りこぼしがないよう工夫する必要がある. ちょっと面倒そう.

対する WCF はポリング派だから取りこぼしが起きないんだね. 絶妙だなあ.

ベイパーウェア C: 例外を使った遅延通知

別の候補も考えてみよう. Jetty Continuation のように例外を使うのはどうだろう. 個人的には悪くないと思っている. Jetty Continuation がまずかったのは, 休眠からの復帰先に休眠開始時と同じメソッドを使うところだった, 高階関数を使って普通に継続を表現すれば, さほど無理なく例外を "特別な戻り値" に使えるかもしれない.

仮に ruby っぽく書くならこんなかんじ:

class AddressBookService
  ...
  def find_by_address(addr)
     raise defer do |res|
       worker.enque do
         res.respond(AddressBookDb.find_by_address(...))
       end
     end
  end
  ...
end

ワーカなりへ処理をキューイングすると同時に, メソッドから遅延通知の例外を投げる. ワーカに切り離されたクロージャと例外は遅延処理を追跡する情報が共有しており, RPC 処理系は例外を通じてそれを参照し, クロージャからのリターンを待つ.

例外を遅延通知に使うのは無作法だけれど利点もある. 特に便利なのは, メソッドのシグネチャがブロッキングスタイルと同じになるところ. だから "普段はブロッキングでいいけど時々ノンブロッキングにしたい" という時に都合がいい.

勤務先の場合

仕事で使う内製の RPC も遅延リターンができる.

内製 RPC にはいくつかの言語で実装がある. C++ 版は, Ice と同じく引数通知スタイルを使う. IDL の定義にはない特別なオブジェクトがメソッド引数に追加され, その引数を通じてリターンを通知する. コードベースのポリシーで C++ 例外は使えないし, 戻り値も C++ では使いにくい. (オブジェクトの確保と解放の対応づけがわかりにくくなってしまう.) 引数を増やすのはみっともないけれど, 手堅い実装をするには悪くない.

Python 版と ActionScript 版は例外通知スタイルを使う. この二つを使う場面ではあまり遅延リターンをすることもないだろうという期待から, 遅延のないケースがクリーンにかける例外通知スタイルを使っている. (ActionScript 版 RPC にサーバ機能があるのはデバッグの小細工用; サーバといっても実際に listen() するわけではなく, なんとなくサーバっぽく動くだけ.)

実際に使ってみたところ, ActionScript なら例外通知モデルもそれほど複雑にならないことがわかった. ActionScript はクロージャのおかげで遅延後のコードがコンパクトに書けるからかもしれない. Python 版の遅延リターンは使いこんでないのでよくわからず.

余談: Python 版にはブロッキングスタイルの RPC 実装もある. 適当なマルチスレッドの HTTP サーバに載せて使う. C++ でノンブロッキングのコードを書いたあとに Python 版でブロッキングのコードを書くと, そのあと C++ に帰るのが辛くなる. まだ幻影の効き目が弱い.

ノンブロッキングスタイルで実装されたサービスの単体テスト(は面倒)

更に脇道へ.

ノンブロッキングの RPC を使うコードベースの単体テストは少し面倒くさい. 個々のオブジェクトをテストする面倒は想像がついていたけれど, 他にも困る場面があった.

たとえばインターフェイス I を定義したリモートのサービスを使うクライアント C のテストを書くとき, C のテストから呼ばれる I プロキシのモック M を適当にでっちあげたい. モック支援の弱い C++ のような言語だと, モックのかわりに I のリアルな実装 S を使って済ますことが多い. S の実装がオンメモリで扱いやすいものならそれで我慢できる. 分散透過万歳!

ところがノンブロッキングスタイルで実装されたサービスはプロキシのモック代わりに使いにくい. ノンブロッキングスタイルの場合, 同じ IDL から生成されてもプロキシとサービスのシグネチャが違う.

// proxy は戻り値通知スタイルを採用
...
abstract public Future< List<Person> > findByAddress(String address);
...
// service は引数通知スタイルを採用
...
abstract public void findByAddress(Response< Future< List<Person> > resp, String address);
...

つまりテストしたいクライアント C は, 実際にはプロキシ用インターフェイス Ip を参照しており, 一方でサービスの実装 S はサーバ用インターフェイス Is を実装している. そして Ip と Is は互換性がない.

ブロッキングスタイルだと Is と Ip は同じになることが多いけれど, ノンブロッキングスタイルでこの二つを揃えるのは案外難しい. 仕方ないので M を Ip にあわせるややこしいアダプタを作るか, インプロセスに M をホストして通信するか, 再利用を諦めて Ip にあわせたモックを実装することになる. どれもいまいち冴えない. うまい C++ のトリックでなんとか Is と Ip を揃えられないものかと考えるもいまのところ妙案なし. なんとかしてください > 同僚のひと

別の解決策も考えられる. まずプロキシとサーバの両方で同じ通知スタイルを採用し, (例: 引数通知) 通知オブジェクトの型もプロキシとサーバで揃えれば (例: IAsyncResult) インターフェイスを統一できる. 通知オブジェクトは複雑になるけれど, なんとか頑張れないもんかなあ. そのへんを頑張り切った WCF はエラい.

まとめ

サーバ側でもノンブロッキング/遅延リターンのできる RPC がほしい.