タケハタのブログ

プログラマの生き方、働き方、技術について雑多に書いていくブログです。

grpc-kotlinはgRPCの実装をどこまでKotlin化できるのか?

先日、GoogleからKotlinのgRPCライブラリであるgrpc-kotlinのバージョン0.1.1がリリースされました。

cloud.google.com

これまでもKotlinでgRPCは使ってきましたが、ずっとJavaを使わざるを得なかったので嬉しい限りです。
そこで、このgrpc-kotlinを使うことで、gRPCの実装をどこまでKotlin化できるのかをまとめてみます。

まだ0.1.1という開発版の最初のリリースのため完成は恐らくまだまだ先ですし、こらからさらに良くなっていくと思いますが、一旦現時点での情報ということで。

gRPC自体の説明などは今回は省くので、分からない方は下記の過去記事を参考にご覧ください。

サーバーサイドKotlin×gRPCコトハジメ 〜①プロジェクト作成から起動確認まで〜
サーバーサイドKotlin×gRPCコトハジメ 〜②Protocol Buffersの色々なオプション〜
サーバーサイドKotlin×gRPCコトハジメ 〜③インターセプターとメタデータ(前編)〜
サーバーサイドKotlin×gRPCコトハジメ 〜④インターセプターとメタデータ(後編)〜
サーバーサイドKotlin×gRPCコトハジメ 〜⑤プロジェクト作成から基本的なAPI実装までまとめ〜

目次

  1. grpc-kotlinとgrpc-javaの実装比較
  2. Kotlin化できた箇所とJavaのままの箇所
  3. まとめ

grpc-kotlinとgrpc-javaの実装比較

grpc-kotlin、grpc-javaはそれぞれのリポジトリにexampleが用意されているので、まずはそのコードを使って比較したいと思います。

github.com github.com

ちなみにgrpc-javaにはexample-kotlinというKotlinで実装しているサンプルがあったのですが、grpc-kotlinリリースと同時に削除されてしまったようなので、現在のmasterブランチでは見ることができません。

https://github.com/grpc/grpc-java/commit/da855f4164e151bc0c679a856546e95f70092548

なのでたまたまローカルに残っていた古いコードを使って比較しました。

(一応コミット履歴から追えば見れます)
https://github.com/grpc/grpc-java/blob/3bd141bf184a82a2cc92c98c906141d5b32ee88e/examples/example-kotlin/src/main/kotlin/io/grpc/examples/helloworld/HelloWorldServer.kt
https://github.com/grpc/grpc-java/blob/3bd141bf184a82a2cc92c98c906141d5b32ee88e/examples/example-kotlin/src/main/kotlin/io/grpc/examples/helloworld/HelloWorldClient.kt

あとは、grpc-kotlinのExampleを使ったQuick Startのページも用意されているので、まず一旦動かしてみたい方は参考にしてください。

grpc.io

とても分かりやすかったので、手順通りやればすぐに動かせると思います。

build.gradle

比較の前にGradleの依存関係の内容などを説明します。 まずライブラリとして追加されているのがgrpc-kotlin-stub

implementation "io.grpc:grpc-kotlin-stub:$grpc_kotlin_version"

クライアントからgRPCサーバーを叩くStubのKotlinライブラリです。
ただ、このgrpc-kotlin-stubはもともとあるJavaのgrpc-stubのクラスを継承して実装されていたりする部分もあり、依存しているのでgrpc-stub(Javaのライブラリ)の依存関係も必要です。

implementation "io.grpc:grpc-stub:$grpc_version"

そしてProtocol Buffersのコード生成タスクに関する部分が下記です。

protobuf {
    protoc { artifact = "com.google.protobuf:protoc:$protobuf_version" }
    plugins {
        // Specify protoc to generate using kotlin protobuf plugin
        grpc {
            artifact = "io.grpc:protoc-gen-grpc-java:$grpc_version"
        }
        // Specify protoc to generate using our grpc kotlin plugin
        grpckt {
            artifact = "io.grpc:protoc-gen-grpc-kotlin:$grpc_kotlin_version"
        }
    }
    generateProtoTasks {
        all().each { task ->
            task.plugins {
                // Generate Java gRPC classes
                grpc { }
                // Generate Kotlin gRPC using the custom plugin from library
                grpckt { }
            }
        }
    }
}

ポイントとしてはまず下記のprotoc-gen-grpc-kotlinというプラグインを指定しているところ。

grpckt {
    artifact = "io.grpc:protoc-gen-grpc-kotlin:$grpc_kotlin_version"
}

以前の記事でも紹介しているのですが、Protocol BuffersからgRPC関連のコードを生成するためのプラグインの指定で、Kotlin用のものを記述しています。
こちらもJavaの生成コードも必要になるので、grpc grpcktと両方のブロックが書かれています。

そして下のgenerateProtoTasks内のgrpckt { }で、タスクでKotlinコードも生成されるように指定しています。

task.plugins {
    // Generate Java gRPC classes
    grpc { }
    // Generate Kotlin gRPC using the custom plugin from library
    grpckt { }
}

サーバー

次に、サーバーのコードを比較します。
まず、grpc-javaとgrpc-kotlinそれぞれのコードは下記になります。

grpc-javaのサンプル

/**
 * Server that manages startup/shutdown of a `Greeter` server.
 *
 * Note: this file was automatically converted from Java
 */
class HelloWorldServer {

    private var server: Server? = null

    @Throws(IOException::class)
    private fun start() {
        /* The port on which the server should run */
        val port = 50051
        server = ServerBuilder.forPort(port)
                .addService(GreeterImpl())
                .build()
                .start()
        logger.log(Level.INFO, "Server started, listening on {0}", port)
        Runtime.getRuntime().addShutdownHook(object : Thread() {
            override fun run() {
                // Use stderr here since the logger may have been reset by its JVM shutdown hook.
                System.err.println("*** shutting down gRPC server since JVM is shutting down")
                this@HelloWorldServer.stop()
                System.err.println("*** server shut down")
            }
        })
    }

    private fun stop() {
        server?.shutdown()
    }

    /**
     * Await termination on the main thread since the grpc library uses daemon threads.
     */
    @Throws(InterruptedException::class)
    private fun blockUntilShutdown() {
        server?.awaitTermination()
    }

    internal class GreeterImpl : GreeterGrpc.GreeterImplBase() {

        override fun sayHello(req: HelloRequest, responseObserver: StreamObserver<HelloReply>) {
            val reply = HelloReply.newBuilder().setMessage("Hello ${req.name}").build()
            responseObserver.onNext(reply)
            responseObserver.onCompleted()
        }
    }

    companion object {
        private val logger = Logger.getLogger(HelloWorldServer::class.java.name)

        /**
         * Main launches the server from the command line.
         */
        @Throws(IOException::class, InterruptedException::class)
        @JvmStatic
        fun main(args: Array<String>) {
            val server = HelloWorldServer()
            server.start()
            server.blockUntilShutdown()
        }
    }
}

grpc-kotlinのサンプル

class HelloWorldServer constructor(
    private val port: Int
) {
    val server: Server = ServerBuilder
        .forPort(port)
        .addService(HelloWorldService())
        .build()

    fun start() {
        server.start()
        println("Server started, listening on $port")
        Runtime.getRuntime().addShutdownHook(
            Thread {
                println("*** shutting down gRPC server since JVM is shutting down")
                this@HelloWorldServer.stop()
                println("*** server shut down")
            }
        )
    }

    private fun stop() {
        server.shutdown()
    }

    fun blockUntilShutdown() {
        server.awaitTermination()
    }

    private class HelloWorldService : GreeterGrpcKt.GreeterCoroutineImplBase() {
        override suspend fun sayHello(request: HelloRequest) = HelloReply
            .newBuilder()
            .setMessage("Hello ${request.name}")
            .build()
    }
}

fun main() {
    val port = 50051
    val server = HelloWorldServer(port)
    server.start()
    server.blockUntilShutdown()
}

一部変数がコンストラクタで定義されていたり、クラス内で定義されていたり書き方に差分はありますが、基本的には同じ内容を実装しています。
いずれも50051ポートでServiceを起動し、sayHelloという関数でリクエストを受け付けるコードになります。

実装の違い

主な実装の違いとしては、Serviceの関数内の処理の実装です。
まず、grpc-javaは次のように実装しています。

internal class GreeterImpl : GreeterGrpc.GreeterImplBase() {
    override fun sayHello(req: HelloRequest, responseObserver: StreamObserver<HelloReply>) {
        val reply = HelloReply.newBuilder().setMessage("Hello ${req.name}").build()
        responseObserver.onNext(reply)
        responseObserver.onCompleted()
    }
}

生成したGreeterImplBaseを継承し、Observerパターンで実装する設計になっています。
そのため関数の戻り値としてレスポンスを返すのではなく、引数で渡されているStreamObserverを使ってレスポンスの値の設定、処理の終了を制御しています。
gRPCがStream(双方向通信)の実装を可能にするため、この形で定義されています。

これに対しgrpc-kotlinでは次のように実装されます。

private class HelloWorldService : GreeterGrpcKt.GreeterCoroutineImplBase() {
    override suspend fun sayHello(request: HelloRequest) = HelloReply
        .newBuilder()
        .setMessage("Hello ${request.name}")
        .build()
}

違いとしては下記になります。

  • 継承元がKotlinのクラスに変わっている
  • responseObserverがなくなりレスポンスのオブジェクトを戻り値として返す形になっている
  • サスペンド関数になっている

継承するクラスがGreeterCoroutineImplBaseというKotlinで実装されたクラスになっています。
これは前述のGradleのところで説明したprotoc-gen-grpc-kotlin で生成されたコードです。

そしてここでオーバーライドしている関数では、引数からStreamObserverがなくなり、レスポンスのオブジェクトを戻り値として返すだけになっています。
中身を全部説明すると長くなるので省きますが、GreeterImplBase、GreeterCoroutineImplBaseの実装を追ってみると終了処理はgrpc-kotlinのライブラリでラッピングされていて、実装側でObserverでの終了通知などを意識しなくて良くなっています。

また、サスペンド関数で定義されるようになっています。
継承している GreeterCoroutineImplBase というクラス名からも分かるように、ライブラリの内部でもコルーチンが多く使われています。
後述するクライアントの処理も含め、最近のKotlinのトレンドと同じくやはりコルーチンを使った非同期処理が前提として作られていますね。

Streamを使ってみると

ちなみにObserverパターン前提の設計がStreamを想定してと書きましたが、ServiceにStreamのメソッドを定義するとどう生成されるのか見てみました。
hello_world.protoで次のようにリクエストとレスポンスの前にstreamを付けて定義します。

rpc SayHelloStream (stream HelloRequest) returns (stream HelloReply) {}

そしてこのprotoファイルからコードを生成すると、sayHelloStreamは次のような関数になります。

override fun sayHelloStream(requests: Flow<HelloRequest>): Flow<HelloReply> {
    return super.sayHelloStream(requests)
}

リクエスト、レスポンスがそれぞれFlowで定義されています。
ここはサンプルがまだないのと、そもそも使えるのかも分からないので一旦確認だけ。

クライアント

次はクライアントのコードの比較です。
grpc-javaとgrpc-kotlinそれぞれのコードは下記になります。

grpc-javaのサンプル

/**
 * A simple client that requests a greeting from the [HelloWorldServer].
 */
class HelloWorldClient
/** Construct client for accessing RouteGuide server using the existing channel.  */
internal constructor(private val channel: ManagedChannel) {
    private val blockingStub: GreeterGrpc.GreeterBlockingStub
            = GreeterGrpc.newBlockingStub(channel)

    /** Construct client connecting to HelloWorld server at `host:port`.  */
    constructor(host: String, port: Int) : this(ManagedChannelBuilder.forAddress(host, port)
            // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
            // needing certificates.
            .usePlaintext()
            .build()) {
    }


    @Throws(InterruptedException::class)
    fun shutdown() {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS)
    }

    /** Say hello to server.  */
    fun greet(name: String) {
        logger.log(Level.INFO, "Will try to greet {0}...", name)
        val request = HelloRequest.newBuilder().setName(name).build()
        val response: HelloReply =  try {
            blockingStub.sayHello(request)
        } catch (e: StatusRuntimeException) {
            logger.log(Level.WARNING, "RPC failed: {0}", e.status)
            return
        }

        logger.info("Greeting: ${response.message}")
    }

    companion object {
        private val logger = Logger.getLogger(HelloWorldClient::class.java.name)

        /**
         * Greet server. If provided, the first element of `args` is the name to use in the
         * greeting.
         */
        @Throws(Exception::class)
        @JvmStatic
        fun main(args: Array<String>) {
            val client = HelloWorldClient("localhost", 50051)
            try {
                /* Access a service running on the local machine on port 50051 */
                val user = if (args.size > 0) "${args[0]}" else "world"
                client.greet(user)
            } finally {
                client.shutdown()
            }
        }
    }
}

grpc-kotlinのサンプル

class HelloWorldClient constructor(
) : Closeable {
    private val stub: GreeterCoroutineStub = GreeterCoroutineStub(channel)

    suspend fun greet(name: String) = coroutineScope {
        val request = HelloRequest.newBuilder().setName(name).build()
        val response = async { stub.sayHello(request) }
        println("Received: ${response.await().message}")
    }

    override fun close() {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS)
    }
}


/**
 * Greeter, uses first argument as name to greet if present;
 * greets "world" otherwise.
 */
fun main(args: Array<String>) = runBlocking {
    val port = 50051

    val client = HelloWorldClient(
        ManagedChannelBuilder.forAddress("localhost", port)
            .usePlaintext()
            .executor(Dispatchers.Default.asExecutor())
            .build()
    )

    val user = args.singleOrNull() ?: "world"
    client.greet(user)
}

こちらも書き方に差分はありますが、基本的には同じ内容を実装しています。
前述のGreeterのsayHelloを叩くクライアントプログラムです。

実装の違い

主な差分は下記のStub生成を生成してsayHelloを実行する部分です。

/** Construct client for accessing HelloWorld server using the existing channel. */
public HelloWorldClient(Channel channel) {
  // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
  // shut it down.

  // Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
  blockingStub = GreeterGrpc.newBlockingStub(channel);
}

/** Say hello to server. */
public void greet(String name) {
  logger.info("Will try to greet " + name + " ...");
  HelloRequest request = HelloRequest.newBuilder().setName(name).build();
  HelloReply response;
  try {
    response = blockingStub.sayHello(request);
  } catch (StatusRuntimeException e) {
    logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
    return;
  }
  logger.info("Greeting: " + response.getMessage());
}
class HelloWorldClient constructor(
    private val channel: ManagedChannel
) : Closeable {
  private val stub: GreeterCoroutineStub = GreeterCoroutineStub(channel)

  suspend fun greet(name: String) = coroutineScope {
    val request = HelloRequest.newBuilder().setName(name).build()
    val response = async { stub.sayHello(request) }
    println("Received: ${response.await().message}")
  }

  override fun close() {
    channel.shutdown().awaitTermination(5, TimeUnit.SECONDS)
  }
}

差分は下記になります。

  • StubがBlockingStubからCoroutineStubに変わっている
  • Stub内の関数もサスペンド関数になっている

まず、Stubがgrpc-javaでは次のようにnewBlockingStubを呼び出してBlockingStubを生成していました。

blockingStub = GreeterGrpc.newBlockingStub(channel);

それがgrpc-kotlinでは、GreeterCoroutineStubというKotlinで実装されたStubを使用しています。

private val stub: GreeterCoroutineStub = GreeterCoroutineStub(channel)

これはサーバーのところで紹介したGreeterCoroutineImplBaseと同じく、GreeterGrpcKtに含まれています。
こちらも名前の通り、内部ではCoroutineが多く使われています。

そして呼び出しているsayHelloもサスペンド関数となっており、grpc-javaでは次のように呼び出していたgreet関数が、

fun greet(name: String) {
    logger.log(Level.INFO, "Will try to greet {0}...", name)
    val request = HelloRequest.newBuilder().setName(name).build()
    val response: HelloReply =  try {
        blockingStub.sayHello(request)
    } catch (e: StatusRuntimeException) {
        logger.log(Level.WARNING, "RPC failed: {0}", e.status)
        return
    }

    logger.info("Greeting: ${response.message}")
}

サスペンド関数として定義されています。

suspend fun greet(name: String) = coroutineScope {
    val request = HelloRequest.newBuilder().setName(name).build()
    val response = async { stub.sayHello(request) }
    println("Received: ${response.await().message}")
}

ここでは非同期通信にするためasyncを使用しています。
(asyncでの非同期通信自体はgrpc-javaでもできます)

Kotlin化できた箇所とJavaのままの箇所

Kotlin化できた箇所とできなかった箇所をまとめます。
もともとgrpc-javaを使ってKotlinで実装することはできたため、呼び出しているクラスや継承しているクラスがどこまでKotlin化できたかということになります。

Kotlin化できた箇所

Kotlin化できた箇所は以下の2つになります。

  • Stub
  • Service

Stub

xxxxGrpcKtに含まれているxxxxCoroutineStubを使うことで、呼び出すStubをKotlin化できます。
ただし、このxxxxCoroutineStubはAbstractCoroutineStubクラスを継承しているのですが、これはさらにgrpc-stub(Javaのライブラリ)のAbstractStubを継承しています。

abstract class AbstractCoroutineStub<S: AbstractCoroutineStub<S>>(
  channel: Channel,
  callOptions: CallOptions = CallOptions.DEFAULT
): AbstractStub<S>(channel, callOptions)

そのため、Kotlin化しているもののJavaには依存している形になります。

Servie

サーバー側のServiceも、xxxxGrpcKtに含まれるxxxxCoroutineImplBaseを使うことで、継承元のクラスをKotlin化できます。
ただし、xxxxCoroutineImplBaseもAbstractCoroutineServerImplというKotlinのクラスを継承しているのですが、こちらがさらにBindableServiceというJava側のライブラリに含まれるクラスを継承しています。

abstract class AbstractCoroutineServerImpl(
  /** The context in which to run server coroutines. */
  val context: CoroutineContext = EmptyCoroutineContext
) : BindableService {
  /*
   * Each RPC is executed in its own coroutine scope built from [context].  We could have a parent
   * scope, but it doesn't really add anything: we don't want users to be able to launch tasks
   * in that scope easily, since almost all coroutines should be scoped to the RPC and cancelled
   * if the RPC is cancelled.  Users who don't want that behavior should manage their own scope for
   * it.  Additionally, gRPC server objects don't have their own notion of shutdown: shutting down
   * a server means cancelling the RPCs, not calling a teardown on the server object.
   */
}

また、xxxxCoroutineImplBaseの中で他にもgrpc-javaのクラスやメソッドを呼び出している箇所もあり、こちらもやはりgrpc-javaには依存している形になります。

Javaのままの箇所

逆にJavaのままだった箇所は、以下になります。

  • message
  • Channel
  • Interceptor

message

リクエスト、レスポンスのオブジェクトなどを定義しているmessageのクラスはJavaのままです。
なのでこれまでと同様、Builderを使ってインスタンスを生成しています。

HelloReply
    .newBuilder()
    .setMessage("Hello ${request.name}")
    .build()

本当はここがデータクラスとかになってくれると実装方法が統一されるのと、プロパティの型もプラットフォーム型を扱わなくて良くなるので、結構Kotlin化したい部分ではあります。
まだ最初のリリースなので、今後のアップデートで対応されるのではないかと期待はしています。

Channel

クライアントからサーバーへの接続に使うChannel関連のコードも全てJavaのままです。
ManagedChannelやManagedChannelBuilderといったクラスはJavaを扱うことになります。

こちらもKotlin化されれば、下記のようなBuilderパターンでの実装が排除されるので良いですね。

ManagedChannelBuilder.forAddress("localhost", port)
    .usePlaintext()
    .executor(Dispatchers.Default.asExecutor())
    .build()

Interceptor

サンプルの方では触れていませんが、ClientInterceptor ServerInterceptorといったInterceptor関連のコードもJavaのままでした。
こちらはやるとしても少し後になるかな、という気はしています。

まとめ:grpc-javaをKotlinでラッピングし、非同期処理前提で実装しやすくしているイメージ

現在対応されている内容を見ると、まずはgRPC関連の処理をKotlinでコルーチンを使った非同期処理として実装できるように、という方向性に見えますね。

ここまでも書いてきましたが、基本的にgrpc-kotlinはgrpc-javaがあった上で動くものになっています。
JavaのAbstractクラスを継承し、コルーチンを使う形に変更したKotlinのAbstractクラスがあったり、ライブラリ内にあるServerCalls ClientCallsなどでJavaの実装を呼び出す形にしてラッピングしているものもあります。

今後Kotlin化されている範囲が増えて行くとしても、この構成は変わらないのかなという気がするので、grpc-javaへの依存は必要になるのかなとは思います。
ただ、ライブラリ側でラッピングしてくれれば実装側でJavaを意識しなくては良くなり嬉しいので、今後のアップデートも期待したいです。