gRPC には Interceptor という概念があるというのを先日のエントリで書きました。 ただ、じゃぁ Interceptor ってどういう風に作るんやっていうところまでは書けなかったので、今日はそのあたりです。
簡単に試せる話題として、送受信ログを出力する仕組みを作ってみましょう。 UnaryServerInterceptor の情報はネット上にそこそこあるのですが StreamServerInterceptor の情報はあまりないので、参考になれば良いなと思います。
UnaryServerInterceptor
UnaryServerInterceptor の型は以下のように定義されています。
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
ctx
はもちろん Context で、req
が Unary な gRPC においてクライアントが送信してきたリクエストそのものです。
info
は gRPC のサービスそのもの、および呼び出されたメソッドを保持している struct です。handler
は呼び出されたメソッドに対応するメソッドハンドラの実体ですね。
そして、interceptor の返却値はレスポンスとエラーになります。
送受信ログを出力するハンドラを、こののシグネチャに合わせて書くと、以下のようになるでしょう。 これまで API サーバを書いてきた人から見ると、非常に直感的な形になっていると思います。
func loggingUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { stdlog.Printf("before [%s] is called with req [%+v]", info.FullMethod, req) res, err := handler(ctx, req) if err != nil { stdlog.Printf("error occured in method [%s]: [%+v]", info.FullMethod, err) return nil, err } stdlog.Printf("after [%s] is called: response=[%+v]", info.FullMethod, res) return res, nil }
これをサーバに組込むには、grpc.NewServer
への ServerOption
として渡すことになります。
s := grpc.NewServer( grpc.UnaryInterceptor(loggingUnaryInterceptor), )
そうやって起動した gRPC サーバに対してリクエストを送信すると、きちんと以下のようにリクエストとレスポンスが出力されていることが分かります。簡単ですね。
2018/12/01 18:52:11 before [/helloworld.Greeter/SayHello] is called with req [name:"kiririmode" ] 2018/12/01 18:52:11 after [/helloworld.Greeter/SayHello] is called: response=[message:"Hello kiririmode" ]
StreamServerInterceptor
StreamServerInterceptor のシグニチャは以下のようになります。
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
性質が悪いのは、(Stream だから当然なのですが) リクエストではなく ss
という ServerStream
しか渡ってこない点です。
送受信ログ出力を行うために Interceptor で stream からリクエストを読み込んでしまうと、メソッドハンドラでそのリクエストを読み込めなくなるというジレンマがあります。
こういう場合は、wikipedia:Decorator パターンを使うのが定石かなと思います
まず、grpc.ServerStream
を wrap する wrappedServerStream
を定義します。ここでは、Stream から読み込む RecvMsg
と、Stream に書き込む SendMsg
に対してログ出力コードを追加しています。
type wrappedServerStream struct { grpc.ServerStream } func (ss *wrappedServerStream) RecvMsg(m interface{}) error { if err := ss.ServerStream.RecvMsg(m); err != nil { return err } stdlog.Printf("received: [%+v]", m) return nil } func (ss *wrappedServerStream) SendMsg(m interface{}) error { if err := ss.ServerStream.SendMsg(m); err != nil { return err } stdlog.Printf("sent: [%+v]", m) return nil }
あとは Interceptor で、ServerStream
をこの wrappedServerStream
で wrap してやれば良い。
それを行う StreamServerInterceptor は以下のようになります。
func loggingStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { wrapped := &wrappedServerStream{ss} return handler(srv, wrapped) }
あとは、先程と同様に gRPC サーバに差し込んでしまえば良い。
s := grpc.NewServer( grpc.UnaryInterceptor(loggingUnaryInterceptor), grpc.StreamInterceptor(loggingStreamInterceptor), )
これで、stream の送受信ログも出力可能になります。
2018/12/01 18:51:54 received: [name:"kiririmode" ] 2018/12/01 18:51:54 sent: [message:"Hello kiririmode" ] 2018/12/01 18:51:54 received: [name:"hyuichi" ] 2018/12/01 18:51:54 sent: [message:"Hello hyuichi" ] 2018/12/01 18:51:54 received: [name:"kiri" ] 2018/12/01 18:51:54 sent: [message:"Hello kiri" ]
まとめ
というわけで、gRPC の ServerInterceptor でした。 ある程度パターンがあるので、あとはどうやって自分の欲しい機能を書くかってところだけだと思います。