並列Aggregateに向けて

PostgreSQL Advent Calendar 2014に参加しています。


数日前、SimonがPgSQL-Hackersに面白いパッチを投げてきた。

曰く、

KaiGai, David Rowley and myself have all made mention of various ways we could optimize aggregates.
Following WIP patch adds an extra function called a "combining function", that is intended to allow the user to specify a semantically correct way of breaking down an aggregate into multiple steps.

Gents, is this what you were thinking? If not...

もしかすると、PostgreSQL v9.5に入るかもしれないこの機能の背景を少し解説してみたい。

PostgreSQLにおいて集約関数(Aggregate Function)は、二種類の関数呼び出しによって実装されている。

  1. Transiton Function
    • 一レコード分のデータを読み込み、それに基づいて内部状態を更新する。
  2. Final Function
    • その時点の内部状態から、結果となるスカラー値を生成する

f:id:kaigai:20141219214544p:plain

具体的に、ビルトインのAVG(float)のケースを見てみる事にする。

集約関数 AVG(float) は以下のようにfloat8[3]型の内部状態を持ち、transition functionにはfloat8_accumが、final functionにはfloat8_avgが指定されている。

  • float8 transvalues[3]
    • transvalues[0] ... N (入力データの個数)
    • transvalues[1] ... 入力データの和
    • transvalues[2] ... 入力データの自乗の和(標準偏差・分散の算出で使用)

つまり、float8_accumが呼び出される度にtransvalues[0]をインクリメント、transvalues[1]に入力値を加算して内部状態を更新し、最後にfloat8_avgが呼び出された時に transvalues[1] / transvalues[0] を計算して返せば、めでたしめでたし float 型平均値を得る事ができる。

話を元に戻そう。上のパッチで、Simonは新たにcombined functionと呼ばれる関数を集約関数の定義に加えようとしている。
これは何をするものか?

KaiGai Koheiがcombined functionの役割を説明して曰く、

Its concept is good to me. I think, the new combined function should be responsible to take a state data type as argument and update state object of the aggregate function. In other words, combined function performs like transition function but can update state object according to the summary of multiple rows. Right?

つまり、集約関数の内部状態を更新するのに、一行一行を読むのではなく、別の集約関数の内部状態を引数として受け取り、それに基づいて自らの集約関数の内部状態を更新するという役割を果たす事が期待されている。

そうすると、何が起こるか?

上の平均値を求めるロジックを考えてもらいたい。AVG(float)関数の内部状態とは、いわば『N=136、合計値=12345』といった情報だ。しかし、必ずしもこれを一行毎に内部状態を更新せねばならない理由は、ない。
それぞれ別個に計算された『N=136、合計値=12345』という内部状態と、『N=123、合計値=23456』という内部状態を足し合わせ、『N=259、合計値=35801』という内部状態を作っても一向に問題はないはずだ。このように、複数行の結果をサマリした"部分集約"とでも呼ぶものを後で合成する事を可能にするのが combined function である。

では、これで何が嬉しいのか?

以下のようなクエリを考えてみよう

SELECT AVG(s.sales_price), p.prod_id
  FROM production p JOIN sales s ON p.prod_id = s.prod_id
  GROUP BY p.prod_id;

2つのテーブルをJOINし、その結果に対して集約関数AVG()を適用する。
通常、この処理はJOIN処理を行った後、JOIN結果に対して一行ずつ集約関数(のtransition function)を適用する事で行われる。

が、このクエリを以下のように書き換えたとしても同一の結果を得られる。
ここで、主クエリのAVG()はcombined functionにより中間結果を足し合わせる挙動を取る、サブクエリのAVG()はfinal functionを呼び出さず中間結果を返すものとする。

SELECT AVG(s.sales_price), p.prod_id
  FROM
    production p
  JOIN
    (SELECT AVG(sales_price), prod_id FROM sales GROUP BY prod_id) s
  ON p.prod_id = s.prod_id
  GROUP BY p.prod_id;

例えば、productionテーブルに百万行、salesテーブルに十億行のレコードが含まれていた場合、INNER JOIN処理は十億行を結合し、次いでGROUP BY prod_idはグループ毎に平均で1000行を集約する事になる。
一方、先にサブクエリで部分集約を作成し、これを後でJOINする場合、結合しなければならない行数は百万行にすぎない。

この方式には他の利点もある。テーブルをJOINした後に集約関数を適用するのに比べ、フラットなテーブルに集約関数を適用するというのは、領域分割による並列処理を実装しやすいというメリットがある。
そうすると、テーブルを並列スキャン+部分集約し、それを遥かに少ない行数のレコードをJOINするというシナリオが現実味を帯びてくる。

今回のPgSQL-Hackersでの議論は、こういった機能拡張の下地になる事を見込んだインフラ機能というワケである。

AWSでPG-Strom

PG-Stromを動かそうという時に、GPU自体は安価で手に入れやすい部品なのだけども、普段使いのLinuxサーバにそうそう都合よくGPUが挿さっている人はマジョリティではないかもしれない。

という事で、簡単にPG-Stromをお試しするために、AWSでのAMIイメージを作ってみました。

AMI-Idは ami-51231b50 で、GPUインスタンス(g2.x2large)を使えば簡単にトライアルできます。

以下に手順をご紹介しますが、デプロイ完了まで10分以下。こりゃ便利だわ・・・。

① Launch Instance を選択する
f:id:kaigai:20141112213815p:plain

② キーワード「strom」で検索するか、上記のami-51231b50でAMIイメージを検索する。
AMIイメージはちょくちょく更新されるので、キーワード検索を使った方が間違いがないかも。
f:id:kaigai:20141112214605p:plain

③ 続いてインスタンスタイプを選択。もちろんGPUインスタンス(g2.x2large)の一択です。
f:id:kaigai:20141112214700p:plain

④ 確認画面。本当はEBSストレージなどアタッチするのだろうけども、とりあえず一発起動するだけなので、そのまま「Launch」を選択。
f:id:kaigai:20141112214822p:plain

仮想マシンに接続するためのSSH鍵を選択します。
f:id:kaigai:20141112214851p:plain

⑥ これでデプロイ完了。あとは1~2分ほど起動を待つ。
f:id:kaigai:20141112214934p:plain

⑦ 初期化中。。。。
f:id:kaigai:20141112215039p:plain

⑧ さっきの秘密鍵を使ってSSHログイン
f:id:kaigai:20141112215325p:plain

⑨ PG-Strom有効なPostgreSQLインスタンスが起動している
f:id:kaigai:20141112215351p:plain

AWSのg2.x2largeタイプなので、GPUNVIDIAのGRID K520。
こんな感じでデバイスのプロパティを参照することができる。

postgres=# SELECT * FROM pgstrom_opencl_device_info();
 dnum | pnum |            property             |         value
------+------+---------------------------------+----------------------------
    0 |    0 | platform index                  | 1
    0 |    1 | platform profile                | FULL_PROFILE
    0 |    2 | platform version                | OpenCL 1.1 CUDA 6.5.18
    0 |    3 | platform name                   | NVIDIA CUDA
    0 |    4 | platform vendor                 | NVIDIA Corporation
    : |    : |    :                            |   :

テスト用のSQL~/pg_strom/testdb.sqlに置いてあるので、これを使って2000万行のテーブルx1個、4万行のテーブルx4個を作ると、昨日のエントリで紹介したテストテーブルを作成できる。

[ec2-user@ip-10-126-51-20 ~]$ psql -U postgres -f testdb.sql postgres

昨日のエントリで使ったGPUMaxwell世代のGTX980で、Kepler世代のGRID K520とは少し特性は異なるものの、まぁ、早くなってるから良いだろう。

PG-Stromありの場合

postgres=# EXPLAIN (ANALYZE, COSTS OFF) SELECT * FROM t0 NATURAL JOIN t1 NATURAL JOIN t2;
                                    QUERY PLAN
----------------------------------------------------------------------------------
 Custom (GpuHashJoin) (actual time=97.992..5512.238 rows=20000000 loops=1)
   hash clause 1: (t0.bid = t2.bid)
   hash clause 2: (t0.aid = t1.aid)
   Bulkload: On
   ->  Custom (GpuScan) on t0 (actual time=9.260..1220.530 rows=20000000 loops=1)
   ->  Custom (MultiHash) (actual time=43.361..43.362 rows=80000 loops=1)
         hash keys: bid
         Buckets: 46000  Batches: 1  Memory Usage: 99.99%
         ->  Seq Scan on t2 (actual time=0.009..9.551 rows=40000 loops=1)
         ->  Custom (MultiHash) (actual time=21.681..21.681 rows=40000 loops=1)
               hash keys: aid
               Buckets: 46000  Batches: 1  Memory Usage: 49.99%
               ->  Seq Scan on t1 (actual time=0.011..9.632 rows=40000 loops=1)
 Execution time: 9144.220 ms
(14 rows)

PG-Stromなしの場合

postgres=# SET pg_strom.enabled = off;
SET
postgres=# EXPLAIN (ANALYZE, COSTS OFF) SELECT * FROM t0 NATURAL JOIN t1 NATURAL JOIN t2;
                                   QUERY PLAN
---------------------------------------------------------------------------------
 Hash Join (actual time=46.932..29132.386 rows=20000000 loops=1)
   Hash Cond: (t0.aid = t1.aid)
   ->  Hash Join (actual time=23.504..17693.551 rows=20000000 loops=1)
         Hash Cond: (t0.bid = t2.bid)
         ->  Seq Scan on t0 (actual time=0.004..4851.203 rows=20000000 loops=1)
         ->  Hash (actual time=23.273..23.273 rows=40000 loops=1)
               Buckets: 65536  Batches: 1  Memory Usage: 2813kB
               ->  Seq Scan on t2 (actual time=0.006..10.589 rows=40000 loops=1)
   ->  Hash (actual time=23.256..23.256 rows=40000 loops=1)
         Buckets: 65536  Batches: 1  Memory Usage: 2813kB
         ->  Seq Scan on t1 (actual time=0.007..10.555 rows=40000 loops=1)
 Execution time: 32513.584 ms
(12 rows)

PG-Stromなう

最近、方々で『GPUイイよ!GPU!』と言って回っている訳ですが、今現在、PG-Stromの開発がどんなもんじゃいというのをまとめておこうと思います。

振り返ってみると、2012年1月、最初にPG-Stromのプロトタイプを作ってみた時は、まさにPG-Strom管理下の外部テーブル(Foreign Scan)に対して非常に複雑な条件句を与えた場合、という非常に限られた条件下でGPUオフロードが効果を発揮するものでした。しかも、プログラミングに使ったのはCUDAなので、NVIDIA専用だったし。
f:id:kaigai:20141111222738p:plain

その後、紆余曲折を経て

  • FDW(Foreign Data Wrapper)を使うのはやめ、新たに Custom-Plan APIを設計した。
    • 外部テーブルでない、普通のテーブルに対してもGPUアクセラレーションを適用可能に
    • 全件探索以外のワークロードへも対応が可能に
  • CUDAを使うのはやめ、OpenCLライブラリを使用するようにした。
    • NVIDIA以外のGPUへの対応や、(効率はベストでないものの)CPU並列も可能に
  • 列指向データ構造を捨て、PostgreSQLに合った行指向データを扱えるように
  • 全件スキャンだけでなく、表結合(Join)と集約演算(Aggregate)に対応した

アーキテクチャ的にはこんな感じ。
f:id:kaigai:20141111222746p:plain
Custom-Plan APIを介して、全件スキャン(GpuScan)、表結合(GpuHashJoin)、集約演算(GpuPreAgg)を実装するCustom-Nodeがプランナ・エグゼキュータから呼び出される。その裏でOpenCLデバイスを管理するバックグランドワーカが立ち上がり、これらのCustom-Nodeから自動生成されたGPUコードと処理すべきデータの組がメッセージキューを介して送信されるので、OpenCL Serverはこれを適宜ディスパッチしてGPUで処理する事ができる。

ま、以前の実装だとかなり適用可能なワークロードが限られていて、こんな感じの手厳しいコメントを頂くこともあったが、特にJoinをGPUで高速化できるってのは結構インパクトがあると思う。

以下の図はCPUによるHash-Joinのロジックと、PG-StromのGpuHashJoinを比較したもの。
CPUであれば、①Hash表を作り ②Outer側から一行取り出してHash表を探索 ③Projectionを行って左右の表の内容をマージした行を作り、次の処理へ回す。この②と③のプロセスをひたすらループする事になるワケだ。
一方、GpuHashJoinの場合、①Hash表を作る 部分は同じだけども、②と③の部分をGPUのコアに並列実行させる。つまり、数百~数千コア分の並列度が期待できる訳である。
f:id:kaigai:20141111222816p:plain

以下のグラフは、2億件 × 10万件 × 10万件 × .... と、結合すべき表の数を増やしていった時の処理時間を、標準PostgreSQL(9.5devel)と、PG-Strom有効化の場合で比較したもの。
一番得意なワークロードで比較しているので、多少、恣意的な部分はあるにせよ、3個のテーブルを結合する時の処理時間で11倍、9個のテーブルを結合する時の処理時間で28倍もの違いが出ている。
ま、この辺は実際に世の中で使われているワークロードと、GPUにオフロードできる部分をどういう風に近づけるかで、コストパフォーマンスが決まってくるんだろうけども。
f:id:kaigai:20141111222822p:plain

こんな感じでプロファイルを取ってみると、どの処理にどれくらい時間を要しているかが判る。
(この例では 2000万件 × 4万件 × 4万件 を結合してみた)

postgres=# SET pg_strom.perfmon = on;
SET
postgres=# EXPLAIN (ANALYZE, COSTS OFF)
           SELECT * FROM t0 NATURAL JOIN t1 NATURAL JOIN t2;
                                   QUERY PLAN
---------------------------------------------------------------------------------
 Custom (GpuHashJoin) (actual time=105.020..4153.333 rows=20000000 loops=1)
   hash clause 1: (t0.aid = t1.aid)
   hash clause 2: (t0.bid = t2.bid)
   Bulkload: On
   number of requests: 145
   total time for inner load: 29.73ms
   total time for outer load: 825.77ms
   total time to materialize: 1359.44ms
   average time in send-mq: 62us
   average time in recv-mq: 1372us
   max time to build kernel: 13us
   DMA send: 5759.42MB/sec, len: 4459.39MB, time: 774.28ms, count: 722
   DMA recv: 5661.55MB/sec, len: 2182.02MB, time: 385.41ms, count: 290
   proj kernel exec: total: 197.28ms, avg: 1360us, count: 145
   main kernel exec: total: 250.03ms, avg: 1724us, count: 145
   ->  Custom (GpuScan) on t0 (actual time=6.226..825.598 rows=20000000 loops=1)
         number of requests: 145
         total time to load: 787.28ms
   ->  Custom (MultiHash) (actual time=29.717..29.718 rows=80000 loops=1)
         hash keys: aid
         Buckets: 46000  Batches: 1  Memory Usage: 99.99%
         ->  Seq Scan on t1 (actual time=0.007..5.204 rows=40000 loops=1)
         ->  Custom (MultiHash) (actual time=16.106..16.106 rows=40000 loops=1)
               hash keys: bid
               Buckets: 46000  Batches: 1  Memory Usage: 49.99%
               ->  Seq Scan on t2 (actual time=0.018..6.000 rows=40000 loops=1)
 Execution time: 5017.388 ms
(27 rows)

GPUでの処理時間は、Joinそれ自身(main kernel exec)と結果の生成(proj kernel exec)で計450msくらい。
つまり計算能力のポテンシャルはこれ位はあるので、あとは、いかに効率的にCPU/GPUでデータを受渡しするかという所がこれからのテーマになってくるのかな。

次のエントリで、PG-Stromのデプロイについて紹介します。

さらばドイツ

2011年2月にドイツへ赴任し、以降2年10ヵ月間、SAP社とのアライアンス業務に携わっていたわけですが、11月末で任地での業務を終了し日本へと戻る事になりました。

振り返れば、初めてフランクフルトの空港に到着した後、一歩外へ出たら氷点下の世界(注:ドイツの2月です)で『こりゃトンデモない場所に来たもんだ』と思ったものですが、まぁ、住めば都で何とかなるもんですな。

オフィスはドイツ南西のWalldorf市(SAP本社がある)。住むには少し規模が小さな街なので、隣のHeidelberg市の旧市街のマンションを借り、1月遅れで嫁さんも日本からやってきました。
f:id:kaigai:20120331145434j:plain

お仕事的には、2011年前後というのはちょうどSAP社がSUSE LinuxベースのインメモリDB製品 "SAP HANA" をぶち上げ始めた頃で、主に技術面でSAPやSUSEとの折衝や、HANA認定取得の実作業というのが駐在中のメインのお仕事でした。

NEC SAP HANA向けアプライアンスサーバを販売開始
http://www.nec.co.jp/press/ja/1203/0901.html

その他にも、NECのHA製品CLUSTERPROで、SAP NW~HA製品間で連携するためのインターフェース認証を取得したりとか。

NEC、「SAP HA Interface Certification」認定を取得した「CLUSTERPRO X 3.1 for SAP NetWeaver」を発売
http://jpn.nec.com/press/201210/20121002_01.html

この手の "○○認証" という仕事が多かったのですが、SAPの認定プログラムはちょくちょくおかしくて、例えば SAP Linux 認証プログラムは80コア/160HTのNECサーバの認定を通すために、SAPの認定プログラムの方を手直しして認定を通したりとか。

一方、OSS活動は工数の20%~30%を充当して良いという事を条件に赴任したので、まだ中途であるSE-PostgreSQLの開発も継続する事ができた。これは会社に感謝。ただ、OSS活動に専従という訳ではないので、MLでの反応が遅れたりと、コミュニティに対して少し申し訳ない。
ドイツに居る間に新たに開発してみたのが PG-Strom というモジュールで、クリスマス休暇の間にCUDAを使ってプロトタイプの実装を行ってみた。これがもう一つ、自分にとって軸となるソフトウェアになるかもしれない。
http://kaigai.hatenablog.com/entry/20120106/1325852100

f:id:kaigai:20131201153401j:plain
これをプラハでのPGconf.EUで発表した際に、非常に大きな反響があった事は"コレは行けるんじゃないか?"という確信に繋がった。これはNECに限った事なのか分からないけども、自分の組織の外に対して何かを発表し、フィードバックを得るという機会が極端に少ない、あるいはナイーブになっているような気がする。
そうすると、自分の提案したい事が第三者の視点から見て価値ある事なのか、そうでないのか確信できないので、上司にちょっとダメ出しされただけで萎縮してしまうという事に繋がらないか。
先日、晴れて四捨五入したらオッサン年代に突入したので偉そうな事を書くと、若い人は公式非公式を問わず、自分のアイデアを社外で発表する機会を持った方がよい。そうすると、上司のダメ出しが的を得ているのか、的外れなのか、自分の判断基準を持てるようになるから。

この辺SAPが偉いなと思うのが、SCN(SAP Community Network)という仕組みを用意し、Wikiやブログを通して開発者やパートナー企業とエンドユーザが直接意見交換できる仕組みが機能している事。例えば製品の変な使い方やプロトタイプ的に作ってみた機能(当然公式にはサポート外)をオープンにして、それに対するフィードバックを受けられるようにしている。
もちろん、これはSAPのように非常に強力なパートナー網、顧客網というエコシステムを持っているからこそできる事。同じ事ができる企業は非常に限られている。
ただ、エコシステムの主催者でなくともコントリビュータとして社外の意見フィードバックを活用するというのは可能。まさにOSSコミュニティが日常的に行っている事で、だからこそイノベーションがそこにある。

仕事環境で言うとドイツ人が偉いのは、きっかり時間通りに仕事を切り上げる事。
2011~2013シーズンはBaden-Hillsカーリングクラブでプレーしていたのだけども、毎週火曜日19:30~のリーグ戦に皆きっちり集まるのがすごい。日本のカルチャーでは難しい所もあるけど、ぜひ見習いたい。目前の業務にばかり時間を費やして、別の領域にアンテナを張る時間や余暇の時間がなければ新しい発想なんて生まれないし、それが中長期的には組織を徐々に蝕んでいく事になるんじゃないかと思った。
f:id:kaigai:20111011183217j:plain

その他、生活面では嫁さんが考えて日本食を作ってくれたので、病気をする事もなくきっちり職務を遂行する事ができた。感謝。結婚してからの3年間、風邪をひく事すら無くなった。
ヨーロッパに住んでいる事で、なかなか日本からは行きにくい場所を旅行できた事は役得だけど、書き出すとキリがないのでないのでこれはまた別の機会に。

f:id:kaigai:20120331154344j:plain

日本に帰った後は、SAPから離れて再びOSSの仕事に戻ります。
自分で手を動かしてコードを書くと同時に、それをお金に換えていく方法をデザインする事が求められているので、タフな仕事だけども、一つのロールモデルとして後進に示せれば良いかなと。

それでは明日、ドイツを発ちます。

Custom Executor 試作版

現状、PostgreSQLのエグゼキュータを拡張するにはいくつか方法が考えられる。

  1. Executor(Start|Run|End)_hookを使う
    • エグゼキュータ全体を乗っ取る。逆に言えば、一部の処理(例えば集約演算)だけを俺様実装にしたい時には、本体側のコードをコピーするなりしないとダメ。
  2. Foreign Data Wrapperを使う
    • 特定のテーブルスキャンに限定すればアリ。しかし、集約演算やJOIN、ソートといった処理を俺様実装にする用途には使用できない。
  3. ブランチして俺様パッチを当てる
    • まぁ、これならどうにでもできますがね…。

しかし、あまり使い勝手がよろしくない。
例えば、GPUを使って集約演算を1000倍早くしたいと考えても、それを実装するために、モジュール側でエグゼキュータ全体のお守りをしなければならないというのは、全く嬉しくない。
という訳で、プラン木の一部を差し替えて、特定のエグゼキュータ・ノードだけをモジュール側で実装したコードで実行できるよう機能拡張にトライしてみた。名称は Custom Executor Node で、名付け主はRobert Haasなり。

イメージとしては以下の通り。
f:id:kaigai:20130831231525p:plain

planner_hookを利用して、PostgreSQLのプラナーがPlannedStmtを作成した"後"でモジュールがプラン木の一部を書き換え、CustomExecノードを追加する、あるいはCustomExecノードで既存のエグゼキュータ・ノードを差し替える。

で、クエリ実行の開始、一行取り出し、クエリ実行の終了時にそれぞれモジュール側で実装されたコードが呼び出されるというワケだ。

APIは以下の通り。機能的に似ているのでFDWのAPIに似ておる。

void BeginCustomExec(CustomExecState *cestate, int eflags);
TupleTableSlot *GetNextCustomExec(CustomExecState *node);
void ReScanCustomExec(CustomExecState *node);
void EndCustomExec(CustomExecState *node);
void ExplainCustomExec(CustomExecState *node, ExplainState *es);

PoCとして、クエリ実行に要した時間を出力する xtime という拡張を自作してみた。
Custom Executorとして本体からコールバックを受けるには、予め自分自身を登録しておく必要がある。これがRegisterCustomExec()関数で、ここでは一連のコールバックの組に対して"xtime"という名前を付けている。

void
_PG_init(void)
{
    CustomExecRoutine   routine;
        :
    /*
     * Registration of custom executor provider
     */
    strcpy(routine.CustomExecName, "xtime");
    routine.BeginCustomExec   = xtime_begin;
    routine.GetNextCustomExec = xtime_getnext;
    routine.ReScanCustomExec  = xtime_rescan;
    routine.EndCustomExec     = xtime_end;
    routine.ExplainCustomExec = xtime_explain;
    RegisterCustomExec(&routine);
        :
}

そして、プラン木に"xtime"のCustomExecノードを挿入するのがplanner_hookでの処理。標準のプラナーが生成したPlannedStmtに手を加えてCustomExecノードを追加する。(ここでは再帰的にプラン木を捜査して、各ノードの頭にCustomExecノードを追加している)

static PlannedStmt *
xtime_planner(Query *parse,
              int cursorOptions,
              ParamListInfo boundParams)
{
    PlannedStmt *result;

    if (original_planner_hook)
        result = original_planner_hook(parse, cursorOptions,
                                       boundParams);
    else
        result = standard_planner(parse, cursorOptions,
                                  boundParams);

    /* walk on underlying plan tree to inject custom-exec node */
    result->planTree = xtime_subplan_walker(result->planTree, 0);

    return result;
}

これによって、クエリ実行計画はどのように修正されるのか。
例えば以下のようなクエリを実行するとする。

postgres=# EXPLAIN (costs off)
           SELECT * FROM t1 JOIN t2 ON t1.a = t2.x
                    WHERE a < 5000 LIMIT 100;
                    QUERY PLAN
--------------------------------------------------
 Limit
   ->  Hash Join
         Hash Cond: (t2.x = t1.a)
         ->  Seq Scan on t2
         ->  Hash
               ->  Index Scan using t1_pkey on t1
                     Index Cond: (a < 5000)
(7 rows)

これが、xtimeモジュールにより以下のように書き換えられる。

postgres=# LOAD '$libdir/xtime';
LOAD
postgres=# EXPLAIN (costs off)
           SELECT * FROM t1 JOIN t2 ON t1.a = t2.x
                    WHERE a < 5000 LIMIT 100;
                             QUERY PLAN
--------------------------------------------------------------------
 CustomExec:xtime
   ->  Limit
         ->  CustomExec:xtime
               ->  Hash Join
                     Hash Cond: (x = t1.a)
                     ->  CustomExec:xtime on t2
                     ->  Hash
                           ->  CustomExec:xtime
                                 ->  Index Scan using t1_pkey on t1
                                       Index Cond: (a < 5000)
(10 rows)

数ヶ所にCustomExecノードが挿入されている。t2へのSeqScanが消えているのは、SeqScanの場合にはxtimeモジュールが自力でテーブルをスキャンするため(コードサンプルとして使う事を想定しているので)。

で、このクエリを実行すると以下のように表示される。CustomExecノードをプラン木の間に挟み込み、下位ノードの実行に要した時間を記録しているのである。

postgres=# \timing
Timing is on.
postgres=# SELECT * FROM t1 JOIN t2 ON t1.a = t2.x WHERE a < 5000 LIMIT 100;
INFO:  execution time of Limit:  9.517 ms
INFO:   execution time of Hash Join:  9.487 ms
INFO:    execution time of CustomExec:xtime on t2:  0.047 ms
INFO:     execution time of Index Scan on t1:  4.947 ms
        :
     (省略)
        :
Time: 12.935 ms

ひとまず、CommitFest:Sepまでにちゃんと提案できるブツを作れてよかった。
あとはドキュメントとソースコードコメントをちゃんと書いて投稿である。

単体型GPUへのDMA転送コストを考える

PostgreSQLの検索処理(特にスキャン周り)をGPUにオフロードするにあたって、できるだけCPU処理の負荷を軽減したい。というのも、PostgreSQLは基本シングルスレッドなので、GPUがバリバリ並列計算しようとしてもデバイスへのデータ供給が追い付かなければ計算機を遊ばせてしまう事になる。

大雑把に言うと、PG-Stromは以下のような構造を持ち、PostgreSQLバックエンドがデータベースファイルから読み込んだデータを、共有メモリを介してOpenCL Serverが見える所に置いてあげる。OpenCL ServerはデータをDMA転送してGPU Kernel関数を起動、計算結果をまたPostgreSQLバックエンドに戻してやる。
f:id:kaigai:20130825174718p:plain

もう少し細かく仕事を分割すると、以下の通り。
f:id:kaigai:20130825174734p:plain

  1. ストレージからデータをロード(共有バッファに載ってなければ)
  2. 共有バッファからタプルを取り出す。この時、Snapshotとの比較でタプルの可視不可視チェックも。
  3. タプルを展開し、長大な配列としてデータを並べ替える。元々カラム指向なデータ構造で持っているので、配列への展開は"すごく重い"という訳ではない。
  4. DMA転送でこの配列をGPUデバイスへコピー。
  5. 転送されたデータを使って、GPU側で計算を実行。

現状の実装だと、(1)~(3)までがPostgreSQLバックエンドの仕事、(4)がOpenCL Serverで、(5)がGPUデバイスとなる。

ただ、これだとCPU側での処理がかさんでしまうので、理想としては以下のようにPostgreSQLの共有バッファから直接GPUデバイスにDMA転送、そこでタプルからのデータ取り出しを行えれば、CPU負荷は下がる事になる。
f:id:kaigai:20130825181145p:plain

しかし、問題が2つ。

  1. タプルの可視性判定はIFブロックの鬼なのでGPUで実行したくない。その上、不可視なタプルをGPUに転送するコストは完全に無駄
  2. PostgreSQLの共有バッファのブロック長は8KB(設定により最大32KB)なので、細切れのDMAを大量に発行する必要が出てくる。

後者に関しては、簡単に性能を計測できるのでやってみた。

128MBのブロックを1回のDMAで転送した場合。理論上、これが最も性能の出るパターン。

[kaigai@iwashi gpuinfo]$ ./gpudma -m async
DMA send/recv test result
device:         GeForce GT 640
size:           128MB
chunks:         128MB x 1
ntrials:        100
total_size:     12800MB
time:           4.18s
speed:          3063.97MB/s
mode:           async

一方、128MBのブロックを8KBx16384個のチャンクに分けて転送した場合だと、転送性能は1/3程度に。うーん、これはちょっとヨロシクナイ…。

[kaigai@iwashi gpuinfo]$ ./gpudma -m async -c 8
DMA send/recv test result
device:         GeForce GT 640
size:           128MB
chunks:         8KB x 16384
ntrials:        100
total_size:     12800MB
time:           12.36s
speed:          1035.97MB/s
mode:           async

当然、ブロック長が長くなればなるほどDMA発行の手間が減るので性能的には有利。PostgreSQLのコンフィグで変更可能な32KB相当のDMA単位にすると、本来の性能の85%程度は出てくれる。それでも結構痛いが。

[kaigai@iwashi gpuinfo]$ ./gpudma -m async -c 32
DMA send/recv test result
device:         GeForce GT 640
size:           128MB
chunks:         32KB x 4096
ntrials:        100
total_size:     12800MB
time:           5.06s
speed:          2531.70MB/s
mode:           async

なかなか悩ましいところではあるが、8KBのチャンクを直接GPUにDMA転送するコストの高さを考えれば、CPU側である程度まとまった大きさに集約してからガツンとDMAを発行した方が効率はよさそう。その一方で、シングルスレッドのPostgreSQLバックエンドを酷使するのも考え物なので、ベストミックスとしては、マルチスレッドで動作可能なOpenCL Serverにタプルの取り出しとDMAバッファへの展開を行わせても良さそう。
あと、現状ではDMAバッファへの展開時に圧縮データの展開を行っているけども、CPUの処理負荷を下げるだけでなく、PCI-Eの帯域を節約するという観点からも、GPU側で圧縮データの伸長を行った方が良いのかもしれない。

OpenCLでCPU/GPUを使い分ける?

最近、PG-Stromに興味があるという方からちょくちょく、個別に質問メールを頂く事がある。

その中で頂いたコメントに興味深い洞察が。

GPUによるアクセラレーションは確かに興味深い機能だけれども、PG-Stromの本質は突き詰めていえばパイプライン処理のお化けだよね?だから、計算処理をCPUでやるようにしても良いんじゃない?

確かに。GPUによる並列処理はハマると物凄い費用対効果をもらたすけれども、例えば正規表現マッチみたく、GPU向きじゃない処理もある。

PG-Stromの場合、SQLのWHERE句に与えられた条件から行を評価する関数を自動的に生成、それをJITコンパイルして実行する。たぶん、プランナの時点でCPU実行用、GPU実行用の2種類の関数を自動的に生成して、計算サーバに渡すという処理は実現可能だろう。

NVidiaGPUを前提とするCUDAと異なり、OpenCLはAMDGPUIntelXeon Phiもサポートする。それどころか、OpenCL Cで書かれたコードをCPU用にコンパイルする事も可能。

その辺の事情もあり、今、PG-StromをOpenCLで再実装し直している。(works in progress)

だが、CPUとGPU用にそれぞれ計算サーバ書くのか、実装が複雑になってやだなぁ~と思っていたところ、実は勘違いである事が判明。

以下のgpuinfoコマンドの出力は、NVidiaのCUDA 4.2と、IntelのOpenCL SDKインストールした環境でのもの。

なんと、Platform-1でGPUが、Platform-2でCPUが認識されている。

NVidiaのOpenCLライブラリでも、IntelのOpenCLライブラリでも同様。

という事はですよ、要求された計算の特性に応じて、1個の計算サーバでCPU/GPUを使い分けるという芸当もできるという事になるじゃありませんか。

いやー、面白い。実装意欲を掻き立てられる。

なお、以下のコマンド gpuinfo のURLは↓です。

https://github.com/kaigai/gpuinfo

[kaigai@iwashi gpuinfo]$ ./gpuinfo
platform-index:      1
platform-vendor:     NVIDIA Corporation
platform-name:       NVIDIA CUDA
platform-version:    OpenCL 1.1 CUDA 4.2.1
platform-profile:    FULL_PROFILE
platform-extensions: cl_khr_byte_addressable_store cl_khr_icd cl_khr_gl_sharing
 cl_nv_compiler_options cl_nv_device_attribute_query cl_nv_pragma_unroll
  Device-01
  Device type:                     GPU
  Vendor:                          NVIDIA Corporation (id: 000010de)
  Name:                            GeForce GT 640
  Version:                         OpenCL 1.1 CUDA
  Driver version:                  310.32
  OpenCL C version:                OpenCL C 1.1
  Profile:                         FULL_PROFILE
  Device available:                yes
  Address bits:                    32
  Compiler available:              yes
  Double FP config:                Denorm, INF/NaN, R/nearest, R/zero, R/INF, FMA
  Endian:                          little
  Error correction support:        no
  Execution capability:            kernel, native kernel
  Extensions:                      cl_khr_byte_addressable_store cl_khr_icd \
   cl_khr_gl_sharing cl_nv_compiler_options cl_nv_device_attribute_query \
   cl_nv_pragma_unroll  cl_khr_global_int32_base_atomics \
   cl_khr_global_int32_extended_atomics cl_khr_local_int32_base_atomics \
   cl_khr_local_int32_extended_atomics cl_khr_fp64
  Global memory cache size:        32 KB
  Global memory cache type:        read-write
  Global memory cacheline size:    128
  Global memory size:              2047 MB
  Host unified memory:             no
  Image support:                   yes
  Image 2D max size:               32768 x 32768
  Image 3D max size:               4096 x 4096 x 4096
  Local memory size:               49152
  Local memory type:               SRAM
  Max clock frequency:             901
  Max compute units:               2
  Max constant args:               9
  Max constant buffer size:        65536
  Max memory allocation size:      511 MB
  Max parameter size:              4352
  Max read image args:             256
  Max samplers:                    32
  Max work-group size:             1024
  Max work-item sizes:             {1024,1024,64}
  Max write image args:            16
  Memory base address align:       4096
  Min data type align size:        128
  Native vector width - char:      1
  Native vector width - short:     1
  Native vector width - int:       1
  Native vector width - long:      1
  Native vector width - float:     1
  Native vector width - double:    1
  Preferred vector width - char:   1
  Preferred vector width - short:  1
  Preferred vector width - int:    1
  Preferred vector width - long:   1
  Preferred vector width - float:  1
  Preferred vector width - double: 1
  Profiling timer resolution:      1000
  Queue properties:                out-of-order execution, profiling
  Sindle FP config:                Denorm, INF/NaN, R/nearest, R/zero, R/INF, FMA

platform-index:      2
platform-vendor:     Intel(R) Corporation
platform-name:       Intel(R) OpenCL
platform-version:    OpenCL 1.2 LINUX
platform-profile:    FULL_PROFILE
platform-extensions: cl_khr_fp64 cl_khr_icd cl_khr_global_int32_base_atomics \
  cl_khr_global_int32_extended_atomics cl_khr_local_int32_base_atomics \
  cl_khr_local_int32_extended_atomics cl_khr_byte_addressable_store \
  cl_intel_printf cl_ext_device_fission cl_intel_exec_by_local_thread
  Device-01
  Device type:                     CPU
  Vendor:                          Intel(R) Corporation (id: 00008086)
  Name:                                   Intel(R) Xeon(R) CPU E5-2670 0 @ 2.60GHz
  Version:                         OpenCL 1.2 (Build 56860)
  Driver version:                  1.2
  OpenCL C version:                OpenCL C 1.2
  Profile:                         FULL_PROFILE
  Device available:                yes
  Address bits:                    64
  Compiler available:              yes
  Double FP config:                Denorm, INF/NaN, R/nearest, R/zero, R/INF, FMA
  Endian:                          little
  Error correction support:        no
  Execution capability:            kernel, native kernel
  Extensions:                      cl_khr_fp64 cl_khr_icd \
    cl_khr_global_int32_base_atomics cl_khr_global_int32_extended_atomics \
    cl_khr_local_int32_base_atomics cl_khr_local_int32_extended_atomics \
    cl_khr_byte_addressable_store cl_intel_printf cl_ext_device_fission \
    cl_intel_exec_by_local_thread
  Global memory cache size:        256 KB
  Global memory cache type:        read-write
  Global memory cacheline size:    64
  Global memory size:              386942 MB
  Host unified memory:             yes
  Image support:                   yes
  Image 2D max size:               16384 x 16384
  Image 3D max size:               2048 x 2048 x 2048
  Local memory size:               32768
  Local memory type:               DRAM
  Max clock frequency:             2600
  Max compute units:               32
  Max constant args:               480
  Max constant buffer size:        131072
  Max memory allocation size:      96735 MB
  Max parameter size:              3840
  Max read image args:             480
  Max samplers:                    480
  Max work-group size:             1024
  Max work-item sizes:             {1024,1024,1024}
  Max write image args:            480
  Memory base address align:       1024
  Min data type align size:        128
  Native vector width - char:      16
  Native vector width - short:     8
  Native vector width - int:       4
  Native vector width - long:      2
  Native vector width - float:     4
  Native vector width - double:    2
  Preferred vector width - char:   16
  Preferred vector width - short:  8
  Preferred vector width - int:    4
  Preferred vector width - long:   2
  Preferred vector width - float:  4
  Preferred vector width - double: 2
  Profiling timer resolution:      1
  Queue properties:                out-of-order execution, profiling
  Sindle FP config:                Denorm, INF/NaN, R/nearest