mysql2arrowでMySQLからデータを抜く

以前からPG-Stromのパッケージにpg2arrowというユーティリティを同梱しており、これを使うと、PostgreSQLに投げたクエリからApache Arrow形式のファイルを作成する事ができる。

kaigai.hatenablog.com
qiita.com

昨年、当初のバージョンを作った時から、内部的には色々ゴチャゴチャ変わっていて*1、Arrow_Fdwとコードを共有するための改良や、RDBMSへの接続に固有の部分だけを別ファイルに切り出すという事をやっていた。
これは、PostgreSQLだけをデータソースにするのではなく、Webアプリやゲームの業界でよく使われる MySQL や、将来的にはNoSQLなどへも簡易に対応できるようにという意味での基礎工事のようなものである。今回はまず、これを MySQL に対応させてみた。

MySQLからWebアプリやゲームのログ情報を Apache Arrow 形式で抜き出し、これを単純なファイルとして NVME-SSD 上のボリュームに保存する。
これらのファイルを Arrow_Fdw 外部テーブルを用いて PostgreSQLマッピングすれば、解析系DBにわざわざデータを再度インポートしなくても、Webアプリやゲームのログを集計処理や異常検知に回す事ができるようになる。
加えて、PG-StromであればArrow_Fdw外部テーブルに対してSSD-to-GPU Direct SQLを実行する事ができるので、きちんとシステムを設計してやれば、秒速で10億レコード超を処理する事だって不可能ではない。

使い方自体はそれほど複雑なものではない。
大半のオプションが pg2arrow と共通*2で、今回の機能強化に合わせて-tオプションを追加した程度である。

$ mysql2arrow --help
Usage:
  mysql2arrow [OPTION] [database] [username]

General options:
  -d, --dbname=DBNAME   Database name to connect to
  -c, --command=COMMAND SQL command to run
  -t, --table=TABLENAME Table name to be dumped
      (-c and -t are exclusive, either of them must be given)
  -o, --output=FILENAME result file in Apache Arrow format
      --append=FILENAME result Apache Arrow file to be appended
      (--output and --append are exclusive. If neither of them
       are given, it creates a temporary file.)

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each

Connection options:
  -h, --host=HOSTNAME  database server host
  -p, --port=PORT      database server port
  -u, --user=USERNAME  database user name
  -P, --password=PASS  Password to use when connecting to server

Other options:
      --dump=FILENAME  dump information of arrow file
      --progress       shows progress of the job
      --set=NAME:VALUE config option to set before SQL execution
      --help           shows this message

Report bugs to <pgstrom@heterodb.com>.

簡単な例でデータを抽出してみる。
なお-tオプションは、SELECT * FROM tablenameの省略形。

$ mysql2arrow -d mysql -u root -t t1 -o /dev/shm/hoge.arrow

生成された Apache Arrow ファイルのスキーマ定義、データの配置はこんな感じ

$ mysql2arrow --dump /dev/shm/hoge.arrow
[Footer]
{Footer: version=V4, schema={Schema: endianness=little, fields=[{Field: name="id", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="a", nullable=true, type={Int32}, children=[], custom_metadata=[]}, {Field: name="b", nullable=true, type={Float64}, children=[], custom_metadata=[]}, {Field: name="c", nullable=true, type={Utf8}, children=[], custom_metadata=[]}, {Field: name="d", nullable=true, type={Timestamp: unit=sec}, children=[], custom_metadata=[]}], custom_metadata=[{KeyValue: key="sql_command" value="SELECT * FROM t1"}]}, dictionaries=[], recordBatches=[{Block: offset=472, metaDataLength=360 bodyLength=60480}]}
[Record Batch 0]
{Block: offset=472, metaDataLength=360 bodyLength=60480}
{Message: version=V4, body={RecordBatch: length=1000, nodes=[{FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=26}, {FieldNode: length=1000, null_count=18}, {FieldNode: length=1000, null_count=0}, {FieldNode: length=1000, null_count=17}], buffers=[{Buffer: offset=0, length=0}, {Buffer: offset=0, length=4032}, {Buffer: offset=4032, length=128}, {Buffer: offset=4160, length=4032}, {Buffer: offset=8192, length=128}, {Buffer: offset=8320, length=8000}, {Buffer: offset=16320, length=0}, {Buffer: offset=16320, length=4032}, {Buffer: offset=20352, length=32000}, {Buffer: offset=52352, length=128}, {Buffer: offset=52480, length=8000}]}, bodyLength=60480}

Python (PyArrow) で読み込んでみるとこんな感じですね。

$ python
Python 3.6.8 (default, Oct  7 2019, 17:58:22)
[GCC 8.2.1 20180905 (Red Hat 8.2.1-3)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> f = pa.ipc.open_file('/dev/shm/hoge.arrow')
>>> f.get_record_batch(0).to_pandas()
       id      a           b                                 c                   d
0       1  750.0  884.851090  c4ca4238a0b923820dcc509a6f75849b 2020-11-25 17:14:56
1       2  962.0  373.533847  c81e728d9d4c2f636f067f89cc14862c 2019-03-26 01:19:29
2       3  287.0  384.895995  eccbc87e4b5ce2fe28308fd9f2a7baf3 2018-11-04 21:55:32
3       4  573.0  890.063600  a87ff679a2f3e71d9181a67b7542122c 2023-05-14 15:24:37
4       5  948.0  778.885925  e4da3b7fbbce2345d7772b0674a318d5 2023-01-18 16:41:12
..    ...    ...         ...                               ...                 ...
995   996 -295.0  424.007169  0b8aff0438617c055eb55f0ba5d226fa 2017-08-15 01:40:31
996   997 -849.0  648.545034  ec5aa0b7846082a2415f0902f0da88f2 2023-05-23 08:58:55
997   998  530.0  865.244230  9ab0d88431732957a618d4a469a0d4c3 2024-07-20 14:13:06
998   999  244.0   96.534528  b706835de79a2b4e80506f582af3676a 2018-08-10 01:42:04
999  1000  997.0  157.958900  a9b7ba70783b617e9998dc4dd82eb3c5 2016-12-27 08:06:44

[1000 rows x 5 columns]

ビルドは PG-Strom のモジュールと一緒にやればよいのですが、mysql-develパッケージをインストールしていない人もいるという想定で((なおpostgresql-develパッケージは全人類がインストールするという想定で))、makeの実行時にWITH_MYSQL2ARROW=1を付加します。

$ make WITH_MYSQL2ARROW=1
gcc -D__MYSQL2ARROW__=1 -D_GNU_SOURCE -g -Wall -I ../src -I ../utils -I /usr/local/pgsql-11/include/server -I/usr/include/mysql -m64  -L/usr/lib64/mysql  -Wl,-rpath,-L/usr/lib64/mysql ../utils/sql2arrow.c ../utils/mysql_client.c ../src/arrow_nodes.c ../src/arrow_write.c -o ../utils/mysql2arrow -lmysqlclient

追記モードで異常終了した時のファイルの回復

もう一点。MySQL対応にするついでに、以前からあった設計上の問題の修正を行っている。

pg2arrowmysql2arrow--appendを指定し、追記モードでSQLの処理結果をApache Arrowファイルに追加する場合、以前のエントリで紹介したように、ファイル末尾のフッター領域を上書きして新しいデータを追加し、最後にフッター領域を再構築する。

kaigai.hatenablog.com

この時、SQLの異常終了やコマンド自体のバグによってプロセスが異常終了してしまったら、元々のApache Arrowファイルが破損したまま残ってしまう事になっていた。
これを修正するため、最新版では元々のApache Arrowファイルのフッタ領域の内容(このサイズ自体は大した量ではないので)を別の領域に退避し、シグナルハンドラとon_exit()ハンドラを用いて、終了コード 0 以外でプロセスが exit したり、SIGSEGVやSIGBUSを受け取った場合にはこれを元の位置に書き戻すという処理を行っている。

例えば、6GB程度の大きさがあるテーブル t0 から100行だけ取り出す。これは生成された Apache Arrow ファイルも5kB程度のもの。

$ pg2arrow -d postgres -c "SELECT * FROM t0 LIMIT 100" -o /dev/shm/monu.arrow
$ ls -l /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 4934 Mar 25 12:41 /dev/shm/monu.arrow

ここに、今度はテーブル全体を追記中にコマンドを ctrl-c で中断してみる事にする。

$ pg2arrow -d postgres -c "SELECT * FROM t0" --append /dev/shm/monu.arrow
^C

別ターミナルでファイルの大きさを観察してみると、確かに途中までデータが書き込まれ、順調にApache Arrowファイルが肥大化している事が分かるが、pg2arrowの異常終了後、最終的には元の大きさに戻っている。

$ ls -lh /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 769M Mar 25 12:46 /dev/shm/monu.arrow
$ ls -lh /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 1.1G Mar 25 12:47 /dev/shm/monu.arrow
$ ls -lh /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 1.3G Mar 25 12:47 /dev/shm/monu.arrow
$ ls -l /dev/shm/monu.arrow
-rw-r--r--. 1 kaigai users 4934 Mar 25 12:47 /dev/shm/monu.arrow

PyArrowで当該ファイルをオープンしてみても、元通り100行のデータを含む Apache Arrow ファイルである。

$ python
>>> import pyarrow as pa
>>> f = pa.ipc.open_file('/dev/shm/monu.arrow')
>>> f.get_record_batch(0).num_rows
100
>>> f.num_record_batches
1
>>> f.get_record_batch(0).num_rows
100

SIGKILLで強制終了した場合など救えないケースもあるが、一応、こういった運用面での安定性に寄与する機能も強化されているという事で。

*1:リファクタリングと呼ぼう!

*2:PostgreSQL系コマンドは-Wでパスワードのプロンプトを出すが、MySQL系はコマンドラインでパスワードを与えるお作法のよう