PG-Strom v6.0特集:GPU-Sortと一部のWindow関数に対応(その1)

PG-Strom v6.0をリリースしました。

  • GPU-Sortと一部のWindow関数対応
  • マルチGPUのPinned Inner Buffer対応
  • Arrow_Fdwの仮想列機能
  • GPUでの完全な集計値の生成

といった、いくつかの重要機能を含むリリースで、特にGPU-Sortによって新しいワークロードへの対応が広がったという事でバージョン6.x系列としました。
その主要機能について、何回かに分けて解説していきたいと思います。

GPU-Sortの歴史的経緯

長らく、PG-StromがオフロードするワークロードはSCAN、JOIN、GROUP-BYの三種類でした。
これらにGPUバイスで対応しているSQL関数や演算子の組み合わせをGPUで並列実行し、またGPU-Direct SQLによる高速なI/Oサブシステムによって、大量のデータをスキャンし、集計した結果を出力するというのがPG-Stromの使いどころでした。

これはそれほど根拠レスな話ではありません。以前に分析系SQLの性能評価に使われるTPC-DSのクエリ103本の実行時間を調査して、SQLのワークロードのうちにどのような処理が実際に時間を費やしているのか調べた事があるのですが、ざっくり34%がSCAN、37%がJOIN、23%が集計処理という事で、この3つで総処理時間の95%を占めています。

しかし、行数が多い場合には特異的にSort処理の時間がかかったり、またSortを前処理として要求するWindow関数の処理に時間を要したりしているクエリもあり、より網羅的なワークロードへの対応という事では選択肢の一つでした。

実はかつて、PG-StromでもGPU-Sortの開発を試みた事がありました。
PG-Stromのリポジトリには、『もはや使わなくなったけれども、もしかしたら同じロジックを将来使うかもしれないし、その時の参考になるかもしれない』コードをモスポール保存しておくためのdeadcodeディレクトリというものがあるのですが、そこにopencl_gpusort.cというコードが存在するほどです。
github.com

CUDAではなくOpenCLを実装のベースとしていた頃なので、おそらく2015年前後。今からちょうど10年前だと思います。
この頃、開発に使用していたGPUはGTX980(メモリ4GB)やTESLA K20(メモリ5GB)といったモデルで、GPUに搭載可能なデータ量というのは今とは比較にならないほど小さなものでした。物理的な限界もさておき、当時はManaged Memoryを使用した時でもGPU Memory Oversubscriptionができなかったため、バッファサイズの推定(予想)はかなり保守的に行わざるを得ず、3GB程度のバッファを用意してスキャンを開始しても、実際にテーブルから読み出して条件句に合致する行だけをバッファに乗せたら200MB程度しか使っていなかった、という事もザラでした。

そうすると、GPUでソート処理を行ったとしても大した規模のソート処理を行うことはできず、最終的にCPUでマージソートを実行せねばならないため、それなら最初からCPUのメモリに乗せてクイックソートでもした方が速いのでは・・・みたいな事になる事もザラでした。実際、当時のGPU-Sortは良くてCPUと同じくらい。大抵は遅くなるというシロモノでした。

しかし、最近のGPUは急速に搭載メモリを増やしてきています。2017年にTESLA V100の搭載メモリが16GBだったのが、2020年のNVIDIA A100では40GBに、2023年のNVIDIA H100では80GBを搭載し、先日のGTC2025で発表されたRTX Pro 6000 Blackwellでは96GBのメモリが搭載されるようになっています。
これだけのデータをGPU上で一度に処理できるようになってくると話が変わってきて、GPU-JoinやGPU-PreAggの後処理として『ソート済み』の結果をCPUに返し、CPUでのソート処理を省略してやる事で、実用的な高速化の効果を実感できるようになってきたのです。

GPUメモリ上での処理フロー

Pinned Inner Bufferの機能とも関係するところですが、PG-StromがSCANやJOINのワークロードを処理する場合と、GROUP-BYのワークロードを処理する場合では、少しデータの動き方が異なります。

全てのワークロードに共通して、テーブルをスキャンする時には約64MB単位のブロックに区切って(Apache Arrowの場合はRecord-Batchを単位として)これをGPUメモリに読み出します。GPU上では命令コードに従ってWHERE句やJOIN結合条件の評価、あるいは集約関数の集計処理といった処理を行い、最後に処理結果を結果バッファに書き込みます。

基本的に、SCAN/JOINは読み出した64MBのブロックごとに結果を書き戻し(one-by-one execution)て、使用したバッファは次のブロックのために解放し再利用します。そのため、たとえテーブルの大きさが1TBあろうとも、少ないGPUメモリでこれを処理できます。
一方で、GROUP BYの場合は集計値をGPUメモリに置いておき、次の入力ブロック、次の入力ブロック、、、、と次々に集計値を保持する結果バッファを更新して、最後に結果を書き戻す事になります。

ソート処理を実行するには、対象となるデータが全てGPUメモリ上に乗っていることが必要です。
そのため、SCANやJOINにGPU-Sortが付加する場合は、これまでのGROUP-BYのインフラを利用して、処理結果をGPUに留置するように手が加えられています。つまり、SCANやJOINであっても Dam-execution 方式となるため、結果セットがGPUメモリの範囲内に収まるかどうかは注意が必要です。

もう一点、これはPinned Inner Bufferとも共通の制限事項ですが、CPU-Fallback機構と同時に利用する事はできません。
例えば非常に長い可変長データがTOAST化されていた場合など、GPUでその文字列やジオメトリデータを処理したくとも、GPUにロードされたデータの中にそれが含まれていなければどうしようもありません。そういった場合には、CPUで再実行するために対象行をCPU側へ書き戻し、CPUで再実行する仕組みがPG-Stromには備わっています。これをCPU-Fallback機構と呼んでいるのですが、この仕組みによって行がCPUに書き戻された場合、GPUメモリ上にはソート処理に必要な「完全な結果セット」が存在しない事になります。

そのため、GPU-Sortの有効化には、ワークロードが結果セットの並び替えを要求している事と同時に、CPU-Fallbackが無効化されている事が必要です。

例えば、以下のクエリ(ORDER BYを含む)は GpuJoin の処理結果に部分Sort(これはGather配下のワーカープロセスでも実行されるため)を実行し、各ワーカープロセスが生成した部分Sort済みの結果セットを Gather Merge で統合しているのが分かります。そして、最後にLimitを実行し上位15件のみを取り出します。
全てのソート処理はCPUで実行されているため、GPUでの処理結果(GpuJoin)は特に並び替えられていません。

ssbm=# explain analyze
        select d_year, p_brand1
          from lineorder, date1, part, supplier
         where lo_orderdate = d_datekey
           and lo_partkey = p_partkey
           and lo_suppkey = s_suppkey
           and p_brand1 in ('MFGR#2221','MFGR#2228')
           and s_region in ('ASIA','AMERICA')
         order by d_year, lo_discount
         limit 15;
                                                                                QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=12569927.20..12569928.96 rows=15 width=18) (actual time=44083.849..44103.448 rows=15 loops=1)
   ->  Gather Merge  (cost=12569927.20..13031121.74 rows=3952820 width=18) (actual time=44083.848..44103.444 rows=15 loops=1)
         Workers Planned: 2
         Workers Launched: 2
         ->  Sort  (cost=12568927.18..12573868.21 rows=1976410 width=18) (actual time=44075.562..44075.567 rows=15 loops=3)
               Sort Key: date1.d_year, lineorder.lo_discount
               Sort Method: top-N heapsort  Memory: 26kB
               Worker 0:  Sort Method: top-N heapsort  Memory: 27kB
               Worker 1:  Sort Method: top-N heapsort  Memory: 26kB
               ->  Parallel Custom Scan (GpuJoin) on lineorder  (cost=143664.84..12520437.04 rows=1976410 width=18) (actual time=1070.804..43861.693 rows=1453231 loops=3)
                     GPU Projection: d_year, p_brand1, lo_discount
                     GPU Join Quals [1]: (p_partkey = lo_partkey) [plan: 2500011000 -> 4976272, exec: 5999989709 -> 12086435]
                     GPU Outer Hash [1]: lo_partkey
                     GPU Inner Hash [1]: p_partkey
                     GPU Join Quals [2]: (s_suppkey = lo_suppkey) [plan: 4976272 -> 1976410, exec: 12086435 -> 4359693]
                     GPU Outer Hash [2]: lo_suppkey
                     GPU Inner Hash [2]: s_suppkey
                     GPU Join Quals [3]: (d_datekey = lo_orderdate) [plan: 1976410 -> 1976410, exec: 4359693 -> 4359693]
                     GPU Outer Hash [3]: lo_orderdate
                     GPU Inner Hash [3]: d_datekey
                     GpuJoin buffer usage: 275.22MB
                     Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=114826068, ntuples=5999989709
                     ->  Parallel Custom Scan (GpuScan) on part  (cost=100.00..12370.84 rows=1659 width=14) (actual time=22.416..50.698 rows=1337 loops=3)
                           GPU Projection: p_brand1, p_partkey
                           GPU Scan Quals: (p_brand1 = ANY ('{MFGR#2221,MFGR#2228}'::bpchar[])) [plan: 2000000 -> 1659, exec: 2000000 -> 4010]
                           Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=29258, ntuples=2000000
                     ->  Parallel Custom Scan (GpuScan) on supplier  (cost=100.00..87206.07 rows=1654815 width=6) (actual time=8.979..120.408 rows=1333704 loops=3)
                           GPU Projection: s_suppkey
                           GPU Scan Quals: (s_region = ANY ('{ASIA,AMERICA}'::bpchar[])) [plan: 9999718 -> 1654815, exec: 10000000 -> 4001111]
                           Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=168663, ntuples=10000000
                     ->  Parallel Seq Scan on date1  (cost=0.00..62.04 rows=1504 width=8) (actual time=0.008..0.112 rows=852 loops=3)
 Planning Time: 0.970 ms
 Execution Time: 44103.741 ms
(33 rows)

ここで、CPU-Fallbackを無効化します。
実行計画がシンプルになったのがお分かりでしょうか?

ssbm=# set pg_strom.cpu_fallback = off;
SET
ssbm=# explain analyze
        select d_year, p_brand1
          from lineorder, date1, part, supplier
         where lo_orderdate = d_datekey
           and lo_partkey = p_partkey
           and lo_suppkey = s_suppkey
           and p_brand1 in ('MFGR#2221','MFGR#2228')
           and s_region in ('ASIA','AMERICA')
         order by d_year, lo_discount
         limit 15;
                                                                       QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------
 Gather  (cost=12514590.37..12514591.95 rows=15 width=18) (actual time=44147.437..44177.058 rows=15 loops=1)
   Workers Planned: 2
   Workers Launched: 2
   ->  Parallel Custom Scan (GpuJoin) on lineorder  (cost=12513590.37..12513590.45 rows=15 width=18) (actual time=43961.197..43961.204 rows=5 loops=3)
         GPU Projection: lo_discount, d_year, p_brand1
         GPU Join Quals [1]: (p_partkey = lo_partkey) [plan: 2500011000 -> 4976272, exec: 6000184056 -> 12086748]
         GPU Outer Hash [1]: lo_partkey
         GPU Inner Hash [1]: p_partkey
         GPU Join Quals [2]: (s_suppkey = lo_suppkey) [plan: 4976272 -> 1976410, exec: 12086748 -> 4359793]
         GPU Outer Hash [2]: lo_suppkey
         GPU Inner Hash [2]: s_suppkey
         GPU Join Quals [3]: (d_datekey = lo_orderdate) [plan: 1976410 -> 1976410, exec: 4359793 -> 4359793]
         GPU Outer Hash [3]: lo_orderdate
         GPU Inner Hash [3]: d_datekey
         GpuJoin buffer usage: 275.22MB
         Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=114826068, ntuples=6000184056
         GPU-Sort keys: d_year, lo_discount
         GPU-Sort Limit: 15
         ->  Parallel Custom Scan (GpuScan) on part  (cost=100.00..12370.84 rows=1659 width=14) (actual time=26.481..35.863 rows=1337 loops=3)
               GPU Projection: p_brand1, p_partkey
               GPU Scan Quals: (p_brand1 = ANY ('{MFGR#2221,MFGR#2228}'::bpchar[])) [plan: 2000000 -> 1659, exec: 2000000 -> 4010]
               Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=29258, ntuples=2000000
         ->  Parallel Custom Scan (GpuScan) on supplier  (cost=100.00..87206.07 rows=1654815 width=6) (actual time=12.544..122.628 rows=1333704 loops=3)
               GPU Projection: s_suppkey
               GPU Scan Quals: (s_region = ANY ('{ASIA,AMERICA}'::bpchar[])) [plan: 9999718 -> 1654815, exec: 10000000 -> 4001111]
               Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=168663, ntuples=10000000
         ->  Parallel Seq Scan on date1  (cost=0.00..62.04 rows=1504 width=8) (actual time=0.008..0.117 rows=852 loops=3)
 Planning Time: 0.812 ms
 Execution Time: 44187.259 ms
(29 rows)

GpuJoinのオプション列を見てみるとGPU-Sort keys: d_year, lo_discountと、GPU-Sort Limit: 15という表示が出ている事が分かるでしょうか?
この表示は、GpuJoinの処理結果をキー値d_yearとlo_discountに基づいてソートする事を示し、また、次の行はソートした結果のうち上位15件だけをGPUからCPUへ書き戻す事を示しています。
このクエリは元々430万件程度しか返しませんので、実行結果のGPU->CPU転送負荷というのはそれほど大したものではないのですが、件数が多くなってくるとプロセス間通信でそれをコピーするというのはそこそこ時間を要する処理となりますので、早い段階で不要な行を削るというのは合理的です。

この後、GPU-PreAggの改良とWindow関数のプッシュダウンについて説明したいと思いますが、長くなりましたのでエントリを分けようと思います。