IPC - File Formatには、ファイル形式は以下の通りであると記述されている。
<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
<STREAMING FORMAT>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">
さらに、Streaming Formatは以下の通り。
<SCHEMA>
<DICTIONARY 0>
...
<DICTIONARY k - 1>
<RECORD BATCH 0>
...
<DICTIONARY x DELTA>
...
<DICTIONARY y DELTA>
...
<RECORD BATCH n - 1>
<EOS [optional]: int32>
そうすると、ファイル先頭"ARROW1\0\0"
の直後にSCHEMAの定義が来ると思って読んでみる。
まず先頭 8 バイト(41 52 52 4f 57 31 00 00
)は"ARROW1\0\0"
なのでこれで良し。
また、次の4バイト(9c 05 00 00
)はメタデータサイズなのでこれも良し。
$ cat /tmp/mytest.arrow | od -t x1 -Ax | head
000000 41 52 52 4f 57 31 00 00 9c 05 00 00 10 00 00 00
000010 00 00 0a 00 0e 00 06 00 05 00 08 00 0a 00 00 00
しかし、それ以降がおかしい。format/Schema.fbsを参照すると、SCHEMAの先頭はバイトオーダを示すendianness
という、0か1のshort値*3であるはずなのに、10 00
という値になってしまっている。
どういう事か?
実はApache Arrowのファイル形式は、Google FlatBuffersというSerialization/Deserializationのための仕組みを使っており、SCHEMAやRECORD BATCHといったメッセージチャンクのデータ定義が、そのままバイナリ上での並びになっている訳ではない。
https://github.com/dvidelabs/flatcc/blob/master/doc/binary-format.md#example
ココに簡単な説明が載っているが、要は簡単なインデックスを作って、1番目のデータは基準点からxxバイト後ろ、2番目のデータは基準点からxxバイト後ろ、・・・という事である。
実際のデータを使って順に読み解いていく。
$ cat /tmp/mytest.arrow | od -t x1 -Ax | head
000000 41 52 52 4f 57 31 00 00 9c 05 00 00 10 00 00 00
000010 00 00 0a 00 0e 00 06 00 05 00 08 00 0a 00 00 00
000020 00 01 03 00 10 00 00 00 00 00 0a 00 0c 00 00 00
000030 04 00 08 00 0a 00 00 00 f4 03 00 00 04 00 00 00
000040 01 00 00 00 0c 00 00 00 08 00 0c 00 04 00 08 00
000050 08 00 00 00 08 00 00 00 10 00 00 00 06 00 00 00
000060 70 61 6e 64 61 73 00 00 bd 03 00 00 7b 22 63 6f
000070 6c 75 6d 6e 73 22 3a 20 5b 7b 22 66 69 65 6c 64
000080 5f 6e 61 6d 65 22 3a 20 22 69 64 22 2c 20 22 70
000090 61 6e 64 61 73 5f 74 79 70 65 22 3a 20 22 69 6e
FlatBufferメッセージの定義は以下の通り。header
はUnionなので、実際のオブジェクト型を示す1byteのメッセージタイプと実際のオブジェクトへのポインタから成る。つまり、Messageは4つのフィールドを持っている。
union MessageHeader {
Schema, DictionaryBatch, RecordBatch, Tensor, SparseTensor
}
table Message {
version: org.apache.arrow.flatbuf.MetadataVersion;
header: MessageHeader;
bodyLength: long;
}
まず先頭8byteはシグニチャ"ARROW1\0\0"
なのでスキップ。次の4byte9c 05 00 00
はメタデータのサイズ。
その次の4byte10 00 00 00
がFlatBufferメッセージへのオフセットなので、0x000c + 0x0010 = 0x001c
を参照する。
0x001c
の値は0a 00 00 00
なので、vtableの開始位置は0x001c - 0x000a = 0x0012
となる。
書き下すと以下のようになり、バイナリ上でのデータの並びが構造体の定義と一致していない事が分かる。
また、4番目のフィールドbodyLength
は省略されているが、これはデフォルトの0である事を意味する。
table:
0x001c 0a 00 00 00 ; 32bit negative offset to vtable
; 0x001c - 0x000a = 0x0012
0x0021 01 ; field-1 / Message Header = Schema
0x0022 03 00 ; field-0 / Metadata Version = V4
0x0024 10 00 00 00 ; field-2 / offset to Message Body (=0x0024 + 0x0010)
vtable:
0x0012 0a 00 ; vtable length = 10bytes / 3 items
0x0014 0e 00 ; table length = 14 bytes (including the negative offset)
0x0016 06 00 ; field id 0: (version; short)
0x0018 05 00 ; field id 1: (MessageHeader; byte)
0x001a 08 00 ; field id 2: (offset to Message Body; int)
これを読む事で、先頭のFlatBufferメッセージにはSCHEMAが格納されており、その実体は 0x0034 にある事が分かった。
次いで、0x0034から始まるバイナリを読んでいく。SCHEMAの定義は以下の通り。
table Schema {
/// endianness of the buffer
/// it is Little Endian by default
/// if endianness doesn't match the underlying system then the vectors need to be converted
endianness: Endianness=Little;
fields: [Field];
// User-defined metadata
custom_metadata: [ KeyValue ];
}
0x0034の値は0a 00 00 00
なので、同様にvtableの開始位置は0x0034 - 0x000a = 0x002aから。
table:
0x0034 0a 00 00 00 ; 32bit negative offset to vtable
; 0x0034 - 0x000a = 0x002a
0x0038 f4 03 00 00 ; field-id 1: offset to [fields] vector
0x003c 04 00 00 00 ; field-id 2: offset to [custom_metadata] vector
vtable:
0x002a 0a 00 ; vtable length = 10bytes / 3 items
0x002c 0c 00 ; table length = 12bytes (including the negative offset)
0x002e 00 00 ; field id 0: (endianness; short)
0x0030 04 00 ; field id 1: (offset to [fields] vector)
0x0032 08 00 ; field id 2: (offset to [custom_metadata] vector)
ここで、field-id 0のendiannessのインデックスが0になっている。これはデフォルト値である事を示し、値が 0 (= Little Endian) として扱って構わない事を意味する。
[fields]と[custom_metadata]はベクトル値、即ち配列で、ここから更に指定されたオフセットを参照する事になる。
先ず、SCHEMAに含まれる列のデータ型を示すFieldの配列は0x0038 + 0x03f4 = 0x42c に格納されているのでこれを参照する。
Fieldの定義は以下の通り。
table Field {
// Name is not required, in i.e. a List
name: string;
nullable: bool;
// This is the type of the decoded value if the field is dictionary encoded
type: Type;
// Present only if the field is dictionary encoded
dictionary: DictionaryEncoding;
// children apply only to Nested data types like Struct, List and Union
children: [Field];
// User-defined metadata
custom_metadata: [ KeyValue ];
}
0x042cの値は07 00 00 00
である。つまり、この後に続く7個のint32値は、Fieldを参照するオフセットである。
000420 62 6a 65 63 74 22 7d 5d 7d 00 00 00 07 00 00 00
000430 40 01 00 00 04 01 00 00 d4 00 00 00 a4 00 00 00
000440 70 00 00 00 44 00 00 00 04 00 00 00 ec fe ff ff
000450 00 00 01 02 1c 00 00 00 0c 00 00 00 04 00 00 00
000460 00 00 00 00 dc fe ff ff 00 00 00 01 40 00 00 00
000470 11 00 00 00 5f 5f 69 6e 64 65 78 5f 6c 65 76 65
000480 6c 5f 30 5f 5f 00 00 00 28 ff ff ff 00 00 01 03
000490 18 00 00 00 0c 00 00 00 04 00 00 00 00 00 00 00
:
fields[0] = 0x0430 + 0x0140 = 0x0570
fields[1] = 0x0434 + 0x0104 = 0x0538
fields[2] = 0x0438 + 0x00d4 = 0x050c
fields[3] = 0x043c + 0x00a4 = 0x04e0
fields[4] = 0x0440 + 0x0070 = 0x04b0
fields[5] = 0x0444 + 0x0044 = 0x0488
fields[6] = 0x0448 + 0x0004 = 0x044c
例えばfields[6]の場合、tableのアドレスは0x044cでvtableへのオフセットは0xfffffeec なので、0x044c - 0xfffffeec = 0x0560である。
一方、fields[5]の場合、tableのアドレスは0x0488でvtableへのオフセットは0xffffff28なので、0x0488 - 0xffffff28 = 0x0560となる。
最初、これを見た時に ( ゚Д゚)ハァ? となったが、実はよく考えられていて、vtableが指し示すオフセットはあくまでtableからの相対値なので、同じデータ構造を持つ fields[*] 配列の場合は vtable が共有可能で、table以降のバイト列だけが違っているという構造になる。
(注)これに気が付いた時のTweet:
したがって、fields[6] の table と vtable は以下のようになる。
table:
0x044c ec fe ff ff ; 32bit negative offset to vtable
; 0x044c - 0xfffffeec = 0x0560
0x0452 01 ; field-id 1: nullable = true
0x0453 02 ; field-id 2: type-id = Int
0x0454 1c 00 00 00 ; field-id 0: offset to 'name' string (at 0x0470)
0x0458 0c 00 00 00 ; field-id 3: offset to type definition (at 0x0464)
0x045c 04 00 00 00 ; field-id 5: offset to [children] vector
vtable:
0x0560 10 00 ; vtable length = 16bytes / 6 items
0x0562 14 00 ; table length = 20bytes (including the negative offset)
0x0564 08 00 ; field id 0: (name: offset to string)
0x0566 06 00 ; field id 1: (nullable: bool)
0x0568 07 00 ; field id 2: (type id; byte)
0x056a 0c 00 ; field id 3: (offset to Type definition)
0x056c 00 00 ; field id 4: (offset to dictionary; = NULL)
0x056e 10 00 ; field id 5: (offset to [children] vector)
fields[5] の vtable は共通で、tableは以下のようになる。fields[6]はInt型のフィールドを表現しているが、fields[5]はFloat型のフィールドを表現している事がわかる。
table:
0x0488 28 ff ff ff ; 32bit negative offset to vtable
; 0x0488 - 0xffffff28 = 0x0560
0x0452 01 ; field-id 1: nullable = true
0x0453 03 ; field-id 2: type-id = Float
0x0454 18 00 00 00 ; field-id 0: offset to 'name' string
0x0458 0c 00 00 00 ; field-id 3: offset to type definition
0x045c 04 00 00 00 ; field-id 5: offset to [children] vector
続いて、fields[6] の名前と型定義を見てみる事にする。
000460 00 00 00 00 dc fe ff ff 00 00 00 01 40 00 00 00
000470 11 00 00 00 5f 5f 69 6e 64 65 78 5f 6c 65 76 65
000480 6c 5f 30 5f 5f 00 00 00 28 ff ff ff 00 00 01 03
まず、列名は 0x0470 から始まる。Stringの場合、int32の文字列長に続いて '\0' 終端の文字列が格納される事になるため、0x0470から始まるfields[6]の列名は 0x0011 バイトの長さがあり、文字列自体は 0x0474 から始まる。
5f 5f 69 6e 64 65 78 5f 6c 65 76 65 6c 5f 30 5f 5f 00 00 00
をASCII文字列に直すと"__index_level_0__\0"
となる。こんなカラムは元のSQL結果には存在しないので、おそらくPandasが挿入したものなのだろう。
列定義は0x0464のtable参照となる。Intの定義は以下の通りなので、これまで同様に vtable = 0x464 - 0xfffffedc = 0x588 を参照する。
table Int {
bitWidth: int; // restricted to 8, 16, 32, and 64 in v1
is_signed: bool;
}
table:
0x0464 dc fe ff ff ; 32bit negative offset to vtable
; 0x0464 - 0xfffffedc = 0x0588
0x046b 01 ; field-id 1: is_signed = true
0x046c 40 00 00 00 ; field-id 0: bitWidth = 64
vtable:
0x0588 08 00 ; vtable length = 8bytes / 2 items
0x0562 0c 00 ; table length = 12bytes (including the negative offset)
0x0564 08 00 ; field id 0: (bitWidth: int)
0x0566 07 00 ; field id 1: (is_signed: bool)
これによって、fields[6]のデータ型は符号付き、64bit長のIntである事が分かった。
Arrowファイルは尻から読む
そろそろバイナリを人力で追っていくのが辛くなってきたので、勉強も兼ねてパーサのプログラムを作ってみた。
github.com
名前からして本来は PostgreSQL のテーブルを Apache Arrow 形式でダンプするツールなのだが、書き出す部分はまだ実装できていない。
とりあえず、デバッグ用機能であるところの--dump
オプションを用いて先ほどのArrow形式ファイルをダンプしてやると次のようになる。
(見やすさのため、適宜改行・インデントを加えている)
$ ./pg2arrow --dump /tmp/mytest.arrow
[Footer]
{Footer: version=3,
schema={Schema: endianness=little, fields=[
{Field: name=id, nullable=true, type={Int64}, children=[], custom_metadata=[]},
{Field: name=cat, nullable=true, type={Utf8}, children=[], custom_metadata=[]},
{Field: name=aid, nullable=true, type={Int64}, children=[], custom_metadata=[]},
{Field: name=bid, nullable=true, type={Int64}, children=[], custom_metadata=[]},
{Field: name=x, nullable=true, type={Float64}, children=[], custom_metadata=[]},
{Field: name=y, nullable=true, type={Float64}, children=[], custom_metadata=[]},
{Field: name=__index_level_0__, nullable=true, type={Int64}, children=[], custom_metadata=[]}
],
custom_metadata [
{KeyValue: key=(pandas), value=({"columns": [{"field_name": "id", "pandas_type": "int64", "name": "id", "metadata": null, "numpy_type": "int64"}, {"field_name": "cat", "pandas_type": "unicode", "name": "cat", "metadata": null, "numpy_type": "object"}, {"field_name": "aid", "pandas_type": "int64", "name": "aid", "metadata": null, "numpy_type": "int64"}, {"field_name": "bid", "pandas_type": "int64", "name": "bid", "metadata": null, "numpy_type": "int64"}, {"field_name": "x", "pandas_type": "float64", "name": "x", "metadata": null, "numpy_type": "float64"}, {"field_name": "y", "pandas_type": "float64", "name": "y", "metadata": null, "numpy_type": "float64"}, {"field_name": "__index_level_0__", "pandas_type": "int64", "name": null, "metadata": null, "numpy_type": "int64"}], "pandas_version": "0.22.0", "index_columns": ["__index_level_0__"], "column_indexes": [{"field_name": null, "pandas_type": "unicode", "name": null, "metadata": {"encoding": "UTF-8"}, "numpy_type": "object"}]})}
]},
dictionaries=[],
recordBatches=[
{Block: offset=1448, metaDataLength=448 bodyLength=73256},
{Block: offset=75152, metaDataLength=448 bodyLength=66056},
{Block: offset=141656, metaDataLength=448 bodyLength=33008}
]
}
[Record Batch 0]
{Message:
version=3,
body={RecordBatch: length=1200, nodes=[
{FieldNode: length=1200, null_count=0},
{FieldNode: length=1200, null_count=0},
{FieldNode: length=1200, null_count=0},
{FieldNode: length=1200, null_count=0},
{FieldNode: length=1200, null_count=0},
{FieldNode: length=1200, null_count=0},
{FieldNode: length=1200, null_count=0}
], buffers=[
{Buffer: offset=0, length=0},
{Buffer: offset=0, length=9600},
{Buffer: offset=9600, length=0},
{Buffer: offset=9600, length=12008},
{Buffer: offset=21608, length=3648},
{Buffer: offset=25256, length=0},
{Buffer: offset=25256, length=9600},
{Buffer: offset=34856, length=0},
{Buffer: offset=34856, length=9600},
{Buffer: offset=44456, length=0},
{Buffer: offset=44456, length=9600},
{Buffer: offset=54056, length=0},
{Buffer: offset=54056, length=9600},
{Buffer: offset=63656, length=0},
{Buffer: offset=63656, length=9600}
]},
bodyLength=73256
}
:
:
[Record Batch 2]
{Message:
version=3,
body={RecordBatch: length=600, nodes=[
{FieldNode: length=600, null_count=0},
{FieldNode: length=600, null_count=0},
{FieldNode: length=600, null_count=0},
{FieldNode: length=600, null_count=0},
{FieldNode: length=600, null_count=0},
{FieldNode: length=600, null_count=0},
{FieldNode: length=600, null_count=0}
], buffers=[
{Buffer: offset=0, length=0},
{Buffer: offset=0, length=4800},
{Buffer: offset=4800, length=0},
{Buffer: offset=4800, length=2408},
{Buffer: offset=7208, length=1800},
{Buffer: offset=9008, length=0},
{Buffer: offset=9008, length=4800},
{Buffer: offset=13808, length=0},
{Buffer: offset=13808, length=4800},
{Buffer: offset=18608, length=0},
{Buffer: offset=18608, length=4800},
{Buffer: offset=23408, length=0},
{Buffer: offset=23408, length=4800},
{Buffer: offset=28208, length=0},
{Buffer: offset=28208, length=4800}
]
},
bodyLength=33008
}
Arrow形式の場合、例えば一個のファイルに100万行のレコードが含まれていたとしても、個々のカラムが例えば100万要素の単純配列になる…訳ではなく、RecordBatchという単位で内部的には10万行×10個のセグメントに分けて保存する事ができる。
最初、ファイルのフォーマットを頭から見ていた時に『はて?ファイル内のRecordBatchの数や位置はどこを見れば書いてあるのだろう?』と不思議に思っていたが、File FormatのFOOTER部分を先に読むのだという事に気が付いて疑問が解消した。
おさらいになるが、ファイルにはこのようにデータが書かれている。
<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
<STREAMING FORMAT>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">
つまり、ファイル末尾 6bytes にはシグニチャ"ARROW1"
が書き込まれており、その前4byteにはFOOTER部分のサイズが書かれている。
という事は、(File Size) - 6bytes - sizeof(int32) の場所を読めばFooterの位置を特定でき、これを元に、ファイル内の RecordBatch の場所や数を特定できることになる。
struct Block {
offset: long;
metaDataLength: int;
bodyLength: long;
}
table Footer {
version: org.apache.arrow.flatbuf.MetadataVersion;
schema: org.apache.arrow.flatbuf.Schema;
dictionaries: [ Block ];
recordBatches: [ Block ];
}
なるほど確かに、ヘッダではなくフッタ側にファイル内の各セグメントへのインデックスを持っておくようにすれば、データを追記(この場合 RecordBatch の追加)する時も、既に書き込まれた RecordBatch を変更する事なく、元々 FOOTER が書き込まれていた場所を上書きして新しい FOOTER を作成すれば、最小限の I/O コストでデータの追記ができるようになる。
ファイルのメタ情報を頭に持っておくというのは、磁気ディスクか、もしかすると磁気テープの時代の設計の名残なのかもとか思ったり思わなかったりする40歳。