タケハタのブログ

プログラマの生き方、働き方、技術について雑多に書いていくブログです。

Spring WebFluxとKotlin CoroutinesでJDBCを実行した時の挙動

よくSpring WebFluxでのリアクティブプログラミングや、Kotlin Coroutinesの話で、「JDBCの処理を呼ぶとブロッキングなので意味が・・・」という話がでてきます。
が、実際に動かしてどういう挙動になるのか、なにが問題なのかを説明しているものがあまりないのでまとめてみました。

Spring WebFluxとKotlin Coroutinesを使い、ノンブロッキングとブロッキングの処理を呼んだ時の違いや、それをJDBCの処理に置き換えた時の挙動などをスレッドの動きを見ながら解説します。

※ Spring WebFluxやORMなど各技術スタックの使い方の説明は割愛しています

ブロッキングとノンブロッキングでの挙動の違い

まずはブロッキングの処理とノンブロッキングの処理での挙動の違いを見てみます。
コルーチンの中でブロッキングなThread.sleep、ノンブロッキングなdelayを呼んだ場合のそれぞれの挙動を確認します。

実行するコード

以下のようにブロッキングの処理とノンブロッキングの処理を、それぞれcoroutineScope内で非同期実行します。

@Service
class GreeterService {
    suspend fun blocking() {
        coroutineScope {
            launch {
                println("${LocalDateTime.now()} start launch A thread:${Thread.currentThread()}")
                for (i in 1..5) {
                    Thread.sleep(1000)
                    println("${LocalDateTime.now()} Hello A $i thread:${Thread.currentThread()}")
                }
            }
            launch {
                println("${LocalDateTime.now()} start launch B thread:${Thread.currentThread()}")
                for (i in 1..5) {
                    Thread.sleep(1000)
                    println("${LocalDateTime.now()} Hello B $i thread:${Thread.currentThread()}")
                }
            }
            launch {
                println("${LocalDateTime.now()} start launch C thread:${Thread.currentThread()}")
                for (i in 1..5) {
                    Thread.sleep(1000)
                    println("${LocalDateTime.now()} Hello C $i thread:${Thread.currentThread()}")
                }
            }
        }
    }

    suspend fun nonBlocking() {
        coroutineScope {
            launch {
                println("${LocalDateTime.now()} start launch A thread:${Thread.currentThread()}")
                for (i in 1..5) {
                    delay(1000)
                    println("${LocalDateTime.now()} Hello A $i thread:${Thread.currentThread()}")
                }
            }
            launch {
                println("${LocalDateTime.now()} start launch B thread:${Thread.currentThread()}")
                for (i in 1..5) {
                    delay(1000)
                    println("${LocalDateTime.now()} Hello B $i thread:${Thread.currentThread()}")
                }
            }
            launch {
                println("${LocalDateTime.now()} start launch C thread:${Thread.currentThread()}")
                for (i in 1..5) {
                    delay(1000)
                    println("${LocalDateTime.now()} Hello C $i thread:${Thread.currentThread()}")
                }
            }
        }
    }
}

launchの中でblocking関数ではブロッキング処理のThread.sleepnonblocking関数ではノンブロッキング処理のdelayで1秒ずつの待機処理を入れています。
そして各launchのブロックの先頭と、実行した後で時間とループのカウンタ、実行されているスレッドを出力しています。
どのブロックでの実行か判別するため、A、B、Cという文字列も出力しています。

そしてこれを以下のRouterから呼び出します。

@Component
class GreeterRouter(private val greeterService: GreeterService) {
    @Bean
    fun greeterRoutes() = coRouter {
        GET("/blocking") {
            greeterService.blocking()
            ServerResponse.ok().bodyValueAndAwait("Blocking")
        }

        GET("/nonblocking") {
            greeterService.nonBlocking()
            ServerResponse.ok().bodyValueAndAwait("NonBlocking")
        }
    }
}

ブロッキングの処理

まず、ブロッキングの処理での挙動です。
以下のようにblocking関数を実行します。

$ curl http://localhost:8080/blocking
Blocking

結果としては次のようになります。

2021-09-10T09:44:52.704136 start launch A thread:Thread[reactor-http-nio-2 @coroutine#2,5,main]
2021-09-10T09:44:53.709400 Hello A 1 thread:Thread[reactor-http-nio-2 @coroutine#2,5,main]
2021-09-10T09:44:54.717926 Hello A 2 thread:Thread[reactor-http-nio-2 @coroutine#2,5,main]
2021-09-10T09:44:55.721963 Hello A 3 thread:Thread[reactor-http-nio-2 @coroutine#2,5,main]
2021-09-10T09:44:56.726440 Hello A 4 thread:Thread[reactor-http-nio-2 @coroutine#2,5,main]
2021-09-10T09:44:57.729325 Hello A 5 thread:Thread[reactor-http-nio-2 @coroutine#2,5,main]
2021-09-10T09:44:57.729949 start launch B thread:Thread[reactor-http-nio-2 @coroutine#3,5,main]
2021-09-10T09:44:58.734963 Hello B 1 thread:Thread[reactor-http-nio-2 @coroutine#3,5,main]
2021-09-10T09:44:59.739694 Hello B 2 thread:Thread[reactor-http-nio-2 @coroutine#3,5,main]
2021-09-10T09:45:00.743815 Hello B 3 thread:Thread[reactor-http-nio-2 @coroutine#3,5,main]
2021-09-10T09:45:01.748340 Hello B 4 thread:Thread[reactor-http-nio-2 @coroutine#3,5,main]
2021-09-10T09:45:02.749532 Hello B 5 thread:Thread[reactor-http-nio-2 @coroutine#3,5,main]
2021-09-10T09:45:02.749876 start launch C thread:Thread[reactor-http-nio-2 @coroutine#4,5,main]
2021-09-10T09:45:03.755196 Hello C 1 thread:Thread[reactor-http-nio-2 @coroutine#4,5,main]
2021-09-10T09:45:04.760001 Hello C 2 thread:Thread[reactor-http-nio-2 @coroutine#4,5,main]
2021-09-10T09:45:05.763684 Hello C 3 thread:Thread[reactor-http-nio-2 @coroutine#4,5,main]
2021-09-10T09:45:06.768072 Hello C 4 thread:Thread[reactor-http-nio-2 @coroutine#4,5,main]
2021-09-10T09:45:07.770133 Hello C 5 thread:Thread[reactor-http-nio-2 @coroutine#4,5,main]

Aのループから順に1秒ごとにログが出力され、直列で実行されているのがわかります。
そして全てreactor-http-nio-2という同一のスレッドで実行されています。

また、VisualVMを使用して実行時のスレッドを見ると以下のようになっています。

API実行前 f:id:take7010:20210918111133p:plain

API実行後 f:id:take7010:20210918111145p:plain

reactor-http-nio-2というスレッドが立ち上がり、Sleep状態になっています。
このreactor-http-nio-x(xは数字)が、Spring WebfluxでAPIアクセスがあった際に立ち上がるイベントループのスレッドです。

ノンブロッキングの処理

今度はノンブロッキングの処理を実行します。

$ curl http://localhost:8080/nonblocking
NonBlocking

結果は以下のようになります。

2021-09-10T09:45:47.679164 start launch A thread:Thread[reactor-http-nio-3 @coroutine#6,5,main]
2021-09-10T09:45:47.682830 start launch B thread:Thread[reactor-http-nio-3 @coroutine#7,5,main]
2021-09-10T09:45:47.683173 start launch C thread:Thread[reactor-http-nio-3 @coroutine#8,5,main]
2021-09-10T09:45:48.685793 Hello A 1 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#6,5,main]
2021-09-10T09:45:48.686279 Hello B 1 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#7,5,main]
2021-09-10T09:45:48.686602 Hello C 1 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#8,5,main]
2021-09-10T09:45:49.689577 Hello A 2 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#6,5,main]
2021-09-10T09:45:49.689769 Hello B 2 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#7,5,main]
2021-09-10T09:45:49.689886 Hello C 2 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#8,5,main]
2021-09-10T09:45:50.692842 Hello A 3 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#6,5,main]
2021-09-10T09:45:50.693031 Hello B 3 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#7,5,main]
2021-09-10T09:45:50.693163 Hello C 3 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#8,5,main]
2021-09-10T09:45:51.697846 Hello A 4 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#6,5,main]
2021-09-10T09:45:51.698084 Hello B 4 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#7,5,main]
2021-09-10T09:45:51.698203 Hello C 4 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#8,5,main]
2021-09-10T09:45:52.702641 Hello A 5 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#6,5,main]
2021-09-10T09:45:52.702950 Hello B 5 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#7,5,main]
2021-09-10T09:45:52.703095 Hello C 5 thread:Thread[kotlinx.coroutines.DefaultExecutor @coroutine#8,5,main]

実行された時間とログの出力されている順番から、A〜Cの各ブロックが並列で実行されていることがわかります。

reactor-http-nio-3というイベントループのスレッドが立ち上がって実行されています。
そして、suspend関数であるdelayが実行され中断した後は、kotlinx.coroutines.DefaultExecutorで処理されています。

JDBCを使用

今度はJDBCを使って挙動を確認してみます。
ORMのMyBatisを使用し、JDBCでの処理を実行します。

すぐ終わってしまうクエリだとわかりづらいため、以下の記事を参考に時間のかかるデータとクエリを作成しました。

qiita.com

使用するテーブルとクエリ

任意のデータベースを作成し、以下のintのvalueのカラムを一つだけ持ったテーブルを作成します。

CREATE TABLE counter(value int);

いくつかのレコードを登録し、さらにサブクエリで大量レコードを作成します。

insert into counter values(1);
insert into counter values(2);
insert into counter values(3);
insert into counter values(4);
insert into counter values(5);
insert into counter values(6);
insert into counter values(7);
insert into counter values(8);
insert into counter values(9);
insert into counter values(10);
insert into counter values(11);
insert into counter values(12);
insert into counter values(13);

insert into counter(value) select a.value value from counter a, counter b, counter c, counter d, counter e, counter f; 

そしてプログラム側からは、以下のMyBatisのMapperから呼び出します。

@Mapper
interface CounterCustomMapper {
    @Select("select count(*) from counter a, counter b, counter c, counter d, counter e, counter f, counter g, counter h, counter i, counter j, counter k, counter l, counter m, counter n, counter o, counter p, counter q, counter r, counter s, counter t;")
    fun selectCount(): Long
}

レコード数やサブクエリで指定するテーブルの数は、動作確認の実行時間としてちょうど良くなるように調整しています。
(私のローカル環境で大体クエリ1回6〜7秒の実行時間でした)

MapperでのJDBCの処理をコルーチンで呼び出す

まずは以下のService、Routerで前述のThread.sleepdelayと同じように、launchでMapperの処理を呼び出します。

@Suppress("SpringJavaInjectionPointsAutowiringInspection")
@Service
class CounterService(private val counterCustomMapper: CounterCustomMapper) {
    suspend fun callCount() {
        coroutineScope {
            launch {
                for (i in 1..5) {
                    counterCustomMapper.selectCount()
                    println("${LocalDateTime.now()} Hello A $i thread:${Thread.currentThread()}")
                }
            }
            launch {
                for (i in 1..5) {
                    counterCustomMapper.selectCount()
                    println("${LocalDateTime.now()} Hello B $i thread:${Thread.currentThread()}")
                }
            }
            launch {
                for (i in 1..5) {
                    counterCustomMapper.selectCount()
                    println("${LocalDateTime.now()} Hello C $i thread:${Thread.currentThread()}")
                }
            }
        }
    }
}
@Component
class CounterRouter(private val counterService: CounterService) {
    @Bean
    fun counterRoutes() = coRouter {
        GET("/callcount") {
            counterService.callCount()
            ServerResponse.ok().bodyValueAndAwait("CallCount")
        }
    }
}

そして以下のcurlコマンドで実行します。

$ curl http://localhost:8080/callcount
CallCount
2021-09-13T11:02:54.942424 start launch A thread:Thread[reactor-http-nio-3 @coroutine#6,5,main]
2021-09-13T11:03:00.911863 Hello A 1 thread:Thread[reactor-http-nio-3 @coroutine#6,5,main]
2021-09-13T11:03:07.026940 Hello A 2 thread:Thread[reactor-http-nio-3 @coroutine#6,5,main]
2021-09-13T11:03:13.266257 Hello A 3 thread:Thread[reactor-http-nio-3 @coroutine#6,5,main]
2021-09-13T11:03:19.456653 Hello A 4 thread:Thread[reactor-http-nio-3 @coroutine#6,5,main]
2021-09-13T11:03:25.794058 Hello A 5 thread:Thread[reactor-http-nio-3 @coroutine#6,5,main]
2021-09-13T11:03:25.794373 start launch B thread:Thread[reactor-http-nio-3 @coroutine#7,5,main]
2021-09-13T11:03:32.263653 Hello B 1 thread:Thread[reactor-http-nio-3 @coroutine#7,5,main]
2021-09-13T11:03:38.543198 Hello B 2 thread:Thread[reactor-http-nio-3 @coroutine#7,5,main]
2021-09-13T11:03:44.691789 Hello B 3 thread:Thread[reactor-http-nio-3 @coroutine#7,5,main]
2021-09-13T11:03:50.738404 Hello B 4 thread:Thread[reactor-http-nio-3 @coroutine#7,5,main]
2021-09-13T11:03:57.001334 Hello B 5 thread:Thread[reactor-http-nio-3 @coroutine#7,5,main]
2021-09-13T11:03:57.001555 start launch C thread:Thread[reactor-http-nio-3 @coroutine#8,5,main]
2021-09-13T11:04:03.194951 Hello C 1 thread:Thread[reactor-http-nio-3 @coroutine#8,5,main]
2021-09-13T11:04:09.515585 Hello C 2 thread:Thread[reactor-http-nio-3 @coroutine#8,5,main]
2021-09-13T11:04:15.736343 Hello C 3 thread:Thread[reactor-http-nio-3 @coroutine#8,5,main]
2021-09-13T11:04:21.914033 Hello C 4 thread:Thread[reactor-http-nio-3 @coroutine#8,5,main]
2021-09-13T11:04:28.072603 Hello C 5 thread:Thread[reactor-http-nio-3 @coroutine#8,5,main]

全てreactor-http-nio-3というスレッドで動作し、A〜Cのlaunchの処理が直列で実行されているのがわかります。
これではコルーチンを使っている意味がなにもありません。

コンテキストを切り替えて実行する

JDBCを呼び出す処理を並列に実行するには、以下のようにコンテキストを切り替える必要があります。

suspend fun callCountWithDispatchers() {
    withContext(Dispatchers.IO) {
        launch {
            for (i in 1..5) {
                counterCustomMapper.selectCount()
                println("${LocalDateTime.now()} Hello A $i thread:${Thread.currentThread()}")
            }
        }
        launch {
            for (i in 1..5) {
                counterCustomMapper.selectCount()
                println("${LocalDateTime.now()} Hello B $i thread:${Thread.currentThread()}")
            }
        }
        launch {
            for (i in 1..5) {
                counterCustomMapper.selectCount()
                println("${LocalDateTime.now()} Hello C $i thread:${Thread.currentThread()}")
            }
        }
    }
}

これを以下のrouterで呼び出し、実行します。

GET("/callcountwithdispatchers") {
    counterService.callCountWithDispatchers()
    ServerResponse.ok().bodyValueAndAwait("CallCountWithDispatchers")
}
$ curl http://localhost:8080/callcountwithdispatchers
CallCountWithDispatchers

すると次のようなログが出力されます。

2021-09-13T11:05:48.433813 start launch A thread:Thread[DefaultDispatcher-worker-3 @coroutine#10,5,main]
2021-09-13T11:05:48.434869 start launch B thread:Thread[DefaultDispatcher-worker-2 @coroutine#11,5,main]
2021-09-13T11:05:48.435298 start launch C thread:Thread[DefaultDispatcher-worker-1 @coroutine#12,5,main]
2021-09-13T11:05:54.401958 Hello C 1 thread:Thread[DefaultDispatcher-worker-1 @coroutine#12,5,main]
2021-09-13T11:05:54.403181 Hello B 1 thread:Thread[DefaultDispatcher-worker-2 @coroutine#11,5,main]
2021-09-13T11:05:54.404092 Hello A 1 thread:Thread[DefaultDispatcher-worker-3 @coroutine#10,5,main]
2021-09-13T11:06:00.327832 Hello C 2 thread:Thread[DefaultDispatcher-worker-1 @coroutine#12,5,main]
2021-09-13T11:06:00.327836 Hello A 2 thread:Thread[DefaultDispatcher-worker-3 @coroutine#10,5,main]
2021-09-13T11:06:00.328467 Hello B 2 thread:Thread[DefaultDispatcher-worker-2 @coroutine#11,5,main]
2021-09-13T11:06:06.195797 Hello A 3 thread:Thread[DefaultDispatcher-worker-3 @coroutine#10,5,main]
2021-09-13T11:06:06.197521 Hello B 3 thread:Thread[DefaultDispatcher-worker-2 @coroutine#11,5,main]
2021-09-13T11:06:06.203643 Hello C 3 thread:Thread[DefaultDispatcher-worker-1 @coroutine#12,5,main]
2021-09-13T11:06:11.974274 Hello A 4 thread:Thread[DefaultDispatcher-worker-3 @coroutine#10,5,main]
2021-09-13T11:06:11.978617 Hello C 4 thread:Thread[DefaultDispatcher-worker-1 @coroutine#12,5,main]
2021-09-13T11:06:11.978617 Hello B 4 thread:Thread[DefaultDispatcher-worker-2 @coroutine#11,5,main]
2021-09-13T11:06:18.185883 Hello B 5 thread:Thread[DefaultDispatcher-worker-2 @coroutine#11,5,main]
2021-09-13T11:06:18.188509 Hello C 5 thread:Thread[DefaultDispatcher-worker-1 @coroutine#12,5,main]
2021-09-13T11:06:18.189157 Hello A 5 thread:Thread[DefaultDispatcher-worker-3 @coroutine#10,5,main]

それぞれのlaunchのブロックがDefaultDispatcher-worker-1〜3と別のスレッドで起動し、並列で実行されているのがわかります。
前述のサンプルとの差分が分かりやすいようにlaunchで書きましたが、例えば以下のようにasyncで書いても同様です。

suspend fun callCountWithDispatchers() {
    withContext(Dispatchers.IO) {
        val count1 = async { counterCustomMapper.selectCount() }
        val count2 = async { counterCustomMapper.selectCount() }
        val count3 = async { counterCustomMapper.selectCount() }
        println(count1.await() + count2.await() + count3.await())
    }
}

実際にはこういった並列でselectを実行するような形が、多く使われるパターンになるかなと思います。

問題点

ここまで書いてきたことの問題点としては、以下のようになります。

  • ただlaunchなどのコルーチンビルダー上から呼び出すだけでは、処理がブロックされて直列になってしまう
  • コンテキストを切り替えることで並列処理にはできるが、並列の処理の数だけスレッドが増えるのでSpring WebFluxの利点をフルに活かせない

並列処理にすることで処理速度の意味でのパフォーマンスは上げられますが、スレッドを効率よく使ってくれるSpring WebFluxを使用する意義が薄れてしまいます。

Spring WebFluxを使う意味

ではDBアクセスが多いアプリケーションの場合、Spring WebFluxを使う意味がないかというと、そういうわけでもないです。

ブロッキングされる処理がないAPIでは有効

ほとんどのシステムではDBへのアクセス処理が多く存在すると思いますが、例えばブロッキングされる処理がない

  • 計算処理だけを行うようなAPI
  • 外部サービスへ接続するAPI

といったようなAPIでは意味があります。
特にマイクロサービスアーキテクチャで作られているシステムでは、他のマイクロサービスだけに接続してデータを取得、更新するようなAPIもあるかと思います。

また、Router Functionsを使用することで得られるメリットを享受したい場合も意味があります。

R2DBCを使用すればDBアクセスも非同期実行にできる

ORM含めJDBCを使用した処理を実行するとブロッキングになってしまいますが、非同期でのDBアクセスを実現するためのR2DBCというライブラリがあります。
このSpring WebFluxの処理がDBアクセスでブロッキングされてしまう問題に対しての対応としてよく上がるものですが、導入に踏み切れているプロジェクトはあまり多くないです。

まだ成熟していない技術なこともあり、導入のコストとメリット考えた時に悩ましいところですが、検討はしてみたいところです。
Springの公式ドキュメントでも、以下の中で使用されています。

spring.io

これはまた別の記事で書こうかと思います。

まとめ

  • Spring WebFlux、Kotlin Coutinesの中でJDBCの処理を呼んでも同一スレッドでの直列処理になる
  • コンテキストを切り替えることで並列実行できるが、Spring WebFluxを使う意義が薄れる
  • ブロッキング処理のないAPIでは使う意味がある
  • 最大限に活かすならばR2DBCの使用も検討に

この辺の技術を上手く使って、パフォーマンス向上につなげていきたいですね。

【宣伝】サーバーサイドKotlinの書籍を発売しています!

私の執筆した書籍「Kotlin サーバーサイドプログラミング実践開発」が発売中です!

gihyo.jp

タイトル通り実践での開発にも持っていける内容になっているので、サーバーサイドKotlinを始めてみたい方はぜひお手にとっていただければと思います。