PostgreSQL v11新機能先取り:Hash-PartitioningとParallel-Append

今回のエントリーは PostgreSQL Advent Calendar 2017 - Qiita に参加しています。

PG-Stromの視点からも、PostgreSQL v11には首を長くして待っていた機能が2つ入っている。

その1:Hash-Partitioning
github.com

その2:Parallel-Append
github.com

Hash-Partitioningというのは、PostgreSQL v10で追加されたテーブルパーティショニング機能の拡張で、日付時刻などの幅(Range)でパーティション化を行うのではなく、レコードの値をハッシュ関数に通して得られた値を元に、振り分ける先の子テーブルを選択して書き込みを行うというもの。

特徴としては、データの母集団が特異なものでない限り*1、各子テーブルへの書込みは均等に平準化されることになる。これは後で説明する通り、子テーブルのスキャンを並列にできれば、ストレージ帯域を有効に活用する事ができる非常に有益な特性である。


例えば、キー値 'hoge' のハッシュ値が 123 であった場合、これを4で割ると余り3なのでテーブル 't3' に、キー値 'monu' のハッシュ値が 234 であった場合、同様にこれを4で割ると余り3なのでテーブル 't0' に格納されるという流れである。

それでは、試してみる事にする。

まず、テーブルスペースを作成する。
評価用サーバにNVMe-SSDを3枚装着していたので、とりあえず、これを使う事にする。

postgres=# create tablespace nvme1 location '/opt/nvme1';
CREATE TABLESPACE
postgres=# create tablespace nvme2 location '/opt/nvme2';
CREATE TABLESPACE
postgres=# create tablespace nvme3 location '/opt/nvme3';
CREATE TABLESPACE

続いて、パーティションの親テーブルを作成する。partition by hashというのが、Hash-Partitioningを指定するためのオプション。

postgres=# create table t (id int, x real, y real, z real, sum text) partition by hash (id);

パーティションの子テーブルを6つ作成。それぞれ、テーブルスペースあたり2個のテーブルをホストするように設定している。
modulesというのがハッシュ値の分母となる数。remainerというのが計算結果の値である。

postgres=# create table t_0 partition of t for values with (modulus 6, remainder 0) tablespace nvme1;
CREATE TABLE
postgres=# create table t_1 partition of t for values with (modulus 6, remainder 1) tablespace nvme1;
CREATE TABLE
postgres=# create table t_2 partition of t for values with (modulus 6, remainder 2) tablespace nvme2;
CREATE TABLE
postgres=# create table t_3 partition of t for values with (modulus 6, remainder 3) tablespace nvme2;
CREATE TABLE
postgres=# create table t_4 partition of t for values with (modulus 6, remainder 4) tablespace nvme3;
CREATE TABLE
postgres=# create table t_5 partition of t for values with (modulus 6, remainder 5) tablespace nvme3;
CREATE TABLE

さくっとデータを流し込んでみる。

postgres=# insert into t (select x,100*random(), 100*random(), 100*random(), md5(x::text)
                            from generate_series(1,600000000) x);
INSERT 0 600000000

postgres=# \d+
                   List of relations
 Schema | Name | Type  | Owner  |  Size   | Description
--------+------+-------+--------+---------+-------------
 public | t_0  | table | kaigai | 8057 MB |
 public | t_1  | table | kaigai | 8057 MB |
 public | t_2  | table | kaigai | 8056 MB |
 public | t_3  | table | kaigai | 8055 MB |
 public | t_4  | table | kaigai | 8056 MB |
 public | t_5  | table | kaigai | 8056 MB |
(6 rows)

各テーブルに均等に書き込まれているのが分かる。

Parallel Append による並列処理

では、スキャン時にこれがどのように処理されるのかを見てみる事にする。

まず、CPUパラレルがない場合。

postgres=# set max_parallel_workers_per_gather = 0;
SET
postgres=# explain select count(*),avg(x) from t where y < z;
                                 QUERY PLAN
----------------------------------------------------------------------------
 Aggregate  (cost=14685571.72..14685571.73 rows=1 width=16)
   ->  Append  (cost=0.00..13685571.40 rows=200000064 width=4)
         ->  Seq Scan on t_0  (cost=0.00..2281105.20 rows=33335925 width=4)
               Filter: (y < z)
         ->  Seq Scan on t_1  (cost=0.00..2281087.50 rows=33335667 width=4)
               Filter: (y < z)
         ->  Seq Scan on t_2  (cost=0.00..2280943.70 rows=33333565 width=4)
               Filter: (y < z)
         ->  Seq Scan on t_3  (cost=0.00..2280662.70 rows=33329459 width=4)
               Filter: (y < z)
         ->  Seq Scan on t_4  (cost=0.00..2280901.60 rows=33332949 width=4)
               Filter: (y < z)
         ->  Seq Scan on t_5  (cost=0.00..2280870.70 rows=33332499 width=4)
               Filter: (y < z)
(14 rows)

この実行計画は Gather ノードを含んでいない。つまり、Appendは t_0、t_1、…、t_5 までの全件スキャンを順番に処理するという事を意味している。つまり、t_0とt_5が別のストレージに格納されていても、片方を読み出している時には他方は全く使われていない訳で、ストレージ分散の恩恵は全くない。

次に、CPUパラレルを有効にした場合。

postgres=# set max_parallel_workers_per_gather = 100;
SET
postgres=# explain select count(*),avg(x) from t where y < z;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=7400855.85..7400855.86 rows=1 width=16)
   ->  Gather  (cost=7400855.10..7400855.81 rows=7 width=40)
         Workers Planned: 7
         ->  Partial Aggregate  (cost=7399855.10..7399855.11 rows=1 width=40)
               ->  Parallel Append  (cost=0.00..7256997.91 rows=28571438 width=4)
                     ->  Parallel Seq Scan on t_0  (cost=0.00..1209593.31 rows=4762275 width=4)
                           Filter: (y < z)
                     ->  Parallel Seq Scan on t_1  (cost=0.00..1209583.93 rows=4762238 width=4)
                           Filter: (y < z)
                     ->  Parallel Seq Scan on t_2  (cost=0.00..1209507.67 rows=4761938 width=4)
                           Filter: (y < z)
                     ->  Parallel Seq Scan on t_3  (cost=0.00..1209358.67 rows=4761351 width=4)
                           Filter: (y < z)
                     ->  Parallel Seq Scan on t_4  (cost=0.00..1209485.37 rows=4761850 width=4)
                           Filter: (y < z)
                     ->  Parallel Seq Scan on t_5  (cost=0.00..1209468.96 rows=4761786 width=4)
                           Filter: (y < z)
(17 rows)

Gatherノードの下に、PostgreSQL v11で新たにサポートされた Parallel Append が出現している。
Gatherノードはワーカープロセスを起動する役割を担っており、ここで起動されたワーカープロセスが、Parallel Appendノード以下のどこかのテーブルスキャンを担うようになる。
そうすると、t_0とt_5は互いに別々のストレージに格納されているので、読出し処理を互いに並列に実行する事ができるようになる。

実際に走らせてみると、以下の通り。158.4sec → 27.6sec へと高速化しているのが分かる。

postgres=# \timing on
Timing is on.

postgres=# set max_parallel_workers_per_gather = 0;
SET
postgres=# select count(*),avg(x) from t where y < z;
   count   |       avg
-----------+------------------
 300006886 | 50.0030065651906
(1 row)

Time: 158412.708 ms

postgres=# select count(*),avg(x) from t where y < z;
   count   |       avg
-----------+------------------
 300006886 | 50.0030065651742
(1 row)

Time: 27597.358 ms

PostgreSQLのshared_buffersは最小限に設定。OSキャッシュは実行前にクリアしています。

iostatで見てみると、こんな感じになっている。
■ CPU並列なしの場合

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
           3.61    0.00    0.56    0.00    0.00   95.83

Device:            tps    MB_read/s    MB_wrtn/s    MB_read    MB_wrtn
nvme0n1           0.00         0.00         0.00          0          0
nvme1n1        5011.00       313.12         0.00        626          0
nvme2n1           0.00         0.00         0.00          0          0
sda               0.00         0.00         0.00          0          0

■ CPU並列ありの場合

avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          23.66    0.00    3.16    4.71    0.00   68.47

Device:            tps    MB_read/s    MB_wrtn/s    MB_read    MB_wrtn
nvme0n1       10163.00       634.81         0.00       1269          0
nvme1n1       10354.00       647.02         0.07       1294          0
nvme2n1        8610.00       537.76         0.01       1075          0
sda               0.00         0.00         0.00          0          0

各バックグラウンドワーカーが均等にI/Oを発行している様子がわかる。

マルチGPU対応とSSD-to-GPUダイレクトSQL実行

この機能は、PG-Stromにとっても非常に重要な技術基盤になっている。

というのも、中核機能の一つであるSSD-to-GPUダイレクトSQL実行は、NVIDIA GPUDirect RDMAという基盤を用いて作られており、これには次のような制約があるからだ。

We can distinguish between three situations, depending on what is on the path between the GPU and the third-party device:

  • PCIe switches only
  • single CPU/IOH
  • CPU/IOH <-> QPI/HT <-> CPU/IOH

The first situation, where there are only PCIe switches on the path, is optimal and yields the best performance. The second one, where a single CPU/IOH is involved, works, but yields worse performance ( especially peer-to-peer read bandwidth has been shown to be severely limited on some processor architectures ). Finally, the third situation, where the path traverses a QPI/HT link, may be extremely performance-limited or even not work reliably.
GPUDirect RDMA :: CUDA Toolkit Documentation

つまり、マルチソケットシステムで異なるCPU配下に接続されているGPUSSDとの間では、GPUDirect RDMAのスループットが極端に低下するか動作しない、という事である*2

PostgreSQLのワーカープロセス上で動作するPG-Stromにとって、複数のGPUが搭載されていた場合でも、どのGPUを使用するか決定するというのはさほど難しい話ではない。基本的にはラウンドロビンで、例えば7プロセスでCPU並列処理を行う場合、4プロセスはGPU0を、3プロセスはGPU1を使用するといった形になる。

だが、SSDが絡むと少々厄介である。

データの格納されているテーブルを移動させるわけにはいかないので、SSD-to-GPUダイレクトSQL実行を使う場合には、スキャンしようとしているSSDと同じCPUソケットに接続されているGPUを使用する必要がある。
したがって、複数のGPUが搭載されていたとしても、Append処理がシーケンシャルである限り、スキャン対象のテーブルはシーケンシャルにしか選択されず、SSD-to-GPUダイレクトを実行中のGPUだけがアクティブで他のGPUは遊んでしまわざるを得ないという課題があった。

が、この制限は、Append処理がCPU並列に対応する事で解消される。

しかも、PostgreSQL v11では同時にHash-Partitioningがサポートされることに伴い、ストレージを跨ったデータの分散配置と、マルチGPU環境下におけるSSD-to-GPUダイレクトSQL実行を共存させる事ができるようになる。ブラボー!


PCIeバスの限界を越えたクエリ処理能力の実現に向けて

そもそも SSD-to-GPUダイレクトSQL実行というのはどういった機能であったかというと、SSD上のデータブロックをGPUに直接転送し、CPU/RAMへロードする前に不要なデータをそぎ落とす。これにより、CPUが処理すべきレコード数を減らし、あたかもI/Oが高速化されたかのように振る舞う、というものであった。

これは、典型的な集計系のSQLワークロードがWHERE句やGROUP BYによってデータ件数を大幅に減らす事ができるという特性に基づいている。

さて、世の中には PCIe バスを引き回して外部の拡張I/OボックスにGPUなどのカードを装着する事のできるデバイスが存在する。

ちょっとこれら各製品の対応状況を調べねばならないが、拡張I/Oボックス内にPCIeスイッチが搭載されており、拡張I/Oボックス内でSSD-to-GPUのデータ転送が完結できる製品であれば*3、データ転送の大部分をボックス内で消し込む事が可能であるはずである。

例えば、10億件のレコードをテーブルから読み出し、1000件の集計結果を出力するようなクエリであれば、拡張I/Oボックス⇒ホストシステム間の帯域はほとんど問題にはならない*4

そうすると、拡張I/Oボックス一個あたりGPUx1とSSDx2を詰め込んで、一台増設すれば8.0~10.0GB/s程度の性能向上と数TBの容量拡張を行えるソリューションという形にする事もそうそう突飛な話ではなくなる。

ラックサーバ単体だと、GPUSSDを搭載するスロットの物理的形状やPCIeレーン数の制約から1CPUあたり高々10GB/sが上限にならざるを得ないが、拡張I/Oボックスを使えば、これを軽々突破するクエリ処理能力というのが実現可能と考えられる。
しかも、アプリケーション視点から見ると、これは単純にたくさんのGPUSSDを搭載したPostgreSQLシステムという事になるので、分散トランザクションを意識する必要は全くない。

来年に向けて、面白くなってきたのではないだろうか。

*1:例えばハッシュ化すべき列が全て同じ値、とか

*2:で、これを回避するために、HPC向けなどのサーバではPCIeスイッチを介して全てのGPUが特定のルート配下で動作するように設計された製品がある。Supermicro SYS-4028GR-TRT2DELL PowerEdge C4130など。

*3:普通に考えたら、ボックス内のPCIeレーン数よりアップリンクのPCIeレーン数が少ないので、スイッチが入ってるはず。

*4:まぁ、書き込み遅いと困るのでPCIe x4くらいはほしいけど。

GTCJapan雑感と、ちょっとした思い付き。

12月12日(火)~13日(水)にかけて、お台場のヒルトンホテルでGPU Technology Conference Japanが開催された。(関係者の皆様、お疲れさまでした!)
www.gputechconf.jp

当方の出番は、初日夕方の INCEPTION AI Startup Summit というスタートアップ19社のプレゼン大会と、2枚が採択されたポスターセッション。

前者は各社10分の持ち時間で自社の紹介を行うという趣向で、時間が短い事もあり、あまり欲張らずに我々の中核技術の一つ:SSD-to-GPUダイレクトSQL実行(と、今後の方向性としてSSD上のRow->Column変換)を紹介するという体裁でプレゼンテーションした。

www.slideshare.net

その結果、最優秀賞を頂いてしまった。(オジサンマジビビル)

www.nikkei.com

これを日経の記者さんに取り上げて頂き、人生二度目の新聞沙汰に*1
まだ立ち上げて半年のスタートアップに注目を頂いたり、何かの折に表彰されたりというのは大変ありがたい事ではあるが、今の段階では、まだ我々はプロダクトのリリースにすらたどり着いていない段階のひよっ子である。舞い上がることなく、着実に実行すべきすることを前に進めていきたい。
多少ぶっちゃけて言えば、審査員の皆さんは基本的にVCの方と聞いている。我々のプロダクトに実際にお金を払っていただけるエンドユーザ様の評価ではないという事は冷静に受け止める必要があるだろう。

もう一つ。ポスターセッションの方では、5月のサンノゼでの発表を基本的には踏襲(ベンチマーク等取り直してるが)した『PL/CUDAによる類似化合物検索』『SSD-to-GPUダイレクトSQL実行』の2テーマを投稿。前者が Top-5 Finalist に選出された。

サンノゼのGTCの作法と同様に、来場者による投票でBest Posterが選ばれるのだが、5月のリベンジで今回は Best Poster Award をいただく事ができた。
応援して頂いた皆様、ポスターをご覧頂いた皆様に、この場を借りて御礼申し上げます。


自分の出番以外には、初日の成瀬さん Volta GPU と CUDA 9 のセッションに通しで出席。
割と新しい情報がちょくちょく混ざっているので油断できない。

備忘録代わりに残しておくと、

VoltaではL2のAtomic演算が速い
これはGROUP BYのパフォーマンスに影響がでる可能性が高い。GROUP BYを実装するGpuPreAggでは、合計値やMax/Minを求めるためにグルーピングキーの値ごとに異なる場所に確保した中間結果に対してガンガンatomicAdd()を行っていくというのが基本動作だが。
もちろん、グローバルメモリへの負荷を下げるため、最大限、共有メモリ上のatomic演算を行うようにしているが、それでも溢れる場合にはグローバルメモリへのatomicを行わざるを得ないので。

もう一つはMPSの改善PostgreSQLのようにパラレルクエリが並列プロセスによって実装されている場合、普通にCUDAを使うだけだと、ある時点でGPUを使用できるのは1プロセスだけ(時分割)になってしまうの。そのため、GPUの使用率を上げるため*2にはMPSを使って、マルチプロセスからのCUDA APIコールをプロキシする必要がある。
これは、以前のオレオレ実装GPU Serverと同じように、各クライアント間のメモリプロテクションが無かったが、Volta以降では他のプロセスが確保したメモリ領域にちょっかいが出せないようになっているらしい。
その他、Volta世代では一個のMPSインスタンスで扱えるクライアント数が16→48に増加したり、一部のSMしか使わないケース*3では複数のGPU kernelを投入できるようになったりと、かなりSQLワークロードに嬉しい改善が入っている。

思い付きネタ:意外にPCIe拡張Boxって使えるかも?

今回、展示品で見たかったモノは、ELSA Japanさんが展示されていたH3 Platform社のこの製品。

というのも、NVMeとSSDを外部の筐体に同居できて、さらにP2P DMAが筐体内で完結するような構成になっているのであれば、ホストシステムのPCIeレーン数の上限を越えたスループットでクエリ処理が可能になるのでは?という着想があったので。

PG-StromのSSD-to-GPUダイレクトSQL実行を使うと、ストレージ(NVMe-SSD)からGPUへのデータサイズは生データそのままの大きさを転送しなければならないが、典型的なバッチ・レポーティングの処理で使われるSCAN->JOIN->GROUP BYという流れでクエリが処理される場合には、GROUP BY後のレコード数しか書き戻されない。

これはどういう事かというと、例えばテーブルから10億行くらいを読みだしたとしても、最終的にGROUP BYで1000行程度に集約されるようなタイプのワークロードであれば、大半のデータはGPUの段階で消し込まれてしまうという事になる。

で、普通のx86_64サーバのPCIeスロットにNVMe-SSDGPUを両方搭載する場合、どうしても、電源容量、スロット数、スロット形状、スロット位置など物理的に考慮しなければならない要素が沢山ある上、CPUが直接PCIeバスを制御する場合にP2P DMAのPCIeパケットを転送する能力にも限界が見えてくる。
だが、PCIe拡張ボックス側でP2P DMAのルーティングを行い、かつ、大半のデータを消し込めるのであれば、PCIe拡張ボックス→ホストシステムへのUpLinkデータ転送能力はそれほど問題にならず、しかも、ユーザの持つデータ量に応じて段階的にハードウェアを増設できるというメリットも併せ持つことになる。

さらに、である。
データベース屋さんにとって、複数のデータベースノードを並べて互いに相矛盾しない状態を保証する分散トランザクションは、基本的に小難しい技術である。少なくとも、ノード数が1かそれ以上かというのは、データベースの運用管理上、あるいはアプリケーションの開発にとって非常に大きな違いであるが、このようにPCIe拡張ボックスを使用してソフトウェア側からはPCIeバスの延長にGPUやNVMe-SSDが見える状態にあり、かつ、SSD-to-GPUダイレクトSQL実行によってUpLinkへのデータ転送を抑制できるのであれば、分散トランザクションや傷害切り分けの小難しいあれやこれやを考える事なく大量データの処理を行う事が可能になるのではなかろうか。

もちろん、これはPCIeを直接引き延ばすソリューションだけでなく、いったんPCIeパケットをEthernetパケットに変換してリモートのI/O拡張ボックスへ飛ばすようなタイプのソリューションでも同様に適用できると思う。(なぜなら、ホスト⇔I/O拡張ボックス間の帯域はさほど問題ではないからだ)

ちなみにこの話のオチであるが、世の中のPostgreSQL関連サービス(保守サポートなど)を手がけておられる事業者の方は、多くの場合、ホストシステムのノード数やCPUコア数によってサポート費/サブスクリプション費が変わるという料金体系を取られている事が多い。
しかし、こういった形でシステムの処理能力を拡張されてしまうと、最低限の費用で従前のDWHやクラスタシステム並みの処理能力を実現できてしまう事となう。

・・・え?ヘテロDBの場合は?ウチは一応、GPU台数に比例したサブスクリプション費をチャージするという形で製品リリースを準備中でございます。(ブーブー

*1:初回は大学院生の頃(←研究しろ)、2001年参議院選挙の茨城選挙区候補者公開討論会を企画、実行した時に朝日新聞から取材を受けたことがある

*2:それ以外にも、GPUバイスの初期化のための時間が節約できるなどのご利益がある

*3:Voltaみたく80個もSMがあれば、20SM程度が最適、みたいなケースも多いと考えたのだろう。たぶん正解。

gstore_fdw: GPUメモリをSQLで読み書き、そして…。

昨年、PGconf.ASIAで発表したPL/CUDAによる創薬ワークロードの高速化実験のテーマであるが、
kaigai.hatenablog.com


実測したベンチマークを見ると、奇妙な傾向が見てとれる。
このワークロードにおける計算量は「Qの行数×Dの行数」であるので、Dの行数が同じ1000万行であるならば、Qの行数が1000のケース(22.6s)に比べ、Qの行数が10のケース(13.4s)の実行時間はもっと顕著に短時間でなければならない。
計算量が1/100なのに、実行時間は半分弱にしかなっていない。

実はこれは、化合物同志の類似度を計算するための時間だけでなく、PL/CUDA関数に与える引数をセットアップするための時間に12秒程度を要しており、アムダールの法則を引用するまでもなく、類似度の計算を高速化するだけでは処理速度はこれ以上伸びないのである。

PL/CUDA関数の引数として行列(float4など固定長データでnullを含まない2次元配列型で代用)を使用するとして、これがGB単位の大きさになってくると、全くI/Oを伴わなくてもバッファからデータを読み出し、これをCPUで整形していくというのはそれなりに面倒な作業である。
また、PostgreSQL可変長データ形式の制約により、1GBを越える大きさのデータは複数に分割しなければ受け渡しができないという問題がある。

これらの問題に対応するため、PL/CUDA関数へのデータの受け渡しに新しいアプローチを考えてみた。FDWを用いる方法である。

FDW(Foreign Data Wrapper)を利用する事で、PostgreSQL管理外のデータソースをあたかもテーブルであるかのように扱う事ができる。最も一般的なケースでは、リモートのPostgreSQLOracleなどのテーブルを、外部表(Foreign Table)としてローカルのテーブルと同様に読み書きするといった使われ方をしている。また、PostgreSQLのcontribパッケージに含まれる file_fdw モジュールは、CSVファイルを外部表として扱う事が可能で、適切なFDWドライバさえ介在すれば、データソースはRDBMSのテーブルである必要はない。

PG-Stromの新機能 gstore_fdw は、GPUメモリ上に獲得した領域をSELECTやINSERTを用いて読み書きするためのFDWドライバである。

例えば、以下の外部表 ft はreal型の列を10個持っており、形式 pgstrom*1で、GPU番号0のデバイス上にメモリ領域を獲得する。

CREATE FOREIGN TABLE ft (
    id int,
    x0 real,
    x1 real,
    x2 real,
    x3 real,
    x4 real,
    x5 real,
    x6 real,
    x7 real,
    x8 real,
    x9 real
) SERVER gstore_fdw OPTIONS (pinning '0', format 'pgstrom');

適当にデータを流し込んでみる。どうやら420MB程度割り当てたようだ。

postgres=# INSERT INTO ft (SELECT x, 100*random(), 100*random(), 100*random(),
                                     100*random(), 100*random(), 100*random(),
                                     100*random(), 100*random(), 100*random(),
                                     100*random() FROM generate_series(1,10000000) x);
LOG:  alloc: preserved memory 440000320 bytes
INSERT 0 10000000

データロードの前。CUDA Contextの使用するリソースで171MBだけ消費されている。

$ nvidia-smi
Sun Nov 12 00:03:30 2017
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 384.81                 Driver Version: 384.81                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P40           Off  | 00000000:02:00.0 Off |                    0 |
| N/A   36C    P0    52W / 250W |    171MiB / 22912MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     12438      C   ...bgworker: PG-Strom GPU memory keeper      161MiB |
+-----------------------------------------------------------------------------+

データロード後、171MB + 420MB = 591MB 消費されている。
この領域はPL/CUDA関数で参照する事ができるが、既にGPUメモリ上に留め置かれているので、呼び出し時に都度可変長データをセットアップする必要もなければ、データをGPUへ転送する必要もない。
これらは、冒頭の創薬ワークロードにおいてボトルネックとなっていた12秒に相当するものである。

$ nvidia-smi
Sun Nov 12 00:06:01 2017
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 384.81                 Driver Version: 384.81                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P40           Off  | 00000000:02:00.0 Off |                    0 |
| N/A   36C    P0    51W / 250W |    591MiB / 22912MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     12438      C   ...bgworker: PG-Strom GPU memory keeper      581MiB |
+-----------------------------------------------------------------------------+

もちろん、外部テーブルであるのでSELECT文を用いて参照する事ができる。

postgres=# SELECT id,x0,x2,x3,x4 FROM ft LIMIT 10;
 id |   x0    |   x2    |   x3    |   x4
----+---------+---------+---------+---------
  1 | 27.1045 |  69.411 |  76.857 | 31.9964
  2 | 41.0264 |  74.365 | 15.4363 | 48.7949
  3 | 33.3703 | 62.7309 | 25.8918 | 8.37674
  4 | 72.0098 | 9.89505 | 41.4208 | 82.9163
  5 | 18.0448 | 49.7461 | 92.4098 | 16.0444
  6 | 25.1164 | 37.0391 | 87.8474 | 62.0111
  7 | 34.9195 | 62.4359 | 44.8145 | 8.56609
  8 | 86.4162 | 46.8959 | 36.1623 | 96.1458
  9 | 99.3865 | 73.5715 | 36.4256 | 36.2447
 10 | 70.0432 | 44.1223 | 32.4791 | 42.7989
(10 rows)

更新系の対応は以下の通りである。

  • 外部表が空の場合に限り、INSERTを実行可能。
  • UPDATEには非対応。
  • DELETEは条件句を伴わない場合に限り実行可能。
  • PostgreSQLが再起動した場合、内容は全て消える。

つまり、マスターとなるデータはPostgreSQL管理下のテーブルに保持しつつ、統計解析や機械学習のワークロードを実行する際に、SQLを用いて母集団を選択したり前処理を行った上で、GPUメモリにロードする事に特化したFDWモジュールである。

間違っても、飛んだら困るデータを格納してはならない。

gstore_fdwにロードしたデータは、内部的にはカラム毎にまとめて保持される。これは、GPUの特性上、同じタイミングで参照するデータ(つまり同じ列のデータ)が隣接領域に存在する方がメモリバスの性能を引き出しやすいため、列指向データ形式を選択したという理由がある。
もちろん、gstore_fdwの外部テーブルオプションには 'format' 指定が存在しているため、将来的には他のデータ形式(例えば、NumPyの行列表現など)を選択できるようにもしようと考えている。

可変長データの場合、先頭から(sizeof(uint) * N)バイトは、後段のExtra Buffer領域を参照するためのオフセット値になっている。
同じ内容のテキストが重複する場合など、複数のレコードが同一のExtra Buffer領域を参照する事となるので、結果として辞書圧縮が効いているのと同じ状態となる。
この設計は、複数のテーブルをJOINして正規化が崩れた状態のデータが gstore_fdw に投入される事を意図している。可変長データに関してはこのような内部表現を持つため、基本的には、冗長なデータを持つ事を過分に恐れる必要はない。

ちなみに、地味な特徴としてトランザクションにちゃんと(??)対応していたりもする。

postgres=# SELECT id,x0,x2,x3,x4 FROM ft LIMIT 10;
 id |   x0    |   x2    |   x3    |   x4
----+---------+---------+---------+---------
  1 | 27.1045 |  69.411 |  76.857 | 31.9964
  2 | 41.0264 |  74.365 | 15.4363 | 48.7949
  3 | 33.3703 | 62.7309 | 25.8918 | 8.37674
  :      :         :         :         :
  9 | 99.3865 | 73.5715 | 36.4256 | 36.2447
 10 | 70.0432 | 44.1223 | 32.4791 | 42.7989
(10 rows)

postgres=# BEGIN;
BEGIN
postgres=# DELETE FROM ft;
DELETE 10000000
postgres=# SAVEPOINT sv1;
SAVEPOINT
postgres=# INSERT INTO ft (SELECT x + 200, 100*random(), 100*random(), 100*random(),
                                           100*random(), 100*random(), 100*random(),
                                           100*random(), 100*random(), 100*random(),
                                           100*random() FROM generate_series(1,20) x);
LOG:  alloc: preserved memory 1200 bytes
INSERT 0 20

postgres=# SELECT id,x0,x2,x3,x4 FROM ft;
 id  |   x0    |   x2    |   x3    |   x4
-----+---------+---------+---------+---------
 201 | 48.2404 | 31.7422 | 38.0771 | 53.8396
 202 | 2.20366 | 56.2501 | 67.7503 |  43.756
 203 | 92.6225 | 61.5537 | 6.36337 | 80.5079
  :       :         :         :         :
 218 | 59.8691 | 88.3846 | 73.4542 | 99.4559
 219 | 2.13907 | 58.8645 | 23.5547 | 21.6422
 220 |  25.592 | 29.1767 | 24.9466 | 40.3255
(20 rows)

postgres=# ROLLBACK TO SAVEPOINT sv1;
ROLLBACK
postgres=# SELECT id,x0,x2,x3,x4 FROM ft;
 id | x0 | x2 | x3 | x4
----+----+----+----+----
(0 rows)

postgres=# ABORT;
LOG:  free: preserved memory at 0x10228800000
ROLLBACK
postgres=# SELECT id,x0,x2,x3,x4 FROM ft LIMIT 10;
 id |   x0    |   x2    |   x3    |   x4
----+---------+---------+---------+---------
  1 | 27.1045 |  69.411 |  76.857 | 31.9964
  2 | 41.0264 |  74.365 | 15.4363 | 48.7949
  3 | 33.3703 | 62.7309 | 25.8918 | 8.37674
  :      :         :         :         :
  9 | 99.3865 | 73.5715 | 36.4256 | 36.2447
 10 | 70.0432 | 44.1223 | 32.4791 | 42.7989
(10 rows)

では実際に、単純なPL/CUDA関数を作成し、実行してみる事にする。

CREATE OR REPLACE FUNCTION gstore_test(reggstore)
RETURNS float
AS
$$
#plcuda_begin
#plcuda_num_threads     gstore_fdw_height
  kern_data_store *kds = arg1.kds;
  int      i, ncols = kds->ncols;
  __shared__ double temp[1024];
  double   psum = 0.0;
  double   total;

  if (get_global_id() < kds->nitems)
  {
    for (i=0; i < ncols; i++)
    {
      kern_colmeta cmeta = kds->colmeta[i];
      Datum       datum;
      cl_bool     isnull;

      if (cmeta.atttypid == PG_FLOAT4OID)
      {
        datum = KDS_COLUMN_GET_VALUE(kds, i, get_global_id(),
                                     &isnull);
        if (!isnull)
          psum += __int_as_float(datum & 0xffffffff);
      }
    }
  }
  temp[get_local_id()] = psum;
  total = pgstromTotalSum(temp, get_local_size());
  if (get_global_id() == 0)
    retval->isnull = false;
  if (get_local_id() == 0)
    atomicAdd(&retval->value, total);
#plcuda_end
$$ LANGUAGE 'plcuda';

この人は、引数として与えられた gstore_fdw 表の中からreal型のカラムの内容をとにかく全部足して、その総和を返すというだけのPL/CUDA関数。
実行してみると、以下のような結果を返す。
平均値50のデータが10列×1000万件あるので、全部足したら50億ちょいというのは妥当なところだろう。

postgres=# select gstore_test('ft');
   gstore_test
------------------
 5000140834.18597
(1 row)

Time: 548.382 ms

統計解析・機械学習向けデータ管理の基盤として

さて、である。
ここまでの機能とCUDA APIこの辺の機能を一緒に使えば、統計解析・機械学習をやっている方にとって大変便利な機能として利用する事はできないだろうか?という所に考えが至った。

CUDAには以下のようなAPIがあり、あるプロセスが獲得したGPUバイスメモリを、別のプロセスと共有して利用する事ができる。

-- GPUデバイスメモリ上の領域の識別子を取得する
CUresult cuIpcGetMemHandle(CUipcMemHandle* pHandle,
                           CUdeviceptr dptr);

-- 取得した識別子を用いて、他プロセスに当該領域をマップする
CUresult cuIpcOpenMemHandle(CUdeviceptr* pdptr,
                            CUipcMemHandle handle,
                            unsigned int flags);

-- マップした領域を解放する
CUresult cuIpcCloseMemHandle(CUdeviceptr dptr);

この CUipcMemHandle というのは64byteのキーで、簡単にエクスポートする事ができる。

postgres=# select gstore_export_ipchandle('ft');
                        gstore_export_ipchandle
------------------------------------------------------------------------
 \xd00e220100000000963000000000000040df391a000000000000401a000000000000.
.000000000000000200000000000042000000000000002500d0c1ac00005c
(1 row)

gstore_fdwにロードしたデータ、つまり、既にGPUメモリに載った状態のデータをPythonやRなどスクリプトから直接使用する事ができれば、統計解析・機械学習に用いるデータの選択や前処理・後処理にSQLを使用する事ができる一方で、中核ロジックに関しては、データサイエンティストが使い慣れたPythonやRスクリプトを利用することできる。ゼロ・コピーなので、データの移動に関わるコストはほとんど省略する事ができる。

さらにSQLである事から、厳密な型チェックや整合性の検査を含める事ができ、これらは前処理における頭痛の種を相当程度に軽減してくれる事だろう。

加えて、PostgreSQLのData Federation機能と連携する事で、更に面白い構想に繋がってくる。
FDWの本来の使い方、つまり、リモートのPostgreSQLOracleデータベースのテーブルを読み書きする機能を用いて、データサイエンティストが作業で利用するワークステーションと、大量のデータを集積する役割のデータベースとを接続する。
(集積するデータの規模によっては、データベースを多段構成にするかもしれないが)

そうすると、一元管理によってデータの散逸を防ぎつつ、データサイエンティストが利用するデータの種類や母集団を選択したり、サマリ作成やデータ正規化といった前処理はワークステーションSQLを使用して実行*2し、機械学習のコア部分は使い慣れたRやPythonを利用して実行する事ができる。

これまで、In-database Analytics対応機能として、ユーザ定義関数(PL/CUDA)を使用して全てをSQLの世界で完結する事を考えていたが、適切なデータ形式に成形した上で、これを外部のスクリプトから共有・参照できるようにすれば、

  • 統計解析・機械学習ワークロードを実行するために、わざわざデータベースからデータをエクスポートさせない。
  • 中核アルゴリズムの前後で、母集団の選択やデータの前処理・後処理のためにSQLを使用した柔軟な記述を可能にする。

といった、従来からのコンセプトだけでなく

  • PythonやRといった、ユーザが使い慣れたツールを用いて In-database Analytics を実現できる。

という、新たな軸を打ち出せるような気がした。
この辺、自分はデータ解析を生業にしている訳ではないので、どの程度の価値がある機能なのかいま一つ想定し難いが、統計解析・機械学習といった領域で仕事をされている方の意見を伺ってみたいところではある。

今のところ、GPU上のデータ形式はPG-Stromの独自形式のみだが、Python向けにはNumPyとバイナリ互換な形式、R向けには行列とバイナリ互換な形式といった対応を拡充する事は検討している。

*1:現時点で唯一実装しているデータ形式

*2:PG-Stromが入っている前提なので、場合によってはSQL自体も高速化されることも

カスタムロジックをWHERE句で使う

しばらくSSD-to-GPUダイレクトSQL実行の開発にどっぷり時間を突っ込んでいたので、久々にPL/CUDAネタ。

この辺のネタや、
kaigai.hatenablog.com

この辺のネタで
kaigai.hatenablog.com

紹介したように、PG-Stromはユーザ定義関数をCUDA Cで記述するための機能(PL/CUDA)を持っており、これを使えば、データベースから読み出したデータをGPUに流し込み、GPU上でカスタムのロジックを実行した後、結果をまたSQLの世界へ戻すという事ができる。

この仕組みはPostgreSQL手続き言語ハンドラの機能を用いて実装されており、ユーザ定義のPL/CUDA関数が呼び出される毎に、手続き言語ハンドラが以下の処理を行う。

  1. ユーザ記述のCUDA Cコードブロックをテンプレートに埋め込んで、ビルド可能なソースコードを作成。
  2. NVRTC(NVIDIA Run-Time Compiler)を用いてソースコードをビルド。既にビルド済みのバイナリがキャッシュされていればそれを使用する。
  3. 関数の引数をGPUに転送する。(場合によっては数百MBの配列データである事も)
  4. ユーザの指定したスレッド数、ブロックサイズ、共有メモリ割当てなどのパラメータに従ってGPU Kernelを起動する。
  5. 予め設定しておいた結果バッファの内容をホスト側に書き戻す。
  6. 手続き言語ハンドラの戻り値として、GPUから戻された結果の値を返す。

これは、一個のGPU Kernelが巨大なマトリックスを引数に取り、数千~数万スレッドが協調動作をしながら処理を進めるようなタイプのワークロードに望ましい。典型的な利用シーンにおいては、SQL構文の中で数回呼び出され、一回あたりの応答時間が少なくとも数秒はあるといったタイプの呼び出し方である。

実際、以前に創薬の領域で類似化合物検索のロジックをPL/CUDA関数で作成した時も、PL/CUDA関数であるところのknn_gpu_similarity()が呼び出されるのは、以下のクエリ実行中に一回だけであった*1

PREPARE knn_sim_rand_10m_gpu_v2(int)	-- arg1:@k-value
AS
SELECT float4_as_int4(key_id) key_id, similarity
  FROM matrix_unnest((SELECT rbind(knn_gpu_similarity($1,Q.matrix,D.matrix))
                        FROM (SELECT cbind(array_matrix(id),
                                           array_matrix(bitmap)) matrix
                                FROM finger_print_query
                               LIMIT 99999) Q,
                             (SELECT matrix
                              FROM finger_print_10m_matrix) D))
    AS sim(key_id real, similarity real)
ORDER BY similarity DESC
LIMIT 1000;

ただ、こういった前提を置いてしまうと、例えば数億件のレコードをスキャンし、GPUで条件句を処理するといったシチュエーションでカスタムのロジックを使うのが難しくなってしまう。

そこで、PG-Stromのコードジェネレータに手を加え、いくつかの制約を満たす場合に限り、カスタムのPL/CUDA関数をWHERE句などで利用できるようにしてみた。

  • PL/CUDA関数が可変長データ(varlena)を返さない
  • PL/CUDA関数は単一のコードブロックしか持たない(prep/postブロックを持たない)
  • ブロックサイズは1、スレッド数も1
  • GPU shared memoryを要求しない
  • 作業バッファ(workbuf)、結果バッファ(resultbuf)を要求しない

これだと全くマルチスレッドを活用できないように見えるが、PG-Stromの場合、テーブルから64MB毎にデータブロックを切り出し、各レコード毎にGPUスレッドを立ち上げて並列にWHERE句を処理するため、結果として数千~数万スレッドで並列処理する事になる。

簡単なPL/CUDA関数を定義してみる。int型の引数を4つ取り、その合計を返すだけというシロモノ。

create function test_1(int,int,int,int)
returns int
as
$$
#plcuda_begin
  if (!arg1.isnull && !arg2.isnull && !arg3.isnull && !arg4.isnull)
  {
    retval->isnull = false;
    retval->value = arg1.value + arg2.value + arg3.value + arg4.value;
  }
#plcuda_end
$$ language 'plcuda';

で、WHERE句の中でこの関数を使ってみる。

postgres=# explain verbose select * from s0 where test_1(aid,bid,cid,did) < 50000;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Custom Scan (GpuScan) on public.s0  (cost=24031.02..223694.11 rows=2666680 width=40)
   Output: id, cat, aid, bid, cid, did, eid, fid, gid, hid
   GPU Filter: (test_1(s0.aid, s0.bid, s0.cid, s0.did) < 50000)
   Kernel Source: /opt/pgdata/base/pgsql_tmp/pgsql_tmp_strom_2118.2.gpu
(4 rows)

自動生成されたGPUコード*2を眺めてみると、WHERE句のCUDA Cに落とし込んだgpuscan_quals_evalの中で、PL/CUDA関数のロジックを実装した pgfn_plcuda_86452() が呼び出されている。
この関数の実行結果は、そのまま大小比較の演算 pgfn_int4lt に渡され、KPARAM_0 (定数 50000) との比較を行っている。

STATIC_FUNCTION(pg_int4_t)
pgfn_plcuda_86452(kern_context *kcxt,
                  pg_int4_t arg1, pg_int4_t arg2,
                  pg_int4_t arg3, pg_int4_t arg4)
{
  pg_int4_t __retval = { true, 0 };
  pg_int4_t *retval = &__retval;
  /* #plcuda_begin */
  if (!arg1.isnull && !arg2.isnull && !arg3.isnull && !arg4.isnull)
  {
    retval->isnull = false;
    retval->value = arg1.value + arg2.value + arg3.value + arg4.value;
  }

  /* #plcuda_end */
  return __retval;
}

STATIC_FUNCTION(cl_bool)
gpuscan_quals_eval(kern_context *kcxt,
                   kern_data_store *kds,
                   ItemPointerData *t_self,
                   HeapTupleHeaderData *htup)
{
  pg_int4_t KPARAM_0 = pg_int4_param(kcxt,0);
  pg_int4_t KVAR_3;
  pg_int4_t KVAR_4;
  pg_int4_t KVAR_5;
  pg_int4_t KVAR_6;
  char *addr;

  assert(htup != NULL);
  EXTRACT_HEAP_TUPLE_BEGIN(addr, kds, htup);
  EXTRACT_HEAP_TUPLE_NEXT(addr);
  EXTRACT_HEAP_TUPLE_NEXT(addr);
  KVAR_3 = pg_int4_datum_ref(kcxt,addr);
  EXTRACT_HEAP_TUPLE_NEXT(addr);
  KVAR_4 = pg_int4_datum_ref(kcxt,addr);
  EXTRACT_HEAP_TUPLE_NEXT(addr);
  KVAR_5 = pg_int4_datum_ref(kcxt,addr);
  EXTRACT_HEAP_TUPLE_NEXT(addr);
  KVAR_6 = pg_int4_datum_ref(kcxt,addr);
  EXTRACT_HEAP_TUPLE_END();

  return EVAL(pgfn_int4lt(kcxt, pgfn_plcuda_86452(kcxt, KVAR_3, KVAR_4,
                                                        KVAR_5, KVAR_6), KPARAM_0));
}

今回の例は非常に単純な計算をPL/CUDAで実装したもので、この程度の条件句の評価であればあまり恩恵はないだろう。
ただ、同じ仕組みを使う事で、機械学習による"推論"をWHERE句の条件評価に混ぜて使う事ができるようにもなるだろう。

そうすると、種々多様なソースから上がってきたデータをPostgreSQLに蓄積し、これをSSD-to-GPUダイレクトSQL実行を使ってサマリ作成・前処理を行った上で、従来型のPL/CUDA関数で実装した機械学習ロジックによりパターンを学習する。さらに、今回の新しいPL/CUDA関数の使い方で、前ステップで生成した学習モデルを引数とする"推論系"の関数をWHERE句に加えてテーブルをスキャンする事ができるようになる。

応用としては、例えば平常パターンで学習させたモデルを元に、異常値くさいレコードを検出するアノマリー検知など。
(NoSQL系DBやらあるとはいえ、)金融系なら間違いなく決済データをRDBMSで保持しているだろう。そうすると、クレジットカードの不正利用や、オレオレ詐欺など「普段と違う」振る舞いの検出をSQLを用いてin-databaseで実行できるようになる、という事を意味する。

*1:その一回の呼び出しが処理時間の大半を占めるというワークロードであるワケだが

*2:見やすくするため、一部改行追加

スキャン速度10GB/sへの挑戦~その②~

一年半ほど前、次のようなエントリーを書いた。

kaigai.hatenablog.com

かいつまんで言うと、多段JOINのように、実際に実行してみないと結果行数が明らかではなく、かつ、ステップ(k+1)の最適な問題サイズがステップkの結果に依存する場合、Kepler以降のGPUでサポートされた Dynamic Parallelism を使えば素直に実装できるという話である。
実際、この時期以降のGpuJoinロジックはDynamic Parallelismを用いて実装されていた。

だが、プロファイラ等を用いて詳しく調べてみると、どうやら、ある程度以上のパフォーマンスでクエリを処理している状況においては、このような設計はGPUが持つ本来の能力を引き出す上で必ずしも適切ではないという事が明らかになった。

例えば、以下のグラフは半年ほど前にStar Schema Benchmark(SSBM)を用いてクエリ処理のスループットを計測したものであるが、理論帯域2.2GB/sのSSD*1を2枚用いているにも関わらず、3.8GB/s辺りで処理性能が頭打ちになってしまっている。13種類のクエリを実行しているが、ひどいものだと1.5GB/s近辺にまでパフォーマンスが落ち込んでいる。

この時に使用していたGPUはTesla K80という少し古いもの*2なので、その辺は少し割り引いて考える必要がある。ただ、この程度の領域でサチっていては10GB/sのクエリ処理スループットなど"夢のまた夢"になってしまうので、数ヵ月前、この辺をプロファイラを用いて詳しく調べてみた。

一見すると、非常に細かな単位でびっしりとGPU Kernelが実行されており、GPU使用率が100%に張り付いているのも納得できるように思う。
Dynamic Parallelismを使ってGpuJoinを書き直したあたりもそういう認識で、あまり深掘りしてはいなかった。

しかし、実は妙なのである。
GPU時間を100%使用している事になっているgpupreagg_mainという関数は、基本的には、Dynamic Parallelismを用いて別のGPU kernelを起動するだけの"軽い"関数で、それぞれ4.7%、3.9%のGPU時間を使っているサブカーネルの方が実際のGROUP BY/集約演算の処理を行っている。

プロファイラで採取したタイムラインを拡大してみると、謎が解ける。
gpupreagg_mainがサブカーネルの実行完了を待っている間、この人がGPUで同時実行可能なGPU kernelをガメてしまい、その分、JOINやGROUP BYといったSQLワークロードを実際に処理するためのカーネルがまるで多重処理されていないのである。

なぜこれほどサブカーネルの同期処理に時間がかかってしまうのか。
Dynamic Parallelismを利用して起動したサブカーネルの完了をGPU内で待ち合わせる唯一の方法が、CUDAランタイム関数 cudaDeviceSynchronize() なのだが、この人は『Blocks until the device has completed all preceding requested tasks.』という動きをする。
つまり、無関係な並行タスクも含め、現在進行中の全てのタスクがいったん完了するのを待つ、という事になる。
ガツンと巨大なタスクをぶん回すHPC的なワークロードではあまり困る事もないだろうが、せいぜい数msで完了する細切れのGPU kernelを非同期かつ大量に発行するPG-Stromのワークロードでは、明らかに同期ポイントまでの遅延時間が長くなり、結果として、無駄にGPUを占有する時間も長くなってしまうように思える。

という事があり、ここ2ヵ月ほど大規模なPG-Stromのリファクタリングに取り組んでいた。

Dynamic Parallelismベースで実装した過去のGpuJoinやGpuPreAggは一旦お破算とし、GPUカーネル内で同期ポイントを持つ事なくJOINやGROUP BYのワークロードを実行するような実装へと作り替えた。
(先日のエントリ『GpuJoinの結果バッファ問題を考える。 - KaiGaiの俺メモ』などはその要素技術。)

で、再度改めてSSBMでベンチマークを行ってみた結果が以下の通り。じゃん。

システム構成は以下の通り。同じく理論帯域2.2GB/sのSSDを3枚搭載しているため、合計で6.6GB/sまでは読出しの帯域を持っている事になる。

  • Server: Supermicro 5018GR-T
  • CPU: Xeon E5-2650v4 (2.2GB, 12C) x1
  • RAM: 128GB (16GB DDR4-2133, ECC; x8)
  • GPU: NVIDIA Tesla P40 (3840C, 24GB) x1
  • SSD: Intel SSD 750 400GB x3
  • OS: CentOS 7.3 (kernel: 3.10.0-693.2.2)
  • SW: CUDA 8.0 (driver: 384.66)
  • DB: PostgreSQL 9.6.5, PG-Strom 2.0devel

それぞれ合計で3回ずつクエリを実行し、その中央値を結果として採用した。
スループットは以下のように導出している『(lineorderテーブルサイズ[MB])/(クエリ実行時間[sec])』

SSD-to-GPUダイレクトSQL実行を使った場合、物理限界6.6GB/sに対して、おおむね6.0GB/s前後のクエリ処理スループットを発揮できている。また、半年前の計測ではサチっていたQ4_1やQ4_2といったクエリでも他と変わらないパフォーマンスを出せている。
一方で、ノーマルのPostgreSQLの場合は2.0GB/s前後の処理スループットに落ち着いている。

今回は、手持ち機材の関係で物理限界 6.6GB/s までの計測となったが、このCPU自体は9.5GB/s~9.7GB/s近辺まではP2P DMAでデータ転送できることが分かっているので、新しいSSDを入手する事ができれば、処理スループット上限の更新に取り組んでみたい。
このベンチマーク実行中のGPU使用率は概ね35%~40%程度。まだ余裕はある。

*1:Intel SSD 750 [400GB]

*2:特に、Kepler世代のGPUはGROUP BYや集約演算で多用するAtomic演算が遅い

GpuJoinの結果バッファ問題を考える。

GPUSQLのJOIN処理を実装する場合、一つ悩ましい問題は、JOINの結果生成されるレコード数は実際に実行してみなければ正確には分からないという点である。
JOINを処理した結果、行数が減る事もあれば増える事もある。減るパターンはまだ良いとして、時として結果が膨れ上がってしまう事も想定しなければならない*1

PG-Stromの持っているGpuJoinの実装をざっくり説明すると、GPUの持つ数百~数千というコア数を最大限に活かすべく、複数行を一度にピックアップして被結合側(INNER)テーブルとJOIN処理を行い、その結果生成された行を結果バッファに書き込むという構造になっている。

前提条件は、INNERテーブルのサイズが十分に小さくGPU RAM上に載る事と、OUTER側のテーブルサイズは相対的に大きい事。これは典型的なStar Schema構造を意識した設計である。
基本となるJOINアルゴリズムはHash-Joinで、(場合によっては複数の)INNERテーブルはハッシュ表にまとめられ、事前にGPU RAMへロードされる。次に、サイズの大きなOUTERテーブルを(64MB程度の)チャンクに区切ってGPU RAMへロードし、図の例ではt1、t2とのJOINを行った上で、その結果を結果バッファに書き込む。


この時、JOINによってレコード数が膨れ上がってしまうと、結果バッファのサイズが足りなくなってしまう。
例えば、GPU RAMにロードした64MBのチャンクのうち、先頭から40MB辺りまでのレコードを処理した時点で結果バッファが満杯になってしまったら、これまでは対処のしようがなかった。
一旦、GPU KernelからCPU側へエラーを返し、新しくより大きな結果バッファを割り当てて再実行するというのがバッファ不足時のリカバリで、これは間違いなく遅い。
そのため、統計情報などを元にできる限り再実行を起こさないバッファサイズの推定を行ってはいたが、必ず外れ値は存在するし、マージンを大きくすればGPU RAMを無駄に消費してしまう*2


本件とは別に、実は9月の頭からGPU使用率を高くする上で問題となっていたGPU kernel内の同期ポイント(cudaDeviceSynchronize)を削除するためのリファクタリングを行っており、その過程でGpuJoinの内部構造を状態マシンのような形に変えていた。
これはこれで、Dynamic Parallelismを使って起動したSub-kernelの実行待ちで無駄にGPUを占有する事が無くなってめでたしめでたしなのだが、もう一つ、状態マシンという事は、内部状態を保存し、後でリストアすれば同じ場所から再開できるという事に気が付いた。
全く別の目的で行っていたリファクタリングが、期せずしてGpuJoinのSuspend/Resume機能を実装するために役立ってしまったという事である。


したがって、結果バッファが満杯になり、これ以上書き込めないという状態になったら、GpuJoinのGPU Kernelを一度サスペンドしてやり、CPU側で新しいバッファを獲得した上で、実行途中のGpuJoinを再開してやればよいという話に変わったワケである。

で、GpuJoinの結果バッファというのは、実はGROUP BYや集約関数をGPUで実行するGpuPreAggの入力バッファにもなったりする。

何個かのテーブルをJOINし、その結果をGROUP BYによってごく少数の集約行にまとめる事で、多くの場合、データサイズは劇的に減少する。少なくともGPU RAMに対するプレッシャーは相当に軽減されることになる。

従来の設計では、あるOUTERテーブルのチャンクに対するGpuJoinの結果は一枚のバッファに書き込んだ後でないとGpuPreAggを起動する事ができなかった。
GpuJoinカーネルのSuspend/Resumeができるようになった事で、GpuJoinの結果バッファであり同時にGpuPreAggの入力バッファが満杯になったら、GpuJoinカーネルを一時停止させ、GROUP BY句による集計演算を行ってバッファを空にし、続きからGpuJoinとGpuPreAggを再開するという事が可能になる。

前回のエントリで、GpuScan+GpuJoin+GpuPreAggの3つのロジックを一発で処理する事でCPU~GPU間のデータ転送を劇的に減らすCombined Kernelのアイデアを紹介した。ここで地味に問題となってくるのが、GpuJoinが想定よりも多くのレコードを生成してバッファを使い尽くす事で、処理パイプラインのハザードが発生する事である。
だが、今回のGpuJoinリファクタリングによって、結果バッファの事前推定や使い過ぎに関連する問題は解決したことになる。

他にも非同期で動いているGPU Kernelがいる中でのバッファ再割り当てや再実行は、パフォーマンス上の問題があるだけでなく、なかなか見つけにくい/再現しにくいバグの温床であった訳で、この辺の原因を断つことができたのは、ソフトウェアの品質という観点からも非常に大きな進展である。

*1:もちろん、統計情報からある程度の傾向は掴めるが絶対ではない。また、データの分布が極端なケースでは統計情報はあまり役に立たない。

*2:もっとも、Pascal以降の世代でManaged Memoryを使った場合、物理ページはデマンドアロケーションなので、実際にページを参照しない限りGPU RAMは消費しない。

GpuJoin + GpuPreAgg combined kernel

以下のクエリは、t0とt1の2つのテーブルをJOINし、その結果をGROUP BYして出力するものである。
しかし、EXPLAIN ANALYZEの出力には奇妙な点がある。

postgres=# explain analyze
           select cat,count(*),avg(ax) from t0 natural join t1 group by cat;
                                        QUERY PLAN
--------------------------------------------------------------------------------
 GroupAggregate  (cost=955519.94..955545.74 rows=26 width=20)
                 (actual time=5964.955..5964.972 rows=26 loops=1)
   Group Key: t0.cat
   ->  Sort  (cost=955519.94..955523.12 rows=1274 width=44)
             (actual time=5964.943..5964.947 rows=26 loops=1)
         Sort Key: t0.cat
         Sort Method: quicksort  Memory: 28kB
         ->  Gather  (cost=955323.19..955454.23 rows=1274 width=44)
                     (actual time=5964.756..5964.914 rows=26 loops=1)
               Workers Planned: 7
               Workers Launched: 7
               ->  Parallel Custom Scan (GpuPreAgg)  (cost=954323.19..954326.83 rows=182 width=44)
                                                     (actual time=5596.730..5596.735 rows=3 loops=8)
                     Reduction: Local
                     GPU Projection: cat, pgstrom.nrows(), pgstrom.nrows((ax IS NOT NULL)), pgstrom.psum(ax)
                     Unified GpuJoin: enabled
                     ->  Parallel Custom Scan (GpuJoin) on t0  (cost=45544.82..840948.19 rows=100000000 width=12)
                                                               (never executed)
                           GPU Projection: t0.cat, t1.ax
                           Outer Scan: t0  (cost=0.00..976191.14 rows=14285714 width=8)
                                           (actual time=50.762..891.266 rows=100000000 loops=1)
                           Depth 1: GpuHashJoin  (plan nrows: 14285714...100000000, actual nrows: 100000000...0)
                                    HashKeys: t0.aid
                                    JoinQuals: (t0.aid = t1.aid)
                                    KDS-Hash (size plan: 10.78MB, exec: 16.00MB)
                           ->  Seq Scan on t1  (cost=0.00..1935.00 rows=100000 width=12)
                                               (actual time=0.010..26.491 rows=100000 loops=1)
 Planning time: 0.501 ms
 Execution time: 6008.393 ms
(22 rows)

GpuScanとGpuPreAggの間に挟まれたGpuJoinが(never executed)となっているのである。
これは、PostgreSQLのエグゼキュータから見た時に、GpuJoinが一度も呼び出されていないことを意味する。
ただ、その割には、OUTER側の t0 テーブルのスキャン、INNER側の t1 テーブルのスキャン(Seq Scan)はしっかり実行されている。

これは、GpuPreAggの直下にGpuJoinが存在している場合の特別な最適化で、GpuPreAggのGPUカーネルがJOINとGROUP BYの両方のタスクを一気に実行しており、GpuJoinを実行する必要がなかった事を意味している。
(但し、GpuPreAggへの入力として、t0およびt1テーブルを読み出す必要はある)


複数のテーブルを(場合によってはWHERE句付きで)スキャンし、それを何らかのキーで結合して、最終的にGROUP BYを使って集約するというのは非常によくある処理である。
PostgreSQL*1これを内部的にいくつかのステップに分解し、順を追って処理していく。

例えば、以下のような非常に単純なクエリの場合

SELECT cat, count(*), avg(x)
  FROM t0 JOIN t1 ON t0.id = t1.id
 WHERE y like ‘%abc%’
 GROUP BY cat;

先ず最初にSCANのロジックが動作する。幸い、WHERE句の条件は複数のテーブルに跨るものではないので、条件に合致しないレコードはこの時点で捨てられる。これによって、JOINすべき行数を減らすことができる。
次に、SCANの出力はJOINの入力となり、t0.id = t1.id という条件によって t0 テーブルと t1 テーブルを結合する。JOINの結果生成されたレコードは次のAggregation/GROUP BYの入力となり、ここでcat列の値ごとに集計され、行数とx値の平均が出力される。

WHERE句の評価、JOIN、GROUP BY/AggregationはそれぞれPG-StromがGPUで実行可能なワークロードであるが、PostgreSQLの実行計画におけるScan, Join, Aggを単純に置き換えるとどうなるか。

実はCPUとGPUの間でデータ転送のピンポンが発生してしまう。
CPUはまずストレージからデータを読み出し、これをDMAバッファにロードする*2
GpuScan kernel関数はWHERE句の評価を並列実行し、結果をDMAバッファに書き戻す。これは次いでGpuJoin kernel関数の入力になり、t0, t1テーブルの結合処理を並列実行し、同じく結果をDMAバッファに書き戻す。さらにこれは次のGpuPreAgg kernel関数の入力になり、GROUP BYと集計演算の結果をDMAバッファに書き戻す。最後に、CPUが最終ステージの集約を実行して、結果をユーザへ返却する。

いくら非同期処理とはいえ、これだけ何度もCPU⇔GPU間でデータ転送を行うと色々と辛い。

なので、PG-Stromには以前から OUTER SCAN Pull-up という仕組みを実装していた。

これは、比較的単純なWHERE句の評価を GpuJoin 側に取り込んで実行するもので、WHERE句の評価をくぐり抜けたレコードのみをJOIN処理の対象とする事で実現している。

スキャン処理(WHERE句評価)の仕組みは単純なので、例えば、GROUP BYの直下にスキャンがあるようなケース。
例えば以下のクエリのような場合、

SELECT cat, count(*), sum(x) FROM big_table;

同様にGpuPreAggもWHERE句の評価を取り込んで実行するという芸当が可能であった。


今回、新たに実装し、冒頭のEXPLAIN ANALYZE文で実行してみたのは、元々の(GpuScan + GpuJoin)をGpuPreAgg側に取り込み、SCAN→JOINを実行し、さらに集約演算を実行した結果を書き戻すという機能。

一般に、GROUP BYや集約演算の実行によって大規模なデータから非常に小さな集計結果を出力する事が期待されているため、元々のSCAN+JOINやSCAN+PreAggといったシンプルなワークロードの結合に比べると、転送すべきデータ量を削減する効果は非常に大きい事が期待できる。

実際、CUDAのプロファイラでタイムラインを採取してみると

MemCpy(HtoD)(CPU側からGPU側へのデータ転送)と、それを処理するGPU kernel関数の実行は非常に多く発生しているにも関わらず、MemCpy(DtoH)(GPU側からCPU側へのデータ転送)は、最後に一回だけ、しかも僅か 1.2kB が書き戻されているだけである。

このようにシンプルな集計処理であれば、SCAN→JOIN→GROUP BYを1回のGPU kernel呼び出しで済ませてしまう事で、PCIeバス上のデータ転送量を大幅に減らし、必要最小限のデータ転送で済ませる事ができるようになる。

なお、Visual Profilerで見た感じ、各タスクのスケジューリングにまだ相当改善できる余地が残っているので、この辺も追って改良を加えていきたい。

*1:別にPostgreSQLに限った事ではないと思うが

*2:SSD-to-GPUダイレクトSQL実行の事はここでは忘れてほしい