PG-Strom v6.0特集:GPU-Sortと一部のWindow関数に対応(その2)
PG-Strom v6.0をリリースしました。
といった、いくつかの重要機能を含むリリースで、特にGPU-Sortによって新しいワークロードへの対応が広がったという事でバージョン6.x系列としました。
その主要機能について、何回かに分けて解説していきたいと思います。
前回の記事では、GPU-SortをGPU-Joinにアタッチし、LIMIT句のプッシュダウンによりGPUからCPUへ返却する行数を減らす処理について解説しました。
今回は、その続きとして集約関数を実行後にソートする処理、およびWindow関数のプッシュダウンについて説明します。
完全なAggregationをGPUで生成する
以下の実行計画を見てください。
4つのテーブル(lineorder、date1、part、supplier)をJOINし、d_year列、p_brand1列によるGROUP BYと集約関数AVG()を実行するシンプルな問い合わせです。
=# explain
select avg(lo_revenue), d_year, p_brand1
from lineorder, date1, part, supplier
where lo_orderdate = d_datekey
and lo_partkey = p_partkey
and lo_suppkey = s_suppkey
and p_category = 'MFGR#12'
and s_region = 'AMERICA'
group by d_year, p_brand1;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
HashAggregate (cost=13806407.68..13806495.18 rows=7000 width=46) (actual time=43407.303..43410.313 rows=280 loops=1)
Group Key: date1.d_year, part.p_brand1
Batches: 1 Memory Usage: 273kB
-> Gather (cost=13805618.73..13806355.18 rows=7000 width=46) (actual time=43405.470..43409.956 rows=560 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Parallel Custom Scan (GpuPreAgg) on lineorder (cost=13804618.73..13804655.18 rows=7000 width=46) (actual time=43396.506..43396.532 rows=187 loops=3)
GPU Projection: pgstrom.pavg(lo_revenue), d_year, p_brand1
GPU Join Quals [1]: (p_partkey = lo_partkey) [plan: 2500011000 -> 98584180, exec: 7520 -> 250]
GPU Outer Hash [1]: lo_partkey
GPU Inner Hash [1]: p_partkey
GPU Join Quals [2]: (s_suppkey = lo_suppkey) [plan: 98584180 -> 19644550, exec: 250 -> 0]
GPU Outer Hash [2]: lo_suppkey
GPU Inner Hash [2]: s_suppkey
GPU Join Quals [3]: (d_datekey = lo_orderdate) [plan: 19644550 -> 19644550, exec: 0 -> 0]
GPU Outer Hash [3]: lo_orderdate
GPU Inner Hash [3]: d_datekey
GpuJoin buffer usage: 143.68MB
GPU Group Key: d_year, p_brand1
Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=114826068, ntuples=7520
-> Parallel Custom Scan (GpuScan) on part (cost=100.00..12682.86 rows=32861 width=14) (actual time=20.596..45.648 rows=26528 loops=3)
GPU Projection: p_brand1, p_partkey
GPU Scan Quals: (p_category = 'MFGR#12'::bpchar) [plan: 2000000 -> 32861, exec: 2000000 -> 79584]
Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=29258, ntuples=2000000
-> Parallel Custom Scan (GpuScan) on supplier (cost=100.00..78960.47 rows=830255 width=6) (actual time=10.240..66.126 rows=667164 loops=3)
GPU Projection: s_suppkey
GPU Scan Quals: (s_region = 'AMERICA'::bpchar) [plan: 9999718 -> 830255, exec: 10000000 -> 2001491]
Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=168663, ntuples=10000000
-> Parallel Seq Scan on date1 (cost=0.00..62.04 rows=1504 width=8) (actual time=0.008..0.139 rows=852 loops=3)
Planning Time: 0.755 ms
Execution Time: 43410.797 ms
(31 rows)この実行計画では、CPUで集約関数を実行するHashAggregateの下に、並列ワーカープロセスを制御するGatherノード、そしてGPUで部分的な集計処理を行うGpuPreAggがぶら下がっています。
GpuPreAggが返却する結果セットはGPU Projection行にある通り、d_year列、p_brand1列、および集計値のpgstrom.pavg(lo_revenue)です。
集計値は非NULLなlo_revenue列の件数とその総和を含むバイナリデータ(bytea型)で、平均値の定義より、各ワーカープロセスがこれらの値を返せば HashAggregateは平均値を計算する事ができます。
では、なぜこのような2段階の方式を取っているのでしょうか?
これには歴史的な経緯と技術的な理由がそれぞれあります。
歴史的な経緯
PG-Strom v3.x系列まではPostgreSQLバックエンドプロセスが個別にCUDAコンテキストを管理していました。
これはGPUの世界における「プロセス」のようなもので、あるCUDAコンテキストから獲得したGPUメモリを別のCUDAコンテキストから参照するにはひと手間必要でした。

そうすると、PostgreSQLバックグラウンドワーカーで作成したCUDAコンテキスト同士で結果をマージするよりは、GPUで部分的な集計を行い、ワーカーの終了後にCPUで並列処理結果を集計した方がシンプルな構造だったわけです。(これはPostgreSQLの並列クエリと同じ処理の分割方法です)
技術的な理由
もう一つは、CPU-Fallbackに関連するものです。(また出た😅)
典型的にはGPUでの処理中にTOAST化された可変長データを参照した場合など、エラーではないもののGPUでの処理を継続できない場合に、PG-Stromはその行をCPU側に書き戻して、CPUで再実行するCPU-Fallbackという機能を持っています。この可能性がある事で、集計値を計算する際にCPU側にFallbackされた未集計のデータが存在する事を排除できず、そうすると結局はCPU側で最終的な集計処理を行わざるを得ないよねという事になってしまいます。

そういうわけで、Pinned Inner Buffer機能と同じく、明示的にCPU-Fallbackがoffの場合に限って、GPU上で全ての集計処理を行ってしまうという割り切りが必要になったわけです。
完全なAggregationをGPU側で作成する。
以下の実行計画を見てください。(説明のため verbose モードにしています)
ssbm=# set pg_strom.cpu_fallback = off;
SET
ssbm=# explain (verbose, analyze)
select avg(lo_revenue), d_year, p_brand1
from lineorder, date1, part, supplier
where lo_orderdate = d_datekey
and lo_partkey = p_partkey
and lo_suppkey = s_suppkey
and p_category = 'MFGR#12'
and s_region = 'AMERICA'
group by d_year, p_brand1;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=13805618.73..13806460.18 rows=7000 width=46) (actual time=43234.075..43246.038 rows=280 loops=1)
Output: (pgstrom.favg_numeric((pgstrom.pavg(lineorder.lo_revenue)))), date1.d_year, part.p_brand1
Workers Planned: 2
Workers Launched: 2
-> Result (cost=13804618.73..13804760.18 rows=7000 width=46) (actual time=43224.472..43224.530 rows=93 loops=3)
Output: pgstrom.favg_numeric((pgstrom.pavg(lineorder.lo_revenue))), date1.d_year, part.p_brand1
Worker 0: actual time=43218.832..43218.837 rows=0 loops=1
Worker 1: actual time=43228.887..43229.052 rows=280 loops=1
-> Parallel Custom Scan (GpuPreAgg) on public.lineorder (cost=13804618.73..13804655.18 rows=7000 width=46) (actual time=43224.466..43224.482 rows=93 loops=3)
Output: (pgstrom.pavg(lineorder.lo_revenue)), date1.d_year, part.p_brand1
GPU Projection: pgstrom.pavg(lineorder.lo_revenue), date1.d_year, part.p_brand1
GPU Join Quals [1]: (part.p_partkey = lineorder.lo_partkey) [plan: 2500011000 -> 98584180, exec: 7520 -> 250]
GPU Outer Hash [1]: lineorder.lo_partkey
GPU Inner Hash [1]: part.p_partkey
GPU Join Quals [2]: (supplier.s_suppkey = lineorder.lo_suppkey) [plan: 98584180 -> 19644550, exec: 250 -> 0]
GPU Outer Hash [2]: lineorder.lo_suppkey
GPU Inner Hash [2]: supplier.s_suppkey
GPU Join Quals [3]: (date1.d_datekey = lineorder.lo_orderdate) [plan: 19644550 -> 19644550, exec: 0 -> 0]
GPU Outer Hash [3]: lineorder.lo_orderdate
GPU Inner Hash [3]: date1.d_datekey
GpuJoin buffer usage: 143.68MB
GPU Group Key: date1.d_year, part.p_brand1
Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=114826068, ntuples=7520
Worker 0: actual time=43218.831..43218.836 rows=0 loops=1
Worker 1: actual time=43228.872..43228.910 rows=280 loops=1
-> Parallel Custom Scan (GpuScan) on public.part (cost=100.00..12682.86 rows=32861 width=14) (actual time=4.688..21.335 rows=26528 loops=3)
Output: part.p_brand1, part.p_partkey
GPU Projection: part.p_brand1, part.p_partkey
GPU Scan Quals: (part.p_category = 'MFGR#12'::bpchar) [plan: 2000000 -> 32861, exec: 2000000 -> 79584]
Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=29258, ntuples=2000000
Worker 0: actual time=0.729..0.732 rows=0 loops=1
Worker 1: actual time=0.805..0.806 rows=0 loops=1
-> Parallel Custom Scan (GpuScan) on public.supplier (cost=100.00..78960.47 rows=830255 width=6) (actual time=35.323..92.328 rows=667164 loops=3)
Output: supplier.s_suppkey
GPU Projection: supplier.s_suppkey
GPU Scan Quals: (supplier.s_region = 'AMERICA'::bpchar) [plan: 9999718 -> 830255, exec: 10000000 -> 2001491]
Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=168663, ntuples=10000000
Worker 0: actual time=92.302..181.533 rows=1067881 loops=1
Worker 1: actual time=13.335..95.117 rows=933610 loops=1
-> Parallel Seq Scan on public.date1 (cost=0.00..62.04 rows=1504 width=8) (actual time=0.007..0.111 rows=852 loops=3)
Output: date1.d_year, date1.d_datekey
Worker 0: actual time=0.004..0.004 rows=0 loops=1
Worker 1: actual time=0.004..0.004 rows=0 loops=1
Planning Time: 0.688 ms
Execution Time: 43246.432 ms
(45 rows)注意して見ないと分かりませんが、Gather -> Result -> Parallel Custom Scan (GpuPreAgg) という流れでクエリが実行されており、最初の例に存在した Hash Aggregate つまりCPUでの集約処理が存在していません。
依然としてGpuPreAggが返却するのはpgstrom.pavg(lineorder.lo_revenue)というlo_revenue列の件数と総和のペアをパックしたバイナリ値なのですが、それを平均値に直してより上位のプランに出力するのはResultです。これは入力行を1行ずつ単純変換するだけの軽量なプランですので、実行性能への影響はほとんどありません。
そのため、CPUで最終盤の集計処理を行う方法と比べ、特にグループ数の大きな場合では性能改善効果が大きくなります。
そして、GpuPreAggの出力の中で、以下の行に注目してください。
Worker 0: actual time=43218.831..43218.836 rows=0 loops=1
Worker 1: actual time=43228.872..43228.910 rows=280 loops=1このケースではGPUを2台使用し、そしてPostgreSQLバックエンドプロセスとワーカープロセスが2個の計3プロセスで実行していますが、今回はGpuPreAggのスキャンする lineorder 表の終端処理を行ったのが(たまたま)Worker-1で、Worker-1が最後にGPU0とGPU1の処理結果を統合し、それをCPU側に書き戻しているという事が分かります。
そして最も重要なのが、ここで集約演算を行った完全な結果セットを作り出す事ができたという事は、GPUメモリにデータが乗ったそのままの状態で、SortやWindow関数と言った「その後」の処理までをGPUで実行するための道が拓けたという事です。
Window関数のプッシュダウン
そして最後に、今回の目玉機能の一つ、Window関数のプッシュダウンについて説明します。
まず、従来の動作(CPU Fallback=on状態)で実行した以下のクエリと実行計画をご覧ください。
このクエリは、lineorderとcustomerテーブルをJOINし、c_region、c_nation、c_cityとlo_orderdate、sum(lo_revenue)を出力しますが、それぞれc_region、c_nation、c_cityごとにsum(lo_revenue)の合計値の多い日付を上位3件ずつ表示するというものです。分析系のクエリではそこそこ遭遇しそうなパターンの気がします。
実行計画は、GpuPreAggが部分集計結果を返し、GatherとHashAggregateがそれを集約。この時点で60,1500行が中間出力されています。
その後、Run Condition: (rank() OVER (?) < 4)を含むWindowAggによって、各パーティションの上位3件で計750件のデータが出力されています。
言い換えれば、それ以外の60万行は最終結果には寄与しないにも関わらず、GPUからCPUへ書き戻され、ご丁寧にそれをHashAggregateやSortで集計・整列の処理を行っているわけです。
ssbm=# explain analyze
select * from (
select c_region, c_nation, c_city, lo_orderdate, sum(lo_revenue) lo_rev,
rank() over(partition by c_region, c_nation, c_city
order by sum(lo_revenue)) cnt
from lineorder, customer
where lo_custkey = c_custkey
group by c_region, c_nation, c_city, lo_orderdate
) subqry
where cnt < 4;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
WindowAgg (cost=74232835.17..76112522.67 rows=75187500 width=84) (actual time=58176.445..58352.400 rows=750 loops=1)
Run Condition: (rank() OVER (?) < 4)
-> Sort (cost=74232835.17..74420803.92 rows=75187500 width=76) (actual time=58176.435..58256.419 rows=601500 loops=1)
Sort Key: customer.c_region, customer.c_nation, customer.c_city, (pgstrom.sum_numeric((pgstrom.psum(lineorder.lo_revenue))))
Sort Method: quicksort Memory: 76268kB
-> HashAggregate (cost=58207057.50..61055958.87 rows=75187500 width=76) (actual time=55033.033..55321.629 rows=601500 loops=1)
Group Key: customer.c_region, customer.c_nation, customer.c_city, lineorder.lo_orderdate
Planned Partitions: 8 Batches: 1 Memory Usage: 516113kB
-> Gather (cost=40216881.72..48127233.28 rows=75187500 width=76) (actual time=53907.157..54301.344 rows=1203000 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Parallel Custom Scan (GpuPreAgg) on lineorder (cost=40215881.72..40607483.28 rows=75187500 width=76) (actual time=53808.628..53903.530 rows=401000 loops=3)
GPU Projection: pgstrom.psum(lo_revenue), c_region, c_nation, c_city, lo_orderdate
GPU Join Quals [1]: (lo_custkey = c_custkey) [plan: 2500011000 -> 2500011000, exec: 3715510 -> 3413504]
GPU Outer Hash [1]: lo_custkey
GPU Inner Hash [1]: c_custkey
GpuJoin buffer usage: 3204.35MB
GPU Group Key: c_region, c_nation, c_city, lo_orderdate
Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=114826068, ntuples=3715510
-> Parallel Seq Scan on customer (cost=0.00..644671.73 rows=12501573 width=46) (actual time=0.031..1472.440 rows=10000000 loops=3)
Planning Time: 0.449 ms
Execution Time: 58734.809 ms
(22 rows)では次に、CPU-Fallbackを無効化した状態で同じクエリを実行してみます。
ssbm=# set pg_strom.cpu_fallback = off;
SET
ssbm=# explain analyze
select * from (
select c_region, c_nation, c_city, lo_orderdate, sum(lo_revenue) lo_rev,
rank() over(partition by c_region, c_nation, c_city
order by sum(lo_revenue)) cnt
from lineorder, customer
where lo_custkey = c_custkey
group by c_region, c_nation, c_city, lo_orderdate
) subqry
where cnt < 4;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
WindowAgg (cost=40296711.63..40315461.63 rows=125000 width=84) (actual time=54055.549..54373.242 rows=750 loops=1)
Run Condition: (rank() OVER (?) < 4)
-> Gather (cost=40296711.63..40312649.13 rows=125000 width=76) (actual time=54055.534..54372.279 rows=750 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Result (cost=40295711.63..40299149.13 rows=125000 width=76) (actual time=53960.565..53960.756 rows=250 loops=3)
-> Parallel Custom Scan (GpuPreAgg) on lineorder (cost=40295711.63..40297274.13 rows=125000 width=76) (actual time=53960.559..53960.635 rows=250 loops=3)
GPU Projection: pgstrom.psum(lo_revenue), c_region, c_nation, c_city, lo_orderdate
GPU Join Quals [1]: (lo_custkey = c_custkey) [plan: 2500011000 -> 2500011000, exec: 3749497 -> 3444736]
GPU Outer Hash [1]: lo_custkey
GPU Inner Hash [1]: c_custkey
GpuJoin buffer usage: 3204.35MB
GPU Group Key: c_region, c_nation, c_city, lo_orderdate
Scan-Engine: GPU-Direct with 2 GPUs <0,1>; direct=114826068, ntuples=3749497
GPU-Sort keys: c_region, c_nation, c_city, pgstrom.fsum_numeric((pgstrom.psum(lo_revenue)))
Window-Rank Filter: rank() over(PARTITION BY c_region, c_nation, c_city ORDER BY pgstrom.fsum_numeric((pgstrom.psum(lo_revenue)))) < 4
-> Parallel Seq Scan on customer (cost=0.00..644671.73 rows=12501573 width=46) (actual time=0.032..1463.581 rows=10000000 loops=3)
Planning Time: 0.401 ms
Execution Time: 54707.476 ms
(19 rows)なかなかキモイ感じでシンプル化されているのがお分かりでしょうか?
Window関数を含むクエリの場合、下位プランに『~~の順にソートされていると嬉しいよ』とヒントが与えられます。
下位プランの作成・評価の際には、それを見た上で、ドンピシャのB-treeインデックスが存在すればそれを使いますし、Sortのプランを挟むといったアレンジも行われます。
このGpuPreAggでは、まずCPU-Fallback=offである事から完全な集約関数の結果を保持している事を前提として、rank()関数のPARTITION BY句でc_region, c_nation, c_cityが指定されている事から、GPU-Sortをアタッチした方がコストの低いプランを作れます。以下のようになっているわけですね。
GPU-Sort keys: c_region, c_nation, c_city, pgstrom.fsum_numeric((pgstrom.psum(lo_revenue)))
さらに、Window関数の使い方がrank() < CONSTである事から、これは「上位xx件」という形で行数を削除するタイプのクエリである事を判定します。
そうすると、rank()以外のよりバリエーションのあるWindow関数の処理はCPU側に任せるとしても、少なくともフィルタリングの対象となる行をわざわざCPUに戻して負荷を上げる事は避けられるというわけです。
Window-Rank Filter: rank() over(PARTITION BY c_region, c_nation, c_city ORDER BY pgstrom.fsum_numeric((pgstrom.psum(lo_revenue)))) < 4
現在のところ、以下のパターンのWindow関数のプッシュダウンに対応しています。
rank() < または <= CONSTdense_rank() < または <= CONSTrow_number() < または <= CONST
これらの機能により、分析系クエリではそこそこ使用頻度の高くチューニングの方法がほとんどないWindow関数に関しても、やりようによっては高速化のアプローチが一つ手に入ったという事になるのではないでしょうか。