Asymmetric Partition-wise JOIN

久々に PostgreSQL 本体機能へのパッチを投げたので、それの解説をしてみる。

PostgreSQL: Re: Asymmetric partition-wise JOIN

背景:Partition-wise JOIN

PostgreSQLパーティションを使ったときに、全く同一のパーティションの切り方をして、かつパーティションキーをJOINの結合条件に用いた場合に限り、パーティション子テーブル同士のJOINを先に行い、その結果を出力するという機能がある。

以下の例では、ptableおよびstableは、それぞれ列distの値を用いたHashパーティションを設定しており、3つずつの子テーブルを持つ。

postgres=# \d
                List of relations
 Schema |   Name    |       Type        | Owner
--------+-----------+-------------------+--------
 public | ptable    | partitioned table | kaigai
 public | ptable_p0 | table             | kaigai
 public | ptable_p1 | table             | kaigai
 public | ptable_p2 | table             | kaigai
 public | stable    | partitioned table | kaigai
 public | stable_p0 | table             | kaigai
 public | stable_p1 | table             | kaigai
 public | stable_p2 | table             | kaigai
(8 rows)

これをJOINする時の実行計画は以下の通り。

postgres=# explain select * from ptable p, stable s where p.dist = s.dist;
                                     QUERY PLAN
------------------------------------------------------------------------------------
 Hash Join  (cost=360.00..24617.00 rows=10000 width=49)
   Hash Cond: (p.dist = s.dist)
   ->  Append  (cost=0.00..20407.00 rows=1000000 width=12)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
   ->  Hash  (cost=235.00..235.00 rows=10000 width=37)
         ->  Append  (cost=0.00..235.00 rows=10000 width=37)
               ->  Seq Scan on stable_p0 s  (cost=0.00..60.77 rows=3277 width=37)
               ->  Seq Scan on stable_p1 s_1  (cost=0.00..62.69 rows=3369 width=37)
               ->  Seq Scan on stable_p2 s_2  (cost=0.00..61.54 rows=3354 width=37)
(11 rows)

おっとっと。
この場合、ptableパーティションの子テーブル3つと、stableパーティションの子テーブル3つをそれぞれ全て読み出し、メモリ上で結合した結果同士をHash Joinで結合する事がわかる。

Partition-wise JOINはデフォルトでは off になっているので、enable_partitionwise_joinパラメータをonにセットして明示的に有効化する必要がある。

postgres=# set enable_partitionwise_join = on;
SET
postgres=# explain select * from ptable p, stable s where p.dist = s.dist;
                                     QUERY PLAN
------------------------------------------------------------------------------------
 Append  (cost=101.73..19617.00 rows=10000 width=49)
   ->  Hash Join  (cost=101.73..6518.87 rows=3277 width=49)
         Hash Cond: (p.dist = s.dist)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Hash  (cost=60.77..60.77 rows=3277 width=37)
               ->  Seq Scan on stable_p0 s  (cost=0.00..60.77 rows=3277 width=37)
   ->  Hash Join  (cost=104.80..6527.08 rows=3369 width=49)
         Hash Cond: (p_1.dist = s_1.dist)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Hash  (cost=62.69..62.69 rows=3369 width=37)
               ->  Seq Scan on stable_p1 s_1  (cost=0.00..62.69 rows=3369 width=37)
   ->  Hash Join  (cost=103.47..6521.06 rows=3354 width=49)
         Hash Cond: (p_2.dist = s_2.dist)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
         ->  Hash  (cost=61.54..61.54 rows=3354 width=37)
               ->  Seq Scan on stable_p2 s_2  (cost=0.00..61.54 rows=3354 width=37)
(16 rows)

同じクエリに対して、set enable_partitionwise_join = onを設定した状態で実行計画を作らせると、対応するパーティションの子テーブル同士でJOINを行い、その次にAppend、つまり各パーティション子要素の処理結果を結合している事が分かる。

これはJOINの問題サイズを小さくし、多くの場合、Append処理に要するCPUサイクルを減らすことのできる優れた最適化テクニックではあるが、かなりの慎重さを以ってDB設計を行う必要がある。
なぜなら『全く同一のパーティションの切り方をして、かつパーティションキーをJOINの結合条件に用いる』事ができるよう、パーティション設計とSQLクエリの書き方を考える必要があるからである。

新機能:Asymmetric Partition-wise JOIN

同じようなノリで、非パーティションテーブルとパーティションテーブルの間のJOIN処理を、Appendの前に移動する事ができる。
例えば、非パーティションテーブルであるt1ptableとの間のJOINを (t1 \times ptable)と記述すると、これは (t1 \times ptable_p0) + (t1 \times ptable_p1) + (t1 \times ptable_p2)と展開する事ができる。

言い換えれば、t1を各パーティション子テーブルに分配する(クエリの書き換えに相当)事で、Appendよりも先にJOINを実行する事ができる。
もちろん、メリット・デメリットはあるので、先にパーティション子テーブルの内容を全て読み出した後でJOINを実行した方が良いというケースもある。

具体的に Asymmetric Partition-wise JOIN が活きてくるケースというのは

  • パーティション側のテーブルが、読み出しコストを無視できる程度に小さい。
    • 分配された分、複数回の読み出しが必要となってしまうため。
  • 子テーブルとのJOINにより、かなりの比率で行数が減ることが見込まれる。
    • 行数が減らなければ、Append処理は楽にならない。

というケースなので、あらゆる場合で効果があるかというと、そのような銀の弾丸はない。

では試してみることにする。
本来、以下のような実行計画となったいたクエリであるが・・・

postgres=# explain select * from ptable p, t1 where p.a = t1.aid;
                                    QUERY PLAN
----------------------------------------------------------------------------------
 Hash Join  (cost=2.12..24658.62 rows=49950 width=49)
   Hash Cond: (p.a = t1.aid)
   ->  Append  (cost=0.00..20407.00 rows=1000000 width=12)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
   ->  Hash  (cost=1.50..1.50 rows=50 width=37)
         ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
(8 rows)

同様に enable_partitionwise_join パラメータによって当該機能は有効化され、以下のような『賢い』実行計画を生成するようになる。

postgres=# explain select * from ptable p, t1 where p.a = t1.aid;
                                    QUERY PLAN
----------------------------------------------------------------------------------
 Append  (cost=2.12..19912.62 rows=49950 width=49)
   ->  Hash Join  (cost=2.12..6552.96 rows=16647 width=49)
         Hash Cond: (p.a = t1.aid)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=2.12..6557.29 rows=16658 width=49)
         Hash Cond: (p_1.a = t1.aid)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=2.12..6552.62 rows=16645 width=49)
         Hash Cond: (p_2.a = t1.aid)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
(16 rows)

t1テーブルが各パーティション子テーブル毎に読み出され、その結果を Append によって結合している事がお分かりだろうか。
しかも、最初のケースでは Append は 100万行を処理しなければならないところ、後者ではHash Joinによって大幅に行数が削られているため、たった49950行(推定値)しか処理しない事になっている。これに伴う推定コストの低下によって、Asymmetric Partition-wise JOINを使った実行計画が選択された。

ちなみに、結合すべきテーブルを増やしても、(コスト的に見合えば)各パーティション子テーブルへと分配してくれる。

postgres=# explain select * from ptable p, t1, t2 where p.a = t1.aid and p.b = t2.bid;
                                       QUERY PLAN
----------------------------------------------------------------------------------------
 Append  (cost=4.25..19893.99 rows=2495 width=86)
   ->  Hash Join  (cost=4.25..6625.83 rows=832 width=86)
         Hash Cond: (p.b = t2.bid)
         ->  Hash Join  (cost=2.12..6552.96 rows=16647 width=49)
               Hash Cond: (p.a = t1.aid)
               ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
               ->  Hash  (cost=1.50..1.50 rows=50 width=37)
                     ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t2  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=4.25..6630.20 rows=832 width=86)
         Hash Cond: (p_1.b = t2.bid)
         ->  Hash Join  (cost=2.12..6557.29 rows=16658 width=49)
               Hash Cond: (p_1.a = t1.aid)
               ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
               ->  Hash  (cost=1.50..1.50 rows=50 width=37)
                     ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t2  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=4.25..6625.48 rows=831 width=86)
         Hash Cond: (p_2.b = t2.bid)
         ->  Hash Join  (cost=2.12..6552.62 rows=16645 width=49)
               Hash Cond: (p_2.a = t1.aid)
               ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
               ->  Hash  (cost=1.50..1.50 rows=50 width=37)
                     ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t2  (cost=0.00..1.50 rows=50 width=37)
(28 rows)

PG-Stromとの関連

この機能は、もちろんピュアなPostgreSQLで大量データを扱うときに効果を発揮するものである。

ただそれ以上に、PG-Stromのように『データが物理的にどこに存在するのか』を気にするソリューションだと更に大きな恩恵がある。

例えば、日々大量に溜まっていくログデータを、パーティション分割によって複数のディスクに物理的に分割格納し、集計・解析時にはこれを他のテーブルとJOIN/GROUP BYするという、比較的ありがちな処理を考えてみる。

もし非パーティションテーブルとパーティションテーブルのJOINが必ずAppendの後であれば、SSD-to-GPU Direct SQLを使ったとしても、CPUが処理すべき行数はほとんど減ることはない*1ため、その後のJOIN/GROUP BY処理でGPUを使えたとしても、データの流れは Disk → CPU/RAM → GPU → CPU/RAM となってしまうため、データのピンポンによって転送効率はかなり悪化してしまう。

Asymmetric Partition-wise JOINによって、非パーティションテーブルとパーティション子テーブルのJOINを先に実行できるようになれば、これはSSD-to-GPU Direct SQLがフル稼働できる状況になり、データの流れも Disk → GPU → CPU/RAM とかなりシンプルになる。

これはとりわけ、以下のような構成のマシンを用いてPCIeスイッチ経由でSSDGPU間でのダイレクトデータ転送を行う場合に顕著で、SQLワークロードの大半をPCIeスイッチより外側のNVME-SSDGPUのペアで実行する事ができるので、データが全くCPUを経由することなく処理されてしまう(= シングルノードでもPCIeバスの限界までスケールできる)という事になってしまう。

こういった諸々面白い事ができるようになるので、ぜひ皆さんにもCommitFestに登録されたパッチのレビューに参加していただければ幸いである。

*1:スキャン条件のみで大半の行を落とせるという幸運な場合を除く

技術負債を返した話(Pre-built GPU Binary対応)

最もプリミティブなPG-Stromの処理は、ユーザが入力したSQLを元にCUDA CのGPUプログラムを自動生成し、これを実行時コンパイル。ここで生成されたGPUバイナリを用いて、ストレージから読み出したデータをGPUで並列処理するという一連の流れである。
後にJOIN/GROUP BYの対応や、データ型の追加、SSD-to-GPU Direct SQLなど様々な機能を付加したものの、このコード生成と実行時ビルドの仕組みは2012年に最初のプロトタイプを公開した時から大きく変わってはいない。

コードの自動生成を担当するのはcodegen.cで、この人が吐いたCUDA Cプログラムをcuda_program.cがNVRTC (NVIDIA Run-Time Compiler) を使ってコンパイルし、GPUバイナリを生成する。

ただ、当初の『全件スキャンWHERE句・固定長データ型のみ』というシンプルなコード生成が、やがてJOIN/GROUP BYや、numeric型やtext型の対応など、より複雑な処理をGPUで実行するようになるにつれて、自動生成部分だけでなく、静的に書いておいた部分と組み合わせてGPUプログラムを生成するようになる。
これは当然で、例えばGPUでHash-Joinを実行する場合でも、クエリを処理するたびに変わる可能性があるのは、二つのテーブルを結合するJoin-Keyの結合条件を評価する部分だけで、Hash-Joinの基本的なアルゴリズムは同じだからである。

これらの静的なCUDA Cコードは、ヘッダファイル(*.h)として記述され、cuda_program.cでコードをコンパイルする時にインクルードする事で、動的生成したコードと組み合わせて使用していた。

ただし…。
PG-Stromの機能が増え、静的なCUDA Cコードの割合が増えるにつれ実行時コンパイルの時間は徐々に増えていく事になる。
intやtextといった基本的なデータ型の比較といった、シンプルなコードであれば気になる程ではないが、例えば、(コーナーケースではあるが)複合型(Composite Type)のデータをGPU側で生成する場合などは、対応する全てのデータ型が関わってくるため、ビルドに要する時間がSQLの実行時間よりも遥かに長くなるという本末転倒な状況が起こる。
その場合、一度ビルドしたGPUバイナリは共有メモリ上にキャッシュされるため、初回と2回目以降のSQL実行時間が大幅に異なるという事になってしまう。

そこで、ほぼ一週間を丸々要してGPUコードの実行時コンパイルに係る部分のリファクタリングを行った。
要は、静的なCUDA Cコードは予めコンパイルして、実行時にはバイナリをリンクすれば済む話なので、ある程度以上に複雑な構造を持つ関数は cuda_gpujoin.cucuda_numeric.cuという形で切り出し、PG-Strom自体のビルド時に、NVCCを用いてGPUバイナリファイルを生成するようにした。
これらは動的に生成されたコードと実行時リンクされ、最終的にはこれまでと同じ処理を行うようになる。
静的な部分は Fatbin 形式という形でインストールされる。この形式は、GPUの各世代(Pascal, Volta, Turing)向けに最適化されたバイナリを内部にアーカイブし、ターゲットGPUに最も適したバイナリをリンク時に使用してくれる。

なので、インストールされたPG-Strom関連ファイルを見てみると、目新しいファイルが並んでいる…という事になる。(*.gfatbin ファイルはデバッグオプションを有効にしてビルドしたバージョン。pg_strom.debug_jit_compile_options=onの時にはこちらが利用される)

[kaigai@magro ~]$ ls /usr/pgsql-11/share/pg_strom
arrow_defs.h          cuda_gpupreagg.fatbin   cuda_gpusort.h        cuda_numeric.gfatbin    cuda_textlib.gfatbin
cuda_basetype.h       cuda_gpupreagg.gfatbin  cuda_jsonlib.fatbin   cuda_numeric.h          cuda_textlib.h
cuda_common.fatbin    cuda_gpupreagg.h        cuda_jsonlib.gfatbin  cuda_plcuda.h           cuda_timelib.fatbin
cuda_common.gfatbin   cuda_gpuscan.fatbin     cuda_jsonlib.h        cuda_primitive.fatbin   cuda_timelib.gfatbin
cuda_common.h         cuda_gpuscan.gfatbin    cuda_misclib.fatbin   cuda_primitive.gfatbin  cuda_timelib.h
cuda_gpujoin.fatbin   cuda_gpuscan.h          cuda_misclib.gfatbin  cuda_primitive.h        cuda_utils.h
cuda_gpujoin.gfatbin  cuda_gpusort.fatbin     cuda_misclib.h        cuda_rangetype.h        pg_strom--2.2.sql
cuda_gpujoin.h        cuda_gpusort.gfatbin    cuda_numeric.fatbin   cuda_textlib.fatbin

ベンチマーク

GPUコードのビルドに要する時間を計測するため、何種類かのテスト用クエリを作成し、その実行時間を計測してみた。
とりあえずクエリは以下の7種類で計測してみた。

-- Q1 ... シンプルなGpuScan + Int型の演算
SELECT id, aid+bid FROM t0 WHERE aid < 100 AND bid < 100;

-- Q2 ... Numeric型を利用するGpuScan
SELECT id, a+b, a-c FROM t_numeric1 WHERE a < 50.0 AND b < 50.0;

-- Q3 ... Timestamp型とEXTRACT()文を使用するGpuScan
SELECT id, a, b FROM t_timestamp1 WHERE EXTRACT(day FROM a) = 19 and EXTRACT(month FROM b) = 6;

-- Q4 ... Arrow_Fdw外部テーブルからシンプルなデータ(Int, Timestamp)の読み出し
SELECT id,ymd FROM t_arrow WHERE id % 100 = 23;

-- Q5 ... Arrow_Fdw外部テーブルから複合型(Composite)の読み出し
SELECT id,v FROM t_arrow WHERE id % 100 = 23;

-- Q6 ... GpuJoinを含むクエリ
SELECT id, aid, atext FROM t0 NATURAL JOIN t1 WHERE bid < 50 AND cid < 50;

-- Q7 ... GpuPreAgg + GpuJoinを含むクエリ
SELECT cat, count(*), sum(ax) FROM t0 NATURAL JOIN t1 GROUP BY cat;

ディスクからの読み出しの影響を排除するため、関連する全てのテーブルをpg_prewarmでオンメモリ状態にした上で、上記の7つのクエリをそれぞれ2回ずつ実行した。
以下の表/グラフはその結果であるが、静的なCUDA Cコードを事前にビルドしておく事で、初回の実行時間が大幅に改善しているのが分かる。もちろん、最終的なGPUバイナリはほぼ同じ形になるので、ビルド済みGPUバイナリのキャッシュを参照できる2回目以降では処理時間の大きな差はない。(単位は全てms)
ちなみにQ5はめちゃめちゃ時間がかかっている。これは、Arrow形式(列データ)から複合型のデータをGPU上で再構成し、CPUへは行データとして返すという、非常にコード量の多い処理を行っているためである。

旧方式(1回目) 旧方式(2回目) 新方式(1回目) 新方式(2回目)
Q1 1,199.6 331.7 369.0 321.5
Q2 3,279.2 222.0 421.6 227.5
Q3 3,213.5 206.5 462.6 219.2
Q4 793.7 134.0 355.1 147.4
Q5 17,872.2 141.4 364.7 144.8
Q6 1,928.5 352.3 605.3 345.9
Q7 2,435.6 359.2 729.1 349.1

分かりやすいように、Pre-built GPU Binary機能の有無別に(2回目の実行時間)-(1回目の実行時間)を計算してみる。2回目は共有メモリ上のキャッシュを参照するだけなので、これが正味のビルド時間という事になるが、これまでは『一呼吸おいて』という感じだった部分がコンマ何秒程度にまで短くなった事が分かる。

背景

実はこの機能、別の要件を考えた時にどうしても急いで作らざるを得ない事情があった。
これまでユーザさんからの新機能の要求を優先して実装していたために、テストケースの作成やテスト自動化が後回しになってきた経緯があったのだが、エンタープライズ向けにはこれらソフトウェア品質改善/品質保証のためのプロセスは必要不可欠であり、今年の後半は少し腰を落ち着けてテストの充実を図ろうと考えている。
ただ、大量のテストを流すとなると、相応のバリエーションのSQLを実行する事となり、その度にGPUコードの自動生成と実行時コンパイルが走るのでは、テストケースの実行に非常に長い時間を要してしまう事になる。これではPG-Stromのテストをしているのか、コンパイラを動かしているのか分からない!

SSDtoGPU Direct SQL on Columnar-store (Apache Arrow)

I have recently worked on development of FDW for Apache Arrow files; including SSDtoGPU Direct SQL support of PG-Strom. Apache Arrow is a column-oriented data format designed for application independent data exchange, supported by not a small number of "big-data" software.
The latest revision of PG-Strom can use Apache Arrow files as data source of the query execution, through the Arrow_Fdw feature. This article introduces the brief overview of Arrow_Fdw, its benchmark results and our future plan.

What is Arrow_Fdw

PostgreSQL supports FDW (Foreign Data Wrapper) mechanism that enables to read variable external data source as if here is a normal table. Some kind of FDW also support writing.
Of course, external data source has different data structure from the internal representation of PostgreSQL rows / columns. For example, CSV file is a way to represent structured data based on comma separated text. PostgreSQL cannot understand this format without proper conversion, so file_fdw extension intermediate the world of PostgreSQL and the world of CSV file.

This mechanism can be applied variable kind of data sources. Here is postgres_fdw that uses a remote PostgreSQL server as data source. An unique one is twitter_fdw that fetches tweets from the Twitter timeline via WebAPI.
The Arrow_Fdw maps files in Apache Arrow file for references by SQL.
Below is an example to map an Arrow file (/opt/nvme/t.arrow) using Arrow_Fdw.

postgres=# IMPORT FOREIGN SCHEMA hoge
             FROM SERVER arrow_fdw
             INTO public OPTIONS (file '/opt/nvme/t.arrow');
IMPORT FOREIGN SCHEMA
postgres=# \d hoge
                   Foreign table "public.hoge"
 Column |  Type   | Collation | Nullable | Default | FDW options
--------+---------+-----------+----------+---------+-------------
 id     | integer |           |          |         |
 aid    | integer |           |          |         |
 bid    | integer |           |          |         |
 ymd    | date    |           |          |         |
 cat    | text    |           |          |         |
 md5    | text    |           |          |         |
Server: arrow_fdw
FDW options: (file '/opt/nvme/t.arrow')

Of course, you can use CREATE FOREIGN TABLE command as usual, however, Arrow file contains its schema definition, and columns-list must match exactly. Our recommendation is IMPORT FOREIGN SCHEMA because it automatically generates foreign table definition.

Overview of SSDtoGPU Direct SQL on Arrow_Fdw


PG-Strom has a special storage optimization mode called SSD-to-GPU Direct SQL. It utilizes P2P DMA over PCIe bus, on top of GPUDirect RDMA kernel APIs provided by NVIDIA.
Usually, PostgreSQL loads table's contents onto RAM from the storage, then processes the loaded data. In typical analytics workloads, not a small portion of the loaded data is filtered out. In other words, we consumes the narrow storage bandwidth to carry junks; that is waste of I/O resources.
When PG-Strom uses SSD-to-GPU Direct SQL, it loads table's contents to GPU's device memory directly, then GPU runs SQL workloads using its thousands cores in parallel. It likely reduce amount of data to be loaded onto the host system much much smaller than the original source.

Even if data source are Arrow files, overall mechanism is same. PG-Strom loads only referenced column data using P2P DMA, then GPU processes SQL workloads (WHERE-clause/JOIN/GROUP BY). Unlike row-data of PostgreSQL tables, it does not load unreferenced columns.

Benchmark

We run the Star Schema Benchmark (SSBM) with scale-factor=401 as usual. Its database size is 353GB in total. We exported its lineorder table into an Arrow file on NVME-SSD volume.

model Supermicro SYS-1019GP-TT
CPU Intel Xeon Gold 6126T (2.6GHz, 12C) x1
RAM 192GB (32GB DDR4-2666 x6)
GPU NVIDIA Tesla V100 (5120C, 16GB) x1
SSD Intel SSD DC P4600 (HHHL; 2.0TB) x3
(striped by md-raid0)
HDD 2.0TB (SATA; 72krpm) x6
network 10Gb ethernet x2 ports
OS Ret Hat Enterprise Linux 7.6
CUDA 10.1 + NVIDIA Driver 418.40.04
DB PostgreSQL v11.2
PG-Strom v2.2devel

The benchmark results are below:
f:id:kaigai:20190430235505p:plain

Due to column-oriented data structure, performance is variable a lot depending on the number of referenced columns.
The query response time becomes much faster than the previous row-data based benchmark results; 7.2-7.9GB/s by PG-Strom and 2.2-2.3GB/s by PostgreSQL with parallel-scan. The query execution throughput is an inverse of query response time. So, 49GB/s means that summarizing of sequential-scan results on 300GB+ table is finished about 6sec. 25GB/s also means similar workloads are finished about 12sec. Once we can achieve this grade of batch processing performance in a single node, it changes some assumptions when we consider system configurations.

Validation of the benchmark results

For confirmation, we like to validate whether the benchmark results are reasonable. Below is definition of the flineorder foreign table that maps an Arrow file using Arrow_Fdw.
The above benchmark results say it runs Q2_1 in 28.7GB/s. The Q2_1 references lo_suppkey, lo_orderdate, lo_orderpriority and lo_supplycost.

 Foreign table "public.flineorder"
       Column       |     Type      | Size
--------------------+---------------+--------------------------------
 lo_orderkey        | numeric       | 35.86GB
 lo_linenumber      | integer       |  8.96GB
 lo_custkey         | numeric       | 35.86GB
 lo_partkey         | integer       |  8.96GB
 lo_suppkey         | numeric       | 35.86GB   <-- ★Referenced by Q2_1
 lo_orderdate       | integer       |  8.96GB   <-- ★Referenced by Q2_1
 lo_orderpriority   | character(15) | 33.61GB   <-- ★Referenced by Q2_1
 lo_shippriority    | character(1)  |  2.23GB
 lo_quantity        | integer       |  8.96GB
 lo_extendedprice   | bigint        | 17.93GB
 lo_ordertotalprice | bigint        | 17.93GB
 lo_discount        | integer       |  8.96GB
 lo_revenue         | bigint        | 17.93GB
 lo_supplycost      | bigint        | 17.93GB   <-- ★Referenced by Q2_1
 lo_tax             | integer       |  8.96GB
 lo_commit_date     | character(8)  | 17.93GB
 lo_shipmode        | character(10) | 22.41GB
FDW options: (file '/opt/nvme/lineorder_s401.arrow')  ... file size = 310GB

Total size of the referenced columns is 96.4GB of 310GB (31.1%). So, raw storage read throughput is 96.4GB / 11.06s = 8.7GB/s. This is reasonable performance for 3x Intel DC P4600 NVME-SSDs.

Future Plan: Towards 100GB/s

We can say the current status is "just workable", so we have to put further works to improvement software quality for production grade.

For more performance improvement, we like to have a benchmark activity using multi-GPU server like this.

This configuration uses Supermicro SYS-4029GP-TRT2 that is designed for HPC by RDMA optimization with PCIe switches. It allows to install up to 4x Tesla GPUs and 4x HBA card *1 to attach external NVME-SSD enclosure. An HBA card can connect 4x NVME-SSD, and can read the storage up to 12.8GB/s.
So, we can configure 4 of unit of 1xGPU + 4xSSD (12.8GB/s). It is 4xGPU + 16xSSD (51.2GB/s) in total.
If we can assume summarizing / analytic query read 1/3 columns in average, the upper limit of the query execution is 51.2GB/s / 0.33 = 150GB/s from the standpoint of raw storage performance.

Probably, 100GB/s is a feasible milestone for PostgreSQL, and we like to run the benchmark in 2019.

*1:One other option is 100Gb-NIC for NVME-over-Fabric. It is more scalable configuration.

Dive into Apache Arrow(その3)- SSD-to-GPU Direct SQL対応

ここ最近取り組んでいた Arrow_Fdw 機能がようやく動くようになったので、性能ベンチマークを行ってみた。
今回のエントリでは順を追って説明する事にしてみたい。

Arrow_Fdwとは

PostgreSQLにはFDW (Foreign Data Wrapper) という機能があり、PostgreSQL管理下にあるデータ(テーブルなど)だけでなく、外部のデータソースをあたかもテーブルであるかのように読み出す事ができる。一部のモジュールでは書込みにも対応している。
例えば、CSVファイルは構造化データを表現するための方法の一つである。もちろん、PostgreSQLの内部形式とデータ形式が違うので、そのままではコレをSQLの世界から取り扱う事ができないが、間にfile_fdwというモジュールが介在する事で、CSVPostgreSQLの内部形式へとデータ形式を変換している。

この枠組みは多種多様なデータソースに対して適用する事が可能で、リモートのPostgreSQLサーバをデータソースとするpostgres_fdwや、変わり種としては、Web APIを用いてTwitterのタイムラインからデータを取得するtwitter_fdw*1などがある。
これを利用してApache Arrow形式のファイルを外部テーブルにマップし、SQLから参照できるようにするのがArrow_Fdwモジュールである。

例えば、/opt/nvme/t.arrowというArrow形式ファイルをArrow_Fdwを用いてマップする場合は、以下の構文を用いる事ができる。

postgres=# IMPORT FOREIGN SCHEMA hoge
             FROM SERVER arrow_fdw
             INTO public OPTIONS (file '/opt/nvme/t.arrow');
IMPORT FOREIGN SCHEMA
postgres=# \d hoge
                   Foreign table "public.hoge"
 Column |  Type   | Collation | Nullable | Default | FDW options
--------+---------+-----------+----------+---------+-------------
 id     | integer |           |          |         |
 aid    | integer |           |          |         |
 bid    | integer |           |          |         |
 ymd    | date    |           |          |         |
 cat    | text    |           |          |         |
 md5    | text    |           |          |         |
Server: arrow_fdw
FDW options: (file '/opt/nvme/t.arrow')

もちろん、いつも通りにCREATE FOREIGN TABLE構文を用いてもよいが、Arrow形式ファイルにはそれ自体スキーマ定義が含まれており、列リストの定義がこれと厳密に一致していないとエラーとなるため、IMPORT FOREIGN SCHEMA構文を用いてArrow形式ファイルから自動的に外部テーブルの定義を作り出す方がお勧めである。

Apache Arrowファイル形式自体の説明は、こちらのエントリをご覧いただきたい。
kaigai.hatenablog.com

要は、列指向でデータが編成されているため、集計・解析系ワークロードに対して全てのデータをストレージから読み出す必要はなく、被参照列のデータのみを読み出せばよいためにI/Oを効率的に(高い密度で)行う事が可能となるという話である。

ベンチマーク

早速ベンチマークを行ってみる。使用したのは、いつもの1Uラックサーバ でスペックおよびシステム構成は以下の通り。

chassis Supermicro SYS-1019GP-TT
CPU Intel Xeon Gold 6126T (2.6GHz, 12C) x1
RAM 192GB (32GB DDR4-2666 x6)
GPU NVIDIA Tesla V100 (5120C, 16GB) x1
SSD Intel SSD DC P4600 (HHHL; 2.0TB) x3
(md-raid0によるストライピング構成)
HDD 2.0TB (SATA; 72krpm) x6
network 10Gb ethernet x2 ports
OS Ret Hat Enterprise Linux 7.6
CUDA 10.1 + NVIDIA Driver 418.40.04
DB PostgreSQL v11.2
PG-Strom v2.2devel

ベンチマークに使用したワークロードは、いつもの Star Schema Benchmark で、これも今まで通り scaling factor=401 なので、DBサイズは353GB。これをExt4で初期化したNVME-SSD区画上にArrow形式ファイルとして書き出すと約310GBであった。

実行結果は以下の通り。じゃん。

列指向データなので、クエリが参照する列の数と幅によって大幅にパフォーマンスが変わってくるのは致し方ないが、2.2~2.3GB/s前後であったPostgreSQL v11や、7.2~7.9GB/s前後であったPG-Strom v2.2develの結果と比べて、大幅に性能が改善されている事が分かる。
単なる逆数ではあるが、49GB/sというのは300GB超のテーブルに対する全件スキャン+集計が6秒台で返ってくる速度、25GB/sというのは12秒台で返ってくる速度なので、これ位のデータ処理能力をシングルノードで実現できるのであれば、システム構成を検討する時の様々な前提条件が変わってくるだろう。

こちらはもう一つ別の指標で。このベンチマークを採取するにあたり、Star Schema Benchmark 全13本のクエリを3回ずつ実行し、クエリ毎の最良値を使ってスループットを計算しているが、13本 x 3回 = 39回のクエリを実行するのに要した時間を積算したもの。

PG-Strom + Arrow_Fdwのケースだと、ほんの僅か、子供のオムツ替え程度の時間であるが、元々のPostgreSQLのケースでは2時間弱と長めのお昼寝程度の時間を要している。
しかもこれは全て同一のハードウェア、特にデータを全て高速なNVME-SSDに載せているがために、元々のPostgreSQLでも2.2~2.3GB/sのパフォーマンスが出ているが、既存システムでAll-Flashという構成は多くはないと考えられる。旧来の磁気ディスクを用いたDBシステムとの比較であれば、どの程度のスコアになるだろう??

測定結果の妥当性

なお、測定ミスなどであると恥ずかしいので、性能の妥当性を考察してみた。

以下は Arrow_Fdw を用いてArrow形式ファイルをマップした flineorder テーブルの列定義であるが、28.7GB/sのクエリ実行スループットを記録した Q2_1 を例に取ると、lo_suppkeylo_orderdatelo_orderpriorityおよびlo_supplycost列が参照されている。

 Foreign table "public.flineorder"
       Column       |     Type      | Size
--------------------+---------------+--------------------------------
 lo_orderkey        | numeric       | 35.86GB
 lo_linenumber      | integer       |  8.96GB
 lo_custkey         | numeric       | 35.86GB
 lo_partkey         | integer       |  8.96GB
 lo_suppkey         | numeric       | 35.86GB   <-- ★Q2_1で参照
 lo_orderdate       | integer       |  8.96GB   <-- ★Q2_1で参照
 lo_orderpriority   | character(15) | 33.61GB   <-- ★Q2_1で参照
 lo_shippriority    | character(1)  |  2.23GB
 lo_quantity        | integer       |  8.96GB
 lo_extendedprice   | bigint        | 17.93GB
 lo_ordertotalprice | bigint        | 17.93GB
 lo_discount        | integer       |  8.96GB
 lo_revenue         | bigint        | 17.93GB
 lo_supplycost      | bigint        | 17.93GB   <-- ★Q2_1で参照
 lo_tax             | integer       |  8.96GB
 lo_commit_date     | character(8)  | 17.93GB
 lo_shipmode        | character(10) | 22.41GB
FDW options: (file '/opt/nvme/lineorder_s401.arrow')  ... file size = 310GB

これら被参照列の行データサイズの合計は、310GB中96.4GBである。つまり、このクエリではファイル全体の31.1%だけしかデータを読み出す必要がなく、読出しサイズ(96.4GB)をQ2_1の実行時間11.06sで割ると、96.4÷11.06 = 8.7GB/s なので、Intel DC P4600 x3 の読出し速度としては概ね妥当な値という事になる。(一安心である)

今後の展望

現状「とりあえず動きました」という段階なので、まだこの後、試験や修正などに相応の時間がかかる事はご容赦いただきたい。

更なる性能改善に向け、今後やってみたいと考えているのが、マルチGPUを搭載したサーバによるベンチマーク

上記の構成イメージは、Supermicro社のHPC向けサーバ SYS-4029GP-TRT2に、Tesla GPUを4台と、外付けNVME-SSDのエンクロージャを接続するHBAカード*2を4台搭載する。このマシンはPCIeスイッチを搭載しているので、SSDGPU間で直接データ転送を行う場合にはCPUに負荷を与えない。

ブロック図で記載すると以下のようになる。GPU 1台あたりNVME-SSD 4台が一個のユニットとなり、一台のサーバの中でそれぞれが並列に動作する。

今回のベンチマークは、PCIeスイッチが無いとか、SSDの台数が3台だという細かな違いを除けば、概ねこちらの構成で1ユニット分の性能であると言う事ができるだろう。それで15GB/s~49GB/sの性能値を出せているという事は、期待値としてその4倍、シングルノード100GB/sの処理性能というのはあながち実現不可能な絵空事という訳でもないだろう。


こちらは、今年の正月に品川神社に奉納した絵馬。
ちょっと目標値の設定が低かったかもしれぬ。

*1:今でもメンテナンスされてるんだろうか?

*2:NVMEoFを使う場合、100Gb-NICでも可。むしろ巨大データだとこちらの方がスケーラビリティは高くなる。

Dive into Apache Arrow(その2)- pg2arrow

前回のエントリApache Arrow のフォーマットについて調べていたが、これのゴールは、外部テーブル(Foreign Table)を介してApache Arrowファイルを読み出し、高速に集計・解析処理を実行する事にある。
特にPG-Stromの場合はSSD-to-GPU Direct SQLという飛び道具が使えるため、NVME-SSD上のApache Arrowファイルを直接GPUへ転送し、列指向データをGPUの大量のコアでぐわーーっと処理するという構成が取れるハズである。

で、FDWモジュールがApache Arrowファイルを読むためには、まずメタデータを解読してどの列がどういったデータ型を持っており、どこにどういう形式で配置されているのか特定できる必要がある。
そのために書いたコードを元に、先ずPostgreSQLのデータをApache Arrowファイルとしてダンプするためのツールを作ってみた。

github.com

pg2arrowの使い方

ある程度 psqlpg_dump のオプションを参考にしたので、PostgreSQL使いの人ならそれほど違和感なく使えるはず。
データベースやユーザ名などの指定は共通。-c COMMAND-f FILENAMEで指定したSQLを実行し、その結果を-o FILENAMEで指定したファイルへ書き出す。

$ ./pg2arrow --help
Usage:
  pg2arrow [OPTION]... [DBNAME [USERNAME]]

General options:
  -d, --dbname=DBNAME     database name to connect to
  -c, --command=COMMAND   SQL command to run
  -f, --file=FILENAME     SQL command from file
      (-c and -f are exclusive, either of them must be specified)
  -o, --output=FILENAME   result file in Apache Arrow format
      (default creates a temporary file)

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each
      (default is 512MB)

Connection options:
  -h, --host=HOSTNAME     database server host
  -p, --port=PORT         database server port
  -U, --username=USERNAME database user name
  -w, --no-password       never prompt for password
  -W, --password          force password prompt

Debug options:
      --dump=FILENAME     dump information of arrow file
      --progress          shows progress of the job.

Report bugs to <pgstrom@heterodb.com>.

使用例

例として、hogehogeテーブルを日付順にソートしてダンプしてみる。

hogehogeテーブルの定義は以下の通り。c列は複合型(composite type)で、内部的にサブフィールドを持つ。

postgres=# \d hogehoge
                  Table "public.hogehoge"
 Column |       Type       | Collation | Nullable | Default
--------+------------------+-----------+----------+---------
 id     | integer          |           | not null |
 a      | bigint           |           | not null |
 b      | double precision |           | not null |
 c      | comp             |           |          |
 d      | text             |           |          |
 e      | double precision |           |          |
 ymd    | date             |           |          |
Indexes:
    "hogehoge_pkey" PRIMARY KEY, btree (id)

postgres=# \dS comp
                Composite type "public.comp"
 Column |       Type       | Collation | Nullable | Default
--------+------------------+-----------+----------+---------
 x      | integer          |           |          |
 y      | double precision |           |          |
 z      | numeric          |           |          |
 memo   | text             |           |          |

さっそく実行し、特にエラーもなく終了する。

$ ./pg2arrow postgres -o /tmp/hogehoge -c "SELECT * FROM hogehoge ORDER BY ymd"

PyArrowで結果を確認する

以下のように、PyArrowを用いて Apache Arrow 形式のファイルを読み出す事ができる。

>>> import pyarrow as pa
>>> X = pa.RecordBatchFileReader("/tmp/hogehoge").read_all()
>>> X.schema
id: int32
a: int64
b: double
c: struct<x: int32, y: double, z: decimal(30, 11), memo: string>
  child 0, x: int32
  child 1, y: double
  child 2, z: decimal(30, 11)
  child 3, memo: string
d: string
e: double
ymd: date32[day]

DBテーブルの定義に準じて、Apache Arrowとしてのスキーマが作られている事が分かる。

ちなみに、Pandasのread_sqlを使ってテーブルを読み出すと、複合型であるcは文字列型にされてしまっている。これは流石にチョットツライ。

>>> A = pd.read_sql(sql="SELECT * FROM hogehoge LIMIT 1000", con="postgresql://localhost/postgres")
>>> B = pa.Table.from_pandas(A)
>>> B.schema
id: int64
a: int64
b: double
c: string
d: string
e: double
ymd: date32[day]
__index_level_0__: int64
metadata
--------
{b'pandas': b'{"index_columns": ["__index_level_0__"], "column_indexes": [{"fi'
            b'eld_name": null, "name": null, "numpy_type": "object", "pandas_t'
            b'ype": "unicode", "metadata": {"encoding": "UTF-8"}}], "columns":'
            b' [{"field_name": "id", "name": "id", "numpy_type": "int64", "pan'
            b'das_type": "int64", "metadata": null}, {"field_name": "a", "name'
            b'": "a", "numpy_type": "int64", "pandas_type": "int64", "metadata'
            b'": null}, {"field_name": "b", "name": "b", "numpy_type": "float6'
            b'4", "pandas_type": "float64", "metadata": null}, {"field_name": '
            b'"c", "name": "c", "numpy_type": "object", "pandas_type": "unicod'
            b'e", "metadata": null}, {"field_name": "d", "name": "d", "numpy_t'
            b'ype": "object", "pandas_type": "unicode", "metadata": null}, {"f'
            b'ield_name": "e", "name": "e", "numpy_type": "float64", "pandas_t'
            b'ype": "float64", "metadata": null}, {"field_name": "ymd", "name"'
            b': "ymd", "numpy_type": "object", "pandas_type": "date", "metadat'
            b'a": null}, {"field_name": "__index_level_0__", "name": null, "nu'
            b'mpy_type": "int64", "pandas_type": "int64", "metadata": null}], '
            b'"pandas_version": "0.22.0"}'}

中身の方を見てみると、このような感じで正しくクエリの結果を保存できている事がわかる。

>>> X.to_pandas()
       id     a          b                                                  c  \
0      24  3379  96.200935  {'memo': '1ff1de774005f8da13f42943881c655f', '...
1    2041  2208  71.122772  {'memo': '3416a75f4cea9109507cacd8e2f2aefc', '...
2    2042  7040  54.081142  {'memo': 'a1d0c6e83f027327d8461063f4ac58a6', '...
3    2043  1635  92.302224  {'memo': '17e62166fc8586dfa4d1bc0e1742c08b', '...
4    2044  3295  58.273429  {'memo': 'f7177163c833dff4b38fc8d2872f1ec6', '...
5    2045  9671  58.085893  {'memo': '6c8349cc7260ae62e3b1396831a8398f', '...
        :          :              :
                                    d          e        ymd
0    f4c1893e352a4e08d1a3b3b444c2d692  34.916724 2025-08-08
1    13f5d46a9f51e30dac02b33b74e9043e  11.098757 2018-09-26
2    b041a9cb97b220c1b073266af31cb45f  81.570772 2025-03-02
3    77e882ed95bf5838abd1b4336a7d2fdc  16.990162 2016-12-29
4    f78d9fde23891ae293f07c576982155b   7.017451 2020-10-08
5    4dfe9131c7ee3a44583a8d21d5ca26a2   3.979350 2023-03-09

今後のロードマップ

ひとまず、pg2arrowを使う事で PostgreSQLApache Arrow へデータを変換する流れができた。
(と、同時に手を動かしてデータ形式を一通り勉強したといえる。えへん。)

次は本来の目標である、FDWを使って Apache Arrow ⇒ PostgreSQL へのデータの流れを作る事。
これによって、IoT/M2Mの領域で扱われるような大量データを、毎度DBへインポートする事なく集計・解析系のクエリで処理する事ができるようになる。

他に、既に個別に頂いているpg2arrowの拡張としては、既に存在するArrowファイルに差分だけを追加する--appendモードや、複数のArrowファイルをマージして一個の巨大なArrowファイルに再編する--mergeモードなど。この辺も追って作っていきたい。

Dive into Apache Arrow(その1)

Arrow_Fdwを作るモチベーション

昨年、かなり頑張ってマルチGPUや拡張I/Oボックスを使用してシングルノードのクエリ処理性能10GB/sを達成できた。ただ一方で、PG-StromがPostgreSQLのデータ構造をそのまま使えるという事は、トランザクショナルに蓄積されたデータをそのまま使えるという手軽さの一方で、どうしても行指向データに伴う非効率なI/Oが処理速度全体を律速してしまうという事になる。

昨年の10月頃から直接お会いした人にはお話していたが、現在、PG-StromでApache Arrow形式のファイルを扱うようにするための機能強化に取り組んでいる。目標としては、3月末には動かせる状態にしたいと思っているが。

Apache Arrow形式とは、Sparkの人がよく使っているデータ形式で、大量の構造化データを列指向で保持する事ができる。特定の行を更新したり削除したりといったDBらしい処理には全く向いていないが、例えば、時々刻々集まってくるログデータを集積し、それを集計するためのデータ構造としては優れた特性を持っている*1


データを列指向で持つ事で、実際にストレージから読み出すのは被参照列の内容のみで済む。これはSSD-to-GPUダイレクトで相当頑張ってるとはいえ、I/Oの絶対量を減らす事ができるので、やはり従来のやり方では破れなかった処理性能の壁を越えるためには必要なステップだろう。


PostgreSQLにはFDW (Foreign Data Wrapper) という機能があり、PostgreSQLの管理下にあるDBファイル "以外の" 外部データソースに対して、これを仲介するドライバさえ書いてやれば、あたかも PostgreSQL のテーブルであるかのように振る舞わせる事ができる。
外部データソースは、リモートのRDBMSでも構わないし、あるいはローカルのCSVファイルを読み出したり(file_fdw)、GPUバイスメモリを読み書きする(Gstore_fdw)ものも存在する。
Apache Arrowファイル形式の対応も、このFDWのフレームワーク上に構築するのが最も合理的なアプローチと考えており、その上で、GPUへのデータ供給にSSD-to-GPU Direct SQLを使うというのが青写真である。

Apache Arrowフォーマットの概要

Apache Arrowプロジェクトのリポジトリを眺めてみると、データ形式に関するドキュメントがいくつも並んでいる事が分かる。
https://github.com/apache/arrow/tree/master/docs/source/format

例えば、Layout.rstを参照すると、Apache Arrow形式では固定長、可変長、null値などのデータがどのように配置されているのか書いてある。
この辺は、クリアコードの須藤さんが12月の勉強会で発表された時の資料が素晴らしいので一読をお勧め。
www.clear-code.com

一方、FDWドライバを実装する人にとっては、これらのデータ位置をどのように特定するのか、各カラムがどのようなデータ型を持っているのかというメタデータの読み書きに興味がある訳だが、ちょっとドキュメントが心許ない。

https://github.com/apache/arrow/blob/master/docs/source/format/Metadata.rst
https://github.com/apache/arrow/blob/master/docs/source/format/IPC.rst

例えば、File formatとして以下のような記述があり、次いでSTREAMING FORMATの定義は・・・とブレークダウンして行くわけだが、正直なところ、これで実装できる気がまるでしない。
フォーマットの記述というのは『先頭からxxバイト目にどういうデータが入っていて、長さはxxバイト』みたいなものであってほしい(笑

<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
<STREAMING FORMAT>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">

という事で、実際に Apache Arrow ファイルを作成し、C++C#のライブラリを眺めつつ、プロセッサの気持ちになってバイナリを読み解いて行く事にした。

Apache Arrowフォーマットを読み解く

Apache Arrow データを作成する。

以下のように、Pandas経由でSQLテーブルを読み出し、これをPyArrowを使って/tmp/mytest.arrowへ書き出す。

$ python3.5
import pyarrow as pa
import pandas as pd
X = pd.read_sql(sql="SELECT * FROM tbl LIMIT 3000", con="postgresql://localhost/postgres")
Y = pa.Table.from_pandas(X)
f = pa.RecordBatchFileWriter('/tmp/mytest.arrow', Y.schema)
f.write_table(Y, 1200)
f.close()

tblテーブルは以下のような内容で、id, aid, bidはint型、catはtext型で、xとyがfloat型である。*2

postgres=# select * from tbl limit 5;
 id | cat |  aid  |  bid  |        x         |        y
----+-----+-------+-------+------------------+------------------
  1 | aaa | 53132 | 29127 | 34.4543647952378 | 82.5439349282533
  2 | rrr | 52654 | 11461 | 81.7723278421909 | 11.2835586536676
  3 | nnn | 36869 | 61278 | 51.7565622925758 | 95.9137627854943
  4 | kkk | 82770 | 23100 | 98.3701516408473 |  39.805391151458
  5 | ttt | 14176 | 49035 | 73.1081502977759 |  50.415129121393
(5 rows)

このような感じでデータが生成される。

$ ls -l /tmp/mytest.arrow
-rw-r--r--. 1 kaigai users 176650 Jan 14 10:01 /tmp/mytest.arrow
$ cat /tmp/mytest.arrow | od -t x1 -Ax | head
000000 41 52 52 4f 57 31 00 00 9c 05 00 00 10 00 00 00
000010 00 00 0a 00 0e 00 06 00 05 00 08 00 0a 00 00 00
000020 00 01 03 00 10 00 00 00 00 00 0a 00 0c 00 00 00
000030 04 00 08 00 0a 00 00 00 f4 03 00 00 04 00 00 00
000040 01 00 00 00 0c 00 00 00 08 00 0c 00 04 00 08 00
000050 08 00 00 00 08 00 00 00 10 00 00 00 06 00 00 00
000060 70 61 6e 64 61 73 00 00 bd 03 00 00 7b 22 63 6f
000070 6c 75 6d 6e 73 22 3a 20 5b 7b 22 66 69 65 6c 64
000080 5f 6e 61 6d 65 22 3a 20 22 69 64 22 2c 20 22 70
000090 61 6e 64 61 73 5f 74 79 70 65 22 3a 20 22 69 6e

Flat Bufferによるエンコード

IPC - File Formatには、ファイル形式は以下の通りであると記述されている。

<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
<STREAMING FORMAT>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">

さらに、Streaming Formatは以下の通り。

<SCHEMA>
<DICTIONARY 0>
...
<DICTIONARY k - 1>
<RECORD BATCH 0>
...
<DICTIONARY x DELTA>
...
<DICTIONARY y DELTA>
...
<RECORD BATCH n - 1>
<EOS [optional]: int32>

そうすると、ファイル先頭"ARROW1\0\0"の直後にSCHEMAの定義が来ると思って読んでみる。

まず先頭 8 バイト(41 52 52 4f 57 31 00 00)は"ARROW1\0\0"なのでこれで良し。
また、次の4バイト(9c 05 00 00)はメタデータサイズなのでこれも良し。

$ cat /tmp/mytest.arrow | od -t x1 -Ax | head
000000 41 52 52 4f 57 31 00 00 9c 05 00 00 10 00 00 00
000010 00 00 0a 00 0e 00 06 00 05 00 08 00 0a 00 00 00

しかし、それ以降がおかしい。format/Schema.fbsを参照すると、SCHEMAの先頭はバイトオーダを示すendiannessという、0か1のshort値*3であるはずなのに、10 00という値になってしまっている。

どういう事か?

実はApache Arrowのファイル形式は、Google FlatBuffersというSerialization/Deserializationのための仕組みを使っており、SCHEMAやRECORD BATCHといったメッセージチャンクのデータ定義が、そのままバイナリ上での並びになっている訳ではない。

https://github.com/dvidelabs/flatcc/blob/master/doc/binary-format.md#example
ココに簡単な説明が載っているが、要は簡単なインデックスを作って、1番目のデータは基準点からxxバイト後ろ、2番目のデータは基準点からxxバイト後ろ、・・・という事である。

実際のデータを使って順に読み解いていく。

$ cat /tmp/mytest.arrow | od -t x1 -Ax | head
000000  41 52 52 4f 57 31 00 00  9c 05 00 00 10 00 00 00
000010  00 00 0a 00 0e 00 06 00  05 00 08 00 0a 00 00 00
000020  00 01 03 00 10 00 00 00  00 00 0a 00 0c 00 00 00
000030  04 00 08 00 0a 00 00 00  f4 03 00 00 04 00 00 00
000040  01 00 00 00 0c 00 00 00  08 00 0c 00 04 00 08 00
000050  08 00 00 00 08 00 00 00  10 00 00 00 06 00 00 00
000060  70 61 6e 64 61 73 00 00  bd 03 00 00 7b 22 63 6f
000070  6c 75 6d 6e 73 22 3a 20  5b 7b 22 66 69 65 6c 64
000080  5f 6e 61 6d 65 22 3a 20  22 69 64 22 2c 20 22 70
000090  61 6e 64 61 73 5f 74 79  70 65 22 3a 20 22 69 6e

FlatBufferメッセージの定義は以下の通り。headerはUnionなので、実際のオブジェクト型を示す1byteのメッセージタイプと実際のオブジェクトへのポインタから成る。つまり、Messageは4つのフィールドを持っている。

union MessageHeader {
  Schema, DictionaryBatch, RecordBatch, Tensor, SparseTensor
}

table Message {
  version: org.apache.arrow.flatbuf.MetadataVersion;
  header: MessageHeader;
  bodyLength: long;
}

まず先頭8byteはシグニチャ"ARROW1\0\0"なのでスキップ。次の4byte9c 05 00 00メタデータのサイズ。
その次の4byte10 00 00 00がFlatBufferメッセージへのオフセットなので、0x000c + 0x0010 = 0x001c を参照する。

0x001cの値は0a 00 00 00なので、vtableの開始位置は0x001c - 0x000a = 0x0012となる。
書き下すと以下のようになり、バイナリ上でのデータの並びが構造体の定義と一致していない事が分かる。
また、4番目のフィールドbodyLengthは省略されているが、これはデフォルトの0である事を意味する。

table:
  0x001c   0a 00 00 00     ;  32bit negative offset to vtable
                           ;  0x001c - 0x000a = 0x0012
  0x0021   01              ; field-1 / Message Header = Schema
  0x0022   03 00           ; field-0 / Metadata Version = V4
  0x0024   10 00 00 00     ; field-2 / offset to Message Body (=0x0024 + 0x0010)

vtable:
  0x0012   0a 00           ; vtable length = 10bytes / 3 items
  0x0014   0e 00           ; table length = 14 bytes (including the negative offset)
  0x0016   06 00           ; field id 0:  (version; short)
  0x0018   05 00           ; field id 1:  (MessageHeader; byte)
  0x001a   08 00           ; field id 2:  (offset to Message Body; int)

これを読む事で、先頭のFlatBufferメッセージにはSCHEMAが格納されており、その実体は 0x0034 にある事が分かった。
次いで、0x0034から始まるバイナリを読んでいく。SCHEMAの定義は以下の通り。

table Schema {

  /// endianness of the buffer
  /// it is Little Endian by default
  /// if endianness doesn't match the underlying system then the vectors need to be converted
  endianness: Endianness=Little;

  fields: [Field];
  // User-defined metadata
  custom_metadata: [ KeyValue ];
}

0x0034の値は0a 00 00 00なので、同様にvtableの開始位置は0x0034 - 0x000a = 0x002aから。

table:
  0x0034   0a 00 00 00      ;  32bit negative offset to vtable
                            ;  0x0034 - 0x000a = 0x002a
  0x0038   f4 03 00 00      ; field-id 1: offset to [fields] vector 
  0x003c   04 00 00 00      ; field-id 2: offset to [custom_metadata] vector

vtable:
  0x002a   0a 00            ; vtable length = 10bytes / 3 items
  0x002c   0c 00            ; table length = 12bytes (including the negative offset)
  0x002e   00 00            ; field id 0: (endianness; short)
  0x0030   04 00            ; field id 1: (offset to [fields] vector)
  0x0032   08 00            ; field id 2: (offset to [custom_metadata] vector)

ここで、field-id 0のendiannessのインデックスが0になっている。これはデフォルト値である事を示し、値が 0 (= Little Endian) として扱って構わない事を意味する。
[fields]と[custom_metadata]はベクトル値、即ち配列で、ここから更に指定されたオフセットを参照する事になる。

先ず、SCHEMAに含まれる列のデータ型を示すFieldの配列は0x0038 + 0x03f4 = 0x42c に格納されているのでこれを参照する。
Fieldの定義は以下の通り。

table Field {
  // Name is not required, in i.e. a List
  name: string;
  nullable: bool;
  // This is the type of the decoded value if the field is dictionary encoded
  type: Type;

  // Present only if the field is dictionary encoded
  dictionary: DictionaryEncoding;

  // children apply only to Nested data types like Struct, List and Union
  children: [Field];

  // User-defined metadata
  custom_metadata: [ KeyValue ];
}

0x042cの値は07 00 00 00である。つまり、この後に続く7個のint32値は、Fieldを参照するオフセットである。

000420  62 6a 65 63 74 22 7d 5d  7d 00 00 00 07 00 00 00
000430  40 01 00 00 04 01 00 00  d4 00 00 00 a4 00 00 00
000440  70 00 00 00 44 00 00 00  04 00 00 00 ec fe ff ff
000450  00 00 01 02 1c 00 00 00  0c 00 00 00 04 00 00 00
000460  00 00 00 00 dc fe ff ff  00 00 00 01 40 00 00 00
000470  11 00 00 00 5f 5f 69 6e  64 65 78 5f 6c 65 76 65
000480  6c 5f 30 5f 5f 00 00 00  28 ff ff ff 00 00 01 03
000490  18 00 00 00 0c 00 00 00  04 00 00 00 00 00 00 00
    :
fields[0] = 0x0430 + 0x0140 = 0x0570
fields[1] = 0x0434 + 0x0104 = 0x0538
fields[2] = 0x0438 + 0x00d4 = 0x050c
fields[3] = 0x043c + 0x00a4 = 0x04e0
fields[4] = 0x0440 + 0x0070 = 0x04b0
fields[5] = 0x0444 + 0x0044 = 0x0488
fields[6] = 0x0448 + 0x0004 = 0x044c

例えばfields[6]の場合、tableのアドレスは0x044cでvtableへのオフセットは0xfffffeec なので、0x044c - 0xfffffeec = 0x0560である。
一方、fields[5]の場合、tableのアドレスは0x0488でvtableへのオフセットは0xffffff28なので、0x0488 - 0xffffff28 = 0x0560となる。

最初、これを見た時に ( ゚Д゚)ハァ? となったが、実はよく考えられていて、vtableが指し示すオフセットはあくまでtableからの相対値なので、同じデータ構造を持つ fields[*] 配列の場合は vtable が共有可能で、table以降のバイト列だけが違っているという構造になる。

(注)これに気が付いた時のTweet

したがって、fields[6] の table と vtable は以下のようになる。

table:
  0x044c   ec fe ff ff      ;  32bit negative offset to vtable
                            ;  0x044c - 0xfffffeec = 0x0560
  0x0452   01               ; field-id 1: nullable = true 
  0x0453   02               ; field-id 2: type-id = Int
  0x0454   1c 00 00 00      ; field-id 0: offset to 'name' string (at 0x0470) 
  0x0458   0c 00 00 00      ; field-id 3: offset to type definition (at 0x0464)
  0x045c   04 00 00 00      ; field-id 5: offset to [children] vector 

vtable:
  0x0560   10 00            ; vtable length = 16bytes / 6 items
  0x0562   14 00            ; table length = 20bytes (including the negative offset)
  0x0564   08 00            ; field id 0: (name: offset to string)
  0x0566   06 00            ; field id 1: (nullable: bool)
  0x0568   07 00            ; field id 2: (type id; byte)
  0x056a   0c 00            ; field id 3: (offset to Type definition)
  0x056c   00 00            ; field id 4: (offset to dictionary; = NULL)
  0x056e   10 00            ; field id 5: (offset to [children] vector)

fields[5] の vtable は共通で、tableは以下のようになる。fields[6]はInt型のフィールドを表現しているが、fields[5]はFloat型のフィールドを表現している事がわかる。

table:
  0x0488   28 ff ff ff      ;  32bit negative offset to vtable
                            ;  0x0488 - 0xffffff28 = 0x0560
  0x0452   01               ; field-id 1: nullable = true 
  0x0453   03               ; field-id 2: type-id = Float
  0x0454   18 00 00 00      ; field-id 0: offset to 'name' string 
  0x0458   0c 00 00 00      ; field-id 3: offset to type definition
  0x045c   04 00 00 00      ; field-id 5: offset to [children] vector 

続いて、fields[6] の名前と型定義を見てみる事にする。

000460  00 00 00 00 dc fe ff ff  00 00 00 01 40 00 00 00
000470  11 00 00 00 5f 5f 69 6e  64 65 78 5f 6c 65 76 65
000480 6c 5f 30 5f 5f 00 00 00 28 ff ff ff 00 00 01 03

まず、列名は 0x0470 から始まる。Stringの場合、int32の文字列長に続いて '\0' 終端の文字列が格納される事になるため、0x0470から始まるfields[6]の列名は 0x0011 バイトの長さがあり、文字列自体は 0x0474 から始まる。
5f 5f 69 6e 64 65 78 5f 6c 65 76 65 6c 5f 30 5f 5f 00 00 00をASCII文字列に直すと"__index_level_0__\0"となる。こんなカラムは元のSQL結果には存在しないので、おそらくPandasが挿入したものなのだろう。

列定義は0x0464のtable参照となる。Intの定義は以下の通りなので、これまで同様に vtable = 0x464 - 0xfffffedc = 0x588 を参照する。

table Int {
  bitWidth: int; // restricted to 8, 16, 32, and 64 in v1
  is_signed: bool;
}
table:
  0x0464   dc fe ff ff      ;  32bit negative offset to vtable
                            ;  0x0464 - 0xfffffedc = 0x0588
  0x046b   01               ; field-id 1: is_signed = true 
  0x046c   40 00 00 00      ; field-id 0: bitWidth = 64

vtable:
  0x0588   08 00            ; vtable length = 8bytes / 2 items
  0x0562   0c 00            ; table length = 12bytes (including the negative offset)
  0x0564   08 00            ; field id 0: (bitWidth: int)
  0x0566   07 00            ; field id 1: (is_signed: bool)

これによって、fields[6]のデータ型は符号付き、64bit長のIntである事が分かった。

Arrowファイルは尻から読む

そろそろバイナリを人力で追っていくのが辛くなってきたので、勉強も兼ねてパーサのプログラムを作ってみた。

github.com

名前からして本来は PostgreSQL のテーブルを Apache Arrow 形式でダンプするツールなのだが、書き出す部分はまだ実装できていない。

とりあえず、デバッグ用機能であるところの--dumpオプションを用いて先ほどのArrow形式ファイルをダンプしてやると次のようになる。
(見やすさのため、適宜改行・インデントを加えている)

$ ./pg2arrow --dump /tmp/mytest.arrow
[Footer]
{Footer: version=3,
  schema={Schema: endianness=little, fields=[
   {Field: name=id, nullable=true, type={Int64}, children=[], custom_metadata=[]},
   {Field: name=cat, nullable=true, type={Utf8}, children=[], custom_metadata=[]},
   {Field: name=aid, nullable=true, type={Int64}, children=[], custom_metadata=[]},
   {Field: name=bid, nullable=true, type={Int64}, children=[], custom_metadata=[]},
   {Field: name=x, nullable=true, type={Float64}, children=[], custom_metadata=[]},
   {Field: name=y, nullable=true, type={Float64}, children=[], custom_metadata=[]},
   {Field: name=__index_level_0__, nullable=true, type={Int64}, children=[], custom_metadata=[]}
  ],
  custom_metadata [
   {KeyValue: key=(pandas), value=({"columns": [{"field_name": "id", "pandas_type": "int64", "name": "id", "metadata": null, "numpy_type": "int64"}, {"field_name": "cat", "pandas_type": "unicode", "name": "cat", "metadata": null, "numpy_type": "object"}, {"field_name": "aid", "pandas_type": "int64", "name": "aid", "metadata": null, "numpy_type": "int64"}, {"field_name": "bid", "pandas_type": "int64", "name": "bid", "metadata": null, "numpy_type": "int64"}, {"field_name": "x", "pandas_type": "float64", "name": "x", "metadata": null, "numpy_type": "float64"}, {"field_name": "y", "pandas_type": "float64", "name": "y", "metadata": null, "numpy_type": "float64"}, {"field_name": "__index_level_0__", "pandas_type": "int64", "name": null, "metadata": null, "numpy_type": "int64"}], "pandas_version": "0.22.0", "index_columns": ["__index_level_0__"], "column_indexes": [{"field_name": null, "pandas_type": "unicode", "name": null, "metadata": {"encoding": "UTF-8"}, "numpy_type": "object"}]})}
  ]},
  dictionaries=[],
  recordBatches=[
   {Block: offset=1448, metaDataLength=448 bodyLength=73256},
   {Block: offset=75152, metaDataLength=448 bodyLength=66056},
   {Block: offset=141656, metaDataLength=448 bodyLength=33008}
  ]
}

[Record Batch 0]
{Message:
  version=3,
  body={RecordBatch: length=1200, nodes=[
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0}
        ], buffers=[
         {Buffer: offset=0, length=0},
         {Buffer: offset=0, length=9600},
         {Buffer: offset=9600, length=0},
         {Buffer: offset=9600, length=12008},
         {Buffer: offset=21608, length=3648},
         {Buffer: offset=25256, length=0},
         {Buffer: offset=25256, length=9600},
         {Buffer: offset=34856, length=0},
         {Buffer: offset=34856, length=9600},
         {Buffer: offset=44456, length=0},
         {Buffer: offset=44456, length=9600},
         {Buffer: offset=54056, length=0},
         {Buffer: offset=54056, length=9600},
         {Buffer: offset=63656, length=0},
         {Buffer: offset=63656, length=9600}
        ]},
  bodyLength=73256
}
        :
        :
[Record Batch 2]
{Message:
  version=3,
  body={RecordBatch: length=600, nodes=[
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0}
        ], buffers=[
         {Buffer: offset=0, length=0},
         {Buffer: offset=0, length=4800},
         {Buffer: offset=4800, length=0},
         {Buffer: offset=4800, length=2408},
         {Buffer: offset=7208, length=1800},
         {Buffer: offset=9008, length=0},
         {Buffer: offset=9008, length=4800},
         {Buffer: offset=13808, length=0},
         {Buffer: offset=13808, length=4800},
         {Buffer: offset=18608, length=0},
         {Buffer: offset=18608, length=4800},
         {Buffer: offset=23408, length=0},
         {Buffer: offset=23408, length=4800},
         {Buffer: offset=28208, length=0},
         {Buffer: offset=28208, length=4800}
        ]
       },
  bodyLength=33008
}

Arrow形式の場合、例えば一個のファイルに100万行のレコードが含まれていたとしても、個々のカラムが例えば100万要素の単純配列になる…訳ではなく、RecordBatchという単位で内部的には10万行×10個のセグメントに分けて保存する事ができる。

最初、ファイルのフォーマットを頭から見ていた時に『はて?ファイル内のRecordBatchの数や位置はどこを見れば書いてあるのだろう?』と不思議に思っていたが、File FormatのFOOTER部分を先に読むのだという事に気が付いて疑問が解消した。

おさらいになるが、ファイルにはこのようにデータが書かれている。

<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
<STREAMING FORMAT>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">

つまり、ファイル末尾 6bytes にはシグニチャ"ARROW1"が書き込まれており、その前4byteにはFOOTER部分のサイズが書かれている。

という事は、(File Size) - 6bytes - sizeof(int32) の場所を読めばFooterの位置を特定でき、これを元に、ファイル内の RecordBatch の場所や数を特定できることになる。

struct Block {
  offset: long;
  metaDataLength: int;
  bodyLength: long;
}

table Footer {
  version: org.apache.arrow.flatbuf.MetadataVersion;
  schema: org.apache.arrow.flatbuf.Schema;
  dictionaries: [ Block ];
  recordBatches: [ Block ];
}

なるほど確かに、ヘッダではなくフッタ側にファイル内の各セグメントへのインデックスを持っておくようにすれば、データを追記(この場合 RecordBatch の追加)する時も、既に書き込まれた RecordBatch を変更する事なく、元々 FOOTER が書き込まれていた場所を上書きして新しい FOOTER を作成すれば、最小限の I/O コストでデータの追記ができるようになる。

ファイルのメタ情報を頭に持っておくというのは、磁気ディスクか、もしかすると磁気テープの時代の設計の名残なのかもとか思ったり思わなかったりする40歳。

追記

(15-Jan-2019)
注釈で『Endianを示すのにshort型というのはいかがなものか』と書いたところ、以下のような指摘を頂いた。

確かに。Table/Vtableを辿ってデータ構造をdeserializeする部分はFlatBufferの領分なので、所詮はペイロードの一部でしかない "endianness" の値によって挙動が変わるはずもなく、決め打ちで Little Endian という事であれば指摘のような問題は起こらない。

*1:さらに更新/削除がない分、通常のテーブルスキャンでは必須のMVCC可視性チェックが不要になる。これはSSD-to-GPU Direct SQLを実行する上では非常に助かる

*2:が、Pandas/PyArrowに取り込んだ時にint型がint64型になってしまった。他にも、複合型データがString扱いされてしまったり、ちょっと色々アレ。

*3:Endianを示すのにshort型というのはいかがなものか。Big Endianでshort型の1を書き込むと 00 01 になってしまい、Little Endian機では256と読めてしまう。byte値にしとけばよかったのに…。

PL/CUDAを使ってロジスティック回帰分析を実装してみた

PostgreSQL Advent Calendar 2018の6日目です。

PG-Stromはアナリティクス向けにPL/CUDAというユーザ定義SQL関数を実装する機能を持っており、SQL処理の中で計算ヘビーな部分をCUDA Cで記述したGPUプログラムで実行させるという事ができる。
SQL関数としてPL/CUDA関数を呼び出すと、その引数は自動的にGPUメモリへロードされ、またGPUプログラムの処理結果は自動的にSQLの処理に渡される事になる。したがって。プログラマはデータの管理をDBに任せ、DBからデータを出し入れする事なく、GPUによる並列計算を自然な形でクエリに埋め込む事が可能となる。

なお、ここで言う”計算ヘビー”というのは、JOINやGROUP BY、SORTなどがCPUインテンシブな処理である、等々とは少し計算量の桁が違う話である事には留意していただきたい。
例えば、二年前のこのエントリは創薬領域での類似化合物探索のワークロードで、DBに登録された化合物同士の類似度をGPUで計算しているが、mその計算すべき類似度の組合せは100億通りもあった。
kaigai.hatenablog.com

で、時々お問合せを頂いていたのが『PL/CUDAでSQL関数を定義するにあたって、何かサンプルプログラムはないか?』という質問。
確かに、パブリックになっているPL/CUDA関数のサンプルはなかったので、折角なのでPGconf.ASIA 2018のLTあたりのネタにすべく、比較的簡単なアルゴリズムをPL/CUDAで実装してみた。

ソースコードはこちら。
https://github.com/heterodb/toybox/tree/master/logistic_regression

ロジスティック回帰分析

今回のテーマに選んでみたのは、ロジスティック回帰分析。

このアルゴリズムは、説明変数 x_j (j=1,...,m)とパラメータw_j (j=0,...,m)を、累積確率関数であるロジスティック関数\sigma(\alpha)=\frac{1}{1-e^{-\alpha}}に通して二値分類を行うためのもの。
学習フェーズでは、トレーニングセットのデータを用いてパラメータを推定する事となり、教師データである従属変数t_i (i=1,...,n)が実現する確率を最大化するパラメータを推定する必要がある。

なお、\alpha = w_0 + \sum_{j=1}^{m} w_j x_jとし、従属変数t_i = 1 である場合の確率をP(X) = \sigma(\alpha)としている。

あまりこの辺を踏み込んで書けるほど数学に強いわけではないので、詳しくはこちらの本をご参考に(逃げた)

ITエンジニアのための機械学習理論入門

ITエンジニアのための機械学習理論入門

最小二乗法の場合とは異なり、ロジスティック回帰分析のパラメータは解析的に数式を解いて求める事はできない。
そのため、実際に説明変数と従属変数から成るトレーニングセットをモデルに当てはめて計算し、その誤差を徐々に小さくしていくよう繰り返し計算を行っていく以外にパラメータを推定する方法がなく、それゆえに計算量が大きくなってしまう。

途中の説明は書籍に任せるとして、最終的には、以下の行列式を繰り返し解いてパラメータを更新していく事になる。

w_{new} = w_{old} - (\Phi^{T}R\Phi)^{-1}\Phi^{T}(z - t)

各変数の意味は以下の通り
レーニングセット(従属変数):
 t=(t_1,...,t_n)
レーニングセット(説明変数):
 \Phi = \left(
\begin{array}{cccc}
1 & x_{11} & \ldots & x_{1m} \\
1 & x_{21} & \ldots & x_{2m}  \\
\vdots & \vdots & \ddots & \vdots  \\
1 & x_{n1} & \ldots & x_{nm}
 \end{array}  \right)
累積確率関数:
z = (z_1,...,z_n)(但し、z_i = \sigma(w^T \Phi_i))と、
それを対角成分に並べたもの:
 R = diag[z_1(1-z_1), ..., z_n(1-z_n)]

仮定としては、説明変数の数はそれほど多くない(数個~数百程度)に対し、トレーニングセットの数は数百万~になり得る。
そうすると、計算量として比較的大きくなるのはi = 1,...,n方向の行列積で、ここは数千コアを持つGPUの得意分野になってくる。

PL/CUDAでの実装を掘り下げる

前振りが長くなったが、ソースコードを見てみる事にする。
https://github.com/heterodb/toybox/tree/master/logistic_regression

エントリポイント

まず、PL/CUDA関数のエントリポイントは#plcuda_begin#plcuda_endで挟まれた区間である。

CREATE OR REPLACE FUNCTION
logregr_train(reggstore,      -- source table (only Gstore foreign table)
              smallint,       -- column number of dependent variable
              smallint[],     -- columns number of independent variables
              int = 20,       -- max number of iteration
              real = 0.0001)  -- threashold to stop iteration
RETURNS real[]
AS $$
   :
#plcuda_begin
   /*
    * ここにエントリポイントの処理を記述する
    */
#plcuda_end
$$ LANGUAGE 'plcuda' STRICT;

ロジスティック回帰の学習関数logregr_trainは、第1引数がトレーニングセットを含むGstore_Fdw外部テーブル*1への参照で、第2引数は従属変数の列番号、第3引数は説明変数の列番号(配列)で、残りはパラメータ推定を繰り返す回数の最大値と、繰り返しを打ち切る閾値*2の設定。

エントリポイント内では、smallintの引数はshort型で、realの引数はfloat型でというように、それぞれSQLの型に対応するCUDA Cのデータ型でarg1arg2...というように引数を参照する事ができる。

例えば、以下のコードはsmallint[]の配列であるarg3を参照し、指定された列がreal型であるかどうかをチェックしている部分である。
型が一致しない場合、EEXITマクロでスクリプトを終了させ、これはPostgreSQL側でエラーとしてユーザへ報告される。

    /* set of simple array of independent data */
    if (!VALIDATE_ARRAY_VECTOR_TYPE_STRICT(arg3, PG_INT2OID))
        EEXIT("independent variables must be 'smallint[]'");
    iattrs = (VectorTypeShort *)arg3;
    rc = cudaMallocManaged(&Xp, sizeof(float *) * iattrs->height);
    if (rc != cudaSuccess)
        CUEXIT(rc, "failed on cudaMallocManaged");
    for (i=0; i < iattrs->height; i++)
    {
        j = iattrs->values[i];
        if (j <= 0 || j > kds_h->ncols)
            EEXIT("independent variable is out of range");
        cmeta = &kds_h->colmeta[j-1];
        if (cmeta->atttypid != PG_FLOAT4OID)
            EEXIT("independent variables must be 'real'");
        Xp[i] = (float *)((char *)kds_d + __kds_unpack(cmeta->va_offset));
    }

既にGPUへロード済みのデータをreggstore型の引数で参照する場合だけは少々特殊で、以下のように定義された GstoreIpcMapping構造体へのポインタとなる。mapメンバはGPUバイスメモリのポインタなので、ホストコードで触ってはいけない。*3

typedef struct
{
    GstoreIpcHandle h;      /* unique identifier of Gstore_Fdw */
    void           *map;    /* mapped device address
                             * in the CUDA program context */
} GstoreIpcMapping;
宣言部

CUDA CでGPUカーネルを呼び出す場合、例えばmy_kernel<<<10,1024>>>(x, y, z)というように、起動すべきブロック数/スレッド数を指定してデバイス関数をコールする。
つまり、エントリポイント内にベタっと処理を記述するだけで不十分で、少なくともGPUカーネル関数をどこかに定義しなければPL/CUDA関数を利用する意味はあまりない。
それを行うのが、#plcuda_decl以降のブロックで、ここに記述したホスト関数/デバイス関数をエントリポイントから呼び出すというのが一般的なPL/CUDA関数の実装という事になる。

例えば、以下のlogregt_update_Zというデバイス関数は、上記の累積確率関数:z = (z_1,...,z_n) の値を格納するVectorTypeFloat型((PostgreSQLreal[]型とバイナリ互換な単純配列))のバッファを更新するためのもので、これを(ブロック数×ブロックサイズ)個のスレッドで並列に実行する。
例えばブロック数40、ブロックサイズ1024の場合、get_global_id()は0~40959の間の値を返し、全てのスレッドが処理を完了した時にはZ->values[]配列には\frac{1}{1-e^{-\alpha}}の計算結果が格納されている事になる。

KERNEL_FUNCTION_MAXTHREADS(void)
logregr_update_Z(VectorTypeFloat *Z,
                 cl_float  **Xp,
                 VectorTypeFloat *W)
{
    cl_uint        nitems = Z->height;
    cl_uint        width = W->height;
    cl_uint        i, j;

    for (i = get_global_id(); i < nitems; i += get_global_size())
    {
        cl_float    x, w, sum = 0.0;
        for (j=0; j < width; j++)
        {
            w = W->values[j];
            x = (j == 0 ? 1.0 : Xp[j-1][i]);
            sum += w * x;
        }
        Z->values[i] = 1.0 / (1.0 + __expf(-sum));
    }
}

このように定義されたデバイス関数を以下のように繰り返し呼び出す事で、徐々にロジスティック回帰のパラメータを更新し、目的とする値に近付くというステップを踏む。

    for (loop=0; loop < nloops; loop++)
    {
               :
        /* compute Z vector */
        logregr_update_Z<<<gridSz_Z, blockSz>>>(Z, Xp, W);
               :
        /* compute P matrix */
        logregr_update_P<<<gridSz_P, blockSz>>>(Preg, Xp, width, Z);
               :
        cudaStreamSynchronize(NULL);
               :
    }
CUDA用ライブラリの利用

ところで、この繰り返し計算の過程では、単純な行列・ベクトル積だけでなく、一ヵ所逆行列を計算する箇所があった事を思い出してほしい。
このように典型的な計算であれば、CUDA Toolkitでライブラリが提供されている場合があり、逆行列の計算であればBlasのCUDA版であるcuBlasを使う事ができる。

ライブラリ関数を使うには、まず普通に#include ...でヘッダファイルをインクルードすると共に、GPUバイナリをビルドする時にリンクするライブラリを指定する必要がある。

以下のように#plcuda_libraryでライブラリ名を指定すると、

#plcuda_library cublas
#plcuda_decl
#include <cublas_v2.h>
#include "cuda_matrix.h"
     :

これはGPUバイナリをビルドする時に

% nvcc .... -lcublas

と展開され、ライブラリ関数をリンクする。

若干、引数が多くてアレなのだが、今回の実装でも以下のcuBlas関数を呼び出して逆行列を計算している。

        /* compute P-inverse */
        status = cublasDgetrfBatched(handle,
                                     width,
                                     Preg,
                                     width,
                                     Pivot,
                                     Info,
                                     1);
        if (status != CUBLAS_STATUS_SUCCESS)
            EEXIT("failed on cublasSgetrfBatched: %d", (int)status);
        status = cublasDgetriBatched(handle,
                                     width,
                                     Preg,
                                     width,
                                     Pivot,
                                     Pinv,
                                     width,
                                     Info,
                                     1);
        if (status != CUBLAS_STATUS_SUCCESS)
            EEXIT("failed on cublasSgetriBatched: %d", (int)status);

CPU実装(MADLib)との比較

最後に、CPU実装と比べてどれくらい速くなっているのかを調べるため、MADLibのロジスティック回帰と性能値を比較してみる事にした。

とは言っても、これを使って何を分類するか、というのはあまり興味がないので、ひとまずランダムに4000万件程度のデータを生成し、人為的に1+2x_1-3x_2+x_3+0.5x_4 > 0である場合を真(true)、それ以外を偽(false)とするテストデータを作って、それを学習させてみる事にする。

postgres=# CREATE TABLE logreg (
                    t  bool,
                    x1 float,
                    x2 float,
                    x3 float,
                   x4 float );
CREATE TABLE

postgres=# INSERT INTO logreg (SELECT (1.0+2.0*x1-3.0*x2+x3+0.5*x4) > 0 t, x1, x2, x3, x4
                                 FROM (SELECT random() x1,
                                              random() x2,
                                              random() x3,
                                              random() x4
                                         FROM generate_series(1,40000000)) x);
INSERT 0 40000000

次に、このテストデータをGPUバイスメモリにロードする。
PL/CUDA関数はreal型を前提にしているので、説明変数のデータ型はrealへキャストする。

postgres=# CREATE FOREIGN TABLE ft (
               t   bool,
               x1  real,
               x2  real,
               x3  real,
               x4  real
           ) SERVER gstore_fdw
             OPTIONS (pinning '0');
CREATE FOREIGN TABLE

postgres=# INSERT INTO ft (SELECT * FROM logreg);
INSERT 0 40000000

bool(1byte) + float(4bytes)*4 = 17bytes
17bytes * 40M行 = 約680MB
バイス管理用に必要な約120MBと併せて、計800MBのデータがGPUにロードされているのが分かる。

[kaigai@saba src]$ nvidia-smi
Thu Dec  6 12:10:56 2018
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 410.72       Driver Version: 410.72       CUDA Version: 10.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P40           Off  | 00000000:02:00.0 Off |                  N/A |
| N/A   42C    P0    52W / 250W |    817MiB / 22919MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     27650      C   ...bgworker: PG-Strom GPU memory keeper      807MiB |
+-----------------------------------------------------------------------------+

PL/CUDA関数を呼び出す。attnum_of()は指定された列番号を返すユーティリティ関数。

postgres=# SELECT logregr_train('ft',
                                attnum_of('ft','t'),
                                attnums_of('ft','{x1,x2,x3,x4}'));
              logregr_train
------------------------------------------
 {3376.4,6752.71,-10129.1,3376.3,1688.27}
(1 row)

Time: 3647.059 ms (00:03.647)

約3.6secで完了。パラメータである w ベクトルを推定しているので、5個の要素が含まれている。

次に、MADLibのロジスティック回帰のトレーニング。(関数名や引数に与えるパラメータはこれを参考にした)

postgres=# SELECT madlib.logregr_train('logreg','hoge', 't','ARRAY[1,x1,x2,x3,x4]',NULL, 20);
 logregr_train
---------------

(1 row)

Time: 1301307.361 ms (21:41.307)

21分41秒。元々の実装がそこまで最適化されているか微妙なところではあるものの、広く使われている実装と比べて350倍速く実行できたのはGPUの面目躍如と言ったところ。

postgres=# SELECT coef FROM hoge;
                                          coef
------------------------------------------------------
 {3041.82722783601,6083.57794939209,-9125.44857123801,3041.73992459095,1520.98287953044}
(1 row)

一応、学習結果であるパラメータを比べてみると

俺々実装 MADLib
w0 3376.4 3041.83
w1 6752.71 6083.58
w2 -10129.1 -9125.45
w3 3376.3 3041.74
w4 1688.27 1520.98

それほど遜色ない値となっている。

これを、説明変数と学習パラメータを引数に取るlogregr_predict関数に与えて(人為的に作ったデータなので)きちんと分類できている事を示そうと思ったが、残念ながらgit addを忘れてgit cleanしてしまい、推論系の実装は闇に葬られてしまった。

こちらはPGconf.ASIAのLTでご報告しようと思うので、少々お待ちを。

明日は @hmatsu47 の『Aurora PostgreSQL 互換版(9.6.9)でフェイルオーバー時にバッファ/キャッシュの生きのこり方を雑に確かめてみた』です。

*1:Gstore_Fdw: GPUバイスメモリの読書きを可能にするFDW

*2:パラメータの変動は\frac{\|w_{new} - w_{old}\|^2}{\|w_{old}\|^2}で計算します

*3:Managed MemoryというPage Faultハンドラを使ってCPU/GPU間でページを入れ替える機能があるが、CUDAがプロセス間通信に対応してくれていない。早く何とかならないものか。