Partition-wise GpuJoin/GpuPreAgg

PostgreSQL v10以降ではテーブルパーティショニングの機能が入っており、値の範囲、または値のリストによってテーブルをいくつかのパーティションに分割する事が可能となっている。

遅まきながら、PG-Stromにパーティションを意識した実行計画を作成するよう機能拡張を行ってみた。

以下の実行計画を見てもらいたい。これは、従来のPG-Stromを使って、DATE型の列 ymd の値を元に pt_2010, pt_2011, ..., pt_2019 まで一年ごとにデータを別テーブルに分割する構成を取っている。

検索条件にWHERE ymd > '2017-01-01'::dateが含まれているため、明らかに検索条件に該当しない子テーブルはスキャンの対象から外されている。したがって、実際にスキャンが行われるのは pt_2017, pt_2018, pt_2019 の3テーブルのみである。

postgres=# EXPLAIN SELECT cat,count(*),avg(ax) FROM pt NATURAL JOIN t1 WHERE ymd > '2017-01-01'::date GROUP BY cat;
                                                           QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=341392.92..341399.42 rows=200 width=48)
   Group Key: pt.cat
   ->  Sort  (cost=341392.92..341393.92 rows=400 width=72)
         Sort Key: pt.cat
         ->  Gather  (cost=341333.63..341375.63 rows=400 width=72)
               Workers Planned: 2
               ->  Partial HashAggregate  (cost=340333.63..340335.63 rows=200 width=72)
                     Group Key: pt.cat
                     ->  Parallel Custom Scan (GpuJoin)  (cost=283591.92..283591.92 rows=7565562 width=40)
                           Depth 1: GpuHashJoin  (nrows 3152318...7565562)
                                    HashKeys: pt.aid
                                    JoinQuals: (pt.aid = t1.aid)
                                    KDS-Hash (size: 10.78MB)
                           ->  Append  (cost=28540.80..200673.34 rows=3152318 width=36)
                                 ->  Parallel Custom Scan (GpuScan) on pt_2017  (cost=28540.80..66891.11 rows=1050772 width=36)
                                       GPU Filter: (ymd > '2017-01-01'::date)
                                 ->  Parallel Custom Scan (GpuScan) on pt_2018  (cost=28540.81..66883.43 rows=1050649 width=36)
                                       GPU Filter: (ymd > '2017-01-01'::date)
                                 ->  Parallel Custom Scan (GpuScan) on pt_2019  (cost=28540.80..66898.79 rows=1050896 width=36)
                                       GPU Filter: (ymd > '2017-01-01'::date)
                           ->  Seq Scan on t1  (cost=0.00..1935.00 rows=100000 width=12)
(21 rows)

しかしこの実行計画には問題がある。
テーブルスキャンを実行するのに GpuScan が選択されており、この出力を Append が受け取った後で、再び GpuJoin がこれを実行する。
つまり、GPU -> ホストRAM -> GPU -> ホストRAM というデータのピンポンが発生しており、条件句の評価やJOIN処理と言ったCPU喰いの処理のオフロードよりもむしろ、PCIeバスを通じたデータの移動に時間を取られやすいという事が容易に想像できる。

次に、パーティション対応を加えた PG-Strom だとどうなるか。

postgres=# EXPLAIN SELECT cat,count(*),avg(ax) FROM pt NATURAL JOIN t1 WHERE ymd > '2017-01-01'::date group by cat;
                                                        QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=196450.44..196476.94 rows=200 width=48)
   Group Key: pt_2017.cat
   ->  Sort  (cost=196450.44..196453.44 rows=1200 width=72)
         Sort Key: pt_2017.cat
         ->  Gather  (cost=66085.69..196389.07 rows=1200 width=72)
               Workers Planned: 2
               ->  Parallel Append  (cost=65085.69..195269.07 rows=600 width=72)
                     ->  Parallel Custom Scan (GpuPreAgg)  (cost=65085.69..65089.69 rows=200 width=72)
                           Reduction: Local
                           Combined GpuJoin: enabled
                           ->  Parallel Custom Scan (GpuJoin) on pt_2017  (cost=32296.64..74474.20 rows=1050772 width=40)
                                 Outer Scan: pt_2017  (cost=28540.80..66891.11 rows=1050772 width=36)
                                 Outer Scan Filter: (ymd > '2017-01-01'::date)
                                 Depth 1: GpuHashJoin  (nrows 1050772...2521854)
                                          HashKeys: pt_2017.aid
                                          JoinQuals: (pt_2017.aid = t1.aid)
                                          KDS-Hash (size: 10.78MB)
                                 ->  Seq Scan on t1  (cost=0.00..1935.00 rows=100000 width=12)
                     ->  Parallel Custom Scan (GpuPreAgg)  (cost=65078.35..65082.35 rows=200 width=72)
                           Reduction: Local
                           Combined GpuJoin: enabled
                           ->  Parallel Custom Scan (GpuJoin) on pt_2018  (cost=32296.65..74465.75 rows=1050649 width=40)
                                 Outer Scan: pt_2018  (cost=28540.81..66883.43 rows=1050649 width=36)
                                 Outer Scan Filter: (ymd > '2017-01-01'::date)
                                 Depth 1: GpuHashJoin  (nrows 1050649...2521557)
                                          HashKeys: pt_2018.aid
                                          JoinQuals: (pt_2018.aid = t1.aid)
                                          KDS-Hash (size: 10.78MB)
                                 ->  Seq Scan on t1  (cost=0.00..1935.00 rows=100000 width=12)
                     ->  Parallel Custom Scan (GpuPreAgg)  (cost=65093.03..65097.03 rows=200 width=72)
                           Reduction: Local
                           Combined GpuJoin: enabled
                           ->  Parallel Custom Scan (GpuJoin) on pt_2019  (cost=32296.65..74482.64 rows=1050896 width=40)
                                 Outer Scan: pt_2019  (cost=28540.80..66898.79 rows=1050896 width=36)
                                 Outer Scan Filter: (ymd > '2017-01-01'::date)
                                 Depth 1: GpuHashJoin  (nrows 1050896...2522151)
                                          HashKeys: pt_2019.aid
                                          JoinQuals: (pt_2019.aid = t1.aid)
                                          KDS-Hash (size: 10.78MB)
                                 ->  Seq Scan on t1  (cost=0.00..1935.00 rows=100000 width=12)
(40 rows)

クエリは同一なので、pt_2017, pt_2018, pt_2019 の3テーブルだけを読み出す事は共通だが、生成された実行計画が大きく異なる事が分かる。
パーティションの各子テーブルから読み出した内容を結合する Append より前に、GpuJoinおよびGpuPreAggが押し込まれ、特に GROUP BY での行数削減効果が大きいため、実際に Append で処理されるのは高々数百行程度に過ぎない事がわかる。
しかも、GpuJoinからGpuPreAgg間はCombined GpuJoin: enabledとあるので、ホストシステムを介さずにGPUバイスメモリ上でJOIN結果をGROUP BY処理へ受け渡す事になっている。つまり、ホストシステムで動作する Append とその後の GroupAggregate にとっては、パーティションに分割するレベルの大きさのデータであっても、実際に処理しなければならないのは(大半はGPUによって消し込まれているため)数百行程度にしかならないという事である。

パーティション設定を物理的なデータ配置と重ね合わせると面白そうな事になるのが分かる。
ホストシステムからはI/O拡張ボックスを使ってGPUSSDを接続するものとし、I/O拡張ボックスに搭載したSSDごとにパーティションを切るものとする。
そうすると、SCAN -> JOIN -> GROUP BY までの処理は、SSD-to-GPUダイレクトSQL実行を用いると一貫してI/O拡張ボックスで処理する事が可能となり、ホストシステムの負荷は極めて小さなものとなる。

PostgreSQL v10ではAppend配下の子テーブルの処理を並列に実行する事ができないため、複数のI/O拡張ボックスを備えていたとしても同時にアクティブになるのは1個だけだが、これはPostgreSQL v11で改善される。そのため、v11がリリースされる頃には、I/O拡張ボックスを増設すれば増設するだけ処理性能と容量を同時に拡張する事ができるというシステムが現実味を帯びる事になる。
現状、I/O拡張ボックスを使った性能測定ではSSDx2枚を用いて少なくとも4.8GB/sのスループットまで出る事は確認できている。ワークロードを選ぶことは確かだが、うまくハマればI/O拡張ボックスの数に比例してスケールするため、イマドキのハイエンドDWHのスペックである数十GB/sのクエリ処理性能も夢ではない。

現状、ひとまず実行計画でパーティションを意識するように追加の実行パスを生成するように修正を加えただけである。
まだ欠けている機能としては以下の2つ。

  • GpuJoinのRight-Tree側を何度も読まずに済むようにする。
  • バックグラウンドワーカがGPUを初期化する時に、SSDに近傍のものを選択する。

これは、上記のEXPLAINの例ではテーブル t1 が相当するものだが、GpuJoinが3個のパーティション子テーブル側にPush-downされた結果、テーブル t1 の読出しを3回も行う必要が出てきたというもの。
これはもう少し実装を頑張れば、例えばGpuJoin用のハッシュテーブルを共有メモリ上に展開するなどして、一度だけ読み出せば全てのバックグラウンドワーカがハッシュテーブルへの参照を共有できる。

また、PG-Strom v2.0では(設計単純化を目的として)各プロセスが同時に利用するGPUは一個だけとし、複数GPUを用いる場合には必然的にCPUパラレルクエリを前提とするという設計になった。そのため、プロセスがGPUを初期化する際に、自分がこれからスキャンしようとする子テーブルに(物理的に)近傍のGPUを選択する事がよい戦略となる。

この辺の課題を解決しつつ、今週リリース予定のPostgreSQL v11に合わせてパーティション機能への対応を強化していきたい。

スキャン速度10GB/sへの挑戦~その③~

ちょっと前(2017年10月)に以下のような記事を書いた。
kaigai.hatenablog.com

この時点では、SeqRead 2.2GB/s の Intel SSD 750(400GB) を3枚束ねて、理論帯域6.6GB/sに対してクエリ処理のスループット6.2GB/s程度までは能力を引き出す事ができていた。
データを受け取るGPUの側は使用率30-40%程度で動いているので、SSDからのデータ供給を増加させてやれば、まだまだ処理速度を上げられるハズである。

というワケで新しいSSDを調達してみた。Intel DC P4600である。

このデバイスは PCIe x4接続のNVMe-SSDとしては最速に近い SeqRead 3.2GB/s の能力を持っており、単純に3枚束ねれば 9.6GB/s 程度の読出しスループットが期待できる。
が、そうは問屋が下ろさなかった。

手元の評価機は Xeon E5-2650v4 (Broadwell) を搭載しているのだが、どうやら P2P DMA の転送レートが 7.2GB/s 近辺で頭打ちになってしまうらしく、ちょっと頭を抱えていた。*1

HPC用の特別なH/Wを除けば、一般的なx86サーバではCPUがPCIeのRoot Complexも兼ねているので、PCIeバス上を流れるパケットのルーティングに関わる性能問題であれば、どうもCPUが被疑なのでは・・・という気になってしまう。

そういった折、ちょうどTesla V100を入手できる事になり、ならばよい機会とGPUを搭載するサーバを一新する事にした。つまり、今回の記事は新しいサーバの自慢である。

SuperServer 1019GP-TT

調達したモデルはSuperMicro社の 1019GP-TT で、その他の搭載機器は以下のとおりである。

Chasis SuperMicro 1019GP-TT
CPU Intel Xeon Gold 6126T (2.6GHz, 12C)
RAM 192GB (DDR4-2666; 32GBx6)
GPU NVIDIA Tesla V100 (5120C, 16GB)
SSD Intel DC P4600 (2.0TB) x3
HDD 2.0TB(SATA; 7.2krpm)x6


こちらが届いたばかりの筐体。
このモデルは本来、左右中央にそれぞれPCIe x16スロットを持っており、フルハイト・フルレングスのGPUを最大2枚搭載する事ができる。中央のスロットはLow Profile用なので、おそらくInfinibandか100Gb EthernetのHBAでも搭載するという構成なのだろう。
これを、ライザカード(RSC-R1UG-2E8G-UP)を使う事で写真の向かって左側スロットをPCIe x16からPCIe x8+x8に分配している。このように組み替える事で、GPUに加えてSSDを3枚搭載するだけのスペースを確保する事ができる。
なお、向かって右側スロットをPCIe x8+x8構成にする事もできるのだが、この場合、必然的にGPUが左側に押し出され、電源ピン位置がデバイス冷却用のファンに近くなって配線がタイトになってしまう。ので、作業スペースを考えると左側の2slot化がベスト。


こちらが各デバイスを搭載した状態。

Star Schema Benchmarkによる計測と困惑

で、OS、CUDA、PostgreSQLとPG-Stromをインストールし、いつものStar Schema Benchmarkで性能測定を行ってみる。

実行中のI/O負荷を iostat で眺めてみると以下のような数字(約 8.5GB/s)が出ていた。
これは Raw-I/O での計測値とほぼ同じで、ベンチマークの中核であるlineorderテーブルのスキャンとJOIN/GROUP BYの実行に関しては、ほぼハードウェアの限界に近い所まで引き出せているのだろう。

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
           1.34    0.00    5.40    0.00    0.00   93.25

Device:            tps    MB_read/s    MB_wrtn/s    MB_read    MB_wrtn
nvme0n1       22701.50      2831.81         0.00       5663          0
nvme1n1       22709.00      2833.05         0.00       5666          0
nvme2n1       22727.50      2835.75         0.00       5671          0
sda               0.00         0.00         0.00          0          0
sdb               0.00         0.00         0.00          0          0
sdc               0.00         0.00         0.00          0          0
sdd               0.00         0.00         0.00          0          0
sde               0.00         0.00         0.00          0          0
sdf               0.00         0.00         0.00          0          0
dm-0              0.00         0.00         0.00          0          0
md0           68146.50      8501.58         0.00      17003          0

で、DB-Size = 353GB ÷ クエリ実行時間で算出したクエリ処理スループットは以下の通り。どんっ!

クエリ実行計画の作成やCUDAコンテキストの初期化、あるいは(直接GPUで高速化とはいかない)GpuHashJoinのハッシュ表作成なども含む総SQL実行時間ベースで、(クエリ毎のばらつきがあるとはいえ)7.0GB/s~7.5GB/s程度の処理スループットを発揮できている。

ただし、素直にこの結果を歓迎できないのが、PG-Stromを無効化した状態、つまり素のPostgreSQLのスコアが以前にBroadwell(E5-2650v4)機で計測したものと比べてずいぶん遅くなっているのである。
単体の能力では劣るIntel SSD 750の3枚構成でも2.0~2.2GB/s程度は出ていたので、ちょっと奇妙な結果ではある。

もしかすると、件のCPU投機実行に関わるLinux kernelパッチ適用の有無で変わったりもするのか?*2と思って、CentOS 7.4リリース時点のカーネル(当然、対策パッチは入ってない)に戻して同じベンチマークを走らせてみたりもしたが、このスローダウンを説明できるほどのデグレードではなかった。

ちょっとこのままだと説明資料に載せるのを躊躇ってしまう数字ではあるので、追って調査する事にしたい。誰かPostgreSQLに詳しい人のコメント大歓迎。

*1:ただ、以前にHGST SN260をお借りして評価した時にはP2P DMAで9.5GB/sまで出た実績があるが、その後再現条件が不明になってしまっており、純粋にH/Wの制約とも考えにくいところがある。

*2:PostgreSQLと比べ、PG-Stromは一度のシステムコールで要求するSSD-to-GPUのデータ転送サイズが大きく、相対的にシステムコール呼び出し回数が少なくなる。

gstore_fdwの圧縮オプション

昨年11月に、GPUメモリをSQLで読み書きするための新機能『gstore_fdw』というものを実装した。*1

これは、PostgreSQLのFDW(Foreign Data Wrapper)の機能を利用して、SELECTが実行された時にはGPUメモリから読み出した内容をPostgreSQLの内部データ表現に変換して出力、逆にINSERTやDELETEの実行時にはGPUメモリを更新するというものであった。

ただ、PG-Strom v2.0のリリースに向けて、少しクセの強い制限事項があったため、もう一度全体的なデザインを見直す事としてみた。
制限事項というのは

  • UPDATEには非対応
  • DELETEは条件なしのみ対応

というもので、要は『GPUメモリへの書込みは可能だが、完全に空の gstore_fdw にINSERTでデータを流し込むだけ』というシロモノである。
GPU側のデータストアは、PL/CUDA関数が単純配列であるかのようにアクセスしたいので行単位のMVCC可視性チェックは不可。
なので、行単位で異なるxmin/xmaxが混在するような状況はよろしくない。。。。という設計意図であった。

新しい実装では、UPDATEや条件付きDELETEを可能にするために、トランザクションのコミットまではホストメモリ上のバッファでデータを保持しておき、コミット時のコールバック処理で、DELETEした行を消去/INSERTした行を追加したGPUメモリのイメージを作成し、それをGPU側に転送するという構造に変更した。*2
そのため、GPU側には一時的に旧バージョン/新バージョンの2つのバッファが混在してしまう事になるが、あるトランザクションIDを持つ人からはどちらか片方しか見えないので、整合性は保たれている。

ちなみに、未コミット状態で gstore_fdw をPL/CUDA関数の引数に積もうとすると、CPU側で最新のイメージを作ってこれをGPUに転送してからPL/CUDA関数のGPUカーネルを起動する事になるので、あまりお勧めはしない。データのロードが終わったら、一度 commit してからPL/CUDA関数を実行する事をお勧めする。

この辺の改造に合わせて、もう一つ、内部的なデータ構造の持ち方を変えてみた。
圧縮フォーマットのサポートである。

圧縮オプション

外部テーブル(Foreign Table)にはFDWに固有のオプションを指定する事が可能で、gstore_fdwの場合は以下のオプションをサポートしている。最後の 'compression' オプションが今回追加したもの。

オプション名 対象 説明
pinning テーブル データを配置するGPUを番号で指定する。必須。
format テーブル GPU上のデータ形式を指定する。現在は 'pgstrom' (デフォルト)のみ対応。
compression カラム 可変長データを保存する時に圧縮を行う。現在は圧縮方式として 'pglz' のみ対応。

FDWの仕組み上、PostgreSQLとデータのやり取りをするときだけはデータ表現をPostgreSQLの内部形式に直してやる必要があるが、内部的なデータ表現はドライバが任意に決めてよい。

現在のデフォルトである 'pgstrom' 形式の場合、データ領域の先頭から単純配列のデータが順番に並んでいるような構造になっている。固定長データの場合はこれで十分であるし、列指向のデータ配置によってGPUのメモリバスの利用効率を最大限に引き出す事ができる。
可変長データ(textや配列など)の場合、少し工夫が必要で、データ領域の先頭から順に 32bit のインデックスがN個並んでいる。このインデックスは可変長データのバッファ(extra buffer)を指しており、可変長データの実体はこちらのバッファに保持されている事になる。

勘のいい人ならお気付きの通り、この構造は元々辞書圧縮を包含している。そのため、内容の重複する要素が多数存在する場合は一個の可変長データを複数の行から共有できるため、例えば複数のテーブルをJOINして正規化の崩れた(= 冗長度の高い)データであっても効率的に格納する事ができる。

ただ、扱う問題の種類によっては、それ以外にも効率的なデータの持ち方があり得る。
例えば、以前にPL/CUDAの検証で利用した化合物の特徴データや、あるいは機械学習の領域で使われるような特徴ベクトルは、基本的には疎行列である事が多い。つまり、ほとんどが 0 でちょびっと非零のデータが存在する。

こういったデータであれば、PostgreSQLが内部的に使用している LZ 圧縮形式でも十分に高い圧縮効率を実現できるはずである。

早速、GPU上のデータストアのサイズが圧縮の有無でどの程度変わるのかを調べてみる事にした。

-- GPUデータストア(圧縮オプションなし)
CREATE FOREIGN TABLE ft_flat (
    id int,
    signature smallint[]
)
SERVER gstore_fdw OPTIONS(pinning '0');

-- GPUデータストア(圧縮オプションあり)
CREATE FOREIGN TABLE ft_comp (
    id int,
    signature smallint[] OPTIONS (compression 'pglz'))
)
SERVER gstore_fdw OPTIONS(pinning '0');

データの詳細はお知らせできないが、これは実際にR&Dの現場で使われているもので、2048次元の特徴ベクトルの中に平均で10~20個程度の非零の要素が存在する。*3

まず、圧縮オプションなしの場合。

test=# INSERT INTO ft_flat(SELECT id,signature FROM source_db);
LOG:  alloc: preserved memory 3110899992 bytes
INSERT 0 1000000

3110899992 bytes = 約3.0GB のデバイスメモリを消費している。
確認してみると、クエリ実行前はこうだったのが

$ nvidia-smi
Thu Jan 11 11:01:08 2018
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 387.26                 Driver Version: 387.26                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P40           Off  | 00000000:04:00.0 Off |                    0 |
| N/A   41C    P0    52W / 250W |    179MiB / 22912MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     24010      C   ...bgworker: PG-Strom GPU memory keeper      161MiB |   <--- 初期状態
+-----------------------------------------------------------------------------+

こうなっている

$ nvidia-smi
Thu Jan 11 11:01:18 2018
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 387.26                 Driver Version: 387.26                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P40           Off  | 00000000:04:00.0 Off |                    0 |
| N/A   41C    P0    52W / 250W |   3147MiB / 22912MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     24010      C   ...bgworker: PG-Strom GPU memory keeper     3129MiB |   <--- デバイスメモリ確保済
+-----------------------------------------------------------------------------+

次に、圧縮オプションありの場合。

test=# INSERT INTO ft_comp(SELECT id,signature FROM source_db);
LOG:  alloc: preserved memory 158944912 bytes
INSERT 0 1000000

特徴ベクトルである signature が非常に sparse な構造を持っているため、158944912 bytes = 151MB にまでデータを圧縮できている事が分かる。

$ nvidia-smi
Thu Jan 11 11:18:04 2018
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 387.26                 Driver Version: 387.26                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P40           Off  | 00000000:04:00.0 Off |                    0 |
| N/A   41C    P0    52W / 250W |    331MiB / 22912MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     24199      C   ...bgworker: PG-Strom GPU memory keeper      313MiB |   <--- 少ないメモリ消費
+-----------------------------------------------------------------------------+

現状、最もデバイスメモリ搭載量の多い NVIDIA Tesla P40 でも 24GB(ECCに少し取られるので実際に使えるのは22GBちょい)しかデバイスメモリを持っていないので、この比率だと、非圧縮では100万件 → 1000万件にデータが増えた時点で破綻するが、圧縮の場合はまだまだ余裕がある事になる。
もちろんデータの特性によるので、これが一般化してどの程度データ圧縮が効くのかを断言はできないが、多くの場合に sparse な行列/ベクトルを扱うという事らしいので、一つの効果的な機能ではある事だろう。

PostgreSQLの可変長データ形式

PostgreSQLの可変長データ形式は元々圧縮をサポートしており、今回、gstore_fdw の圧縮オプションもこれを踏襲している。

可変長データを参照する時、まず先頭バイトの最下位ビットを参照する。
ここが1なら "short format" と呼ばれる形式で、データ長が1~126バイトの比較的短い可変長データの表現か、あるいは逆に、外部のtoastテーブルを参照するためのオブジェクトIDが格納される事になる。
最下位ビットが0の場合、ヘッダは32bitの "long format" と呼ばれる形式で、次のビットが0か1かで圧縮の有無を判定する。残りの30bitで可変長データの長さを表現するため、PostgreSQLにおける可変長データの最大長は1GBとなる。
圧縮ありの場合、ここの30bitに格納されているのは『圧縮後』のイメージの長さで、次の32bitで圧縮前のデータ長を、残りの領域が圧縮済みのイメージである。
このデータ形式だけを見ると、データ圧縮さえ行えば4GBまで表現できそうなものだが、PostgreSQLが内部的に使っているメモリアロケーション関数が 1GB でサイズチェックを行っている箇所が多々あり、実際にはうまく動かないだろう。

圧縮済みのデータも、実際にテーブルから読み出し、PostgreSQLの外部へ出力する際には透過的に展開され、ユーザからは圧縮/非圧縮の差があるようには見えない。
世の中には sparse な行列/ベクトルの表現に特化した専用のデータ構造もあるが、データ形式PostgreSQL と揃えておくことでの利便性を優先した。

*1:http://kaigai.hatenablog.com/entry/2017/11/12/092355

*2:同時に2つ以上のトランザクションが更新処理を走らせる場合、このデザインはうまく機能しないが、gstore_fdwの更新系処理は完全に排他的である。同時実行性能は低下するが、そもそも gstore_fdw の想定するワークロードではほとんど必要ではないので。

*3:作者のご好意で評価に使わせていただきました。ありがとうございます。

PostgreSQL v11新機能先取り:Hash-PartitioningとParallel-Append

今回のエントリーは PostgreSQL Advent Calendar 2017 - Qiita に参加しています。

PG-Stromの視点からも、PostgreSQL v11には首を長くして待っていた機能が2つ入っている。

その1:Hash-Partitioning
github.com

その2:Parallel-Append
github.com

Hash-Partitioningというのは、PostgreSQL v10で追加されたテーブルパーティショニング機能の拡張で、日付時刻などの幅(Range)でパーティション化を行うのではなく、レコードの値をハッシュ関数に通して得られた値を元に、振り分ける先の子テーブルを選択して書き込みを行うというもの。

特徴としては、データの母集団が特異なものでない限り*1、各子テーブルへの書込みは均等に平準化されることになる。これは後で説明する通り、子テーブルのスキャンを並列にできれば、ストレージ帯域を有効に活用する事ができる非常に有益な特性である。


例えば、キー値 'hoge' のハッシュ値が 123 であった場合、これを4で割ると余り3なのでテーブル 't3' に、キー値 'monu' のハッシュ値が 234 であった場合、同様にこれを4で割ると余り3なのでテーブル 't0' に格納されるという流れである。

それでは、試してみる事にする。

まず、テーブルスペースを作成する。
評価用サーバにNVMe-SSDを3枚装着していたので、とりあえず、これを使う事にする。

postgres=# create tablespace nvme1 location '/opt/nvme1';
CREATE TABLESPACE
postgres=# create tablespace nvme2 location '/opt/nvme2';
CREATE TABLESPACE
postgres=# create tablespace nvme3 location '/opt/nvme3';
CREATE TABLESPACE

続いて、パーティションの親テーブルを作成する。partition by hashというのが、Hash-Partitioningを指定するためのオプション。

postgres=# create table t (id int, x real, y real, z real, sum text) partition by hash (id);

パーティションの子テーブルを6つ作成。それぞれ、テーブルスペースあたり2個のテーブルをホストするように設定している。
modulesというのがハッシュ値の分母となる数。remainerというのが計算結果の値である。

postgres=# create table t_0 partition of t for values with (modulus 6, remainder 0) tablespace nvme1;
CREATE TABLE
postgres=# create table t_1 partition of t for values with (modulus 6, remainder 1) tablespace nvme1;
CREATE TABLE
postgres=# create table t_2 partition of t for values with (modulus 6, remainder 2) tablespace nvme2;
CREATE TABLE
postgres=# create table t_3 partition of t for values with (modulus 6, remainder 3) tablespace nvme2;
CREATE TABLE
postgres=# create table t_4 partition of t for values with (modulus 6, remainder 4) tablespace nvme3;
CREATE TABLE
postgres=# create table t_5 partition of t for values with (modulus 6, remainder 5) tablespace nvme3;
CREATE TABLE

さくっとデータを流し込んでみる。

postgres=# insert into t (select x,100*random(), 100*random(), 100*random(), md5(x::text)
                            from generate_series(1,600000000) x);
INSERT 0 600000000

postgres=# \d+
                   List of relations
 Schema | Name | Type  | Owner  |  Size   | Description
--------+------+-------+--------+---------+-------------
 public | t_0  | table | kaigai | 8057 MB |
 public | t_1  | table | kaigai | 8057 MB |
 public | t_2  | table | kaigai | 8056 MB |
 public | t_3  | table | kaigai | 8055 MB |
 public | t_4  | table | kaigai | 8056 MB |
 public | t_5  | table | kaigai | 8056 MB |
(6 rows)

各テーブルに均等に書き込まれているのが分かる。

Parallel Append による並列処理

では、スキャン時にこれがどのように処理されるのかを見てみる事にする。

まず、CPUパラレルがない場合。

postgres=# set max_parallel_workers_per_gather = 0;
SET
postgres=# explain select count(*),avg(x) from t where y < z;
                                 QUERY PLAN
----------------------------------------------------------------------------
 Aggregate  (cost=14685571.72..14685571.73 rows=1 width=16)
   ->  Append  (cost=0.00..13685571.40 rows=200000064 width=4)
         ->  Seq Scan on t_0  (cost=0.00..2281105.20 rows=33335925 width=4)
               Filter: (y < z)
         ->  Seq Scan on t_1  (cost=0.00..2281087.50 rows=33335667 width=4)
               Filter: (y < z)
         ->  Seq Scan on t_2  (cost=0.00..2280943.70 rows=33333565 width=4)
               Filter: (y < z)
         ->  Seq Scan on t_3  (cost=0.00..2280662.70 rows=33329459 width=4)
               Filter: (y < z)
         ->  Seq Scan on t_4  (cost=0.00..2280901.60 rows=33332949 width=4)
               Filter: (y < z)
         ->  Seq Scan on t_5  (cost=0.00..2280870.70 rows=33332499 width=4)
               Filter: (y < z)
(14 rows)

この実行計画は Gather ノードを含んでいない。つまり、Appendは t_0、t_1、…、t_5 までの全件スキャンを順番に処理するという事を意味している。つまり、t_0とt_5が別のストレージに格納されていても、片方を読み出している時には他方は全く使われていない訳で、ストレージ分散の恩恵は全くない。

次に、CPUパラレルを有効にした場合。

postgres=# set max_parallel_workers_per_gather = 100;
SET
postgres=# explain select count(*),avg(x) from t where y < z;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=7400855.85..7400855.86 rows=1 width=16)
   ->  Gather  (cost=7400855.10..7400855.81 rows=7 width=40)
         Workers Planned: 7
         ->  Partial Aggregate  (cost=7399855.10..7399855.11 rows=1 width=40)
               ->  Parallel Append  (cost=0.00..7256997.91 rows=28571438 width=4)
                     ->  Parallel Seq Scan on t_0  (cost=0.00..1209593.31 rows=4762275 width=4)
                           Filter: (y < z)
                     ->  Parallel Seq Scan on t_1  (cost=0.00..1209583.93 rows=4762238 width=4)
                           Filter: (y < z)
                     ->  Parallel Seq Scan on t_2  (cost=0.00..1209507.67 rows=4761938 width=4)
                           Filter: (y < z)
                     ->  Parallel Seq Scan on t_3  (cost=0.00..1209358.67 rows=4761351 width=4)
                           Filter: (y < z)
                     ->  Parallel Seq Scan on t_4  (cost=0.00..1209485.37 rows=4761850 width=4)
                           Filter: (y < z)
                     ->  Parallel Seq Scan on t_5  (cost=0.00..1209468.96 rows=4761786 width=4)
                           Filter: (y < z)
(17 rows)

Gatherノードの下に、PostgreSQL v11で新たにサポートされた Parallel Append が出現している。
Gatherノードはワーカープロセスを起動する役割を担っており、ここで起動されたワーカープロセスが、Parallel Appendノード以下のどこかのテーブルスキャンを担うようになる。
そうすると、t_0とt_5は互いに別々のストレージに格納されているので、読出し処理を互いに並列に実行する事ができるようになる。

実際に走らせてみると、以下の通り。158.4sec → 27.6sec へと高速化しているのが分かる。

postgres=# \timing on
Timing is on.

postgres=# set max_parallel_workers_per_gather = 0;
SET
postgres=# select count(*),avg(x) from t where y < z;
   count   |       avg
-----------+------------------
 300006886 | 50.0030065651906
(1 row)

Time: 158412.708 ms

postgres=# select count(*),avg(x) from t where y < z;
   count   |       avg
-----------+------------------
 300006886 | 50.0030065651742
(1 row)

Time: 27597.358 ms

PostgreSQLのshared_buffersは最小限に設定。OSキャッシュは実行前にクリアしています。

iostatで見てみると、こんな感じになっている。
■ CPU並列なしの場合

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
           3.61    0.00    0.56    0.00    0.00   95.83

Device:            tps    MB_read/s    MB_wrtn/s    MB_read    MB_wrtn
nvme0n1           0.00         0.00         0.00          0          0
nvme1n1        5011.00       313.12         0.00        626          0
nvme2n1           0.00         0.00         0.00          0          0
sda               0.00         0.00         0.00          0          0

■ CPU並列ありの場合

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          23.66    0.00    3.16    4.71    0.00   68.47

Device:            tps    MB_read/s    MB_wrtn/s    MB_read    MB_wrtn
nvme0n1       10163.00       634.81         0.00       1269          0
nvme1n1       10354.00       647.02         0.07       1294          0
nvme2n1        8610.00       537.76         0.01       1075          0
sda               0.00         0.00         0.00          0          0

各バックグラウンドワーカーが均等にI/Oを発行している様子がわかる。

マルチGPU対応とSSD-to-GPUダイレクトSQL実行

この機能は、PG-Stromにとっても非常に重要な技術基盤になっている。

というのも、中核機能の一つであるSSD-to-GPUダイレクトSQL実行は、NVIDIA GPUDirect RDMAという基盤を用いて作られており、これには次のような制約があるからだ。

We can distinguish between three situations, depending on what is on the path between the GPU and the third-party device:

  • PCIe switches only
  • single CPU/IOH
  • CPU/IOH <-> QPI/HT <-> CPU/IOH

The first situation, where there are only PCIe switches on the path, is optimal and yields the best performance. The second one, where a single CPU/IOH is involved, works, but yields worse performance ( especially peer-to-peer read bandwidth has been shown to be severely limited on some processor architectures ). Finally, the third situation, where the path traverses a QPI/HT link, may be extremely performance-limited or even not work reliably.
GPUDirect RDMA :: CUDA Toolkit Documentation

つまり、マルチソケットシステムで異なるCPU配下に接続されているGPUSSDとの間では、GPUDirect RDMAのスループットが極端に低下するか動作しない、という事である*2

PostgreSQLのワーカープロセス上で動作するPG-Stromにとって、複数のGPUが搭載されていた場合でも、どのGPUを使用するか決定するというのはさほど難しい話ではない。基本的にはラウンドロビンで、例えば7プロセスでCPU並列処理を行う場合、4プロセスはGPU0を、3プロセスはGPU1を使用するといった形になる。

だが、SSDが絡むと少々厄介である。

データの格納されているテーブルを移動させるわけにはいかないので、SSD-to-GPUダイレクトSQL実行を使う場合には、スキャンしようとしているSSDと同じCPUソケットに接続されているGPUを使用する必要がある。
したがって、複数のGPUが搭載されていたとしても、Append処理がシーケンシャルである限り、スキャン対象のテーブルはシーケンシャルにしか選択されず、SSD-to-GPUダイレクトを実行中のGPUだけがアクティブで他のGPUは遊んでしまわざるを得ないという課題があった。

が、この制限は、Append処理がCPU並列に対応する事で解消される。

しかも、PostgreSQL v11では同時にHash-Partitioningがサポートされることに伴い、ストレージを跨ったデータの分散配置と、マルチGPU環境下におけるSSD-to-GPUダイレクトSQL実行を共存させる事ができるようになる。ブラボー!


PCIeバスの限界を越えたクエリ処理能力の実現に向けて

そもそも SSD-to-GPUダイレクトSQL実行というのはどういった機能であったかというと、SSD上のデータブロックをGPUに直接転送し、CPU/RAMへロードする前に不要なデータをそぎ落とす。これにより、CPUが処理すべきレコード数を減らし、あたかもI/Oが高速化されたかのように振る舞う、というものであった。

これは、典型的な集計系のSQLワークロードがWHERE句やGROUP BYによってデータ件数を大幅に減らす事ができるという特性に基づいている。

さて、世の中には PCIe バスを引き回して外部の拡張I/OボックスにGPUなどのカードを装着する事のできるデバイスが存在する。

ちょっとこれら各製品の対応状況を調べねばならないが、拡張I/Oボックス内にPCIeスイッチが搭載されており、拡張I/Oボックス内でSSD-to-GPUのデータ転送が完結できる製品であれば*3、データ転送の大部分をボックス内で消し込む事が可能であるはずである。

例えば、10億件のレコードをテーブルから読み出し、1000件の集計結果を出力するようなクエリであれば、拡張I/Oボックス⇒ホストシステム間の帯域はほとんど問題にはならない*4

そうすると、拡張I/Oボックス一個あたりGPUx1とSSDx2を詰め込んで、一台増設すれば8.0~10.0GB/s程度の性能向上と数TBの容量拡張を行えるソリューションという形にする事もそうそう突飛な話ではなくなる。

ラックサーバ単体だと、GPUSSDを搭載するスロットの物理的形状やPCIeレーン数の制約から1CPUあたり高々10GB/sが上限にならざるを得ないが、拡張I/Oボックスを使えば、これを軽々突破するクエリ処理能力というのが実現可能と考えられる。
しかも、アプリケーション視点から見ると、これは単純にたくさんのGPUSSDを搭載したPostgreSQLシステムという事になるので、分散トランザクションを意識する必要は全くない。

来年に向けて、面白くなってきたのではないだろうか。

*1:例えばハッシュ化すべき列が全て同じ値、とか

*2:で、これを回避するために、HPC向けなどのサーバではPCIeスイッチを介して全てのGPUが特定のルート配下で動作するように設計された製品がある。Supermicro SYS-4028GR-TRT2DELL PowerEdge C4130など。

*3:普通に考えたら、ボックス内のPCIeレーン数よりアップリンクのPCIeレーン数が少ないので、スイッチが入ってるはず。

*4:まぁ、書き込み遅いと困るのでPCIe x4くらいはほしいけど。

GTCJapan雑感と、ちょっとした思い付き。

12月12日(火)~13日(水)にかけて、お台場のヒルトンホテルでGPU Technology Conference Japanが開催された。(関係者の皆様、お疲れさまでした!)
www.gputechconf.jp

当方の出番は、初日夕方の INCEPTION AI Startup Summit というスタートアップ19社のプレゼン大会と、2枚が採択されたポスターセッション。

前者は各社10分の持ち時間で自社の紹介を行うという趣向で、時間が短い事もあり、あまり欲張らずに我々の中核技術の一つ:SSD-to-GPUダイレクトSQL実行(と、今後の方向性としてSSD上のRow->Column変換)を紹介するという体裁でプレゼンテーションした。

www.slideshare.net

その結果、最優秀賞を頂いてしまった。(オジサンマジビビル)

www.nikkei.com

これを日経の記者さんに取り上げて頂き、人生二度目の新聞沙汰に*1
まだ立ち上げて半年のスタートアップに注目を頂いたり、何かの折に表彰されたりというのは大変ありがたい事ではあるが、今の段階では、まだ我々はプロダクトのリリースにすらたどり着いていない段階のひよっ子である。舞い上がることなく、着実に実行すべきすることを前に進めていきたい。
多少ぶっちゃけて言えば、審査員の皆さんは基本的にVCの方と聞いている。我々のプロダクトに実際にお金を払っていただけるエンドユーザ様の評価ではないという事は冷静に受け止める必要があるだろう。

もう一つ。ポスターセッションの方では、5月のサンノゼでの発表を基本的には踏襲(ベンチマーク等取り直してるが)した『PL/CUDAによる類似化合物検索』『SSD-to-GPUダイレクトSQL実行』の2テーマを投稿。前者が Top-5 Finalist に選出された。

サンノゼのGTCの作法と同様に、来場者による投票でBest Posterが選ばれるのだが、5月のリベンジで今回は Best Poster Award をいただく事ができた。
応援して頂いた皆様、ポスターをご覧頂いた皆様に、この場を借りて御礼申し上げます。


自分の出番以外には、初日の成瀬さん Volta GPU と CUDA 9 のセッションに通しで出席。
割と新しい情報がちょくちょく混ざっているので油断できない。

備忘録代わりに残しておくと、

VoltaではL2のAtomic演算が速い
これはGROUP BYのパフォーマンスに影響がでる可能性が高い。GROUP BYを実装するGpuPreAggでは、合計値やMax/Minを求めるためにグルーピングキーの値ごとに異なる場所に確保した中間結果に対してガンガンatomicAdd()を行っていくというのが基本動作だが。
もちろん、グローバルメモリへの負荷を下げるため、最大限、共有メモリ上のatomic演算を行うようにしているが、それでも溢れる場合にはグローバルメモリへのatomicを行わざるを得ないので。

もう一つはMPSの改善PostgreSQLのようにパラレルクエリが並列プロセスによって実装されている場合、普通にCUDAを使うだけだと、ある時点でGPUを使用できるのは1プロセスだけ(時分割)になってしまうの。そのため、GPUの使用率を上げるため*2にはMPSを使って、マルチプロセスからのCUDA APIコールをプロキシする必要がある。
これは、以前のオレオレ実装GPU Serverと同じように、各クライアント間のメモリプロテクションが無かったが、Volta以降では他のプロセスが確保したメモリ領域にちょっかいが出せないようになっているらしい。
その他、Volta世代では一個のMPSインスタンスで扱えるクライアント数が16→48に増加したり、一部のSMしか使わないケース*3では複数のGPU kernelを投入できるようになったりと、かなりSQLワークロードに嬉しい改善が入っている。

思い付きネタ:意外にPCIe拡張Boxって使えるかも?

今回、展示品で見たかったモノは、ELSA Japanさんが展示されていたH3 Platform社のこの製品。

というのも、NVMeとSSDを外部の筐体に同居できて、さらにP2P DMAが筐体内で完結するような構成になっているのであれば、ホストシステムのPCIeレーン数の上限を越えたスループットでクエリ処理が可能になるのでは?という着想があったので。

PG-StromのSSD-to-GPUダイレクトSQL実行を使うと、ストレージ(NVMe-SSD)からGPUへのデータサイズは生データそのままの大きさを転送しなければならないが、典型的なバッチ・レポーティングの処理で使われるSCAN->JOIN->GROUP BYという流れでクエリが処理される場合には、GROUP BY後のレコード数しか書き戻されない。

これはどういう事かというと、例えばテーブルから10億行くらいを読みだしたとしても、最終的にGROUP BYで1000行程度に集約されるようなタイプのワークロードであれば、大半のデータはGPUの段階で消し込まれてしまうという事になる。

で、普通のx86_64サーバのPCIeスロットにNVMe-SSDGPUを両方搭載する場合、どうしても、電源容量、スロット数、スロット形状、スロット位置など物理的に考慮しなければならない要素が沢山ある上、CPUが直接PCIeバスを制御する場合にP2P DMAのPCIeパケットを転送する能力にも限界が見えてくる。
だが、PCIe拡張ボックス側でP2P DMAのルーティングを行い、かつ、大半のデータを消し込めるのであれば、PCIe拡張ボックス→ホストシステムへのUpLinkデータ転送能力はそれほど問題にならず、しかも、ユーザの持つデータ量に応じて段階的にハードウェアを増設できるというメリットも併せ持つことになる。

さらに、である。
データベース屋さんにとって、複数のデータベースノードを並べて互いに相矛盾しない状態を保証する分散トランザクションは、基本的に小難しい技術である。少なくとも、ノード数が1かそれ以上かというのは、データベースの運用管理上、あるいはアプリケーションの開発にとって非常に大きな違いであるが、このようにPCIe拡張ボックスを使用してソフトウェア側からはPCIeバスの延長にGPUやNVMe-SSDが見える状態にあり、かつ、SSD-to-GPUダイレクトSQL実行によってUpLinkへのデータ転送を抑制できるのであれば、分散トランザクションや傷害切り分けの小難しいあれやこれやを考える事なく大量データの処理を行う事が可能になるのではなかろうか。

もちろん、これはPCIeを直接引き延ばすソリューションだけでなく、いったんPCIeパケットをEthernetパケットに変換してリモートのI/O拡張ボックスへ飛ばすようなタイプのソリューションでも同様に適用できると思う。(なぜなら、ホスト⇔I/O拡張ボックス間の帯域はさほど問題ではないからだ)

ちなみにこの話のオチであるが、世の中のPostgreSQL関連サービス(保守サポートなど)を手がけておられる事業者の方は、多くの場合、ホストシステムのノード数やCPUコア数によってサポート費/サブスクリプション費が変わるという料金体系を取られている事が多い。
しかし、こういった形でシステムの処理能力を拡張されてしまうと、最低限の費用で従前のDWHやクラスタシステム並みの処理能力を実現できてしまう事となう。

・・・え?ヘテロDBの場合は?ウチは一応、GPU台数に比例したサブスクリプション費をチャージするという形で製品リリースを準備中でございます。(ブーブー

*1:初回は大学院生の頃(←研究しろ)、2001年参議院選挙の茨城選挙区候補者公開討論会を企画、実行した時に朝日新聞から取材を受けたことがある

*2:それ以外にも、GPUバイスの初期化のための時間が節約できるなどのご利益がある

*3:Voltaみたく80個もSMがあれば、20SM程度が最適、みたいなケースも多いと考えたのだろう。たぶん正解。

gstore_fdw: GPUメモリをSQLで読み書き、そして…。

昨年、PGconf.ASIAで発表したPL/CUDAによる創薬ワークロードの高速化実験のテーマであるが、
kaigai.hatenablog.com


実測したベンチマークを見ると、奇妙な傾向が見てとれる。
このワークロードにおける計算量は「Qの行数×Dの行数」であるので、Dの行数が同じ1000万行であるならば、Qの行数が1000のケース(22.6s)に比べ、Qの行数が10のケース(13.4s)の実行時間はもっと顕著に短時間でなければならない。
計算量が1/100なのに、実行時間は半分弱にしかなっていない。

実はこれは、化合物同志の類似度を計算するための時間だけでなく、PL/CUDA関数に与える引数をセットアップするための時間に12秒程度を要しており、アムダールの法則を引用するまでもなく、類似度の計算を高速化するだけでは処理速度はこれ以上伸びないのである。

PL/CUDA関数の引数として行列(float4など固定長データでnullを含まない2次元配列型で代用)を使用するとして、これがGB単位の大きさになってくると、全くI/Oを伴わなくてもバッファからデータを読み出し、これをCPUで整形していくというのはそれなりに面倒な作業である。
また、PostgreSQL可変長データ形式の制約により、1GBを越える大きさのデータは複数に分割しなければ受け渡しができないという問題がある。

これらの問題に対応するため、PL/CUDA関数へのデータの受け渡しに新しいアプローチを考えてみた。FDWを用いる方法である。

FDW(Foreign Data Wrapper)を利用する事で、PostgreSQL管理外のデータソースをあたかもテーブルであるかのように扱う事ができる。最も一般的なケースでは、リモートのPostgreSQLOracleなどのテーブルを、外部表(Foreign Table)としてローカルのテーブルと同様に読み書きするといった使われ方をしている。また、PostgreSQLのcontribパッケージに含まれる file_fdw モジュールは、CSVファイルを外部表として扱う事が可能で、適切なFDWドライバさえ介在すれば、データソースはRDBMSのテーブルである必要はない。

PG-Stromの新機能 gstore_fdw は、GPUメモリ上に獲得した領域をSELECTやINSERTを用いて読み書きするためのFDWドライバである。

例えば、以下の外部表 ft はreal型の列を10個持っており、形式 pgstrom*1で、GPU番号0のデバイス上にメモリ領域を獲得する。

CREATE FOREIGN TABLE ft (
    id int,
    x0 real,
    x1 real,
    x2 real,
    x3 real,
    x4 real,
    x5 real,
    x6 real,
    x7 real,
    x8 real,
    x9 real
) SERVER gstore_fdw OPTIONS (pinning '0', format 'pgstrom');

適当にデータを流し込んでみる。どうやら420MB程度割り当てたようだ。

postgres=# INSERT INTO ft (SELECT x, 100*random(), 100*random(), 100*random(),
                                     100*random(), 100*random(), 100*random(),
                                     100*random(), 100*random(), 100*random(),
                                     100*random() FROM generate_series(1,10000000) x);
LOG:  alloc: preserved memory 440000320 bytes
INSERT 0 10000000

データロードの前。CUDA Contextの使用するリソースで171MBだけ消費されている。

$ nvidia-smi
Sun Nov 12 00:03:30 2017
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 384.81                 Driver Version: 384.81                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P40           Off  | 00000000:02:00.0 Off |                    0 |
| N/A   36C    P0    52W / 250W |    171MiB / 22912MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     12438      C   ...bgworker: PG-Strom GPU memory keeper      161MiB |
+-----------------------------------------------------------------------------+

データロード後、171MB + 420MB = 591MB 消費されている。
この領域はPL/CUDA関数で参照する事ができるが、既にGPUメモリ上に留め置かれているので、呼び出し時に都度可変長データをセットアップする必要もなければ、データをGPUへ転送する必要もない。
これらは、冒頭の創薬ワークロードにおいてボトルネックとなっていた12秒に相当するものである。

$ nvidia-smi
Sun Nov 12 00:06:01 2017
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 384.81                 Driver Version: 384.81                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P40           Off  | 00000000:02:00.0 Off |                    0 |
| N/A   36C    P0    51W / 250W |    591MiB / 22912MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     12438      C   ...bgworker: PG-Strom GPU memory keeper      581MiB |
+-----------------------------------------------------------------------------+

もちろん、外部テーブルであるのでSELECT文を用いて参照する事ができる。

postgres=# SELECT id,x0,x2,x3,x4 FROM ft LIMIT 10;
 id |   x0    |   x2    |   x3    |   x4
----+---------+---------+---------+---------
  1 | 27.1045 |  69.411 |  76.857 | 31.9964
  2 | 41.0264 |  74.365 | 15.4363 | 48.7949
  3 | 33.3703 | 62.7309 | 25.8918 | 8.37674
  4 | 72.0098 | 9.89505 | 41.4208 | 82.9163
  5 | 18.0448 | 49.7461 | 92.4098 | 16.0444
  6 | 25.1164 | 37.0391 | 87.8474 | 62.0111
  7 | 34.9195 | 62.4359 | 44.8145 | 8.56609
  8 | 86.4162 | 46.8959 | 36.1623 | 96.1458
  9 | 99.3865 | 73.5715 | 36.4256 | 36.2447
 10 | 70.0432 | 44.1223 | 32.4791 | 42.7989
(10 rows)

更新系の対応は以下の通りである。

  • 外部表が空の場合に限り、INSERTを実行可能。
  • UPDATEには非対応。
  • DELETEは条件句を伴わない場合に限り実行可能。
  • PostgreSQLが再起動した場合、内容は全て消える。

つまり、マスターとなるデータはPostgreSQL管理下のテーブルに保持しつつ、統計解析や機械学習のワークロードを実行する際に、SQLを用いて母集団を選択したり前処理を行った上で、GPUメモリにロードする事に特化したFDWモジュールである。

間違っても、飛んだら困るデータを格納してはならない。

gstore_fdwにロードしたデータは、内部的にはカラム毎にまとめて保持される。これは、GPUの特性上、同じタイミングで参照するデータ(つまり同じ列のデータ)が隣接領域に存在する方がメモリバスの性能を引き出しやすいため、列指向データ形式を選択したという理由がある。
もちろん、gstore_fdwの外部テーブルオプションには 'format' 指定が存在しているため、将来的には他のデータ形式(例えば、NumPyの行列表現など)を選択できるようにもしようと考えている。

可変長データの場合、先頭から(sizeof(uint) * N)バイトは、後段のExtra Buffer領域を参照するためのオフセット値になっている。
同じ内容のテキストが重複する場合など、複数のレコードが同一のExtra Buffer領域を参照する事となるので、結果として辞書圧縮が効いているのと同じ状態となる。
この設計は、複数のテーブルをJOINして正規化が崩れた状態のデータが gstore_fdw に投入される事を意図している。可変長データに関してはこのような内部表現を持つため、基本的には、冗長なデータを持つ事を過分に恐れる必要はない。

ちなみに、地味な特徴としてトランザクションにちゃんと(??)対応していたりもする。

postgres=# SELECT id,x0,x2,x3,x4 FROM ft LIMIT 10;
 id |   x0    |   x2    |   x3    |   x4
----+---------+---------+---------+---------
  1 | 27.1045 |  69.411 |  76.857 | 31.9964
  2 | 41.0264 |  74.365 | 15.4363 | 48.7949
  3 | 33.3703 | 62.7309 | 25.8918 | 8.37674
  :      :         :         :         :
  9 | 99.3865 | 73.5715 | 36.4256 | 36.2447
 10 | 70.0432 | 44.1223 | 32.4791 | 42.7989
(10 rows)

postgres=# BEGIN;
BEGIN
postgres=# DELETE FROM ft;
DELETE 10000000
postgres=# SAVEPOINT sv1;
SAVEPOINT
postgres=# INSERT INTO ft (SELECT x + 200, 100*random(), 100*random(), 100*random(),
                                           100*random(), 100*random(), 100*random(),
                                           100*random(), 100*random(), 100*random(),
                                           100*random() FROM generate_series(1,20) x);
LOG:  alloc: preserved memory 1200 bytes
INSERT 0 20

postgres=# SELECT id,x0,x2,x3,x4 FROM ft;
 id  |   x0    |   x2    |   x3    |   x4
-----+---------+---------+---------+---------
 201 | 48.2404 | 31.7422 | 38.0771 | 53.8396
 202 | 2.20366 | 56.2501 | 67.7503 |  43.756
 203 | 92.6225 | 61.5537 | 6.36337 | 80.5079
  :       :         :         :         :
 218 | 59.8691 | 88.3846 | 73.4542 | 99.4559
 219 | 2.13907 | 58.8645 | 23.5547 | 21.6422
 220 |  25.592 | 29.1767 | 24.9466 | 40.3255
(20 rows)

postgres=# ROLLBACK TO SAVEPOINT sv1;
ROLLBACK
postgres=# SELECT id,x0,x2,x3,x4 FROM ft;
 id | x0 | x2 | x3 | x4
----+----+----+----+----
(0 rows)

postgres=# ABORT;
LOG:  free: preserved memory at 0x10228800000
ROLLBACK
postgres=# SELECT id,x0,x2,x3,x4 FROM ft LIMIT 10;
 id |   x0    |   x2    |   x3    |   x4
----+---------+---------+---------+---------
  1 | 27.1045 |  69.411 |  76.857 | 31.9964
  2 | 41.0264 |  74.365 | 15.4363 | 48.7949
  3 | 33.3703 | 62.7309 | 25.8918 | 8.37674
  :      :         :         :         :
  9 | 99.3865 | 73.5715 | 36.4256 | 36.2447
 10 | 70.0432 | 44.1223 | 32.4791 | 42.7989
(10 rows)

では実際に、単純なPL/CUDA関数を作成し、実行してみる事にする。

CREATE OR REPLACE FUNCTION gstore_test(reggstore)
RETURNS float
AS
$$
#plcuda_begin
#plcuda_num_threads     gstore_fdw_height
  kern_data_store *kds = arg1.kds;
  int      i, ncols = kds->ncols;
  __shared__ double temp[1024];
  double   psum = 0.0;
  double   total;

  if (get_global_id() < kds->nitems)
  {
    for (i=0; i < ncols; i++)
    {
      kern_colmeta cmeta = kds->colmeta[i];
      Datum       datum;
      cl_bool     isnull;

      if (cmeta.atttypid == PG_FLOAT4OID)
      {
        datum = KDS_COLUMN_GET_VALUE(kds, i, get_global_id(),
                                     &isnull);
        if (!isnull)
          psum += __int_as_float(datum & 0xffffffff);
      }
    }
  }
  temp[get_local_id()] = psum;
  total = pgstromTotalSum(temp, get_local_size());
  if (get_global_id() == 0)
    retval->isnull = false;
  if (get_local_id() == 0)
    atomicAdd(&retval->value, total);
#plcuda_end
$$ LANGUAGE 'plcuda';

この人は、引数として与えられた gstore_fdw 表の中からreal型のカラムの内容をとにかく全部足して、その総和を返すというだけのPL/CUDA関数。
実行してみると、以下のような結果を返す。
平均値50のデータが10列×1000万件あるので、全部足したら50億ちょいというのは妥当なところだろう。

postgres=# select gstore_test('ft');
   gstore_test
------------------
 5000140834.18597
(1 row)

Time: 548.382 ms

統計解析・機械学習向けデータ管理の基盤として

さて、である。
ここまでの機能とCUDA APIこの辺の機能を一緒に使えば、統計解析・機械学習をやっている方にとって大変便利な機能として利用する事はできないだろうか?という所に考えが至った。

CUDAには以下のようなAPIがあり、あるプロセスが獲得したGPUバイスメモリを、別のプロセスと共有して利用する事ができる。

-- GPUデバイスメモリ上の領域の識別子を取得する
CUresult cuIpcGetMemHandle(CUipcMemHandle* pHandle,
                           CUdeviceptr dptr);

-- 取得した識別子を用いて、他プロセスに当該領域をマップする
CUresult cuIpcOpenMemHandle(CUdeviceptr* pdptr,
                            CUipcMemHandle handle,
                            unsigned int flags);

-- マップした領域を解放する
CUresult cuIpcCloseMemHandle(CUdeviceptr dptr);

この CUipcMemHandle というのは64byteのキーで、簡単にエクスポートする事ができる。

postgres=# select gstore_export_ipchandle('ft');
                        gstore_export_ipchandle
------------------------------------------------------------------------
 \xd00e220100000000963000000000000040df391a000000000000401a000000000000.
.000000000000000200000000000042000000000000002500d0c1ac00005c
(1 row)

gstore_fdwにロードしたデータ、つまり、既にGPUメモリに載った状態のデータをPythonやRなどスクリプトから直接使用する事ができれば、統計解析・機械学習に用いるデータの選択や前処理・後処理にSQLを使用する事ができる一方で、中核ロジックに関しては、データサイエンティストが使い慣れたPythonやRスクリプトを利用することできる。ゼロ・コピーなので、データの移動に関わるコストはほとんど省略する事ができる。

さらにSQLである事から、厳密な型チェックや整合性の検査を含める事ができ、これらは前処理における頭痛の種を相当程度に軽減してくれる事だろう。

加えて、PostgreSQLのData Federation機能と連携する事で、更に面白い構想に繋がってくる。
FDWの本来の使い方、つまり、リモートのPostgreSQLOracleデータベースのテーブルを読み書きする機能を用いて、データサイエンティストが作業で利用するワークステーションと、大量のデータを集積する役割のデータベースとを接続する。
(集積するデータの規模によっては、データベースを多段構成にするかもしれないが)

そうすると、一元管理によってデータの散逸を防ぎつつ、データサイエンティストが利用するデータの種類や母集団を選択したり、サマリ作成やデータ正規化といった前処理はワークステーションSQLを使用して実行*2し、機械学習のコア部分は使い慣れたRやPythonを利用して実行する事ができる。

これまで、In-database Analytics対応機能として、ユーザ定義関数(PL/CUDA)を使用して全てをSQLの世界で完結する事を考えていたが、適切なデータ形式に成形した上で、これを外部のスクリプトから共有・参照できるようにすれば、

  • 統計解析・機械学習ワークロードを実行するために、わざわざデータベースからデータをエクスポートさせない。
  • 中核アルゴリズムの前後で、母集団の選択やデータの前処理・後処理のためにSQLを使用した柔軟な記述を可能にする。

といった、従来からのコンセプトだけでなく

  • PythonやRといった、ユーザが使い慣れたツールを用いて In-database Analytics を実現できる。

という、新たな軸を打ち出せるような気がした。
この辺、自分はデータ解析を生業にしている訳ではないので、どの程度の価値がある機能なのかいま一つ想定し難いが、統計解析・機械学習といった領域で仕事をされている方の意見を伺ってみたいところではある。

今のところ、GPU上のデータ形式はPG-Stromの独自形式のみだが、Python向けにはNumPyとバイナリ互換な形式、R向けには行列とバイナリ互換な形式といった対応を拡充する事は検討している。

*1:現時点で唯一実装しているデータ形式

*2:PG-Stromが入っている前提なので、場合によってはSQL自体も高速化されることも

カスタムロジックをWHERE句で使う

しばらくSSD-to-GPUダイレクトSQL実行の開発にどっぷり時間を突っ込んでいたので、久々にPL/CUDAネタ。

この辺のネタや、
kaigai.hatenablog.com

この辺のネタで
kaigai.hatenablog.com

紹介したように、PG-Stromはユーザ定義関数をCUDA Cで記述するための機能(PL/CUDA)を持っており、これを使えば、データベースから読み出したデータをGPUに流し込み、GPU上でカスタムのロジックを実行した後、結果をまたSQLの世界へ戻すという事ができる。

この仕組みはPostgreSQL手続き言語ハンドラの機能を用いて実装されており、ユーザ定義のPL/CUDA関数が呼び出される毎に、手続き言語ハンドラが以下の処理を行う。

  1. ユーザ記述のCUDA Cコードブロックをテンプレートに埋め込んで、ビルド可能なソースコードを作成。
  2. NVRTC(NVIDIA Run-Time Compiler)を用いてソースコードをビルド。既にビルド済みのバイナリがキャッシュされていればそれを使用する。
  3. 関数の引数をGPUに転送する。(場合によっては数百MBの配列データである事も)
  4. ユーザの指定したスレッド数、ブロックサイズ、共有メモリ割当てなどのパラメータに従ってGPU Kernelを起動する。
  5. 予め設定しておいた結果バッファの内容をホスト側に書き戻す。
  6. 手続き言語ハンドラの戻り値として、GPUから戻された結果の値を返す。

これは、一個のGPU Kernelが巨大なマトリックスを引数に取り、数千~数万スレッドが協調動作をしながら処理を進めるようなタイプのワークロードに望ましい。典型的な利用シーンにおいては、SQL構文の中で数回呼び出され、一回あたりの応答時間が少なくとも数秒はあるといったタイプの呼び出し方である。

実際、以前に創薬の領域で類似化合物検索のロジックをPL/CUDA関数で作成した時も、PL/CUDA関数であるところのknn_gpu_similarity()が呼び出されるのは、以下のクエリ実行中に一回だけであった*1

PREPARE knn_sim_rand_10m_gpu_v2(int)	-- arg1:@k-value
AS
SELECT float4_as_int4(key_id) key_id, similarity
  FROM matrix_unnest((SELECT rbind(knn_gpu_similarity($1,Q.matrix,D.matrix))
                        FROM (SELECT cbind(array_matrix(id),
                                           array_matrix(bitmap)) matrix
                                FROM finger_print_query
                               LIMIT 99999) Q,
                             (SELECT matrix
                              FROM finger_print_10m_matrix) D))
    AS sim(key_id real, similarity real)
ORDER BY similarity DESC
LIMIT 1000;

ただ、こういった前提を置いてしまうと、例えば数億件のレコードをスキャンし、GPUで条件句を処理するといったシチュエーションでカスタムのロジックを使うのが難しくなってしまう。

そこで、PG-Stromのコードジェネレータに手を加え、いくつかの制約を満たす場合に限り、カスタムのPL/CUDA関数をWHERE句などで利用できるようにしてみた。

  • PL/CUDA関数が可変長データ(varlena)を返さない
  • PL/CUDA関数は単一のコードブロックしか持たない(prep/postブロックを持たない)
  • ブロックサイズは1、スレッド数も1
  • GPU shared memoryを要求しない
  • 作業バッファ(workbuf)、結果バッファ(resultbuf)を要求しない

これだと全くマルチスレッドを活用できないように見えるが、PG-Stromの場合、テーブルから64MB毎にデータブロックを切り出し、各レコード毎にGPUスレッドを立ち上げて並列にWHERE句を処理するため、結果として数千~数万スレッドで並列処理する事になる。

簡単なPL/CUDA関数を定義してみる。int型の引数を4つ取り、その合計を返すだけというシロモノ。

create function test_1(int,int,int,int)
returns int
as
$$
#plcuda_begin
  if (!arg1.isnull && !arg2.isnull && !arg3.isnull && !arg4.isnull)
  {
    retval->isnull = false;
    retval->value = arg1.value + arg2.value + arg3.value + arg4.value;
  }
#plcuda_end
$$ language 'plcuda';

で、WHERE句の中でこの関数を使ってみる。

postgres=# explain verbose select * from s0 where test_1(aid,bid,cid,did) < 50000;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Custom Scan (GpuScan) on public.s0  (cost=24031.02..223694.11 rows=2666680 width=40)
   Output: id, cat, aid, bid, cid, did, eid, fid, gid, hid
   GPU Filter: (test_1(s0.aid, s0.bid, s0.cid, s0.did) < 50000)
   Kernel Source: /opt/pgdata/base/pgsql_tmp/pgsql_tmp_strom_2118.2.gpu
(4 rows)

自動生成されたGPUコード*2を眺めてみると、WHERE句のCUDA Cに落とし込んだgpuscan_quals_evalの中で、PL/CUDA関数のロジックを実装した pgfn_plcuda_86452() が呼び出されている。
この関数の実行結果は、そのまま大小比較の演算 pgfn_int4lt に渡され、KPARAM_0 (定数 50000) との比較を行っている。

STATIC_FUNCTION(pg_int4_t)
pgfn_plcuda_86452(kern_context *kcxt,
                  pg_int4_t arg1, pg_int4_t arg2,
                  pg_int4_t arg3, pg_int4_t arg4)
{
  pg_int4_t __retval = { true, 0 };
  pg_int4_t *retval = &__retval;
  /* #plcuda_begin */
  if (!arg1.isnull && !arg2.isnull && !arg3.isnull && !arg4.isnull)
  {
    retval->isnull = false;
    retval->value = arg1.value + arg2.value + arg3.value + arg4.value;
  }

  /* #plcuda_end */
  return __retval;
}

STATIC_FUNCTION(cl_bool)
gpuscan_quals_eval(kern_context *kcxt,
                   kern_data_store *kds,
                   ItemPointerData *t_self,
                   HeapTupleHeaderData *htup)
{
  pg_int4_t KPARAM_0 = pg_int4_param(kcxt,0);
  pg_int4_t KVAR_3;
  pg_int4_t KVAR_4;
  pg_int4_t KVAR_5;
  pg_int4_t KVAR_6;
  char *addr;

  assert(htup != NULL);
  EXTRACT_HEAP_TUPLE_BEGIN(addr, kds, htup);
  EXTRACT_HEAP_TUPLE_NEXT(addr);
  EXTRACT_HEAP_TUPLE_NEXT(addr);
  KVAR_3 = pg_int4_datum_ref(kcxt,addr);
  EXTRACT_HEAP_TUPLE_NEXT(addr);
  KVAR_4 = pg_int4_datum_ref(kcxt,addr);
  EXTRACT_HEAP_TUPLE_NEXT(addr);
  KVAR_5 = pg_int4_datum_ref(kcxt,addr);
  EXTRACT_HEAP_TUPLE_NEXT(addr);
  KVAR_6 = pg_int4_datum_ref(kcxt,addr);
  EXTRACT_HEAP_TUPLE_END();

  return EVAL(pgfn_int4lt(kcxt, pgfn_plcuda_86452(kcxt, KVAR_3, KVAR_4,
                                                        KVAR_5, KVAR_6), KPARAM_0));
}

今回の例は非常に単純な計算をPL/CUDAで実装したもので、この程度の条件句の評価であればあまり恩恵はないだろう。
ただ、同じ仕組みを使う事で、機械学習による"推論"をWHERE句の条件評価に混ぜて使う事ができるようにもなるだろう。

そうすると、種々多様なソースから上がってきたデータをPostgreSQLに蓄積し、これをSSD-to-GPUダイレクトSQL実行を使ってサマリ作成・前処理を行った上で、従来型のPL/CUDA関数で実装した機械学習ロジックによりパターンを学習する。さらに、今回の新しいPL/CUDA関数の使い方で、前ステップで生成した学習モデルを引数とする"推論系"の関数をWHERE句に加えてテーブルをスキャンする事ができるようになる。

応用としては、例えば平常パターンで学習させたモデルを元に、異常値くさいレコードを検出するアノマリー検知など。
(NoSQL系DBやらあるとはいえ、)金融系なら間違いなく決済データをRDBMSで保持しているだろう。そうすると、クレジットカードの不正利用や、オレオレ詐欺など「普段と違う」振る舞いの検出をSQLを用いてin-databaseで実行できるようになる、という事を意味する。

*1:その一回の呼び出しが処理時間の大半を占めるというワークロードであるワケだが

*2:見やすくするため、一部改行追加