理系学生日記

おまえはいつまで学生気分なのか

忍者TOOLS

gRPC の Server Streaming RPC で HTTP/2 の挙動を確認してみる

先日は gRPC の Unary RPC で HTTP/2 の挙動を確認してみました。

今日は、gRPC の Server Streaming RPC を用いたときの HTTP/2 の流れを確認してみます。

Server Streaming RPC

Server Streaming RPC というのは、Client が Server に対してリクエストを送信すると、そのレスポンスとして stream を取得できるというタイプの RPC です。 これにより、Server は Client に対して、間欠泉のようにデータを送信し続けることができます。

今回は、ここでは、1 秒に 1 回、挨拶とタイムスタンプを返却するという RPC を SayHellos という名前で作成してみます。 protocol buffer の定義は以下のようにしました。

syntax = 'proto3';
import "google/protobuf/timestamp.proto";

service Greeter {
  // Unary RPC: Sends a greeting
  rpc SayHello(HelloRequest) returns (HelloReply) {}
  // Server Streaming RPC: Sends a greeing with timestamp
  rpc SayHellos(HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest { string name = 1; }

// The response message containing the greetings
message HelloReply {
  google.protobuf.Timestamp timestamp = 1;
  string message = 2;
}

サーバ実装

サーバ実装はシンプルで、以下のようになります。

1 秒ごとに値を受け取れる channel を time.NewTicker で作成し、それを select で待ち受けて処理をしているだけですね。

func (s *server) SayHellos(req *greeter.HelloRequest, stream greeter.Greeter_SayHellosServer) error {
    done := make(chan interface{})
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    time.AfterFunc(10*time.Second, func() { close(done) })

    for {
        select {
        case <-done:
            return nil
        case <-ticker.C:
            if err := stream.Send(&greeter.HelloReply{
                Timestamp: ptypes.TimestampNow(),
                Message:   "Hello " + req.Name,
            }); err != nil {
                log.Fatalf("could not greet: %v", err)
                return err
            }
        }
    }
}

クライアント実装

クライアントからの Server Streaming RPC 呼び出し部分は以下の形。 SayHellos RPC の呼び出しで stream が取得できるので、その stream が io.EOF を返却するまで無限ループという構成です。

   ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    stream, err := c.SayHellos(ctx, &greeter.HelloRequest{Name: name})
    if err != nil {
        log.Fatalf("could not greet: %v", err)
    }

    for {
        r, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("SayHellos: %v", err)
        }
        log.Printf("Greeting: %s at %s", r.Message, ptypes.TimestampString(r.Timestamp))
    }

HTTP/2 の流れ

上記のサーバとクライアントを動かし、昨日と同様に、Wireshark でパケットキャプチャをしてみました。 結果として、大まかな流れは以下のようになりました。

  1. Client -> Server へ Connection Preface を送信し、TCP コネクションを確立
  2. Client <-> Server で SETTINGS フレームを交換
  3. Client -> Server へ RPC リクエストを stream ID 1 で送信。
    • HEADERS フレーム (End Headers フラグ有)
    • DATA フレーム (End Stream フラグ有)
  4. 1 秒待ち
  5. Server -> Client へ stream ID 1 で RPC レスポンスを送信。
    • HEADERS フレーム (End Headers フラグ有)
    • DATA フレーム (ここでは、End Stream フラグは立ちません)
  6. 4, 5 を 9 回繰り返す
  7. Server -> Client へ stream ID 1 で最後の RPC レスポンスを送信
    • DATA フレーム (ここでは、End Stream フラグは立ちません)
    • HEADERS フレーム (End StreamEnd Headers 両フラグ有)
  8. TCP コネクション切断

Unary RPC のときとほとんど違いはありません。 End Stream フラグが立つ、つまり Stream の切断を要求するのは以下のタイミングになります。

  • Client は、最初の RPC リクエストを送信したとき。これはリクエスト送信後、Client は Stream に何も送る必要がないため。
  • Server は、最後の RPC レスポンスを送信したとき。

まとめ

Unary とあまり違いがないですね。

なお、実験に使ったソースはこちらです。