タケハタのブログ

プログラマの生き方、働き方、技術について雑多に書いていくブログです。

サーバーサイドKotlin×gRPCコトハジメ 〜④インターセプターとメタデータ(後編)〜

前回の記事でgRPCのインターセプターについて書きましたが、今回はその後編です。

blog.takehata-engineer.com

後処理とトレーラーについてです。
また、今回はその前に「クライアントサイドからのgRPCの実行」についても書きます。

これまでgrpccというクライアントツールを使ってきましたが、メタデータの扱いなどそろそろ説明がしづらくなってきたので、ここで1回書いておきます。

Stubを使ったクライアントのgRPC実行

クライアントと言ってもAndroidアプリとかを作るわけではなく、通常のREST APIを作ります。
なのでHTTP通信でREST APIアクセスし、そのREST APIをクライアントとしてgRPCのServiceを実行という形になります。
(マイクロサービスアーキテクチャでよくある形です)

grpc-spring-boot-starterを使っていれば、8080ポートをREST、6565ポートをgRPCとして起動してくれるので、同一アプリケーション内にREST APIとgRPCのServiceを両方作って試すことができます。

クライアントプログラムからgRPCのServiceを実行する

まず、クライアントとなるREST APIを実装します。
JSONでデータの送信をするため、まずbuild.gradleのdependenciesにJacksonの依存関係を追加してください。

implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

そして、下記のRestControllerとRequestクラスを実装してください。

data class SayHelloRequest(val name: String)
@RestController
class ClientController {
    @RequestMapping("/sayhello")
    fun sayHello(@RequestBody request: SayHelloRequest): String {
        val helloRequest = HelloRequest.newBuilder().setName(request.name).build() // ①GreeterServiceのリクエストの生成
        val channel = ManagedChannelBuilder.forAddress("localhost", 6565) // ②Channel(接続のための定義)の設定
                .usePlaintext().build()
        val stub = GreeterGrpc.newBlockingStub(channel) // ③GreeterServiceを実行するStubの作成
        val response = stub.sayHello(helloRequest) // ④GreeterServiceのメソッド実行

        return response.message
    }
}

これはHTTP通信で「name」の値をリクエストし、gRPCのGreeterServiceにパラメータとして渡して実行する処理になっています。 ポイントを簡単に説明します。

①GreeterServiceのリクエストの生成

GreeterService#sayHelloのリクエストパラメータを生成しています。
ここではHTTP通信のリクエストで受け取った「name」の値をそのまま入れるだけになっています。
レスポンスクラスと同じように、Builderパターンで生成します。

②Channel(接続のための定義)の設定

Channelは、gRPCで通信をするために必要な定義で、Connectionのようなものだと思ってください。
Kotlin(Java)では ManagedChannelBuilder というクラスを使い、こちらもBuilderパターンで各パラメータを設定してChannelを生成することができます。

ここでは forAddress("localhost", 6565) で接続先のIP、ポートをローカル環境に設定しています。
また、 usePlaintext() を指定していますが、こちらはSSL通信を無効にするためのメソッドになります(ローカル環境でSSLに対応していないため)。 実際のプロダクト開発でSSLが有効な環境に接続する時は、このオプションを消さないとアクセスできないので、注意してください。

③GreeterServiceを実行するStubの作成

初回の記事でProtocol Buffersで色々なファイルが生成されることを説明しましたが、その中に「Stub」が作られます。
このStubを使うことで、生成したServiceのメソッドにアクセスすることができるようになっています。

今回は GreeterGrpc.newBlockingStub(channel) で、GreeterServiceのStubを作っています。
このように、生成したChannelを引数に渡し、各xxxGrpcクラスの newBlockingStub メソッドを実行することで、Stubを生成できます。

④GreeterServiceのメソッド実行

あとはStubから対象のメソッドに最初に生成したリクエストクラスを渡して実行することで、gRPCのServiceに通信して実行します。

実行

起動して、下記のcurlコマンドでアクセスしてください。

$curl -H 'Content-Type:application/json' -d '{"name":"souda"}' http://localhost:8080/sayhello
Hello souda

「GreeterService」で生成されたメッセージが返却されたと思います。
HTTPでの通信を中継しているだけになりますが、これで「REST APIへアクセス→gRPCのServiceを実行」という処理が実現できます。

クライアントからヘッダを送信する

前回の記事で紹介したヘッダの送信を、クライアントプログラムで実行します。

クライアントでメタデータを扱うには、 ClientInterceptor というインターフェースを実装します。

class MetadataClientInterceptor : ClientInterceptor {
    override fun <ReqT, RespT> interceptCall(method: MethodDescriptor<ReqT, RespT>?,
                                             callOptions: CallOptions?, next: Channel): ClientCall<ReqT, RespT>? {
        return object : SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            override fun start(responseListener: Listener<RespT>?, headers: Metadata) {
                headers.put(Metadata.Key.of("deviceName", Metadata.ASCII_STRING_MARSHALLER), "Galaxy S9+")
                super.start(responseListener, headers)
            }
        }
    }
}

interceptCall で返却する値で SimpleForwardingClientCall というクラスを使います。 その中でオーバーライドしている start という関数で、ヘッダを設定できます。

headers.put のキーには、サーバー側での取得時と同じように Metadata.Key クラスで設定します。
ここでは「deviceName」というキーで、文字列を設定しています。
クライアントアプリとの通信では、ここで端末名を取得してヘッダで送信する、というような使い方も想定されます。

そして、実装した「MetadataClientInterceptor」をクライアント側で有効にするため、「ClientController」を下記のように修正します。

@RestController
class ClientController {
    @RequestMapping("/sayhello")
    fun hello(@RequestBody request: SayHelloRequest): String {
        val helloRequest = HelloRequest.newBuilder().setName(request.name).build()
        val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .intercept(MetadataClientInterceptor())
                .usePlaintext().build()
        val stub = GreeterGrpc.newBlockingStub(channel)
        val response = stub.sayHello(helloRequest)

        return response.message
    }
}

channelの作成時に intercept(MetadataClientInterceptor()) を加えることで、このchannelを使ってgRPCのServiceを実行する時にインターセプターを通るようになります。

インターセプター(後処理)

やっと今回の本題、インターセプターでの後処理です。

メイン処理でもログ出力するように変更

インターセプター実装の前に、実行時の処理順をログで終えるようにするため、GreeterServiceも下記に変更します。

@GRpcService
class GreeterService: GreeterGrpc.GreeterImplBase() {
    companion object{
        private val log = LoggerFactory.getLogger(GreeterService::class.java)
    }

    override fun sayHello(request: HelloRequest, responseObserver: StreamObserver<HelloReply>) {
        val replyBuilder = HelloReply.newBuilder()
                .setMessage("Hello " + request.name)
                .setType(MessageType.NORMAL)

        log.info("execute sayHello")

        responseObserver.onNext(replyBuilder.build())
        responseObserver.onCompleted()
    }
}

これでsayHello実行時に「execute sayHello」のログが出力されるようになります。

インターセプターで後処理の実装

前回の記事で作った「MetadataInterceptor」を、下記のように変更します。

@GRpcGlobalInterceptor
@Order(30)
class MetadataInterceptor : ServerInterceptor {
    companion object {
        private val log = LoggerFactory.getLogger(MetadataInterceptor::class.java)
    }

    override fun <ReqT : Any, RespT : Any> interceptCall(call: ServerCall<ReqT, RespT>, headers: Metadata, next: ServerCallHandler<ReqT, RespT>): ServerCall.Listener<ReqT> {
        val deviceName = headers.get(Metadata.Key.of("deviceName", Metadata.ASCII_STRING_MARSHALLER))

        log.info("deviceName=$deviceName")

        return next.startCall(object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
            override fun close(status: Status?, trailers: Metadata) {
                log.info("execute close")
                super.close(status, trailers)
            }
        }, headers)
    }
}

next.startCall の引数で、 ForwardingServerCall.SimpleForwardingServerCallclose メソッドをオーバーライドすることで、メイン処理(ここではGreeterService#sayhello)の後に実行する処理を定義できます。
今回は「execute close」というメッセージをログに出力するだけの処理です。

そして再度実行すると

$ curl -H 'Content-Type:application/json' -d '{"name":"piyosu"}' http://localhost:8080/sayhello
Hello piyosu

アプリケーションのログに下記が出力されます。

INFO 68641 --- [ault-executor-0] c.e.g.k.k.i.LoggingInterceptor           : method name:Greeter/SayHello
INFO 68641 --- [ault-executor-0] c.e.g.k.k.i.MetadataInterceptor          : deviceName=Galaxy S9+
INFO 68641 --- [ault-executor-0] c.e.g.k.k.grpcservice.GreeterService     : execute sayHello
INFO 68641 --- [ault-executor-0] c.e.g.k.k.i.MetadataInterceptor          : execute close

「LoggingInterceptor」「MetadataInterceptor」の処理が設定した順番通り実行され、メイン処理の「GreeterService」が実行され、最後に「MetadataInterceptor」で close に定義した処理が実行されています。
ちなみに下記の close の処理は他のインターセプターにも定義することができます。

return next.startCall(object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
    override fun close(status: Status?, trailers: Metadata) {
        log.info("execute close")
        super.close(status, trailers)
    }
}, headers)

もし複数のインターセプターに定義した場合は、順番が後ろのインターセプターのcloseの処理から実行されます。
例えば、2つのインターセプターを定義して、その両方にcloseの処理を実装している場合、

①インターセプター1の処理
②インターセプター2の処理
③メイン処理
④インターセプター2のclose処理
⑤インターセプター1のclose処理

という順番で実行されます。

トレーラー

最後にトレーラーです。
サーバーからのレスポンスには、メタデータとしてトレーラーも設定できます。

サーバーからのレスポンスにトレーラーを設定

「MetadataInterceptor」の処理を下記のように書き換えます。

@GRpcGlobalInterceptor
@Order(30)
class MetadataInterceptor: ServerInterceptor {
    companion object{
        private val log = LoggerFactory.getLogger(MetadataInterceptor::class.java)
    }

    override fun <ReqT : Any, RespT : Any> interceptCall(call: ServerCall<ReqT, RespT>, headers: Metadata, next: ServerCallHandler<ReqT, RespT>): ServerCall.Listener<ReqT> {
        val deviceName = headers.get(Metadata.Key.of("deviceName", Metadata.ASCII_STRING_MARSHALLER))

        log.info("deviceName=$deviceName")

        return next.startCall(object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
            override fun close(status: Status?, trailers: Metadata) {
                log.info("execute close")
                trailers.put(Metadata.Key.of("responseDataTime", Metadata.ASCII_STRING_MARSHALLER), Date().toString())
                super.close(status, trailers)
            }
        }, headers)
    }
}

close メソッドの引数 trailers で、トレーラーを扱うことができます。
値の設定の仕方はヘッダと同様で、 Metadata.Key をキーにします。
ここでは「responseDataTime」という名前のキーで、現在日時を返すようにしています。

クライアント側でトレーラーを受信

「MetadataClientInterceptor」を下記のように変更します。

class MetadataClientInterceptor : ClientInterceptor {
    companion object{
        private val log = LoggerFactory.getLogger(MetadataClientInterceptor::class.java)
    }

    override fun <ReqT, RespT> interceptCall(method: MethodDescriptor<ReqT, RespT>?,
                                             callOptions: CallOptions?, next: Channel): ClientCall<ReqT, RespT>? {
        return object : SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            override fun start(responseListener: Listener<RespT>?, headers: Metadata) {
                headers.put(Metadata.Key.of("deviceName", Metadata.ASCII_STRING_MARSHALLER), "Galaxy S9+")
                super.start(MetadataClientCallListener(responseListener), headers)
            }
        }
    }

    internal class MetadataClientCallListener<RespT>(responseListener: ClientCall.Listener<RespT>?) : SimpleForwardingClientCallListener<RespT>(responseListener) {
        override fun onClose(status: Status?, trailers: Metadata?) {
            try {
                val responseDateTime = trailers?.get(Metadata.Key.of("responseDataTime", Metadata.ASCII_STRING_MARSHALLER))
                log.info("responseDateTime=${responseDateTime}")
            } finally {
                super.onClose(status, trailers)
            }
        }
    }
}

interceptCall の戻り値の中でオーバーライドしている start メソッドで、最後に実行している super.start に引数の responseListener をそのまま渡していたところを、自前で作った responseListener に変更します。

下にinternal classで定義していますが、 SimpleForwardingClientCallListener を継承して onClose をオーバーライドすることで、クライアント側のインターセプターでの後処理を定義できます。
サーバー側と同じようにここで引数に trailers としてトレーラーを受け取っているので、キーを指定してgetすることで値を取得できます。

もう一度実行すると

$ curl -H 'Content-Type:application/json' -d '{"name":"sarina"}' http://localhost:8080/sayhello
Hello sarina

アプリケーションのログに下記が出力されます。

INFO 51921 --- [ault-executor-0] c.e.g.k.k.i.LoggingInterceptor           : method name:Greeter/SayHello
INFO 51921 --- [ault-executor-0] c.e.g.k.k.i.MetadataInterceptor          : deviceName=Galaxy S9+
INFO 51921 --- [ault-executor-0] c.e.g.k.k.grpcservice.GreeterService     : execute sayHello
INFO 51921 --- [ault-executor-0] c.e.g.k.k.i.MetadataInterceptor          : execute close
INFO 51921 --- [nio-8080-exec-1] c.e.g.k.k.c.i.MetadataClientInterceptor  : responseDateTime=Wed Nov 27 23:58:49 JST 2019

一番最後にクライアントがトレーラーで受け取った値を出力した行が増えていますね。

次回はここまでの内容を通しでまとめて公開の予定

ここまで4回の記事で、ある程度一連の流れが作れるようになりました。
間が空いてしまったのもあり、書いている間にバラバラしてきた部分もあるので、次回は振り返りつつ通しで説明します。 併せてサンプルも整備してすぐ試せるような形で作って、Githubに公開する予定です。

Kotlin Advent Calendar 2019の2日目の記事として書く予定なので、ぜひ読んでいただければと思います。