Dive into Apache Arrow(その1)

Arrow_Fdwを作るモチベーション

昨年、かなり頑張ってマルチGPUや拡張I/Oボックスを使用してシングルノードのクエリ処理性能10GB/sを達成できた。ただ一方で、PG-StromがPostgreSQLのデータ構造をそのまま使えるという事は、トランザクショナルに蓄積されたデータをそのまま使えるという手軽さの一方で、どうしても行指向データに伴う非効率なI/Oが処理速度全体を律速してしまうという事になる。

昨年の10月頃から直接お会いした人にはお話していたが、現在、PG-StromでApache Arrow形式のファイルを扱うようにするための機能強化に取り組んでいる。目標としては、3月末には動かせる状態にしたいと思っているが。

Apache Arrow形式とは、Sparkの人がよく使っているデータ形式で、大量の構造化データを列指向で保持する事ができる。特定の行を更新したり削除したりといったDBらしい処理には全く向いていないが、例えば、時々刻々集まってくるログデータを集積し、それを集計するためのデータ構造としては優れた特性を持っている*1


データを列指向で持つ事で、実際にストレージから読み出すのは被参照列の内容のみで済む。これはSSD-to-GPUダイレクトで相当頑張ってるとはいえ、I/Oの絶対量を減らす事ができるので、やはり従来のやり方では破れなかった処理性能の壁を越えるためには必要なステップだろう。


PostgreSQLにはFDW (Foreign Data Wrapper) という機能があり、PostgreSQLの管理下にあるDBファイル "以外の" 外部データソースに対して、これを仲介するドライバさえ書いてやれば、あたかも PostgreSQL のテーブルであるかのように振る舞わせる事ができる。
外部データソースは、リモートのRDBMSでも構わないし、あるいはローカルのCSVファイルを読み出したり(file_fdw)、GPUバイスメモリを読み書きする(Gstore_fdw)ものも存在する。
Apache Arrowファイル形式の対応も、このFDWのフレームワーク上に構築するのが最も合理的なアプローチと考えており、その上で、GPUへのデータ供給にSSD-to-GPU Direct SQLを使うというのが青写真である。

Apache Arrowフォーマットの概要

Apache Arrowプロジェクトのリポジトリを眺めてみると、データ形式に関するドキュメントがいくつも並んでいる事が分かる。
https://github.com/apache/arrow/tree/master/docs/source/format

例えば、Layout.rstを参照すると、Apache Arrow形式では固定長、可変長、null値などのデータがどのように配置されているのか書いてある。
この辺は、クリアコードの須藤さんが12月の勉強会で発表された時の資料が素晴らしいので一読をお勧め。
www.clear-code.com

一方、FDWドライバを実装する人にとっては、これらのデータ位置をどのように特定するのか、各カラムがどのようなデータ型を持っているのかというメタデータの読み書きに興味がある訳だが、ちょっとドキュメントが心許ない。

https://github.com/apache/arrow/blob/master/docs/source/format/Metadata.rst
https://github.com/apache/arrow/blob/master/docs/source/format/IPC.rst

例えば、File formatとして以下のような記述があり、次いでSTREAMING FORMATの定義は・・・とブレークダウンして行くわけだが、正直なところ、これで実装できる気がまるでしない。
フォーマットの記述というのは『先頭からxxバイト目にどういうデータが入っていて、長さはxxバイト』みたいなものであってほしい(笑

<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
<STREAMING FORMAT>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">

という事で、実際に Apache Arrow ファイルを作成し、C++C#のライブラリを眺めつつ、プロセッサの気持ちになってバイナリを読み解いて行く事にした。

Apache Arrowフォーマットを読み解く

Apache Arrow データを作成する。

以下のように、Pandas経由でSQLテーブルを読み出し、これをPyArrowを使って/tmp/mytest.arrowへ書き出す。

$ python3.5
import pyarrow as pa
import pandas as pd
X = pd.read_sql(sql="SELECT * FROM tbl LIMIT 3000", con="postgresql://localhost/postgres")
Y = pa.Table.from_pandas(X)
f = pa.RecordBatchFileWriter('/tmp/mytest.arrow', Y.schema)
f.write_table(Y, 1200)
f.close()

tblテーブルは以下のような内容で、id, aid, bidはint型、catはtext型で、xとyがfloat型である。*2

postgres=# select * from tbl limit 5;
 id | cat |  aid  |  bid  |        x         |        y
----+-----+-------+-------+------------------+------------------
  1 | aaa | 53132 | 29127 | 34.4543647952378 | 82.5439349282533
  2 | rrr | 52654 | 11461 | 81.7723278421909 | 11.2835586536676
  3 | nnn | 36869 | 61278 | 51.7565622925758 | 95.9137627854943
  4 | kkk | 82770 | 23100 | 98.3701516408473 |  39.805391151458
  5 | ttt | 14176 | 49035 | 73.1081502977759 |  50.415129121393
(5 rows)

このような感じでデータが生成される。

$ ls -l /tmp/mytest.arrow
-rw-r--r--. 1 kaigai users 176650 Jan 14 10:01 /tmp/mytest.arrow
$ cat /tmp/mytest.arrow | od -t x1 -Ax | head
000000 41 52 52 4f 57 31 00 00 9c 05 00 00 10 00 00 00
000010 00 00 0a 00 0e 00 06 00 05 00 08 00 0a 00 00 00
000020 00 01 03 00 10 00 00 00 00 00 0a 00 0c 00 00 00
000030 04 00 08 00 0a 00 00 00 f4 03 00 00 04 00 00 00
000040 01 00 00 00 0c 00 00 00 08 00 0c 00 04 00 08 00
000050 08 00 00 00 08 00 00 00 10 00 00 00 06 00 00 00
000060 70 61 6e 64 61 73 00 00 bd 03 00 00 7b 22 63 6f
000070 6c 75 6d 6e 73 22 3a 20 5b 7b 22 66 69 65 6c 64
000080 5f 6e 61 6d 65 22 3a 20 22 69 64 22 2c 20 22 70
000090 61 6e 64 61 73 5f 74 79 70 65 22 3a 20 22 69 6e

Flat Bufferによるエンコード

IPC - File Formatには、ファイル形式は以下の通りであると記述されている。

<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
<STREAMING FORMAT>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">

さらに、Streaming Formatは以下の通り。

<SCHEMA>
<DICTIONARY 0>
...
<DICTIONARY k - 1>
<RECORD BATCH 0>
...
<DICTIONARY x DELTA>
...
<DICTIONARY y DELTA>
...
<RECORD BATCH n - 1>
<EOS [optional]: int32>

そうすると、ファイル先頭"ARROW1\0\0"の直後にSCHEMAの定義が来ると思って読んでみる。

まず先頭 8 バイト(41 52 52 4f 57 31 00 00)は"ARROW1\0\0"なのでこれで良し。
また、次の4バイト(9c 05 00 00)はメタデータサイズなのでこれも良し。

$ cat /tmp/mytest.arrow | od -t x1 -Ax | head
000000 41 52 52 4f 57 31 00 00 9c 05 00 00 10 00 00 00
000010 00 00 0a 00 0e 00 06 00 05 00 08 00 0a 00 00 00

しかし、それ以降がおかしい。format/Schema.fbsを参照すると、SCHEMAの先頭はバイトオーダを示すendiannessという、0か1のshort値*3であるはずなのに、10 00という値になってしまっている。

どういう事か?

実はApache Arrowのファイル形式は、Google FlatBuffersというSerialization/Deserializationのための仕組みを使っており、SCHEMAやRECORD BATCHといったメッセージチャンクのデータ定義が、そのままバイナリ上での並びになっている訳ではない。

https://github.com/dvidelabs/flatcc/blob/master/doc/binary-format.md#example
ココに簡単な説明が載っているが、要は簡単なインデックスを作って、1番目のデータは基準点からxxバイト後ろ、2番目のデータは基準点からxxバイト後ろ、・・・という事である。

実際のデータを使って順に読み解いていく。

$ cat /tmp/mytest.arrow | od -t x1 -Ax | head
000000  41 52 52 4f 57 31 00 00  9c 05 00 00 10 00 00 00
000010  00 00 0a 00 0e 00 06 00  05 00 08 00 0a 00 00 00
000020  00 01 03 00 10 00 00 00  00 00 0a 00 0c 00 00 00
000030  04 00 08 00 0a 00 00 00  f4 03 00 00 04 00 00 00
000040  01 00 00 00 0c 00 00 00  08 00 0c 00 04 00 08 00
000050  08 00 00 00 08 00 00 00  10 00 00 00 06 00 00 00
000060  70 61 6e 64 61 73 00 00  bd 03 00 00 7b 22 63 6f
000070  6c 75 6d 6e 73 22 3a 20  5b 7b 22 66 69 65 6c 64
000080  5f 6e 61 6d 65 22 3a 20  22 69 64 22 2c 20 22 70
000090  61 6e 64 61 73 5f 74 79  70 65 22 3a 20 22 69 6e

FlatBufferメッセージの定義は以下の通り。headerはUnionなので、実際のオブジェクト型を示す1byteのメッセージタイプと実際のオブジェクトへのポインタから成る。つまり、Messageは4つのフィールドを持っている。

union MessageHeader {
  Schema, DictionaryBatch, RecordBatch, Tensor, SparseTensor
}

table Message {
  version: org.apache.arrow.flatbuf.MetadataVersion;
  header: MessageHeader;
  bodyLength: long;
}

まず先頭8byteはシグニチャ"ARROW1\0\0"なのでスキップ。次の4byte9c 05 00 00メタデータのサイズ。
その次の4byte10 00 00 00がFlatBufferメッセージへのオフセットなので、0x000c + 0x0010 = 0x001c を参照する。

0x001cの値は0a 00 00 00なので、vtableの開始位置は0x001c - 0x000a = 0x0012となる。
書き下すと以下のようになり、バイナリ上でのデータの並びが構造体の定義と一致していない事が分かる。
また、4番目のフィールドbodyLengthは省略されているが、これはデフォルトの0である事を意味する。

table:
  0x001c   0a 00 00 00     ;  32bit negative offset to vtable
                           ;  0x001c - 0x000a = 0x0012
  0x0021   01              ; field-1 / Message Header = Schema
  0x0022   03 00           ; field-0 / Metadata Version = V4
  0x0024   10 00 00 00     ; field-2 / offset to Message Body (=0x0024 + 0x0010)

vtable:
  0x0012   0a 00           ; vtable length = 10bytes / 3 items
  0x0014   0e 00           ; table length = 14 bytes (including the negative offset)
  0x0016   06 00           ; field id 0:  (version; short)
  0x0018   05 00           ; field id 1:  (MessageHeader; byte)
  0x001a   08 00           ; field id 2:  (offset to Message Body; int)

これを読む事で、先頭のFlatBufferメッセージにはSCHEMAが格納されており、その実体は 0x0034 にある事が分かった。
次いで、0x0034から始まるバイナリを読んでいく。SCHEMAの定義は以下の通り。

table Schema {

  /// endianness of the buffer
  /// it is Little Endian by default
  /// if endianness doesn't match the underlying system then the vectors need to be converted
  endianness: Endianness=Little;

  fields: [Field];
  // User-defined metadata
  custom_metadata: [ KeyValue ];
}

0x0034の値は0a 00 00 00なので、同様にvtableの開始位置は0x0034 - 0x000a = 0x002aから。

table:
  0x0034   0a 00 00 00      ;  32bit negative offset to vtable
                            ;  0x0034 - 0x000a = 0x002a
  0x0038   f4 03 00 00      ; field-id 1: offset to [fields] vector 
  0x003c   04 00 00 00      ; field-id 2: offset to [custom_metadata] vector

vtable:
  0x002a   0a 00            ; vtable length = 10bytes / 3 items
  0x002c   0c 00            ; table length = 12bytes (including the negative offset)
  0x002e   00 00            ; field id 0: (endianness; short)
  0x0030   04 00            ; field id 1: (offset to [fields] vector)
  0x0032   08 00            ; field id 2: (offset to [custom_metadata] vector)

ここで、field-id 0のendiannessのインデックスが0になっている。これはデフォルト値である事を示し、値が 0 (= Little Endian) として扱って構わない事を意味する。
[fields]と[custom_metadata]はベクトル値、即ち配列で、ここから更に指定されたオフセットを参照する事になる。

先ず、SCHEMAに含まれる列のデータ型を示すFieldの配列は0x0038 + 0x03f4 = 0x42c に格納されているのでこれを参照する。
Fieldの定義は以下の通り。

table Field {
  // Name is not required, in i.e. a List
  name: string;
  nullable: bool;
  // This is the type of the decoded value if the field is dictionary encoded
  type: Type;

  // Present only if the field is dictionary encoded
  dictionary: DictionaryEncoding;

  // children apply only to Nested data types like Struct, List and Union
  children: [Field];

  // User-defined metadata
  custom_metadata: [ KeyValue ];
}

0x042cの値は07 00 00 00である。つまり、この後に続く7個のint32値は、Fieldを参照するオフセットである。

000420  62 6a 65 63 74 22 7d 5d  7d 00 00 00 07 00 00 00
000430  40 01 00 00 04 01 00 00  d4 00 00 00 a4 00 00 00
000440  70 00 00 00 44 00 00 00  04 00 00 00 ec fe ff ff
000450  00 00 01 02 1c 00 00 00  0c 00 00 00 04 00 00 00
000460  00 00 00 00 dc fe ff ff  00 00 00 01 40 00 00 00
000470  11 00 00 00 5f 5f 69 6e  64 65 78 5f 6c 65 76 65
000480  6c 5f 30 5f 5f 00 00 00  28 ff ff ff 00 00 01 03
000490  18 00 00 00 0c 00 00 00  04 00 00 00 00 00 00 00
    :
fields[0] = 0x0430 + 0x0140 = 0x0570
fields[1] = 0x0434 + 0x0104 = 0x0538
fields[2] = 0x0438 + 0x00d4 = 0x050c
fields[3] = 0x043c + 0x00a4 = 0x04e0
fields[4] = 0x0440 + 0x0070 = 0x04b0
fields[5] = 0x0444 + 0x0044 = 0x0488
fields[6] = 0x0448 + 0x0004 = 0x044c

例えばfields[6]の場合、tableのアドレスは0x044cでvtableへのオフセットは0xfffffeec なので、0x044c - 0xfffffeec = 0x0560である。
一方、fields[5]の場合、tableのアドレスは0x0488でvtableへのオフセットは0xffffff28なので、0x0488 - 0xffffff28 = 0x0560となる。

最初、これを見た時に ( ゚Д゚)ハァ? となったが、実はよく考えられていて、vtableが指し示すオフセットはあくまでtableからの相対値なので、同じデータ構造を持つ fields[*] 配列の場合は vtable が共有可能で、table以降のバイト列だけが違っているという構造になる。

(注)これに気が付いた時のTweet

したがって、fields[6] の table と vtable は以下のようになる。

table:
  0x044c   ec fe ff ff      ;  32bit negative offset to vtable
                            ;  0x044c - 0xfffffeec = 0x0560
  0x0452   01               ; field-id 1: nullable = true 
  0x0453   02               ; field-id 2: type-id = Int
  0x0454   1c 00 00 00      ; field-id 0: offset to 'name' string (at 0x0470) 
  0x0458   0c 00 00 00      ; field-id 3: offset to type definition (at 0x0464)
  0x045c   04 00 00 00      ; field-id 5: offset to [children] vector 

vtable:
  0x0560   10 00            ; vtable length = 16bytes / 6 items
  0x0562   14 00            ; table length = 20bytes (including the negative offset)
  0x0564   08 00            ; field id 0: (name: offset to string)
  0x0566   06 00            ; field id 1: (nullable: bool)
  0x0568   07 00            ; field id 2: (type id; byte)
  0x056a   0c 00            ; field id 3: (offset to Type definition)
  0x056c   00 00            ; field id 4: (offset to dictionary; = NULL)
  0x056e   10 00            ; field id 5: (offset to [children] vector)

fields[5] の vtable は共通で、tableは以下のようになる。fields[6]はInt型のフィールドを表現しているが、fields[5]はFloat型のフィールドを表現している事がわかる。

table:
  0x0488   28 ff ff ff      ;  32bit negative offset to vtable
                            ;  0x0488 - 0xffffff28 = 0x0560
  0x0452   01               ; field-id 1: nullable = true 
  0x0453   03               ; field-id 2: type-id = Float
  0x0454   18 00 00 00      ; field-id 0: offset to 'name' string 
  0x0458   0c 00 00 00      ; field-id 3: offset to type definition
  0x045c   04 00 00 00      ; field-id 5: offset to [children] vector 

続いて、fields[6] の名前と型定義を見てみる事にする。

000460  00 00 00 00 dc fe ff ff  00 00 00 01 40 00 00 00
000470  11 00 00 00 5f 5f 69 6e  64 65 78 5f 6c 65 76 65
000480 6c 5f 30 5f 5f 00 00 00 28 ff ff ff 00 00 01 03

まず、列名は 0x0470 から始まる。Stringの場合、int32の文字列長に続いて '\0' 終端の文字列が格納される事になるため、0x0470から始まるfields[6]の列名は 0x0011 バイトの長さがあり、文字列自体は 0x0474 から始まる。
5f 5f 69 6e 64 65 78 5f 6c 65 76 65 6c 5f 30 5f 5f 00 00 00をASCII文字列に直すと"__index_level_0__\0"となる。こんなカラムは元のSQL結果には存在しないので、おそらくPandasが挿入したものなのだろう。

列定義は0x0464のtable参照となる。Intの定義は以下の通りなので、これまで同様に vtable = 0x464 - 0xfffffedc = 0x588 を参照する。

table Int {
  bitWidth: int; // restricted to 8, 16, 32, and 64 in v1
  is_signed: bool;
}
table:
  0x0464   dc fe ff ff      ;  32bit negative offset to vtable
                            ;  0x0464 - 0xfffffedc = 0x0588
  0x046b   01               ; field-id 1: is_signed = true 
  0x046c   40 00 00 00      ; field-id 0: bitWidth = 64

vtable:
  0x0588   08 00            ; vtable length = 8bytes / 2 items
  0x0562   0c 00            ; table length = 12bytes (including the negative offset)
  0x0564   08 00            ; field id 0: (bitWidth: int)
  0x0566   07 00            ; field id 1: (is_signed: bool)

これによって、fields[6]のデータ型は符号付き、64bit長のIntである事が分かった。

Arrowファイルは尻から読む

そろそろバイナリを人力で追っていくのが辛くなってきたので、勉強も兼ねてパーサのプログラムを作ってみた。

github.com

名前からして本来は PostgreSQL のテーブルを Apache Arrow 形式でダンプするツールなのだが、書き出す部分はまだ実装できていない。

とりあえず、デバッグ用機能であるところの--dumpオプションを用いて先ほどのArrow形式ファイルをダンプしてやると次のようになる。
(見やすさのため、適宜改行・インデントを加えている)

$ ./pg2arrow --dump /tmp/mytest.arrow
[Footer]
{Footer: version=3,
  schema={Schema: endianness=little, fields=[
   {Field: name=id, nullable=true, type={Int64}, children=[], custom_metadata=[]},
   {Field: name=cat, nullable=true, type={Utf8}, children=[], custom_metadata=[]},
   {Field: name=aid, nullable=true, type={Int64}, children=[], custom_metadata=[]},
   {Field: name=bid, nullable=true, type={Int64}, children=[], custom_metadata=[]},
   {Field: name=x, nullable=true, type={Float64}, children=[], custom_metadata=[]},
   {Field: name=y, nullable=true, type={Float64}, children=[], custom_metadata=[]},
   {Field: name=__index_level_0__, nullable=true, type={Int64}, children=[], custom_metadata=[]}
  ],
  custom_metadata [
   {KeyValue: key=(pandas), value=({"columns": [{"field_name": "id", "pandas_type": "int64", "name": "id", "metadata": null, "numpy_type": "int64"}, {"field_name": "cat", "pandas_type": "unicode", "name": "cat", "metadata": null, "numpy_type": "object"}, {"field_name": "aid", "pandas_type": "int64", "name": "aid", "metadata": null, "numpy_type": "int64"}, {"field_name": "bid", "pandas_type": "int64", "name": "bid", "metadata": null, "numpy_type": "int64"}, {"field_name": "x", "pandas_type": "float64", "name": "x", "metadata": null, "numpy_type": "float64"}, {"field_name": "y", "pandas_type": "float64", "name": "y", "metadata": null, "numpy_type": "float64"}, {"field_name": "__index_level_0__", "pandas_type": "int64", "name": null, "metadata": null, "numpy_type": "int64"}], "pandas_version": "0.22.0", "index_columns": ["__index_level_0__"], "column_indexes": [{"field_name": null, "pandas_type": "unicode", "name": null, "metadata": {"encoding": "UTF-8"}, "numpy_type": "object"}]})}
  ]},
  dictionaries=[],
  recordBatches=[
   {Block: offset=1448, metaDataLength=448 bodyLength=73256},
   {Block: offset=75152, metaDataLength=448 bodyLength=66056},
   {Block: offset=141656, metaDataLength=448 bodyLength=33008}
  ]
}

[Record Batch 0]
{Message:
  version=3,
  body={RecordBatch: length=1200, nodes=[
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0},
         {FieldNode: length=1200, null_count=0}
        ], buffers=[
         {Buffer: offset=0, length=0},
         {Buffer: offset=0, length=9600},
         {Buffer: offset=9600, length=0},
         {Buffer: offset=9600, length=12008},
         {Buffer: offset=21608, length=3648},
         {Buffer: offset=25256, length=0},
         {Buffer: offset=25256, length=9600},
         {Buffer: offset=34856, length=0},
         {Buffer: offset=34856, length=9600},
         {Buffer: offset=44456, length=0},
         {Buffer: offset=44456, length=9600},
         {Buffer: offset=54056, length=0},
         {Buffer: offset=54056, length=9600},
         {Buffer: offset=63656, length=0},
         {Buffer: offset=63656, length=9600}
        ]},
  bodyLength=73256
}
        :
        :
[Record Batch 2]
{Message:
  version=3,
  body={RecordBatch: length=600, nodes=[
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0},
         {FieldNode: length=600, null_count=0}
        ], buffers=[
         {Buffer: offset=0, length=0},
         {Buffer: offset=0, length=4800},
         {Buffer: offset=4800, length=0},
         {Buffer: offset=4800, length=2408},
         {Buffer: offset=7208, length=1800},
         {Buffer: offset=9008, length=0},
         {Buffer: offset=9008, length=4800},
         {Buffer: offset=13808, length=0},
         {Buffer: offset=13808, length=4800},
         {Buffer: offset=18608, length=0},
         {Buffer: offset=18608, length=4800},
         {Buffer: offset=23408, length=0},
         {Buffer: offset=23408, length=4800},
         {Buffer: offset=28208, length=0},
         {Buffer: offset=28208, length=4800}
        ]
       },
  bodyLength=33008
}

Arrow形式の場合、例えば一個のファイルに100万行のレコードが含まれていたとしても、個々のカラムが例えば100万要素の単純配列になる…訳ではなく、RecordBatchという単位で内部的には10万行×10個のセグメントに分けて保存する事ができる。

最初、ファイルのフォーマットを頭から見ていた時に『はて?ファイル内のRecordBatchの数や位置はどこを見れば書いてあるのだろう?』と不思議に思っていたが、File FormatのFOOTER部分を先に読むのだという事に気が付いて疑問が解消した。

おさらいになるが、ファイルにはこのようにデータが書かれている。

<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
<STREAMING FORMAT>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">

つまり、ファイル末尾 6bytes にはシグニチャ"ARROW1"が書き込まれており、その前4byteにはFOOTER部分のサイズが書かれている。

という事は、(File Size) - 6bytes - sizeof(int32) の場所を読めばFooterの位置を特定でき、これを元に、ファイル内の RecordBatch の場所や数を特定できることになる。

struct Block {
  offset: long;
  metaDataLength: int;
  bodyLength: long;
}

table Footer {
  version: org.apache.arrow.flatbuf.MetadataVersion;
  schema: org.apache.arrow.flatbuf.Schema;
  dictionaries: [ Block ];
  recordBatches: [ Block ];
}

なるほど確かに、ヘッダではなくフッタ側にファイル内の各セグメントへのインデックスを持っておくようにすれば、データを追記(この場合 RecordBatch の追加)する時も、既に書き込まれた RecordBatch を変更する事なく、元々 FOOTER が書き込まれていた場所を上書きして新しい FOOTER を作成すれば、最小限の I/O コストでデータの追記ができるようになる。

ファイルのメタ情報を頭に持っておくというのは、磁気ディスクか、もしかすると磁気テープの時代の設計の名残なのかもとか思ったり思わなかったりする40歳。

追記

(15-Jan-2019)
注釈で『Endianを示すのにshort型というのはいかがなものか』と書いたところ、以下のような指摘を頂いた。

確かに。Table/Vtableを辿ってデータ構造をdeserializeする部分はFlatBufferの領分なので、所詮はペイロードの一部でしかない "endianness" の値によって挙動が変わるはずもなく、決め打ちで Little Endian という事であれば指摘のような問題は起こらない。

*1:さらに更新/削除がない分、通常のテーブルスキャンでは必須のMVCC可視性チェックが不要になる。これはSSD-to-GPU Direct SQLを実行する上では非常に助かる

*2:が、Pandas/PyArrowに取り込んだ時にint型がint64型になってしまった。他にも、複合型データがString扱いされてしまったり、ちょっと色々アレ。

*3:Endianを示すのにshort型というのはいかがなものか。Big Endianでshort型の1を書き込むと 00 01 になってしまい、Little Endian機では256と読めてしまう。byte値にしとけばよかったのに…。

PL/CUDAを使ってロジスティック回帰分析を実装してみた

PostgreSQL Advent Calendar 2018の6日目です。

PG-Stromはアナリティクス向けにPL/CUDAというユーザ定義SQL関数を実装する機能を持っており、SQL処理の中で計算ヘビーな部分をCUDA Cで記述したGPUプログラムで実行させるという事ができる。
SQL関数としてPL/CUDA関数を呼び出すと、その引数は自動的にGPUメモリへロードされ、またGPUプログラムの処理結果は自動的にSQLの処理に渡される事になる。したがって。プログラマはデータの管理をDBに任せ、DBからデータを出し入れする事なく、GPUによる並列計算を自然な形でクエリに埋め込む事が可能となる。

なお、ここで言う”計算ヘビー”というのは、JOINやGROUP BY、SORTなどがCPUインテンシブな処理である、等々とは少し計算量の桁が違う話である事には留意していただきたい。
例えば、二年前のこのエントリは創薬領域での類似化合物探索のワークロードで、DBに登録された化合物同士の類似度をGPUで計算しているが、mその計算すべき類似度の組合せは100億通りもあった。
kaigai.hatenablog.com

で、時々お問合せを頂いていたのが『PL/CUDAでSQL関数を定義するにあたって、何かサンプルプログラムはないか?』という質問。
確かに、パブリックになっているPL/CUDA関数のサンプルはなかったので、折角なのでPGconf.ASIA 2018のLTあたりのネタにすべく、比較的簡単なアルゴリズムをPL/CUDAで実装してみた。

ソースコードはこちら。
https://github.com/heterodb/toybox/tree/master/logistic_regression

ロジスティック回帰分析

今回のテーマに選んでみたのは、ロジスティック回帰分析。

このアルゴリズムは、説明変数 x_j (j=1,...,m)とパラメータw_j (j=0,...,m)を、累積確率関数であるロジスティック関数\sigma(\alpha)=\frac{1}{1-e^{-\alpha}}に通して二値分類を行うためのもの。
学習フェーズでは、トレーニングセットのデータを用いてパラメータを推定する事となり、教師データである従属変数t_i (i=1,...,n)が実現する確率を最大化するパラメータを推定する必要がある。

なお、\alpha = w_0 + \sum_{j=1}^{m} w_j x_jとし、従属変数t_i = 1 である場合の確率をP(X) = \sigma(\alpha)としている。

あまりこの辺を踏み込んで書けるほど数学に強いわけではないので、詳しくはこちらの本をご参考に(逃げた)

ITエンジニアのための機械学習理論入門

ITエンジニアのための機械学習理論入門

最小二乗法の場合とは異なり、ロジスティック回帰分析のパラメータは解析的に数式を解いて求める事はできない。
そのため、実際に説明変数と従属変数から成るトレーニングセットをモデルに当てはめて計算し、その誤差を徐々に小さくしていくよう繰り返し計算を行っていく以外にパラメータを推定する方法がなく、それゆえに計算量が大きくなってしまう。

途中の説明は書籍に任せるとして、最終的には、以下の行列式を繰り返し解いてパラメータを更新していく事になる。

w_{new} = w_{old} - (\Phi^{T}R\Phi)^{-1}\Phi^{T}(z - t)

各変数の意味は以下の通り
レーニングセット(従属変数):
 t=(t_1,...,t_n)
レーニングセット(説明変数):
 \Phi = \left(
\begin{array}{cccc}
1 & x_{11} & \ldots & x_{1m} \\
1 & x_{21} & \ldots & x_{2m}  \\
\vdots & \vdots & \ddots & \vdots  \\
1 & x_{n1} & \ldots & x_{nm}
 \end{array}  \right)
累積確率関数:
z = (z_1,...,z_n)(但し、z_i = \sigma(w^T \Phi_i))と、
それを対角成分に並べたもの:
 R = diag[z_1(1-z_1), ..., z_n(1-z_n)]

仮定としては、説明変数の数はそれほど多くない(数個~数百程度)に対し、トレーニングセットの数は数百万~になり得る。
そうすると、計算量として比較的大きくなるのはi = 1,...,n方向の行列積で、ここは数千コアを持つGPUの得意分野になってくる。

PL/CUDAでの実装を掘り下げる

前振りが長くなったが、ソースコードを見てみる事にする。
https://github.com/heterodb/toybox/tree/master/logistic_regression

エントリポイント

まず、PL/CUDA関数のエントリポイントは#plcuda_begin#plcuda_endで挟まれた区間である。

CREATE OR REPLACE FUNCTION
logregr_train(reggstore,      -- source table (only Gstore foreign table)
              smallint,       -- column number of dependent variable
              smallint[],     -- columns number of independent variables
              int = 20,       -- max number of iteration
              real = 0.0001)  -- threashold to stop iteration
RETURNS real[]
AS $$
   :
#plcuda_begin
   /*
    * ここにエントリポイントの処理を記述する
    */
#plcuda_end
$$ LANGUAGE 'plcuda' STRICT;

ロジスティック回帰の学習関数logregr_trainは、第1引数がトレーニングセットを含むGstore_Fdw外部テーブル*1への参照で、第2引数は従属変数の列番号、第3引数は説明変数の列番号(配列)で、残りはパラメータ推定を繰り返す回数の最大値と、繰り返しを打ち切る閾値*2の設定。

エントリポイント内では、smallintの引数はshort型で、realの引数はfloat型でというように、それぞれSQLの型に対応するCUDA Cのデータ型でarg1arg2...というように引数を参照する事ができる。

例えば、以下のコードはsmallint[]の配列であるarg3を参照し、指定された列がreal型であるかどうかをチェックしている部分である。
型が一致しない場合、EEXITマクロでスクリプトを終了させ、これはPostgreSQL側でエラーとしてユーザへ報告される。

    /* set of simple array of independent data */
    if (!VALIDATE_ARRAY_VECTOR_TYPE_STRICT(arg3, PG_INT2OID))
        EEXIT("independent variables must be 'smallint[]'");
    iattrs = (VectorTypeShort *)arg3;
    rc = cudaMallocManaged(&Xp, sizeof(float *) * iattrs->height);
    if (rc != cudaSuccess)
        CUEXIT(rc, "failed on cudaMallocManaged");
    for (i=0; i < iattrs->height; i++)
    {
        j = iattrs->values[i];
        if (j <= 0 || j > kds_h->ncols)
            EEXIT("independent variable is out of range");
        cmeta = &kds_h->colmeta[j-1];
        if (cmeta->atttypid != PG_FLOAT4OID)
            EEXIT("independent variables must be 'real'");
        Xp[i] = (float *)((char *)kds_d + __kds_unpack(cmeta->va_offset));
    }

既にGPUへロード済みのデータをreggstore型の引数で参照する場合だけは少々特殊で、以下のように定義された GstoreIpcMapping構造体へのポインタとなる。mapメンバはGPUバイスメモリのポインタなので、ホストコードで触ってはいけない。*3

typedef struct
{
    GstoreIpcHandle h;      /* unique identifier of Gstore_Fdw */
    void           *map;    /* mapped device address
                             * in the CUDA program context */
} GstoreIpcMapping;
宣言部

CUDA CでGPUカーネルを呼び出す場合、例えばmy_kernel<<<10,1024>>>(x, y, z)というように、起動すべきブロック数/スレッド数を指定してデバイス関数をコールする。
つまり、エントリポイント内にベタっと処理を記述するだけで不十分で、少なくともGPUカーネル関数をどこかに定義しなければPL/CUDA関数を利用する意味はあまりない。
それを行うのが、#plcuda_decl以降のブロックで、ここに記述したホスト関数/デバイス関数をエントリポイントから呼び出すというのが一般的なPL/CUDA関数の実装という事になる。

例えば、以下のlogregt_update_Zというデバイス関数は、上記の累積確率関数:z = (z_1,...,z_n) の値を格納するVectorTypeFloat型((PostgreSQLreal[]型とバイナリ互換な単純配列))のバッファを更新するためのもので、これを(ブロック数×ブロックサイズ)個のスレッドで並列に実行する。
例えばブロック数40、ブロックサイズ1024の場合、get_global_id()は0~40959の間の値を返し、全てのスレッドが処理を完了した時にはZ->values[]配列には\frac{1}{1-e^{-\alpha}}の計算結果が格納されている事になる。

KERNEL_FUNCTION_MAXTHREADS(void)
logregr_update_Z(VectorTypeFloat *Z,
                 cl_float  **Xp,
                 VectorTypeFloat *W)
{
    cl_uint        nitems = Z->height;
    cl_uint        width = W->height;
    cl_uint        i, j;

    for (i = get_global_id(); i < nitems; i += get_global_size())
    {
        cl_float    x, w, sum = 0.0;
        for (j=0; j < width; j++)
        {
            w = W->values[j];
            x = (j == 0 ? 1.0 : Xp[j-1][i]);
            sum += w * x;
        }
        Z->values[i] = 1.0 / (1.0 + __expf(-sum));
    }
}

このように定義されたデバイス関数を以下のように繰り返し呼び出す事で、徐々にロジスティック回帰のパラメータを更新し、目的とする値に近付くというステップを踏む。

    for (loop=0; loop < nloops; loop++)
    {
               :
        /* compute Z vector */
        logregr_update_Z<<<gridSz_Z, blockSz>>>(Z, Xp, W);
               :
        /* compute P matrix */
        logregr_update_P<<<gridSz_P, blockSz>>>(Preg, Xp, width, Z);
               :
        cudaStreamSynchronize(NULL);
               :
    }
CUDA用ライブラリの利用

ところで、この繰り返し計算の過程では、単純な行列・ベクトル積だけでなく、一ヵ所逆行列を計算する箇所があった事を思い出してほしい。
このように典型的な計算であれば、CUDA Toolkitでライブラリが提供されている場合があり、逆行列の計算であればBlasのCUDA版であるcuBlasを使う事ができる。

ライブラリ関数を使うには、まず普通に#include ...でヘッダファイルをインクルードすると共に、GPUバイナリをビルドする時にリンクするライブラリを指定する必要がある。

以下のように#plcuda_libraryでライブラリ名を指定すると、

#plcuda_library cublas
#plcuda_decl
#include <cublas_v2.h>
#include "cuda_matrix.h"
     :

これはGPUバイナリをビルドする時に

% nvcc .... -lcublas

と展開され、ライブラリ関数をリンクする。

若干、引数が多くてアレなのだが、今回の実装でも以下のcuBlas関数を呼び出して逆行列を計算している。

        /* compute P-inverse */
        status = cublasDgetrfBatched(handle,
                                     width,
                                     Preg,
                                     width,
                                     Pivot,
                                     Info,
                                     1);
        if (status != CUBLAS_STATUS_SUCCESS)
            EEXIT("failed on cublasSgetrfBatched: %d", (int)status);
        status = cublasDgetriBatched(handle,
                                     width,
                                     Preg,
                                     width,
                                     Pivot,
                                     Pinv,
                                     width,
                                     Info,
                                     1);
        if (status != CUBLAS_STATUS_SUCCESS)
            EEXIT("failed on cublasSgetriBatched: %d", (int)status);

CPU実装(MADLib)との比較

最後に、CPU実装と比べてどれくらい速くなっているのかを調べるため、MADLibのロジスティック回帰と性能値を比較してみる事にした。

とは言っても、これを使って何を分類するか、というのはあまり興味がないので、ひとまずランダムに4000万件程度のデータを生成し、人為的に1+2x_1-3x_2+x_3+0.5x_4 > 0である場合を真(true)、それ以外を偽(false)とするテストデータを作って、それを学習させてみる事にする。

postgres=# CREATE TABLE logreg (
                    t  bool,
                    x1 float,
                    x2 float,
                    x3 float,
                   x4 float );
CREATE TABLE

postgres=# INSERT INTO logreg (SELECT (1.0+2.0*x1-3.0*x2+x3+0.5*x4) > 0 t, x1, x2, x3, x4
                                 FROM (SELECT random() x1,
                                              random() x2,
                                              random() x3,
                                              random() x4
                                         FROM generate_series(1,40000000)) x);
INSERT 0 40000000

次に、このテストデータをGPUバイスメモリにロードする。
PL/CUDA関数はreal型を前提にしているので、説明変数のデータ型はrealへキャストする。

postgres=# CREATE FOREIGN TABLE ft (
               t   bool,
               x1  real,
               x2  real,
               x3  real,
               x4  real
           ) SERVER gstore_fdw
             OPTIONS (pinning '0');
CREATE FOREIGN TABLE

postgres=# INSERT INTO ft (SELECT * FROM logreg);
INSERT 0 40000000

bool(1byte) + float(4bytes)*4 = 17bytes
17bytes * 40M行 = 約680MB
バイス管理用に必要な約120MBと併せて、計800MBのデータがGPUにロードされているのが分かる。

[kaigai@saba src]$ nvidia-smi
Thu Dec  6 12:10:56 2018
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 410.72       Driver Version: 410.72       CUDA Version: 10.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  Tesla P40           Off  | 00000000:02:00.0 Off |                  N/A |
| N/A   42C    P0    52W / 250W |    817MiB / 22919MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     27650      C   ...bgworker: PG-Strom GPU memory keeper      807MiB |
+-----------------------------------------------------------------------------+

PL/CUDA関数を呼び出す。attnum_of()は指定された列番号を返すユーティリティ関数。

postgres=# SELECT logregr_train('ft',
                                attnum_of('ft','t'),
                                attnums_of('ft','{x1,x2,x3,x4}'));
              logregr_train
------------------------------------------
 {3376.4,6752.71,-10129.1,3376.3,1688.27}
(1 row)

Time: 3647.059 ms (00:03.647)

約3.6secで完了。パラメータである w ベクトルを推定しているので、5個の要素が含まれている。

次に、MADLibのロジスティック回帰のトレーニング。(関数名や引数に与えるパラメータはこれを参考にした)

postgres=# SELECT madlib.logregr_train('logreg','hoge', 't','ARRAY[1,x1,x2,x3,x4]',NULL, 20);
 logregr_train
---------------

(1 row)

Time: 1301307.361 ms (21:41.307)

21分41秒。元々の実装がそこまで最適化されているか微妙なところではあるものの、広く使われている実装と比べて350倍速く実行できたのはGPUの面目躍如と言ったところ。

postgres=# SELECT coef FROM hoge;
                                          coef
------------------------------------------------------
 {3041.82722783601,6083.57794939209,-9125.44857123801,3041.73992459095,1520.98287953044}
(1 row)

一応、学習結果であるパラメータを比べてみると

俺々実装 MADLib
w0 3376.4 3041.83
w1 6752.71 6083.58
w2 -10129.1 -9125.45
w3 3376.3 3041.74
w4 1688.27 1520.98

それほど遜色ない値となっている。

これを、説明変数と学習パラメータを引数に取るlogregr_predict関数に与えて(人為的に作ったデータなので)きちんと分類できている事を示そうと思ったが、残念ながらgit addを忘れてgit cleanしてしまい、推論系の実装は闇に葬られてしまった。

こちらはPGconf.ASIAのLTでご報告しようと思うので、少々お待ちを。

明日は @hmatsu47 の『Aurora PostgreSQL 互換版(9.6.9)でフェイルオーバー時にバッファ/キャッシュの生きのこり方を雑に確かめてみた』です。

*1:Gstore_Fdw: GPUバイスメモリの読書きを可能にするFDW

*2:パラメータの変動は\frac{\|w_{new} - w_{old}\|^2}{\|w_{old}\|^2}で計算します

*3:Managed MemoryというPage Faultハンドラを使ってCPU/GPU間でページを入れ替える機能があるが、CUDAがプロセス間通信に対応してくれていない。早く何とかならないものか。

PGconf.EU 2018参加レポート

10/23~26にかけて、ポルトガルリスボンで開催されたPGconf.EU2018へ参加してきました。
リスボンへはドイツ駐在中の2013年に一度旅行で訪れた事があり、個人的にも懐かしい、5年ぶりの訪問となります。

PGconf.EUとは、PostgreSQLの欧州ユーザコミュニティが主催するカンファレンスで、今回はちょうど10回目。450名分の参加チケットが売り切れるなどPostgreSQLの技術カンファレンスとしては最大のものです。海外は過去に2012(チェコプラハ)、2015(オーストリア・ウィーン)の2回に参加、発表しており、今回で3回目の参加となります。

リスボンまでは直行便がないため、成田~ヘルシンキリスボンと途中で乗り換えを挟み、片道18時間のフライト(ヘルシンキでの接続4時間を含む)でした。成田~リスボンの中間地点に位置しているので、意外にルート的な無駄は少ない様子。*1

セッションピックアップ

参加したセッションの中から、いくつか感想を書いておくことにします。
発表資料に関しては、順次リンク先に追加されるはずですので確認してみてください。


Location - the universal foreign key. Past, present and future of spatial data in PostgreSQL (Paul Ramsey)

PostGISのオリジナル開発者によるセッション。
自分もGIS系ワークロードのGPU高速化について質問&リクエストされる事が多々あり、興味深く聞いてみた。
いくつか分析の実例という事で、全世界のスターバックスの店舗位置を使った距離計算などのケースが紹介されていたが、実際、この手の地理データ分析というのはどの程度の件数を扱う事を前提にすればいいのだろうか?
おそらく、モバイルデバイスから出てくる座標(緯度・経度)を集計するようなものだと、データ件数は非常に大きなものになるので、SSD2GPUなどストレージ系の高速化技術と組み合わせる事が必要になってくる。
ただ、店舗情報などどれだけ大きくても"億"の単位に行かないようなデータサイズであれば、オーダーはGB以下になり、実際にはI/Oが問題となるような事はなくなる。逆に計算量が支配的になるため、Gstore_fdwのようにGPUメモリを使ったストレージに対して単純にGPUカーネルをキックしてやるという事になるだろう。
いずれにせよPL/CUDA以外の普通のSQLからきちんとGstore_fdwを使うための機能強化は検討中なので、その後GIS関数の対応を考えた方がよさげ。

PostgreSQL Serverside Programming in C (Heikki Linnakangas)

PostgreSQLのサーバサイドで動かすCの拡張モジュールを作るための基本的なインフラについて説明する教育的なセッション。
メモリコンテキストやらエラー・例外の使い方などが資料に網羅されているので、初心者が軽く目を通しておくには良い資料だと思う。

What's new in PostgreSQL 11 (Magnus Hagander)

PostgreSQL v11の新機能を紹介するセッション。
パーティション周りの機能強化やパラレルスキャンの強化など、リリースノートで周知されているものではあるが、PG-Strom的に少し注意が必要かなと思ったのは以下の二点。

  • ALTER TABLE ... ADD COLUMN defaultをテーブル書き換えなしで実行する
  • 実行ステージのPartition Pruning

前者は、古いテーブル定義に基づいてINSERTされたレコード(当然ADD COLUMNされた列は存在しない)を展開する時に、列番号 > nattrs の時にはnullとして扱うというのが既定の動作であったが、仮にADD COLUMN defaultされた値があれば、あたかもそのdefault値がそこにあるかのように振舞わねばならない。これはレコードを展開するコード(SQLから自動生成する)部分の機能強化が必要。
後者は、実行時に不要なパーティションを落とす事が有効になってはいるものの、PG-Stromが独自にJOINやGROUP BYをプッシュダウンして作成した実行計画の場合に正しく動くんだっけな??という事が気になったので要確認。

zheap: An answer to PostgreSQL bloat woes (Amit Kapila)

Pluggable Storageと併せて開発が進められているzHeapについてのセッション。これはPostgreSQLにUNDOログに基づくストレージ形式を持ち込もうという話で、従来のHeapと比べ、トランザクション負荷が高い時のテーブルサイズ増加(主に被更新行の占有するディスク領域によるもの)が緩やかであるという特徴を持つ。
トランザクション系の方はあまり興味はないのだけども、zHeapのメカニズムが入る事によって最終的にvacuum不要となり、それによってvisibility mapが無くなってしまうと困ったことになる。
visibility mapによって、データブロックを読む前に、当該ブロックがMVCC可視性チェックを必要とするかどうかを判断できるので、GPUではコミットログを参照できない以上、all-visibleなブロックだけをSSD-to-GPUダイレクトSQLの対象とする事ができる。
ただ本質的には、コミットログを参照できないだけで、HintBitによりレコードの可視性を判断できればall-visibleなページに限定する必要はない。なので、こちらは代用品としてのvisibility-mapを使うのではなく『all-visibleまたは全てのレコードにHintBitがセットされている』状態を担保できるよう、ブロックの状態が変化した時にExtensionが捕捉できるようフックが必要という事ではありそうだ。

CREATE STATISTICS - what is it for? (Tomas Vondra)

WHERE cond AND condGROUP BY col1, col2のように複数の条件が絡む場合、それぞれの条件が全く独立事象である場合は単純にP(cond1) * P(cond2)で行数が推定できるが、実際には互いに相関関係があるため、過剰・過少に行推定をしてしまう。これにより、例えばNested Loopのような実行計画は行数が増加すると極端に実行効率が下がるため、性能インパクトが大きい。(数行を迅速に取り出すという用途ならいいんですが)
こういった問題を避けるため、CREATE STATISTICS構文によってDBAが明示的に変数間の相関関係を教えてやることができるようになる。

た・だ・し、いくら行推定を頑張ったとして、JOINの段数が増えていったり、条件句に使う関数のselectivityが現実を反映していなかったりすると、推定行数はどんどん実態と乖離していく。例えば条件なしSeqScanの推定行はテーブルサイズからほぼ正確に導出する事ができるが、特に何も考えなしにselectivity=0.5の条件でスキャンしたものを別のテーブルとJOINした結果などはほぼ根拠レスと言ってよい。
こういった値であっても、PostgreSQLは同様に「行数の推定値」として扱ってしまうため、時には事故が起こる事もある。

これは自分の持論であるが、行推定は行推定で頑張るとして、それがどれくらい信用できるのかという reliability は別のファクターとして実行計画作成に組み込むべきではなかろうか。
例えば nrows = 10, reliability = 0.99 ならIndexつきNested Loopは(たぶん)問題ない選択肢だろうが、nrows=10, reliability = 0.10 ならもしかすると、過剰に上振れした場合には安全側に倒して Hash Join を選択しておく方がベターかもしれない。


着ぐるみの中の人でもあります。なんかワシがスナイパーに狙撃されたような写真になってますが。

Pluggable Storage in PostgreSQL (Andres Freund)

PostgreSQL v12あたりで?入ってくるのだろうか、Pluggable Storageの最新の開発状況の紹介。
現状zHeapがターゲットではあるが、可能性としてはColumnarストレージを実装する事も可能。
CREATE TABLE xxx (...) USING heap;のような使い方をする。

WALの方は変わらないので、あくまでもストレージフォーマットの話で、基本はrow-by-row。
なのでAndreasの意見に100%同意なんだけど、scan_getnextslot()みたいに「全ての列を」「一行ずつ」取り出すためのインターフェースでは列指向ストレージを要求するような高いスループットSQL処理には全く追いつかないので、列指向ストレージ用のCustomScanを定義して、場合によってはJOINやGROUP BYも含めたベクトル化は必要になるだろう。

あと、拡張モジュール的視点では、TupleTableSlotがコールバックを持つようになり、これが個々のストレージ形式とHeapTuple形式の間を仲介するようになる。今後、基本的にレコードの受け渡しにはHeapTupleではなくTupleTableSlotを使うようになるので、例えばCで書かれたトリガ関数などは影響を受ける可能性がある。

発表(その1)ライトニングトーク

自分の発表その1は2日目夕方のライトニングトーク
2年前のPL/CUDAの発表ネタから、関数の引数が巨大(数百MB~)になるときの呼出しオーバーヘッド問題の解決のためにGPUメモリストアであるGstore_fdwの開発と、そこからの応用としてCUDA APIのプロセス間通信(IPC)機能を用いてPythonスクリプトとデータ連携を行うところまでを駆け足で紹介した。さすがに5分で30ページはチャレンジングであったが、だいたい収まった。

LT発表資料(5分なのに30pもある・・・。)

www.slideshare.net

発表(その2)本番

自分の発表その2は、最終日のセッション発表。
概ね70%くらいの席の埋まり具合で40名弱というところ。テーマ自体は面白いと思ってもらえる自信があるが、聞いてもらえなければ仕方ない。ちょいと残念。

日本の皆さんには既にお馴染みの内容だとは思うが、PCIeバス上のP2P DMA(SSD-to-GPUダイレクトSQL)を用いてストレージ上のPostgreSQLデータブロックを直接GPUに転送し、データを減らしてからホストシステムに書き戻す。
加えて、現行のXeonプロセッサだと7.1~8.5GB/sといった辺りで性能がサチってしまう問題を紹介し、それを回避するために、PCIeスイッチを搭載したハードウェア(HPCサーバやI/O拡張ボックス)を用いてストレージに近いところでSQLを処理、CPUの制御するPCIeバスの負荷を軽減するという手法を説明した。
これらの改良を加える事で、I/O中心ワークロードのスループットを改善し、3台のI/Oボックスで13.5GB/s*2、スペック上可能な8台構成では30GB/s程度まで狙えるのでは?という所で話を締めくくった。

面白かったのが、スライド p.19 の絵を見せてSSD-to-GPUダイレクトSQLの説明をした瞬間、発表している側からはっきり分かるレベルで参加者の顔色が『マジかこいつ!?』みたいに変わったところ。
そりゃそうだろう。PostgreSQLコミッターレベルの人と話をしても、基本的には『GPU = 計算のアクセラレータ』という先入観でみんな話をしている所に、普通のNVME-SSDストレージを、ExadataやNetezzaがやっているようなインテリジェントなストレージに変身させるための素材としてGPUを使いましょうという提案なんだから。

発表資料:

www.slideshare.net

質疑応答に関しては覚えているものだけでこんな感じ

  • ファイルシステムは経由するのか?そこがボトルネックになるが。
  • Nvmeにはパフォーマンスの問題がゴニョゴニョゴニョ
    • 申し訳ないが、今のところNVMEドライバでパフォーマンス問題にぶち当たった事はない。
    • (後で思ったが)PCIe NVMEでリクエストの多重度が上がった時にdma_alloc_pool()がクッソ遅くなる事を言ってたのだろうか?そこは自前で持つようにして性能問題は解決している。
  • レイテンシはどうなのか?
    • いや、このワークロードはレイテンシではなくスループットが問題。

PivotalのTシャツを着た人がやたら怖い顔をして質問を突っ込んできた。善きかな善きかな。

あと、学生さんみたいだけどこんなコメントも。素直に嬉しい。


所感

昨年のPGconf.EUでは(GPU1個の)SSD-to-GPUダイレクトSQLをテーマにプロポーザルを出したものの Rejected になってしまったので、ようやくグローバルのコミュニティ向けに SSD-to-GPUダイレクトSQL について発表する機会を得られたという事で、宿願叶ったりという事になる。

ただ、450人が登録しているカンファレンスで3セッションが並列で進行。その中で自分のセッションへの参加者が40名弱というのはちょっと問題。
PeterやAmitなど一部の主要開発者にはレセプション等の機会を使って、現在のPG-StromはI/Oの高速化を追求するアーキテクチャになっている事などを説明したが、まだ『GPU = 計算アクセラレータでしょ』という認識をひっくり返せたとは言い難い気はする。

この辺、英語の問題でセッションを避けられてるのか、"GPU"のキーワードが自分ごとではないように感じさせてるのか、コミュニティへの露出が最近下がってるので「あんた誰?」になってるのか、ちょっと分析は必要かもしれない。やはり欧州/米国でのカンファレンスとなると、費用と時間はそれなりにかかる訳だから、発表するだけで満足することなく、自分たちの技術的成果やその価値を広く認識してもらう作戦も必要だろう。

最後に、ほぼ一週間家を空けていた間、娘と息子の世話を一人でやってくれた嫁さんに感謝である。

観光

航空券の価格の都合により、一日滞在を伸ばしたとしても(土曜日ではなく)日曜日にリスボンを発つフライトの方が安上がりだったので、出張経費削減のため止むなくリスボン周辺を1日観光する事に。
ユネスコ世界遺産の「シントラの文化的景観」、ユーラシア大陸最西端「ロカ岬」を回ってきました。こちら、長くなったので写真をFacebookに上げておきました。
www.facebook.com

*1:ただ、次は直行便で行ける都市がいいなぁ…。

*2:1.05TBのテーブル全件スキャンを含むクエリが80s弱で完了する

スキャン速度10GB/sへの挑戦~その④ 完結編~

今回のエントリは、ここ1年ほど取り組んでいた PG-Strom による大量データのスキャン・集計処理性能改善の取り組みが、当面の目標であったシングルノード10GB/sを達成したという完結編です。(長かった)

要素技術:SSD-to-GPUダイレクトSQL

先ず、PG-Stromのストレージ関連機能について軽くおさらい。

RDBMSに限らず一般論として、GPUなど並列プロセッサの処理性能を稼ぐには、プロセッサコアの数や動作クロック以上に、処理すべきデータをできるだけ大量に供給するかという点が重要。
これは、ハードウェアレベルではキャッシュ階層や容量の設計、あるいはメモリデバイスのデータ転送レートという話になり、最近のGPUだとメモリ読出しの帯域は数百GB/sにも達する。もう少し大局的に見ると、これは、ストレージと計算機をどのように接続し、アプリケーションはこれをどのように制御するのかという話になってくる。

PG-Stromは元々、PostgreSQLのストレージ層をそのまま利用していた。これは業務系⇒情報系の間でデータ形式を変更するのがユーザにとって手間であり、使い勝手を損なうと考えたため。ただこれは、処理がI/O待ちになった瞬間、GPUによる高速化の効果を全て打ち消してしまったため、当時は『データを前もってメモリにロードしておいて下さい』と言っていた。
しかし、同じストレージ上のデータを使いつつも、読み出す方法を変える事でI/Oを高速化できるのであれば、使い勝手とGPUによる高速化は矛盾しない。
現在のPG-Stromはいくつかのストレージ機能強化を持っているが、その最も特徴的な機能である「SSD-to-GPUダイレクトSQL」は、ストレージに格納せざるを得ないサイズのデータをGPUのプロセッサへ高速に供給するための機能である。

テーブルの全件スキャンが必要な集計系クエリを実行する場合、通常、ストレージ上のPostgreSQLデータブロックをホストRAMにロードし、その後で、このレコードは条件句に合う、合わないといった処理を行う。
そのため、最終的には破棄する事になる「ゴミデータ」も含めて帯域の限られたPCIeバスを転送しなければならない。また、単にPCIeバスの帯域だけでなく、ホストシステムのDMAバッファ→Page Cache→PostgreSQLのShared Bufferというメモリコピーの負荷も馬鹿にならない。

一方、SSD-to-GPUダイレクトSQLの場合、P2P DMAを用いてストレージ(NVME-SSD)上のデータブロックをP2P DMAを用いてGPUへ転送する。このデータ転送にホストRAMは介在しない。次にGPU上でWHERE句/JOIN/GROUP BYといった集計系クエリの中核となる処理を実行する。ワークロードとデータの分布次第ではあるが、基本的にこれらの処理は不要レコードを落とし、予め集計を済ませておく事でデータ量を削減する事が可能である。
したがって、前処理済みのデータがホストRAMへロードされた時点でデータ量が1/1000以下になっているという事も珍しくない。

パフォーマンスは良好。少し古い計測データだが、1枚あたり2.2GB/sのSeqRead性能を持つIntel SSD 750 (400GBl HHHL) を3枚束ねた構成で、ハードウェア的な限界性能6.6GB/sに対し、Star Schema Benchmark (SSBM) の実行性能で6.3GB/sまで出ている事が分かる。

PCIeバスの最適化を考える

GPUの処理能力にはまだ余裕があったため、次に考えたのは、より高速なSSDを使用する事で処理速度を引き上げられないだろうかという目論見。これは部分的には成功するが、課題も残す。
kaigai.hatenablog.com

NVIDIAGPUDirect RDMAのドキュメントを参照すると、以下のような記述がある。

We can distinguish between three situations, depending on what is on the path between the GPU and the third-party device:

  • PCIe switches only
  • single CPU/IOH
  • CPU/IOH <-> QPI/HT <-> CPU/IOH

The first situation, where there are only PCIe switches on the path, is optimal and yields the best performance. The second one, where a single CPU/IOH is involved, works, but yields worse performance ( especially peer-to-peer read bandwidth has been shown to be severely limited on some processor architectures ). Finally, the third situation, where the path traverses a QPI/HT link, may be extremely performance-limited or even not work reliably.

要するに、GPUとNVME-SSDを接続するパスの中で、マルチプロセッサシステムでQPIを経由するものはアウト(実際に試したことはあるが、強烈に遅い)。CPUがPCIeのRoot Complexとして振る舞うものも、動作はするがベストではなく、性能上の問題が生じる可能性がある。

実際、1枚あたり3.2GBのSeqRead性能を持つIntel DC P4600 (2.0TB; HHHL)を3枚束ねて(クエリ処理速度の影響を受けない)Raw-I/Oの性能を計測してみると、Broadwell-EP世代では7.1GB/s程度、Skylake-SP世代のプロセッサでは8.5GB/s程度で転送性能が頭打ちになってしまった。困ったものである。

PCIeバスの最適化を考える

ここまでのTry&Errorで明らかになったのは、NVME-SSDのように十分な転送速度を持つストレージとGPUを直結する場合、もはやストレージの側は律速要因ではなく、バスを制御するCPUの側がボトルネックとなり得るという点である。
したがって、ホストRAMに転送されたレコードを処理するというソフトウェア実行の負荷だけでなく、PCIeバス上を流れるデータ転送の負荷という観点においても、CPUの負荷を下げるという必要が生じてくる。

ここで目を付けたのがI/O拡張ボックス。
これは元々、窮屈なラックサーバに搭載可能な分よりも多数のデバイスを搭載するための機器で、ホストシステムへはコンパクトなLow ProfileのPCIeカードで接続すれば、その先に搭載したNVME-SSDGPUをあたかもローカル機器であるかのように使用する事ができる。
また製品によっては、I/O拡張ボックス側にPCIeスイッチを組込んでおり、同一ボックス内に搭載したデバイス同士であれば、P2P DMAは単純にボックス内に閉じたデータ転送となり、ホストシステムには全く影響を与えない。

その条件に該当するのがNEC ExpEtherで、この製品はI/O拡張ボックス側に4個のPCIeスロット(PCIe 3.0 x8)を備えており、PCIeスイッチを挟んで、ホストシステムへは40G Ehternetで接続する。このPCIeスイッチを活用する事で、ホストCPUに負荷を与えることなく、NVME-SSDからGPUへとP2P DMAによるデータ転送が行えるハズである。

PCIeバスの接続トポロジをブロック図で示すと以下のようになる。
データ転送負荷の最も大きくなるNVME-SSDGPU間の通信は、I/O拡張ボックスに内蔵のPCIeスイッチがこれを仲介するため、ホストシステムの負荷とはならない。I/O拡張ボックスからホストシステムへのデータ転送は、前処理が終わった後の小さなデータ断片のみとなる。

PostgreSQL上では、各I/O拡張ボックスに搭載されたNVME-SSDごとにパーティションを作成し、v11の新機能であるHash-Partitioningによりデータ分散を行う。このように構成する事で、各I/O拡張ボックスごとにデータ量が概ね平滑化され、各ボックスの処理時間が極端に違うという事がなくなる。


ベンチマーク

今回、NEC様のご協力により、ExpEther 40G (4slot) を3台お借りしてベンチマークを行う事ができた。
主なハードウェア環境は以下の通りで、3台のExpEtherにはそれぞれGPUを1枚とNVME-SSDを2枚ずつ搭載している。
データサイズは全体として1055GBで、各I/O拡張ボックスにはlineorderテーブルの一部が351GBずつ格納されている。

これまで同様にStar Schema Benchmarkで性能を計測した結果が以下の通り。
シングルノードのPostgreSQLにも関わらず、最大で13.5GB/sものクエリ処理速度を記録している。
面白い事に、I/O拡張ボックス上のSSDからホストシステムへデータを読み出した時のRaw-I/O性能(つまりSQL処理を挟まないタダのデータ転送速度)である8.8GB/sをも上回っている。

しかも注目すべきは、これはI/O拡張ボックス3台という最小構成に近い構成での結果であり、1台あたり約4.5GB/sというクエリ処理速度と、8.0TB*1というストレージ容量を増設可能である、という点である。
これは、台数を並べればDWH専用機にも比肩し得る処理性能を、GPUやNVME-SSDという最新ハードウェアの性能をフルに引き出すようソフトウェアを工夫する事で、シンプルなシングルノード構成のPostgreSQLで構築できてしまうという事を意味する。

まとめ&告知

ここ一年ほど取り組み続けた『GPUによるI/O性能の高速化』については、なんとかめでたくも、当初の目標性能であるシングルノード10GB/sを達成する事ができた。
この水準の処理能力・データベース容量の想定としては、中堅企業~大企業の部門/拠点規模でのログデータの集積・集計を行うようなワークロード。おそらくは従来であればRDBMSではなくHadoopベースのソリューションを検討していたであろう領域。総データサイズとしては、ボリュームゾーンとなるであろう~数十TBまでの範囲であれば、特に複雑な構成を作り込むまでもなく、シンプルなシングルノード構成のPostgreSQLで十分に捌く事も可能だろう。

今回の内容は、9/13(木)~9/14(金)に開催のGPU Technology Conference Japan 2018(於・品川高輪プリンスホテル)の『GTC JAPAN 2018 INCEPTION AWARD』セッションの中で発表する他、ポスターセッションでも発表場所で待機しているので、ぜひお越しいただきたい。
www.nvidia.com

また、より詳しい内容は、9/19(水)~9/21(金)に開催のDB Tech Showcase(於・秋葉原UDX)でも発表を行う。こちらは45分のノーマルセッションなので、よりじっくりと、詳しい内容に興味があるという方は、ぜひこちらへのご参加もご検討いただきたい。
www.db-tech-showcase.com

*1:Intel DC P4600 4.0TB版を使った場合

PostgreSQLとcupyを繋ぐ~機械学習基盤としてのPG-Stromその①~

世間の機械学習屋さんは、機械学習・統計解析のライブラリにデータを食わせる時に、どうやってデータを入力しているのだろうか?
話を聞くに、データを一度CSV形式に落とし込んで、それをPythonスクリプトで読み込むというパターンが多いようではある。

ただ、ある程度大量のデータセットCSVファイルで扱うようになると、いくつか問題点が露わになってくる。

  • 解析すべきデータセットを切り替えるたびに異なるCSVファイルを用意する事になり、ファイルの取り回しが煩雑である。
  • 前処理をかけた後のCSVファイルがまたできてしまい、ファイルの取り回しが更に煩雑になってくる。
  • 最終的にCSVファイルの所在が誰にも分からなくなってしまい、機械学習・統計解析の元になったファイルが散逸してしまう。
  • そもそも、GB単位のフラットファイルをシェル上でコピーしたり読み込ませたりするのはそれなりに時間を要する処理である。

データベース屋から言わせてもらえれば、餅は餅屋、データ管理はデータベース管理システムという事で、得意な人にやってもらった方が良いのになと思う事はある。

Gstore_fdwとInter-Process Data Collaboration

PG-StromにはGstore_fdwという機能がある。これはGPU上のデバイスメモリをPostgreSQLにとっての外部ストレージに見立てて、INSERT構文を使ってGPUバイスメモリへデータを書き出すための機構である。

kaigai.hatenablog.com

これは元々、PostgreSQLの可変長データの制限が1GBであるため、PL/CUDAユーザ定義関数が処理すべきデータが1GBを越える場合にはSQL関数の引数として与える事ができず、また、CPU側で(通常、null値を含まないベクトルor行列として表現する)メモリイメージをセットアップする時間が処理全体の支配項になってしまうために作成した機能であった。

創薬分野における類似化合物探索ワークロードにおける化合物データベースのように)繰り返し何度も利用されるデータであればGPU上に”置きっぱなし”である方が合理的であるし、これはまた同時に、PostgreSQL可変長データの制限である1GBを回避する事にもなる。

一方、PG-Stromの実装基盤であるCUDAのAPIリファレンスを眺めてみると、興味深いAPIが提供されている事が分かる。

  • CUresult cuIpcGetMemHandle ( CUipcMemHandle* pHandle, CUdeviceptr dptr )
    • Gets an interprocess memory handle for an existing device memory allocation.
  • CUresult cuIpcOpenMemHandle ( CUdeviceptr* pdptr, CUipcMemHandle handle, unsigned int Flags )
    • Opens an interprocess memory handle exported from another process and returns a device pointer usable in the local process.

要は、GPUバイスメモリ上に確保した領域の一個ずつにユニークな識別子が付与されており、cuIpcGetMemHandleを利用する事でその識別子を取得する事ができる。これは64バイトの長さがあり、重複しない事が保証されている。
そして他のプロセスでこの識別子をcuIpcOpenMemHandleに渡してやると、GPUバイスメモリ上の同じ領域を読み書きできるようになるという、GPU版のプロセス間通信の仕組みである。

そうすると、GPUへのデータロードはGstore_fdwを用いてSQLで行った上で、その後のデータ操作、データ解析はPythonスクリプトで実行するというワークフローが出来上がる。

何が良いのか?

  1. 解析すべきデータを抽出する際に、WHERE句の条件を変更するだけでDB側が母集団を絞り込んでくれる。
  2. JOIN、GROUP BY、WINDOW関数など、前処理に適した機能が豊富に用意されている。
  3. インデックスやGPU並列処理、列キャッシュなど大規模なデータを効率的に処理する仕組みを備えている。
  4. データ連携は全てバイナリ形式のまま行われるため、CSVを介してデータ交換を行う時のように、バイナリ→テキスト/テキスト→バイナリ変換を行う非効率性を排除できる。

では、具体的にどのようにPythonスクリプトとデータ交換を行うのか?

pystrom - cupyとPG-Stromを繋ぐためのモジュール

PG-Strom側でGstore_fdwテーブルの背後に存在するGPUバイスメモリの識別子を取得するには、以下の関数を用いる。

postgres=# select gstore_export_ipchandle('ft');
                                                      gstore_export_ipchandle
------------------------------------------------------------------------------------------------------------------------------------
 \x006b730200000000601100000000000000750200000000000000200000000000000000000000000000020000000000005b000000000000002000d0c1ff00005c
(1 row)

gstore_export_ipchandleは引数で指定されたGstore_fdwテーブルの識別子を bytea 型のデータとして出力する。表示はHex形式だが、64バイトある事が分かる。

次に、Python側でこの識別子を受け取り、データ操作が可能なPythonのオブジェクトとして扱えるようにするためのモジュールが必要。
そのために作成したのがpystromモジュールで、ソースコードは以下の通り*1

pg-strom/pystrom at master · heterodb/pg-strom · GitHub

pystromクラスは唯一のメソッドipc_importを持つ。

  • cupy.core.ndarray pystrom.ipc_import(bytes ipc_handle [, list attrNames])

このメソッドはipc_handleで与えられた識別子でGstore_fdw外部テーブルをオープンし、その中からattrNamesで指定された列だけから成る2次元のcupy.core.ndarrayを生成する。
attrNamesで指定する列は全て同一のデータ型を持っている必要があり、現状、boolsmallintintbigintrealおよびfloatだけが対応している。
また、デバイスメモリ上のレイアウトは行優先の2次元配列となっている(DB的に言えば列指向データ構造)。

では、実際に使ってみる事にする。

Gstore_fdwテーブルの作成とテストデータの挿入

postgres=# CREATE FOREIGN TABLE ft (id int, x real, y real, z real)
                        SERVER gstore_fdw OPTIONS (pinning '0');
CREATE FOREIGN TABLE
postgres=# INSERT INTO ft (SELECT x, random(), random(), random() FROM generate_series(1,100000) x);
LOG:  alloc: preserved memory 1601024 bytes
INSERT 0 100000

CREATE FOREIGN TABLE構文を使ってGstore_fdwテーブルを作成する。
SERVER gstore_fdwを指定している事と、デバイスメモリを確保すべきGPUのデバイス番号をpinning '0'オプションで指定している。

次に、INSERTを使ってランダムなデータを10万件ほど挿入してみた(データ自体に意味はない)。ログに1.6MBほどGPUバイスメモリを確保した旨が表示されている。

pystromモジュールによるPythonスクリプトへのインポート

続いて、Python側のスクリプトに移る。上記のデータをインポートするには、大きく以下の3つの手順を踏む事になる。

  1. PythonスクリプトからPostgreSQLに接続する(psycopg2モジュール)
  2. gstore_export_ipchandle関数を用いて、Gstore_fdwの識別子を取得する
  3. この識別子を用いてGstore_fdwをオープンし、cupy.core.ndarrayオブジェクトを作成する(pystromモジュール)

順を追って説明する。

先ずは、PostgreSQLへの接続とSQL関数の実行であるが、これはアシスト様の以下のブログを参考にさせていただいた。
www.ashisuto.co.jp

ローカル接続でtrust認証なら何も難しい事は考えず、以下の通りである。

import psycopg2

conn = psycopg2.connect("host=localhost dbname=postgres")

そして、この conn オブジェクトを用いてクエリを発行し、その結果を取得する。

curr = conn.cursor()
curr.execute("select gstore_export_ipchandle('ft')::bytea")
row = curr.fetchone()
conn.close()

rowは実行結果の先頭行であり、この場合、結果を一個だけ含むタプルであるため、row[0]がGstore_fdwの識別子となる。

次に、pystromを用いて上記の識別子をオープンする。

import pystrom

X = pystrom.ipc_import(row[0], ['x','y','z'])

以上で完了である。

cupy.core.ndarrayクラスのオブジェクトであるXは、real型(32bit浮動小数点型)であるx列、y列、z列の内容を持つ3列x10000行の行列である。

>>> X
array([[0.05267062, 0.15842682, 0.95535886],
       [0.8110889 , 0.75173104, 0.09625155],
       [0.0950045 , 0.71161145, 0.6916123 ],
       ...,
       [0.32576588, 0.8340051 , 0.82255083],
       [0.12769088, 0.23999453, 0.28765103],
       [0.07242639, 0.14565416, 0.7454422 ]], dtype=float32)

SQLで上記のftテーブルを出力してみると以下のようになる。先頭3行分のデータを見比べていただきたい。

postgres=# SELECT * FROM ft LIMIT 5;
 id |     x     |    y     |     z
----+-----------+----------+-----------
  1 | 0.0526706 | 0.158427 |  0.955359
  2 |  0.811089 | 0.751731 | 0.0962516
  3 | 0.0950045 | 0.711611 |  0.691612
  4 |  0.051835 | 0.405314 | 0.0207166
  5 |  0.598073 |   0.4739 |  0.492226
(5 rows)

もちろん、一度Python側でcupyのデータオブジェクトとしてインポートした行列はPG-Stromの機能とは関係無しに操作する事ができるため、cupyの提供する多種多様な演算を利用する事ができる。

例えばドット積を計算してみる。

>>> cupy.dot(X[:,0],X[:,1])
array(24943.564, dtype=float32)

今回の例は、単純なランダムデータをGPUに投入してデータ交換を行っただけであるが、データ管理をデータベース管理システム(PostgreSQL)に任せてしまう事で、統計解析・機械学習の前処理にSQLというパワフルなツールを使う事ができる。また、データのエクスポートとその管理という煩雑な作業からデータサイエンティストを解放する事ができるほか、フラットファイルの場合と異なりデータファイルの散逸という問題が起こる事もない。


さらに、PostgreSQLにはリモートDBと連携して動作するための機能も備わっている。
~100GB程度のデータは個々のエンジニアが自分のワークステーションへ取り込んで作業する一方、データウェアハウス/データレイクという巨大なデータセットから適宜必要なデータを取り出し、それを他のエンジニアと共有し、容易に再現可能な形にする事こそが当面の目標である。

とりあえず、今回の記事は『とりあえず動きました』以上のものではないが、以下のような点はこれからの宿題である。

  • マルチGPUの対応
    • 64バイトの識別子のうちGPUバイス番号がどのフィールドに記録されているかは非公開なので、PG-Stromとpystrom側でデバイス番号を付与してやる必要がある。
    • 今回のケースは、たまたまGPUが1台だけの環境だからうまくいった。
  • クラウド環境への対応
    • セットアップ済みのマシンイメージを提供し、クラウド環境のGPUインスタンスで容易にデプロイできるよう準備。おそらく、オンプレ環境と同じように動かせるはず。
  • pip installできるように
    • ソースからインストールはさすがにナシでしょう。
  • 機械学習フレームワークとの連携
    • 今回は簡単なところでcupyと連携させてみたが、機械学習フレームワークとのデータ連携を使えるようにするというのが、利用シーンを考えた時の本丸。
    • この辺は自分は詳しくないので、得意な人と一緒に何がしかの検証を行ってみたい。

*1:将来的には pip install で簡単に入れられるようにしたいが、これはもう少し宿題事項

時系列データ/BRINインデックス対応

PG-StromにBRINインデックス対応機能を実装してみた。

まずは、以下のEXPLAIN ANALYZEの実行結果をご覧いただきたい。
条件句で参照しているymd列は日付型(date)で、テーブルにデータを挿入する際には意図的に日付順にINSERTを行っている。

postgres=# EXPLAIN (analyze, buffers)
                    SELECT * FROM dt
                    WHERE ymd BETWEEN '2018-01-01' AND '2018-12-31' AND cat LIKE '%bbb%';
                                                           QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------
 Custom Scan (GpuScan) on dt  (cost=94815.94..176284.51 rows=180436 width=44)
                                                   (actual time=475.668..585.988 rows=174590 loops=1)
   GPU Filter: ((ymd >= '2018-01-01'::date) AND (ymd <= '2018-12-31'::date) AND (cat ~~ '%bbb%'::text))
   Rows Removed by GPU Filter: 4386178
   BRIN cond: ((ymd >= '2018-01-01'::date) AND (ymd <= '2018-12-31'::date))
   BRIN skipped: 424704
   Buffers: shared hit=214 read=42432
 Planning time: 0.465 ms
 Execution time: 1005.738 ms
(8 rows)

BRIN condBRIN skippedという新しい項目が追加されている。
これは、ymd列に設定しているBRINインデックスを用いる事で、明らかに検索条件にマッチしないデータブロックをGpuScanが読み飛ばしている事を意味する。

テーブルdtは5000万行のレコードを含んでおり、テーブルサイズは3652MBある。
PostgreSQLのブロックサイズ 8KB で換算すると 467456 ブロック存在する事になる。
つまり、本来は全件スキャンで467456ブロックをスキャンすべきところ、うち424704ブロック(約90.8%)を『明らかにマッチするレコードが存在しない』として読み飛ばしている。

postgres=# \d+
                        List of relations
 Schema |    Name     | Type  | Owner  |    Size    | Description
--------+-------------+-------+--------+------------+-------------
 public | dt          | table | kaigai | 3652 MB    |
     :          :                :          :            :
 public | t1          | table | kaigai | 7512 kB    |
 public | t2          | table | kaigai | 7512 kB    |
 public | t3          | table | kaigai | 7512 kB    |

当然、読み飛ばすブロック数は検索条件によって変わり、例えば、日付範囲を2倍にした下記の例では(データをランダムに生成した事もあり)読み飛ばしたブロック数は382208個になっている。(それに伴い、処理時間も多少増えている)

postgres=# EXPLAIN (analyze, buffers)
                    SELECT * FROM dt
                    WHERE ymd BETWEEN '2018-01-01' AND '2019-12-31' AND cat LIKE '%bbb%';
                                                           QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------
 Custom Scan (GpuScan) on dt  (cost=74703.58..348389.96 rows=360298 width=44) (actual time=551.541..993.654 rows=349081 loops=1)
   GPU Filter: ((ymd >= '2018-01-01'::date) AND (ymd <= '2019-12-31'::date) AND (cat ~~ '%bbb%'::text))
   Rows Removed by GPU Filter: 8758759
   BRIN cond: ((ymd >= '2018-01-01'::date) AND (ymd <= '2019-12-31'::date))
   BRIN skipped: 382208
   Buffers: shared hit=284 read=84864
 Planning time: 0.496 ms
 Execution time: 1449.248 ms
(8 rows)

BRINインデックスとは

BRINとは Block Range Index の略で*1、その名の通り、ある一定範囲のブロックを単位とするインデックスである。

RDBでお馴染みのB-treeインデックスは、インデックス対象列の値(キー)とレコード位置(ポインタ)を各レコード毎に持っており、例えば「ID=1234」みたいな条件句から特定のレコードを抽出するといった処理には滅法強い。
ただし、これはインデックスサイズが増大しがちで、例えば、センサやモバイル機器が生成したデータを日々DBに蓄積していくといった使い方を考えると、大規模データの脇に大規模インデックスが控えているという事になり、あまり現実的ではなくなる。

以下はインデックスサイズの比較だが、B-treeインデックスを設定しているid列(主キー)のインデックスdt_id_idxは1071MBで、本体のテーブルの1/3近いサイズになっている。この比率で行けば1.0TBのテーブルに対するインデックスは300GB程度になる(!)

一方で、BRINインデックスのdt_ymd_idxのサイズは僅か128kBに留まっている。これは両者のインデックスの持ち方に起因する。

postgres=# \di+
                                List of relations
 Schema |       Name       | Type  | Owner  |    Table    |  Size   | Description
--------+------------------+-------+--------+-------------+---------+-------------
 public | dt_id_idx        | index | kaigai | dt          | 1071 MB |
 public | dt_ymd_idx       | index | kaigai | dt          | 128 kB  |

BRINインデックスは128ブロック*2毎に、インデックス対象列の最小値と最大値を記録する。
そうすると、例えばymd BETWEEN '2018-01-01' AND '2019-12-31'という条件が与えられた時に、最大値が'2016-07-01'であるブロックは明らかにマッチする行が存在しないので読み飛ばして構わない。

PG-Stromの動作で言えば、読み飛ばすべきブロックはそもそもGPUへ転送されない。
GPUはワンチップに数千コアを搭載し、強烈な並列処理能力を持っているとはいえ、「何もしない」に比べれば圧倒的に遅い。そのため、同じ集計処理を行うにしても、予めBRINインデックスを用いてある程度の範囲の絞り込みができた方が有利である事は間違いない。

なぜ時系列データに有効なのか?

PG-StromでBRINインデックスへ対応するというモチベーションは、基本的にはIoT系ワークロードで使われるデータに対する最適化である。

これらのデータには以下のような特徴がある。

  • レコードにはタイムスタンプが付与される
  • 一度DBに挿入されたデータは(滅多に)更新されない

単純なテーブルへのINSERTを続けていくとやがてブロック(= 8KB)にレコードが収まり切らなくなり、PostgreSQLは新しいブロックを割当て、さらにレコードを追加していくという動作になる。

タイムスタンプの付与とDBへの挿入は多少のタイムラグがあるとはいえ、そうそう大きなズレが生じるわけではない。そうすると、あるブロックに記録されているタイムスタンプの値はかなり近しい値のものでまとまって物理的に保存されているという事になる。

すると、ある一定範囲の最大値/最小値だけをインデックスに保存しておくBRINインデックスであっても、相当範囲の絞り込みが可能であるという事になる。

実際、ymd列の順にINSERTしたdtテーブルのスキャン時に使われるBRINインデックスのビットマップを表示させてみると、以下のようになる。
ブロック番号で言うと 995~1328 番までが2018年のデータを含んでおり、他のブロックは読み飛ばして構わないという事が分かる。

postgres=# EXPLAIN (analyze, buffers)
                      SELECT * FROM dt WHERE ymd BETWEEN '2018-01-01' AND '2018-12-31';
INFO:  BRIN-index (dt_ymd_idx) range_sz = 128
INFO:       0: ffffffff ffffffff ffffffff ffffffff
INFO:     128: ffffffff ffffffff ffffffff ffffffff
INFO:     256: ffffffff ffffffff ffffffff ffffffff
INFO:     384: ffffffff ffffffff ffffffff ffffffff
INFO:     512: ffffffff ffffffff ffffffff ffffffff
INFO:     640: ffffffff ffffffff ffffffff ffffffff
INFO:     768: ffffffff ffffffff ffffffff ffffffff
INFO:     896: 00000007 ffffffff ffffffff ffffffff
INFO:    1024: 00000000 00000000 00000000 00000000
INFO:    1152: 00000000 00000000 00000000 00000000
INFO:    1280: ffffffff ffffffff ffff0000 00000000
INFO:    1408: ffffffff ffffffff ffffffff ffffffff
INFO:    1536: ffffffff ffffffff ffffffff ffffffff
INFO:    1664: ffffffff ffffffff ffffffff ffffffff
INFO:    1792: ffffffff ffffffff ffffffff ffffffff
INFO:    1920: ffffffff ffffffff ffffffff ffffffff
INFO:    2048: ffffffff ffffffff ffffffff ffffffff
INFO:    2176: ffffffff ffffffff ffffffff ffffffff
INFO:    2304: ffffffff ffffffff ffffffff ffffffff
INFO:    2432: ffffffff ffffffff ffffffff ffffffff
INFO:    2560: ffffffff ffffffff ffffffff ffffffff
INFO:    2688: ffffffff ffffffff ffffffff ffffffff
INFO:    2816: ffffffff ffffffff ffffffff ffffffff
INFO:    2944: ffffffff ffffffff ffffffff ffffffff
INFO:    3072: ffffffff ffffffff ffffffff ffffffff
INFO:    3200: ffffffff ffffffff ffffffff ffffffff
INFO:    3328: ffffffff ffffffff ffffffff ffffffff
INFO:    3456: ffffffff ffffffff ffffffff ffffffff
INFO:    3584: 00000000 00000007 ffffffff ffffffff

JOINおよびGROUP BYでの対応

PG-StromにはSCAN→JOINやSCAN→GROUP BY間のデータ移動を省略するため、これらの処理を実行するGpuJoinやGpuPreAgg自身がテーブルスキャンも実行するというモードがある。
BRINインデックスによって範囲を絞り込める場合、これらのケースでも同様に機能しI/O量を削減する。

SCAN+JOINの合体ケース

postgres=# EXPLAIN (analyze, buffers) SELECT * FROM dt NATURAL JOIN t1 NATURAL JOIN t2 WHERE ymd BETWEEN '2018-01-01' AND '2018-12-31';
                                                             QUERY PLAN
--------------------------------------------------------------------------
 Custom Scan (GpuJoin) on dt  (cost=56759.17..56759.17 rows=4541187 width=126)
                                                  (actual time=486.777..1235.942 rows=4545204 loops=1)
   Outer Scan: dt  (cost=96534.45..179544.65 rows=4541187 width=44)
                            (actual time=78.232..379.588 rows=6477696 loops=1)
   Outer Scan Filter: ((ymd >= '2018-01-01'::date) AND (ymd <= '2018-12-31'::date))
   Rows Removed by Outer Scan Filter: 15564
   BRIN cond: ((ymd >= '2018-01-01'::date) AND (ymd <= '2018-12-31'::date))
   BRIN skipped: 424704
   Depth 1: GpuHashJoin  (plan nrows: 4541187...4541187, actual nrows: 6462132...6462132)
            HashKeys: dt.aid
            JoinQuals: (dt.aid = t1.aid)
            KDS-Hash (size plan: 10.78MB, exec: 10.78MB)
   Depth 2: GpuHashJoin  (plan nrows: 4541187...4541187, actual nrows: 6462132...6462132)
            HashKeys: dt.bid
            JoinQuals: (dt.bid = t2.bid)
            KDS-Hash (size plan: 10.78MB, exec: 10.78MB)
   Buffers: shared hit=1956 read=42560
   ->  Seq Scan on t1  (cost=0.00..1935.00 rows=100000 width=45)
                                   (actual time=0.018..37.770 rows=100000 loops=1)
         Buffers: shared hit=935
   ->  Seq Scan on t2  (cost=0.00..1935.00 rows=100000 width=45)
                                   (actual time=0.012..37.352 rows=100000 loops=1)
         Buffers: shared hit=935
 Planning time: 1.594 ms
 Execution time: 2053.291 ms
(21 rows)

SCAN+GROUP BYの合体ケース

postgres=# EXPLAIN (analyze, buffers)
                      SELECT cat,count(*) FROM dt
                     WHERE ymd BETWEEN '2018-01-01' AND '2019-12-31'
                GROUP BY cat;
                                                       QUERY PLAN
--------------------------------------------------------------------------
 GroupAggregate  (cost=8271.68..8273.76 rows=26 width=12)
                              (actual time=727.366..727.385 rows=26 loops=1)
   Group Key: cat
   Buffers: shared hit=92 read=85056
   ->  Sort  (cost=8271.68..8272.14 rows=182 width=12)
                  (actual time=727.358..727.360 rows=26 loops=1)
         Sort Key: cat
         Sort Method: quicksort  Memory: 26kB
         Buffers: shared hit=92 read=85056
         ->  Custom Scan (GpuPreAgg) on dt  (cost=8262.58..8264.85 rows=182 width=12)
                                                                     (actual time=727.294..727.301 rows=26 loops=1)
               Reduction: Local
               Outer Scan: dt  (cost=4000.00..4011.99 rows=9067906 width=4)
                                        (actual time=62.124..718.351 rows=9107840 loops=1)
               Outer Scan Filter: ((ymd >= '2018-01-01'::date) AND (ymd <= '2019-12-31'::date))
               Rows Removed by Outer Scan Filter: 17367
               BRIN cond: ((ymd >= '2018-01-01'::date) AND (ymd <= '2019-12-31'::date))
               BRIN skipped: 382208
               Buffers: shared hit=92 read=85056
 Planning time: 0.773 ms
 Execution time: 1118.730 ms
(17 rows)

*1:「BRINインデックス」だと、若干、馬から落馬した、頭痛が痛い感があるものの、収まりが悪いので"BRINインデックス"と記載します。

*2:デフォルト値。変更可

Partition-wise GpuJoin/GpuPreAgg

PostgreSQL v10以降ではテーブルパーティショニングの機能が入っており、値の範囲、または値のリストによってテーブルをいくつかのパーティションに分割する事が可能となっている。

遅まきながら、PG-Stromにパーティションを意識した実行計画を作成するよう機能拡張を行ってみた。

以下の実行計画を見てもらいたい。これは、従来のPG-Stromを使って、DATE型の列 ymd の値を元に pt_2010, pt_2011, ..., pt_2019 まで一年ごとにデータを別テーブルに分割する構成を取っている。

検索条件にWHERE ymd > '2017-01-01'::dateが含まれているため、明らかに検索条件に該当しない子テーブルはスキャンの対象から外されている。したがって、実際にスキャンが行われるのは pt_2017, pt_2018, pt_2019 の3テーブルのみである。

postgres=# EXPLAIN SELECT cat,count(*),avg(ax) FROM pt NATURAL JOIN t1 WHERE ymd > '2017-01-01'::date GROUP BY cat;
                                                           QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=341392.92..341399.42 rows=200 width=48)
   Group Key: pt.cat
   ->  Sort  (cost=341392.92..341393.92 rows=400 width=72)
         Sort Key: pt.cat
         ->  Gather  (cost=341333.63..341375.63 rows=400 width=72)
               Workers Planned: 2
               ->  Partial HashAggregate  (cost=340333.63..340335.63 rows=200 width=72)
                     Group Key: pt.cat
                     ->  Parallel Custom Scan (GpuJoin)  (cost=283591.92..283591.92 rows=7565562 width=40)
                           Depth 1: GpuHashJoin  (nrows 3152318...7565562)
                                    HashKeys: pt.aid
                                    JoinQuals: (pt.aid = t1.aid)
                                    KDS-Hash (size: 10.78MB)
                           ->  Append  (cost=28540.80..200673.34 rows=3152318 width=36)
                                 ->  Parallel Custom Scan (GpuScan) on pt_2017  (cost=28540.80..66891.11 rows=1050772 width=36)
                                       GPU Filter: (ymd > '2017-01-01'::date)
                                 ->  Parallel Custom Scan (GpuScan) on pt_2018  (cost=28540.81..66883.43 rows=1050649 width=36)
                                       GPU Filter: (ymd > '2017-01-01'::date)
                                 ->  Parallel Custom Scan (GpuScan) on pt_2019  (cost=28540.80..66898.79 rows=1050896 width=36)
                                       GPU Filter: (ymd > '2017-01-01'::date)
                           ->  Seq Scan on t1  (cost=0.00..1935.00 rows=100000 width=12)
(21 rows)

しかしこの実行計画には問題がある。
テーブルスキャンを実行するのに GpuScan が選択されており、この出力を Append が受け取った後で、再び GpuJoin がこれを実行する。
つまり、GPU -> ホストRAM -> GPU -> ホストRAM というデータのピンポンが発生しており、条件句の評価やJOIN処理と言ったCPU喰いの処理のオフロードよりもむしろ、PCIeバスを通じたデータの移動に時間を取られやすいという事が容易に想像できる。

次に、パーティション対応を加えた PG-Strom だとどうなるか。

postgres=# EXPLAIN SELECT cat,count(*),avg(ax) FROM pt NATURAL JOIN t1 WHERE ymd > '2017-01-01'::date group by cat;
                                                        QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=196450.44..196476.94 rows=200 width=48)
   Group Key: pt_2017.cat
   ->  Sort  (cost=196450.44..196453.44 rows=1200 width=72)
         Sort Key: pt_2017.cat
         ->  Gather  (cost=66085.69..196389.07 rows=1200 width=72)
               Workers Planned: 2
               ->  Parallel Append  (cost=65085.69..195269.07 rows=600 width=72)
                     ->  Parallel Custom Scan (GpuPreAgg)  (cost=65085.69..65089.69 rows=200 width=72)
                           Reduction: Local
                           Combined GpuJoin: enabled
                           ->  Parallel Custom Scan (GpuJoin) on pt_2017  (cost=32296.64..74474.20 rows=1050772 width=40)
                                 Outer Scan: pt_2017  (cost=28540.80..66891.11 rows=1050772 width=36)
                                 Outer Scan Filter: (ymd > '2017-01-01'::date)
                                 Depth 1: GpuHashJoin  (nrows 1050772...2521854)
                                          HashKeys: pt_2017.aid
                                          JoinQuals: (pt_2017.aid = t1.aid)
                                          KDS-Hash (size: 10.78MB)
                                 ->  Seq Scan on t1  (cost=0.00..1935.00 rows=100000 width=12)
                     ->  Parallel Custom Scan (GpuPreAgg)  (cost=65078.35..65082.35 rows=200 width=72)
                           Reduction: Local
                           Combined GpuJoin: enabled
                           ->  Parallel Custom Scan (GpuJoin) on pt_2018  (cost=32296.65..74465.75 rows=1050649 width=40)
                                 Outer Scan: pt_2018  (cost=28540.81..66883.43 rows=1050649 width=36)
                                 Outer Scan Filter: (ymd > '2017-01-01'::date)
                                 Depth 1: GpuHashJoin  (nrows 1050649...2521557)
                                          HashKeys: pt_2018.aid
                                          JoinQuals: (pt_2018.aid = t1.aid)
                                          KDS-Hash (size: 10.78MB)
                                 ->  Seq Scan on t1  (cost=0.00..1935.00 rows=100000 width=12)
                     ->  Parallel Custom Scan (GpuPreAgg)  (cost=65093.03..65097.03 rows=200 width=72)
                           Reduction: Local
                           Combined GpuJoin: enabled
                           ->  Parallel Custom Scan (GpuJoin) on pt_2019  (cost=32296.65..74482.64 rows=1050896 width=40)
                                 Outer Scan: pt_2019  (cost=28540.80..66898.79 rows=1050896 width=36)
                                 Outer Scan Filter: (ymd > '2017-01-01'::date)
                                 Depth 1: GpuHashJoin  (nrows 1050896...2522151)
                                          HashKeys: pt_2019.aid
                                          JoinQuals: (pt_2019.aid = t1.aid)
                                          KDS-Hash (size: 10.78MB)
                                 ->  Seq Scan on t1  (cost=0.00..1935.00 rows=100000 width=12)
(40 rows)

クエリは同一なので、pt_2017, pt_2018, pt_2019 の3テーブルだけを読み出す事は共通だが、生成された実行計画が大きく異なる事が分かる。
パーティションの各子テーブルから読み出した内容を結合する Append より前に、GpuJoinおよびGpuPreAggが押し込まれ、特に GROUP BY での行数削減効果が大きいため、実際に Append で処理されるのは高々数百行程度に過ぎない事がわかる。
しかも、GpuJoinからGpuPreAgg間はCombined GpuJoin: enabledとあるので、ホストシステムを介さずにGPUバイスメモリ上でJOIN結果をGROUP BY処理へ受け渡す事になっている。つまり、ホストシステムで動作する Append とその後の GroupAggregate にとっては、パーティションに分割するレベルの大きさのデータであっても、実際に処理しなければならないのは(大半はGPUによって消し込まれているため)数百行程度にしかならないという事である。

パーティション設定を物理的なデータ配置と重ね合わせると面白そうな事になるのが分かる。
ホストシステムからはI/O拡張ボックスを使ってGPUSSDを接続するものとし、I/O拡張ボックスに搭載したSSDごとにパーティションを切るものとする。
そうすると、SCAN -> JOIN -> GROUP BY までの処理は、SSD-to-GPUダイレクトSQL実行を用いると一貫してI/O拡張ボックスで処理する事が可能となり、ホストシステムの負荷は極めて小さなものとなる。

PostgreSQL v10ではAppend配下の子テーブルの処理を並列に実行する事ができないため、複数のI/O拡張ボックスを備えていたとしても同時にアクティブになるのは1個だけだが、これはPostgreSQL v11で改善される。そのため、v11がリリースされる頃には、I/O拡張ボックスを増設すれば増設するだけ処理性能と容量を同時に拡張する事ができるというシステムが現実味を帯びる事になる。
現状、I/O拡張ボックスを使った性能測定ではSSDx2枚を用いて少なくとも4.8GB/sのスループットまで出る事は確認できている。ワークロードを選ぶことは確かだが、うまくハマればI/O拡張ボックスの数に比例してスケールするため、イマドキのハイエンドDWHのスペックである数十GB/sのクエリ処理性能も夢ではない。

現状、ひとまず実行計画でパーティションを意識するように追加の実行パスを生成するように修正を加えただけである。
まだ欠けている機能としては以下の2つ。

  • GpuJoinのRight-Tree側を何度も読まずに済むようにする。
  • バックグラウンドワーカがGPUを初期化する時に、SSDに近傍のものを選択する。

これは、上記のEXPLAINの例ではテーブル t1 が相当するものだが、GpuJoinが3個のパーティション子テーブル側にPush-downされた結果、テーブル t1 の読出しを3回も行う必要が出てきたというもの。
これはもう少し実装を頑張れば、例えばGpuJoin用のハッシュテーブルを共有メモリ上に展開するなどして、一度だけ読み出せば全てのバックグラウンドワーカがハッシュテーブルへの参照を共有できる。

また、PG-Strom v2.0では(設計単純化を目的として)各プロセスが同時に利用するGPUは一個だけとし、複数GPUを用いる場合には必然的にCPUパラレルクエリを前提とするという設計になった。そのため、プロセスがGPUを初期化する際に、自分がこれからスキャンしようとする子テーブルに(物理的に)近傍のGPUを選択する事がよい戦略となる。

この辺の課題を解決しつつ、今週リリース予定のPostgreSQL v11に合わせてパーティション機能への対応を強化していきたい。