Billion rows processed per second at a single-node PostgreSQL

I have worked on benchmarking of PG-Strom at a large hardware configuration for a couple of months.
Due to the server models we had, our benchmark results had been usually measured at a small 1U rack server with 1CPU, 1GPU and 3-4 NVME-SSDs, even though PG-Strom supports multi-GPUs and striping of NVME-SSDs.
So, we determined to arrange more powerful hardware environment for a validation of maximum performance of PG-Strom a few months before. After that, we could confirm billion-rows processed per second performance at a single-node PostgreSQL system using Star Schema Benchmark (SSBM), as we have expected.
This article introduces the technology briefs, benchmark results and expected applications.

Hardware configuration for the benchmark

The diagram below shows the hardware configuration we prepared for the benchmark project.

The 4U-rack server (Supermicro SYS-4029GP-TRT) is a certified model for NVIDIA Tesla V100 GPUs *1, and optimized for NVIDIA GPUDirect RDMA by PCIe-switch.
PCIe-switch has two purpose here. It enables to install more device than host system’s PCIe lanes *2, and also enables to bypass CPU on peer-to-peer data transfer between PCIe devices under the PCIe-switch.
It is an ideal configuration for us, because PG-Strom fully utilize the P2P data transfer on SSD-to-GPU Direct SQL mode, and it is the key of performance.
U.2 NVME-SSD drives (Intel DC P4510) are installed on the external JBOF enclosures, and connected to the host system using PCIe host cards and direct attach cables *3. This host card is designed to install on PCIe x16 slot, and capable to connect four NVME-SSD drives that support PCIe x4 bandwidth. So, data transfer bandwidth over the PCIe-bus shall balance on 1xGPU and 4xSSDs.
Below is the block diagram of our benchmark environment.


Element technology① - SSD-to-GPU Direct SQL

It is a characteristic feature of PG-Strom. By co-operation with a special Linux kernel driver (nvme_strom) that intermediates peer-to-peer DMA from NVME-SSD to GPU over PCIe-bus, PG-Strom can directly load PostgreSQL data blocks on NVME-SSD drives onto GPU’s device memory, but CPU/RAM is bypassed.

Usually, software cannot determine which items are necessary and which ones are junks, prior to data blocks are loaded to system RAM. On the other word, we consume I/O bandwidth and CPU cycles to copy junk data.
SSD-to-GPU Direct SQL changes the data flow. GPU pre-processes SQL workloads on the middle of I/O path to eliminate unnecessary rows and runs JOIN/GROUP BY steps. So, CPU/RAM eventually receives just a small portion of the result set from the GPU.

Element technology② - Arrow_Fdw

Apache Arrow is a column-oriented data format for structured data-set, and many applications (especially, big-data and data analytics area) support it for data exchange. PG-Strom supports to read Apache Arrow files as its columnar store, in addition to PostgreSQL’s row data blocks.

Due to the nature of data format, columnar-data enables to read only referenced columns, unlike row-data. It usually reduce amount of I/O to be read from the storage.
Arrow_Fdw is designed for direct read from Apache Arrow files without data importing to database system. It means we can eliminate one time-consuming steps, if conprehensive software generates imput data in Apache Arrow format. It is a suitable characteristic for IoT/M2M log processing system that usually generates tons of data to be loaded to data processing phase.

Star Schema Benchmark and PostgreSQL partition

As our usual, we used Star Schema Benchmark (SSBM) for performance measurement.
Its scaling factor of lineorder is SF=4000, then it shall be distributed to four partition-leafs using hash-partition of PostgreSQL.
So, individual partition-leafs has about 879GB (6 billion rows) row-data, and total size of lineorder is about 3.5TB.

We also set up equivalent Apache Arrow files for each partition leaf, and set up one another partition table that is consists of foreign-tables on Arrow_Fdw.

SSBM defines 13 queries that containes a scan on lineorder that is the largest portion and JOIN / GROUP BY. For example, the query below is Q2_3.

select sum(lo_revenue), d_year, p_brand1
  from lineorder, date1, part, supplier
  where lo_orderdate = d_datekey
    and lo_partkey = p_partkey
    and lo_suppkey = s_suppkey
     and p_brand1 = 'MFGR#2221'
     and s_region = 'EUROPE'
  group by d_year, p_brand1
  order by d_year, p_brand1;

Benchmark Results

Below is the benchmark result. The vertical axis means number of rows processed per second.
Its definition is (number of lineorder; 24billion rows) / (SQL response time).
For example, PG-Strom + Arrow_Fdw responds the result of Q1_2 within 14.0s, so it means 1.71 billion rows were processed per second.

The blue bar is PostgreSQL v11.5 with a manual tuning to launch 24 parallel workers *4. Its performance was about 50-60 million rows per second.

The orange bar is PG-Strom on row-data of PostgreSQL by SSD-to-GPU Direct SQL. Its performance was about 250 million rows per second.

These two series has little variation over the 13 queries, because i/o dominates the overall workloads rather than JOIN / GROUP BY, thus equivalent amount of storage i/o led almost equivalent performance.

The green bar is PG-Strom + Arrow_Fdw with SSD-to-GPU Direct SQL, is the topmost performance.
Due to the nature of columnar data, query performance was variable according to the number of referenced columns.
Regarding of the group of Q1_* and Q2_*, we could validate the performance more than billion rows processed per second.

The graph above is time-seriesed result of iostat.
It shows PG-Strom could load the data from 16x NVME-SSD drives in 40GB/s during the query execution.
Here are 13 mountains because SSBM defines 13 queries.

Conclusion

This benchmark results shows that a proper configuration of hardware and software can process reporting and analytics queries on terabytes grade data-set more than billion rows per second performance.
This grade of performance will change our assumption towards system architecture to process "big-data".
People have usually applied cluster system without any choice, however, accelerated PostgreSQL with GPU + NVME can be an alternative choice for them.

We think our primary target is log-data processing at IoT/M2M area where many devices generate data day-by-day. The raw data is usually huge for visualization or machine-learning, so needs to be summarized first prior to other valuable tasks.
A single node configuration makes system administration much simpler than cluster-based solution, and PostgreSQL is a long-standing software thus many engineers are already familiar with.

Appendix - hardware components.

This parts list is just for your reference

Parts Qty
model Supermicro SYS-4029GP-TRT 1
CPU Intel Xeon Gold 6226 (12C, 2.7GHz) 2
RAM 16GB DIMM (DDR4-2933; ECC) 12
GPU NVIDIA Tesla V100 (PCI-E; 16GB) 2
GPU NVIDIA Tesla V100 (PCI-E; 32GB) 2
HDD Seagate 2.5inch 1.0TB/2.0TB 4
JBOF SerialCables PCI-ENC8G-08A 2
SSD Intel DC P4510 (U.2; 1.0TB) 16
HBA SerialCables PCI-AD-x16HE-M 4

*1:This model is capable to install up to 8 GPUs

*2:Xeon Gold 6226 [Cascade Lake] has 48lanes

*3:Supermicro does not support to replace the internal storage backplane to the product that support NVME.

*4:10 workers were launched in the default, so it means 2-3 workers per partition were assigned. It is too small and cannot pull out i/o capability because CPU usage ratio was 100% during the execution.

秒速で10億レコードを処理する話

これまでのPG-Stromの性能測定といえば、自社保有機材の関係もあり、基本的には1Uラックサーバに1CPU、1GPU、3~4台のNVME-SSDを載せた構成のハードウェアが中心だった。*1
ただソフトウェア的にはマルチGPUやNVME-SSDのストライピングに対応しており、能力的にどこまで伸ばせるのかというのは気になるところである。
そこで、方々に手を尽くして、次のようなベンチマーク環境を整備してみた。
(機材をお貸し頂いたパートナー様には感謝感激雨あられである)

4UサーバのSYS-4029GP-TRTというモデルは、GPUをたくさん乗っけるためにPCIeスイッチを用いてPCIeスロットを分岐している。ちょうど、PCIeスイッチ1個あたり2個のPCIe x16スロットが用意されており、同じPCIeスイッチ配下のデバイス同士であれば、完全にCPUをバイパスしてPeer-to-Peerのデータ転送ができる。Supermicro自身も、このサーバを"GPUDirect RDMAに最適化"したモデルとして売っている。

こういう構造のサーバであるので、P2P DMAを用いてSSDからGPUへデータを転送する場合、PCIeスイッチの配下にある2本のPCIeスロットにそれぞれSSDGPUをペアにして装着すると、データ転送の時に効率が良い。

U.2のNVME-SSDを装着するには外部のエンクロージャが必要で、今回は(色々あって)SerialCables社のPCI-ENC8G-08Aという製品を使用した。これはエンクロージャあたり8本のU.2 NVME-SSDを装着する事ができ、ダイレクトアタッチケーブルを使用して各2枚のPCIeホストカード(PCI-AD-x16HE-M)と接続する。

そうすると、GPU 1台あたりU.2 NVME-SSDを4台のペアを構成する事ができ、それぞれPCIe Gen3.0 x16レーン幅の帯域でCPUをバイパスして直結できるようになる。
ブロックダイアグラムにして書き直すと以下の通り。


Star Schema Benchmarkとデータセット

性能評価に使ったのはいつもの Star Schema Benchmark(SSBM) で、SF=4000で作ったデータセットPostgreSQLのHashパーティショニング機能を使って4つのユニットにデータを分散させる。つまり、U.2 NVME-SSDを4台あたりSF=1000相当の規模のデータ(60億件、879GB)を持つことになる。

さらにデータの持ち方も、PostgreSQLの行形式に加えて、PG-Stromの列ストアであるApache Arrow形式で全く同じ内容のファイルを用意して、各パーティションへ配置した。

Apache Arrow形式というのは構造化データを列形式で保存・交換するためのフォーマットで、PG-StromにおいてはArrow_Fdw機能を用いての直接読み出し(SSD-to-GPU Direct SQLを含む)に対応している。詳しくはこちらのエントリーなど。

kaigai.hatenablog.com

SSBMには全部で13種類のクエリが含まれており、例えば、Q2_3のクエリは以下の通り。
データ量の多いlineorderのスキャンと同時に、他のテーブルとのJOINやGROUP BYを含むワークロードである。

select sum(lo_revenue), d_year, p_brand1
  from lineorder, date1, part, supplier
  where lo_orderdate = d_datekey
    and lo_partkey = p_partkey
    and lo_suppkey = s_suppkey
     and p_brand1 = 'MFGR#2221'
     and s_region = 'EUROPE'
  group by d_year, p_brand1
  order by d_year, p_brand1;

ベンチマーク結果

という訳で、早速ベンチマーク結果を以下に。
縦軸は単位時間あたりの処理行数で、(lineorderの行数;240億行)÷(SQL応答時間)で計算している。例えば PG-Strom 2.2 + Arrow_Fdw における Q1_2 の応答時間は 14.0s なので、毎秒17.1億行を処理しているということになる。

青軸は PostgreSQL v11.5 で、並列クエリ数を24に引き上げるというチューニング*2を行っている。性能値としては、毎秒50~60百万行の水準。

オレンジの軸は、PostgreSQLの行データに対して PG-Strom v2.2 のSSD-to-GPU Direct SQLを使用してクエリを実行したもの。性能値としては、毎秒250百万行前後の水準。

この二つに関しては、13個のクエリの間で性能値に大きな傾向の差がない。これは、JOIN/GROUP BYの処理負荷よりも、まずI/Oの負荷が支配項になっており、行データである限りは参照されていない列も含めてストレージから読み出さねばならないためだと考えられる。

緑の軸が真打で、PG-Strom v2.2のSSD-to-GPU Direct SQLをArrow_Fdw管理下の列ストアに対して実行したもの。
これは列データなので、参照するカラムの数によって大きく性能値の傾向が違っている様子が見えるが、Q1_*のグループ、Q2_*のグループに関しては、目標としていた毎秒10億行の処理能力を実証できたことになる。

一応、4xGPU + 16xNVME-SSD できちんとI/Oの性能が出ているという事を確認するために、クエリ実行中の iostat の結果を積み上げグラフにしてみた。山が13個あるのはクエリを13回実行したという事で、物理的には概ね40GB/sのSeqRead性能が出ている事がわかる。(つまり、クエリ応答性能の違いは参照している列の数による、という事でもある)

参考までに、今回の構成は以下の通り。

型番 数量
model Supermicro SYS-4029GP-TRT 1
CPU Intel Xeon Gold 6226 (12C, 2.7GHz) 2
RAM 16GB (DDR4-2933; ECC) 12
GPU NVIDIA Tesla V100 (PCI-E; 16GB) 2
GPU NVIDIA Tesla V100 (PCI-E; 32GB) 2
HDD Seagate 2.5inch 1.0TB/2.0TB 4
JBOF SerialCables PCI-ENC8G-08A 2
SSD Intel DC P4510 (U.2; 1.0TB) 16
HBA SerialCables PCI-AD-x16HE-M 4

ご自分でも環境を作ってみたいという方はご参考に。

PostgreSQL Conference Japan 2019

今回の一連の検証結果に関しては、来る11月15日(金)に品川駅前(AP品川)で開催予定の PostgreSQL Conference Japan 2019 にて『PostgreSQLだってビッグデータ処理したい!! ~GPUとNVMEを駆使して毎秒10億レコードを処理する技術~』と題して発表を行います。
有償でのカンファレンスではありますが、私の発表の他にも、経験豊富なPostgreSQLエンジニアによる14のセッション/チュートリアルが予定されており、ぜひご参加いただければと思います。

www.postgresql.jp

*1:例外としては、PGconf.ASIA 2018などに向けて、NEC様の協力でExpEtherを3台、3xGPU + 6xSSDの構成を作って13.5GB/sを記録したもの。

*2:デフォルトだとnworkers=10程度、すなわちパーティションあたり2~3のワーカーとなり、CPU100%で貼り付いてしまうため

Asymmetric Partition-wise JOIN

久々に PostgreSQL 本体機能へのパッチを投げたので、それの解説をしてみる。

PostgreSQL: Re: Asymmetric partition-wise JOIN

背景:Partition-wise JOIN

PostgreSQLパーティションを使ったときに、全く同一のパーティションの切り方をして、かつパーティションキーをJOINの結合条件に用いた場合に限り、パーティション子テーブル同士のJOINを先に行い、その結果を出力するという機能がある。

以下の例では、ptableおよびstableは、それぞれ列distの値を用いたHashパーティションを設定しており、3つずつの子テーブルを持つ。

postgres=# \d
                List of relations
 Schema |   Name    |       Type        | Owner
--------+-----------+-------------------+--------
 public | ptable    | partitioned table | kaigai
 public | ptable_p0 | table             | kaigai
 public | ptable_p1 | table             | kaigai
 public | ptable_p2 | table             | kaigai
 public | stable    | partitioned table | kaigai
 public | stable_p0 | table             | kaigai
 public | stable_p1 | table             | kaigai
 public | stable_p2 | table             | kaigai
(8 rows)

これをJOINする時の実行計画は以下の通り。

postgres=# explain select * from ptable p, stable s where p.dist = s.dist;
                                     QUERY PLAN
------------------------------------------------------------------------------------
 Hash Join  (cost=360.00..24617.00 rows=10000 width=49)
   Hash Cond: (p.dist = s.dist)
   ->  Append  (cost=0.00..20407.00 rows=1000000 width=12)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
   ->  Hash  (cost=235.00..235.00 rows=10000 width=37)
         ->  Append  (cost=0.00..235.00 rows=10000 width=37)
               ->  Seq Scan on stable_p0 s  (cost=0.00..60.77 rows=3277 width=37)
               ->  Seq Scan on stable_p1 s_1  (cost=0.00..62.69 rows=3369 width=37)
               ->  Seq Scan on stable_p2 s_2  (cost=0.00..61.54 rows=3354 width=37)
(11 rows)

おっとっと。
この場合、ptableパーティションの子テーブル3つと、stableパーティションの子テーブル3つをそれぞれ全て読み出し、メモリ上で結合した結果同士をHash Joinで結合する事がわかる。

Partition-wise JOINはデフォルトでは off になっているので、enable_partitionwise_joinパラメータをonにセットして明示的に有効化する必要がある。

postgres=# set enable_partitionwise_join = on;
SET
postgres=# explain select * from ptable p, stable s where p.dist = s.dist;
                                     QUERY PLAN
------------------------------------------------------------------------------------
 Append  (cost=101.73..19617.00 rows=10000 width=49)
   ->  Hash Join  (cost=101.73..6518.87 rows=3277 width=49)
         Hash Cond: (p.dist = s.dist)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Hash  (cost=60.77..60.77 rows=3277 width=37)
               ->  Seq Scan on stable_p0 s  (cost=0.00..60.77 rows=3277 width=37)
   ->  Hash Join  (cost=104.80..6527.08 rows=3369 width=49)
         Hash Cond: (p_1.dist = s_1.dist)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Hash  (cost=62.69..62.69 rows=3369 width=37)
               ->  Seq Scan on stable_p1 s_1  (cost=0.00..62.69 rows=3369 width=37)
   ->  Hash Join  (cost=103.47..6521.06 rows=3354 width=49)
         Hash Cond: (p_2.dist = s_2.dist)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
         ->  Hash  (cost=61.54..61.54 rows=3354 width=37)
               ->  Seq Scan on stable_p2 s_2  (cost=0.00..61.54 rows=3354 width=37)
(16 rows)

同じクエリに対して、set enable_partitionwise_join = onを設定した状態で実行計画を作らせると、対応するパーティションの子テーブル同士でJOINを行い、その次にAppend、つまり各パーティション子要素の処理結果を結合している事が分かる。

これはJOINの問題サイズを小さくし、多くの場合、Append処理に要するCPUサイクルを減らすことのできる優れた最適化テクニックではあるが、かなりの慎重さを以ってDB設計を行う必要がある。
なぜなら『全く同一のパーティションの切り方をして、かつパーティションキーをJOINの結合条件に用いる』事ができるよう、パーティション設計とSQLクエリの書き方を考える必要があるからである。

新機能:Asymmetric Partition-wise JOIN

同じようなノリで、非パーティションテーブルとパーティションテーブルの間のJOIN処理を、Appendの前に移動する事ができる。
例えば、非パーティションテーブルであるt1ptableとの間のJOINを (t1 \times ptable)と記述すると、これは (t1 \times ptable_p0) + (t1 \times ptable_p1) + (t1 \times ptable_p2)と展開する事ができる。

言い換えれば、t1を各パーティション子テーブルに分配する(クエリの書き換えに相当)事で、Appendよりも先にJOINを実行する事ができる。
もちろん、メリット・デメリットはあるので、先にパーティション子テーブルの内容を全て読み出した後でJOINを実行した方が良いというケースもある。

具体的に Asymmetric Partition-wise JOIN が活きてくるケースというのは

  • パーティション側のテーブルが、読み出しコストを無視できる程度に小さい。
    • 分配された分、複数回の読み出しが必要となってしまうため。
  • 子テーブルとのJOINにより、かなりの比率で行数が減ることが見込まれる。
    • 行数が減らなければ、Append処理は楽にならない。

というケースなので、あらゆる場合で効果があるかというと、そのような銀の弾丸はない。

では試してみることにする。
本来、以下のような実行計画となったいたクエリであるが・・・

postgres=# explain select * from ptable p, t1 where p.a = t1.aid;
                                    QUERY PLAN
----------------------------------------------------------------------------------
 Hash Join  (cost=2.12..24658.62 rows=49950 width=49)
   Hash Cond: (p.a = t1.aid)
   ->  Append  (cost=0.00..20407.00 rows=1000000 width=12)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
   ->  Hash  (cost=1.50..1.50 rows=50 width=37)
         ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
(8 rows)

同様に enable_partitionwise_join パラメータによって当該機能は有効化され、以下のような『賢い』実行計画を生成するようになる。

postgres=# explain select * from ptable p, t1 where p.a = t1.aid;
                                    QUERY PLAN
----------------------------------------------------------------------------------
 Append  (cost=2.12..19912.62 rows=49950 width=49)
   ->  Hash Join  (cost=2.12..6552.96 rows=16647 width=49)
         Hash Cond: (p.a = t1.aid)
         ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=2.12..6557.29 rows=16658 width=49)
         Hash Cond: (p_1.a = t1.aid)
         ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=2.12..6552.62 rows=16645 width=49)
         Hash Cond: (p_2.a = t1.aid)
         ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
(16 rows)

t1テーブルが各パーティション子テーブル毎に読み出され、その結果を Append によって結合している事がお分かりだろうか。
しかも、最初のケースでは Append は 100万行を処理しなければならないところ、後者ではHash Joinによって大幅に行数が削られているため、たった49950行(推定値)しか処理しない事になっている。これに伴う推定コストの低下によって、Asymmetric Partition-wise JOINを使った実行計画が選択された。

ちなみに、結合すべきテーブルを増やしても、(コスト的に見合えば)各パーティション子テーブルへと分配してくれる。

postgres=# explain select * from ptable p, t1, t2 where p.a = t1.aid and p.b = t2.bid;
                                       QUERY PLAN
----------------------------------------------------------------------------------------
 Append  (cost=4.25..19893.99 rows=2495 width=86)
   ->  Hash Join  (cost=4.25..6625.83 rows=832 width=86)
         Hash Cond: (p.b = t2.bid)
         ->  Hash Join  (cost=2.12..6552.96 rows=16647 width=49)
               Hash Cond: (p.a = t1.aid)
               ->  Seq Scan on ptable_p0 p  (cost=0.00..5134.63 rows=333263 width=12)
               ->  Hash  (cost=1.50..1.50 rows=50 width=37)
                     ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t2  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=4.25..6630.20 rows=832 width=86)
         Hash Cond: (p_1.b = t2.bid)
         ->  Hash Join  (cost=2.12..6557.29 rows=16658 width=49)
               Hash Cond: (p_1.a = t1.aid)
               ->  Seq Scan on ptable_p1 p_1  (cost=0.00..5137.97 rows=333497 width=12)
               ->  Hash  (cost=1.50..1.50 rows=50 width=37)
                     ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t2  (cost=0.00..1.50 rows=50 width=37)
   ->  Hash Join  (cost=4.25..6625.48 rows=831 width=86)
         Hash Cond: (p_2.b = t2.bid)
         ->  Hash Join  (cost=2.12..6552.62 rows=16645 width=49)
               Hash Cond: (p_2.a = t1.aid)
               ->  Seq Scan on ptable_p2 p_2  (cost=0.00..5134.40 rows=333240 width=12)
               ->  Hash  (cost=1.50..1.50 rows=50 width=37)
                     ->  Seq Scan on t1  (cost=0.00..1.50 rows=50 width=37)
         ->  Hash  (cost=1.50..1.50 rows=50 width=37)
               ->  Seq Scan on t2  (cost=0.00..1.50 rows=50 width=37)
(28 rows)

PG-Stromとの関連

この機能は、もちろんピュアなPostgreSQLで大量データを扱うときに効果を発揮するものである。

ただそれ以上に、PG-Stromのように『データが物理的にどこに存在するのか』を気にするソリューションだと更に大きな恩恵がある。

例えば、日々大量に溜まっていくログデータを、パーティション分割によって複数のディスクに物理的に分割格納し、集計・解析時にはこれを他のテーブルとJOIN/GROUP BYするという、比較的ありがちな処理を考えてみる。

もし非パーティションテーブルとパーティションテーブルのJOINが必ずAppendの後であれば、SSD-to-GPU Direct SQLを使ったとしても、CPUが処理すべき行数はほとんど減ることはない*1ため、その後のJOIN/GROUP BY処理でGPUを使えたとしても、データの流れは Disk → CPU/RAM → GPU → CPU/RAM となってしまうため、データのピンポンによって転送効率はかなり悪化してしまう。

Asymmetric Partition-wise JOINによって、非パーティションテーブルとパーティション子テーブルのJOINを先に実行できるようになれば、これはSSD-to-GPU Direct SQLがフル稼働できる状況になり、データの流れも Disk → GPU → CPU/RAM とかなりシンプルになる。

これはとりわけ、以下のような構成のマシンを用いてPCIeスイッチ経由でSSDGPU間でのダイレクトデータ転送を行う場合に顕著で、SQLワークロードの大半をPCIeスイッチより外側のNVME-SSDGPUのペアで実行する事ができるので、データが全くCPUを経由することなく処理されてしまう(= シングルノードでもPCIeバスの限界までスケールできる)という事になってしまう。

こういった諸々面白い事ができるようになるので、ぜひ皆さんにもCommitFestに登録されたパッチのレビューに参加していただければ幸いである。

*1:スキャン条件のみで大半の行を落とせるという幸運な場合を除く

技術負債を返した話(Pre-built GPU Binary対応)

最もプリミティブなPG-Stromの処理は、ユーザが入力したSQLを元にCUDA CのGPUプログラムを自動生成し、これを実行時コンパイル。ここで生成されたGPUバイナリを用いて、ストレージから読み出したデータをGPUで並列処理するという一連の流れである。
後にJOIN/GROUP BYの対応や、データ型の追加、SSD-to-GPU Direct SQLなど様々な機能を付加したものの、このコード生成と実行時ビルドの仕組みは2012年に最初のプロトタイプを公開した時から大きく変わってはいない。

コードの自動生成を担当するのはcodegen.cで、この人が吐いたCUDA Cプログラムをcuda_program.cがNVRTC (NVIDIA Run-Time Compiler) を使ってコンパイルし、GPUバイナリを生成する。

ただ、当初の『全件スキャンWHERE句・固定長データ型のみ』というシンプルなコード生成が、やがてJOIN/GROUP BYや、numeric型やtext型の対応など、より複雑な処理をGPUで実行するようになるにつれて、自動生成部分だけでなく、静的に書いておいた部分と組み合わせてGPUプログラムを生成するようになる。
これは当然で、例えばGPUでHash-Joinを実行する場合でも、クエリを処理するたびに変わる可能性があるのは、二つのテーブルを結合するJoin-Keyの結合条件を評価する部分だけで、Hash-Joinの基本的なアルゴリズムは同じだからである。

これらの静的なCUDA Cコードは、ヘッダファイル(*.h)として記述され、cuda_program.cでコードをコンパイルする時にインクルードする事で、動的生成したコードと組み合わせて使用していた。

ただし…。
PG-Stromの機能が増え、静的なCUDA Cコードの割合が増えるにつれ実行時コンパイルの時間は徐々に増えていく事になる。
intやtextといった基本的なデータ型の比較といった、シンプルなコードであれば気になる程ではないが、例えば、(コーナーケースではあるが)複合型(Composite Type)のデータをGPU側で生成する場合などは、対応する全てのデータ型が関わってくるため、ビルドに要する時間がSQLの実行時間よりも遥かに長くなるという本末転倒な状況が起こる。
その場合、一度ビルドしたGPUバイナリは共有メモリ上にキャッシュされるため、初回と2回目以降のSQL実行時間が大幅に異なるという事になってしまう。

そこで、ほぼ一週間を丸々要してGPUコードの実行時コンパイルに係る部分のリファクタリングを行った。
要は、静的なCUDA Cコードは予めコンパイルして、実行時にはバイナリをリンクすれば済む話なので、ある程度以上に複雑な構造を持つ関数は cuda_gpujoin.cucuda_numeric.cuという形で切り出し、PG-Strom自体のビルド時に、NVCCを用いてGPUバイナリファイルを生成するようにした。
これらは動的に生成されたコードと実行時リンクされ、最終的にはこれまでと同じ処理を行うようになる。
静的な部分は Fatbin 形式という形でインストールされる。この形式は、GPUの各世代(Pascal, Volta, Turing)向けに最適化されたバイナリを内部にアーカイブし、ターゲットGPUに最も適したバイナリをリンク時に使用してくれる。

なので、インストールされたPG-Strom関連ファイルを見てみると、目新しいファイルが並んでいる…という事になる。(*.gfatbin ファイルはデバッグオプションを有効にしてビルドしたバージョン。pg_strom.debug_jit_compile_options=onの時にはこちらが利用される)

[kaigai@magro ~]$ ls /usr/pgsql-11/share/pg_strom
arrow_defs.h          cuda_gpupreagg.fatbin   cuda_gpusort.h        cuda_numeric.gfatbin    cuda_textlib.gfatbin
cuda_basetype.h       cuda_gpupreagg.gfatbin  cuda_jsonlib.fatbin   cuda_numeric.h          cuda_textlib.h
cuda_common.fatbin    cuda_gpupreagg.h        cuda_jsonlib.gfatbin  cuda_plcuda.h           cuda_timelib.fatbin
cuda_common.gfatbin   cuda_gpuscan.fatbin     cuda_jsonlib.h        cuda_primitive.fatbin   cuda_timelib.gfatbin
cuda_common.h         cuda_gpuscan.gfatbin    cuda_misclib.fatbin   cuda_primitive.gfatbin  cuda_timelib.h
cuda_gpujoin.fatbin   cuda_gpuscan.h          cuda_misclib.gfatbin  cuda_primitive.h        cuda_utils.h
cuda_gpujoin.gfatbin  cuda_gpusort.fatbin     cuda_misclib.h        cuda_rangetype.h        pg_strom--2.2.sql
cuda_gpujoin.h        cuda_gpusort.gfatbin    cuda_numeric.fatbin   cuda_textlib.fatbin

ベンチマーク

GPUコードのビルドに要する時間を計測するため、何種類かのテスト用クエリを作成し、その実行時間を計測してみた。
とりあえずクエリは以下の7種類で計測してみた。

-- Q1 ... シンプルなGpuScan + Int型の演算
SELECT id, aid+bid FROM t0 WHERE aid < 100 AND bid < 100;

-- Q2 ... Numeric型を利用するGpuScan
SELECT id, a+b, a-c FROM t_numeric1 WHERE a < 50.0 AND b < 50.0;

-- Q3 ... Timestamp型とEXTRACT()文を使用するGpuScan
SELECT id, a, b FROM t_timestamp1 WHERE EXTRACT(day FROM a) = 19 and EXTRACT(month FROM b) = 6;

-- Q4 ... Arrow_Fdw外部テーブルからシンプルなデータ(Int, Timestamp)の読み出し
SELECT id,ymd FROM t_arrow WHERE id % 100 = 23;

-- Q5 ... Arrow_Fdw外部テーブルから複合型(Composite)の読み出し
SELECT id,v FROM t_arrow WHERE id % 100 = 23;

-- Q6 ... GpuJoinを含むクエリ
SELECT id, aid, atext FROM t0 NATURAL JOIN t1 WHERE bid < 50 AND cid < 50;

-- Q7 ... GpuPreAgg + GpuJoinを含むクエリ
SELECT cat, count(*), sum(ax) FROM t0 NATURAL JOIN t1 GROUP BY cat;

ディスクからの読み出しの影響を排除するため、関連する全てのテーブルをpg_prewarmでオンメモリ状態にした上で、上記の7つのクエリをそれぞれ2回ずつ実行した。
以下の表/グラフはその結果であるが、静的なCUDA Cコードを事前にビルドしておく事で、初回の実行時間が大幅に改善しているのが分かる。もちろん、最終的なGPUバイナリはほぼ同じ形になるので、ビルド済みGPUバイナリのキャッシュを参照できる2回目以降では処理時間の大きな差はない。(単位は全てms)
ちなみにQ5はめちゃめちゃ時間がかかっている。これは、Arrow形式(列データ)から複合型のデータをGPU上で再構成し、CPUへは行データとして返すという、非常にコード量の多い処理を行っているためである。

旧方式(1回目) 旧方式(2回目) 新方式(1回目) 新方式(2回目)
Q1 1,199.6 331.7 369.0 321.5
Q2 3,279.2 222.0 421.6 227.5
Q3 3,213.5 206.5 462.6 219.2
Q4 793.7 134.0 355.1 147.4
Q5 17,872.2 141.4 364.7 144.8
Q6 1,928.5 352.3 605.3 345.9
Q7 2,435.6 359.2 729.1 349.1

分かりやすいように、Pre-built GPU Binary機能の有無別に(2回目の実行時間)-(1回目の実行時間)を計算してみる。2回目は共有メモリ上のキャッシュを参照するだけなので、これが正味のビルド時間という事になるが、これまでは『一呼吸おいて』という感じだった部分がコンマ何秒程度にまで短くなった事が分かる。

背景

実はこの機能、別の要件を考えた時にどうしても急いで作らざるを得ない事情があった。
これまでユーザさんからの新機能の要求を優先して実装していたために、テストケースの作成やテスト自動化が後回しになってきた経緯があったのだが、エンタープライズ向けにはこれらソフトウェア品質改善/品質保証のためのプロセスは必要不可欠であり、今年の後半は少し腰を落ち着けてテストの充実を図ろうと考えている。
ただ、大量のテストを流すとなると、相応のバリエーションのSQLを実行する事となり、その度にGPUコードの自動生成と実行時コンパイルが走るのでは、テストケースの実行に非常に長い時間を要してしまう事になる。これではPG-Stromのテストをしているのか、コンパイラを動かしているのか分からない!

SSDtoGPU Direct SQL on Columnar-store (Apache Arrow)

I have recently worked on development of FDW for Apache Arrow files; including SSDtoGPU Direct SQL support of PG-Strom. Apache Arrow is a column-oriented data format designed for application independent data exchange, supported by not a small number of "big-data" software.
The latest revision of PG-Strom can use Apache Arrow files as data source of the query execution, through the Arrow_Fdw feature. This article introduces the brief overview of Arrow_Fdw, its benchmark results and our future plan.

What is Arrow_Fdw

PostgreSQL supports FDW (Foreign Data Wrapper) mechanism that enables to read variable external data source as if here is a normal table. Some kind of FDW also support writing.
Of course, external data source has different data structure from the internal representation of PostgreSQL rows / columns. For example, CSV file is a way to represent structured data based on comma separated text. PostgreSQL cannot understand this format without proper conversion, so file_fdw extension intermediate the world of PostgreSQL and the world of CSV file.

This mechanism can be applied variable kind of data sources. Here is postgres_fdw that uses a remote PostgreSQL server as data source. An unique one is twitter_fdw that fetches tweets from the Twitter timeline via WebAPI.
The Arrow_Fdw maps files in Apache Arrow file for references by SQL.
Below is an example to map an Arrow file (/opt/nvme/t.arrow) using Arrow_Fdw.

postgres=# IMPORT FOREIGN SCHEMA hoge
             FROM SERVER arrow_fdw
             INTO public OPTIONS (file '/opt/nvme/t.arrow');
IMPORT FOREIGN SCHEMA
postgres=# \d hoge
                   Foreign table "public.hoge"
 Column |  Type   | Collation | Nullable | Default | FDW options
--------+---------+-----------+----------+---------+-------------
 id     | integer |           |          |         |
 aid    | integer |           |          |         |
 bid    | integer |           |          |         |
 ymd    | date    |           |          |         |
 cat    | text    |           |          |         |
 md5    | text    |           |          |         |
Server: arrow_fdw
FDW options: (file '/opt/nvme/t.arrow')

Of course, you can use CREATE FOREIGN TABLE command as usual, however, Arrow file contains its schema definition, and columns-list must match exactly. Our recommendation is IMPORT FOREIGN SCHEMA because it automatically generates foreign table definition.

Overview of SSDtoGPU Direct SQL on Arrow_Fdw


PG-Strom has a special storage optimization mode called SSD-to-GPU Direct SQL. It utilizes P2P DMA over PCIe bus, on top of GPUDirect RDMA kernel APIs provided by NVIDIA.
Usually, PostgreSQL loads table's contents onto RAM from the storage, then processes the loaded data. In typical analytics workloads, not a small portion of the loaded data is filtered out. In other words, we consumes the narrow storage bandwidth to carry junks; that is waste of I/O resources.
When PG-Strom uses SSD-to-GPU Direct SQL, it loads table's contents to GPU's device memory directly, then GPU runs SQL workloads using its thousands cores in parallel. It likely reduce amount of data to be loaded onto the host system much much smaller than the original source.

Even if data source are Arrow files, overall mechanism is same. PG-Strom loads only referenced column data using P2P DMA, then GPU processes SQL workloads (WHERE-clause/JOIN/GROUP BY). Unlike row-data of PostgreSQL tables, it does not load unreferenced columns.

Benchmark

We run the Star Schema Benchmark (SSBM) with scale-factor=401 as usual. Its database size is 353GB in total. We exported its lineorder table into an Arrow file on NVME-SSD volume.

model Supermicro SYS-1019GP-TT
CPU Intel Xeon Gold 6126T (2.6GHz, 12C) x1
RAM 192GB (32GB DDR4-2666 x6)
GPU NVIDIA Tesla V100 (5120C, 16GB) x1
SSD Intel SSD DC P4600 (HHHL; 2.0TB) x3
(striped by md-raid0)
HDD 2.0TB (SATA; 72krpm) x6
network 10Gb ethernet x2 ports
OS Ret Hat Enterprise Linux 7.6
CUDA 10.1 + NVIDIA Driver 418.40.04
DB PostgreSQL v11.2
PG-Strom v2.2devel

The benchmark results are below:
f:id:kaigai:20190430235505p:plain

Due to column-oriented data structure, performance is variable a lot depending on the number of referenced columns.
The query response time becomes much faster than the previous row-data based benchmark results; 7.2-7.9GB/s by PG-Strom and 2.2-2.3GB/s by PostgreSQL with parallel-scan. The query execution throughput is an inverse of query response time. So, 49GB/s means that summarizing of sequential-scan results on 300GB+ table is finished about 6sec. 25GB/s also means similar workloads are finished about 12sec. Once we can achieve this grade of batch processing performance in a single node, it changes some assumptions when we consider system configurations.

Validation of the benchmark results

For confirmation, we like to validate whether the benchmark results are reasonable. Below is definition of the flineorder foreign table that maps an Arrow file using Arrow_Fdw.
The above benchmark results say it runs Q2_1 in 28.7GB/s. The Q2_1 references lo_suppkey, lo_orderdate, lo_orderpriority and lo_supplycost.

 Foreign table "public.flineorder"
       Column       |     Type      | Size
--------------------+---------------+--------------------------------
 lo_orderkey        | numeric       | 35.86GB
 lo_linenumber      | integer       |  8.96GB
 lo_custkey         | numeric       | 35.86GB
 lo_partkey         | integer       |  8.96GB
 lo_suppkey         | numeric       | 35.86GB   <-- ★Referenced by Q2_1
 lo_orderdate       | integer       |  8.96GB   <-- ★Referenced by Q2_1
 lo_orderpriority   | character(15) | 33.61GB   <-- ★Referenced by Q2_1
 lo_shippriority    | character(1)  |  2.23GB
 lo_quantity        | integer       |  8.96GB
 lo_extendedprice   | bigint        | 17.93GB
 lo_ordertotalprice | bigint        | 17.93GB
 lo_discount        | integer       |  8.96GB
 lo_revenue         | bigint        | 17.93GB
 lo_supplycost      | bigint        | 17.93GB   <-- ★Referenced by Q2_1
 lo_tax             | integer       |  8.96GB
 lo_commit_date     | character(8)  | 17.93GB
 lo_shipmode        | character(10) | 22.41GB
FDW options: (file '/opt/nvme/lineorder_s401.arrow')  ... file size = 310GB

Total size of the referenced columns is 96.4GB of 310GB (31.1%). So, raw storage read throughput is 96.4GB / 11.06s = 8.7GB/s. This is reasonable performance for 3x Intel DC P4600 NVME-SSDs.

Future Plan: Towards 100GB/s

We can say the current status is "just workable", so we have to put further works to improvement software quality for production grade.

For more performance improvement, we like to have a benchmark activity using multi-GPU server like this.

This configuration uses Supermicro SYS-4029GP-TRT2 that is designed for HPC by RDMA optimization with PCIe switches. It allows to install up to 4x Tesla GPUs and 4x HBA card *1 to attach external NVME-SSD enclosure. An HBA card can connect 4x NVME-SSD, and can read the storage up to 12.8GB/s.
So, we can configure 4 of unit of 1xGPU + 4xSSD (12.8GB/s). It is 4xGPU + 16xSSD (51.2GB/s) in total.
If we can assume summarizing / analytic query read 1/3 columns in average, the upper limit of the query execution is 51.2GB/s / 0.33 = 150GB/s from the standpoint of raw storage performance.

Probably, 100GB/s is a feasible milestone for PostgreSQL, and we like to run the benchmark in 2019.

*1:One other option is 100Gb-NIC for NVME-over-Fabric. It is more scalable configuration.

Dive into Apache Arrow(その3)- SSD-to-GPU Direct SQL対応

ここ最近取り組んでいた Arrow_Fdw 機能がようやく動くようになったので、性能ベンチマークを行ってみた。
今回のエントリでは順を追って説明する事にしてみたい。

Arrow_Fdwとは

PostgreSQLにはFDW (Foreign Data Wrapper) という機能があり、PostgreSQL管理下にあるデータ(テーブルなど)だけでなく、外部のデータソースをあたかもテーブルであるかのように読み出す事ができる。一部のモジュールでは書込みにも対応している。
例えば、CSVファイルは構造化データを表現するための方法の一つである。もちろん、PostgreSQLの内部形式とデータ形式が違うので、そのままではコレをSQLの世界から取り扱う事ができないが、間にfile_fdwというモジュールが介在する事で、CSVPostgreSQLの内部形式へとデータ形式を変換している。

この枠組みは多種多様なデータソースに対して適用する事が可能で、リモートのPostgreSQLサーバをデータソースとするpostgres_fdwや、変わり種としては、Web APIを用いてTwitterのタイムラインからデータを取得するtwitter_fdw*1などがある。
これを利用してApache Arrow形式のファイルを外部テーブルにマップし、SQLから参照できるようにするのがArrow_Fdwモジュールである。

例えば、/opt/nvme/t.arrowというArrow形式ファイルをArrow_Fdwを用いてマップする場合は、以下の構文を用いる事ができる。

postgres=# IMPORT FOREIGN SCHEMA hoge
             FROM SERVER arrow_fdw
             INTO public OPTIONS (file '/opt/nvme/t.arrow');
IMPORT FOREIGN SCHEMA
postgres=# \d hoge
                   Foreign table "public.hoge"
 Column |  Type   | Collation | Nullable | Default | FDW options
--------+---------+-----------+----------+---------+-------------
 id     | integer |           |          |         |
 aid    | integer |           |          |         |
 bid    | integer |           |          |         |
 ymd    | date    |           |          |         |
 cat    | text    |           |          |         |
 md5    | text    |           |          |         |
Server: arrow_fdw
FDW options: (file '/opt/nvme/t.arrow')

もちろん、いつも通りにCREATE FOREIGN TABLE構文を用いてもよいが、Arrow形式ファイルにはそれ自体スキーマ定義が含まれており、列リストの定義がこれと厳密に一致していないとエラーとなるため、IMPORT FOREIGN SCHEMA構文を用いてArrow形式ファイルから自動的に外部テーブルの定義を作り出す方がお勧めである。

Apache Arrowファイル形式自体の説明は、こちらのエントリをご覧いただきたい。
kaigai.hatenablog.com

要は、列指向でデータが編成されているため、集計・解析系ワークロードに対して全てのデータをストレージから読み出す必要はなく、被参照列のデータのみを読み出せばよいためにI/Oを効率的に(高い密度で)行う事が可能となるという話である。

ベンチマーク

早速ベンチマークを行ってみる。使用したのは、いつもの1Uラックサーバ でスペックおよびシステム構成は以下の通り。

chassis Supermicro SYS-1019GP-TT
CPU Intel Xeon Gold 6126T (2.6GHz, 12C) x1
RAM 192GB (32GB DDR4-2666 x6)
GPU NVIDIA Tesla V100 (5120C, 16GB) x1
SSD Intel SSD DC P4600 (HHHL; 2.0TB) x3
(md-raid0によるストライピング構成)
HDD 2.0TB (SATA; 72krpm) x6
network 10Gb ethernet x2 ports
OS Ret Hat Enterprise Linux 7.6
CUDA 10.1 + NVIDIA Driver 418.40.04
DB PostgreSQL v11.2
PG-Strom v2.2devel

ベンチマークに使用したワークロードは、いつもの Star Schema Benchmark で、これも今まで通り scaling factor=401 なので、DBサイズは353GB。これをExt4で初期化したNVME-SSD区画上にArrow形式ファイルとして書き出すと約310GBであった。

実行結果は以下の通り。じゃん。

列指向データなので、クエリが参照する列の数と幅によって大幅にパフォーマンスが変わってくるのは致し方ないが、2.2~2.3GB/s前後であったPostgreSQL v11や、7.2~7.9GB/s前後であったPG-Strom v2.2develの結果と比べて、大幅に性能が改善されている事が分かる。
単なる逆数ではあるが、49GB/sというのは300GB超のテーブルに対する全件スキャン+集計が6秒台で返ってくる速度、25GB/sというのは12秒台で返ってくる速度なので、これ位のデータ処理能力をシングルノードで実現できるのであれば、システム構成を検討する時の様々な前提条件が変わってくるだろう。

こちらはもう一つ別の指標で。このベンチマークを採取するにあたり、Star Schema Benchmark 全13本のクエリを3回ずつ実行し、クエリ毎の最良値を使ってスループットを計算しているが、13本 x 3回 = 39回のクエリを実行するのに要した時間を積算したもの。

PG-Strom + Arrow_Fdwのケースだと、ほんの僅か、子供のオムツ替え程度の時間であるが、元々のPostgreSQLのケースでは2時間弱と長めのお昼寝程度の時間を要している。
しかもこれは全て同一のハードウェア、特にデータを全て高速なNVME-SSDに載せているがために、元々のPostgreSQLでも2.2~2.3GB/sのパフォーマンスが出ているが、既存システムでAll-Flashという構成は多くはないと考えられる。旧来の磁気ディスクを用いたDBシステムとの比較であれば、どの程度のスコアになるだろう??

測定結果の妥当性

なお、測定ミスなどであると恥ずかしいので、性能の妥当性を考察してみた。

以下は Arrow_Fdw を用いてArrow形式ファイルをマップした flineorder テーブルの列定義であるが、28.7GB/sのクエリ実行スループットを記録した Q2_1 を例に取ると、lo_suppkeylo_orderdatelo_orderpriorityおよびlo_supplycost列が参照されている。

 Foreign table "public.flineorder"
       Column       |     Type      | Size
--------------------+---------------+--------------------------------
 lo_orderkey        | numeric       | 35.86GB
 lo_linenumber      | integer       |  8.96GB
 lo_custkey         | numeric       | 35.86GB
 lo_partkey         | integer       |  8.96GB
 lo_suppkey         | numeric       | 35.86GB   <-- ★Q2_1で参照
 lo_orderdate       | integer       |  8.96GB   <-- ★Q2_1で参照
 lo_orderpriority   | character(15) | 33.61GB   <-- ★Q2_1で参照
 lo_shippriority    | character(1)  |  2.23GB
 lo_quantity        | integer       |  8.96GB
 lo_extendedprice   | bigint        | 17.93GB
 lo_ordertotalprice | bigint        | 17.93GB
 lo_discount        | integer       |  8.96GB
 lo_revenue         | bigint        | 17.93GB
 lo_supplycost      | bigint        | 17.93GB   <-- ★Q2_1で参照
 lo_tax             | integer       |  8.96GB
 lo_commit_date     | character(8)  | 17.93GB
 lo_shipmode        | character(10) | 22.41GB
FDW options: (file '/opt/nvme/lineorder_s401.arrow')  ... file size = 310GB

これら被参照列の行データサイズの合計は、310GB中96.4GBである。つまり、このクエリではファイル全体の31.1%だけしかデータを読み出す必要がなく、読出しサイズ(96.4GB)をQ2_1の実行時間11.06sで割ると、96.4÷11.06 = 8.7GB/s なので、Intel DC P4600 x3 の読出し速度としては概ね妥当な値という事になる。(一安心である)

今後の展望

現状「とりあえず動きました」という段階なので、まだこの後、試験や修正などに相応の時間がかかる事はご容赦いただきたい。

更なる性能改善に向け、今後やってみたいと考えているのが、マルチGPUを搭載したサーバによるベンチマーク

上記の構成イメージは、Supermicro社のHPC向けサーバ SYS-4029GP-TRT2に、Tesla GPUを4台と、外付けNVME-SSDのエンクロージャを接続するHBAカード*2を4台搭載する。このマシンはPCIeスイッチを搭載しているので、SSDGPU間で直接データ転送を行う場合にはCPUに負荷を与えない。

ブロック図で記載すると以下のようになる。GPU 1台あたりNVME-SSD 4台が一個のユニットとなり、一台のサーバの中でそれぞれが並列に動作する。

今回のベンチマークは、PCIeスイッチが無いとか、SSDの台数が3台だという細かな違いを除けば、概ねこちらの構成で1ユニット分の性能であると言う事ができるだろう。それで15GB/s~49GB/sの性能値を出せているという事は、期待値としてその4倍、シングルノード100GB/sの処理性能というのはあながち実現不可能な絵空事という訳でもないだろう。


こちらは、今年の正月に品川神社に奉納した絵馬。
ちょっと目標値の設定が低かったかもしれぬ。

*1:今でもメンテナンスされてるんだろうか?

*2:NVMEoFを使う場合、100Gb-NICでも可。むしろ巨大データだとこちらの方がスケーラビリティは高くなる。

Dive into Apache Arrow(その2)- pg2arrow

前回のエントリApache Arrow のフォーマットについて調べていたが、これのゴールは、外部テーブル(Foreign Table)を介してApache Arrowファイルを読み出し、高速に集計・解析処理を実行する事にある。
特にPG-Stromの場合はSSD-to-GPU Direct SQLという飛び道具が使えるため、NVME-SSD上のApache Arrowファイルを直接GPUへ転送し、列指向データをGPUの大量のコアでぐわーーっと処理するという構成が取れるハズである。

で、FDWモジュールがApache Arrowファイルを読むためには、まずメタデータを解読してどの列がどういったデータ型を持っており、どこにどういう形式で配置されているのか特定できる必要がある。
そのために書いたコードを元に、先ずPostgreSQLのデータをApache Arrowファイルとしてダンプするためのツールを作ってみた。

github.com

pg2arrowの使い方

ある程度 psqlpg_dump のオプションを参考にしたので、PostgreSQL使いの人ならそれほど違和感なく使えるはず。
データベースやユーザ名などの指定は共通。-c COMMAND-f FILENAMEで指定したSQLを実行し、その結果を-o FILENAMEで指定したファイルへ書き出す。

$ ./pg2arrow --help
Usage:
  pg2arrow [OPTION]... [DBNAME [USERNAME]]

General options:
  -d, --dbname=DBNAME     database name to connect to
  -c, --command=COMMAND   SQL command to run
  -f, --file=FILENAME     SQL command from file
      (-c and -f are exclusive, either of them must be specified)
  -o, --output=FILENAME   result file in Apache Arrow format
      (default creates a temporary file)

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each
      (default is 512MB)

Connection options:
  -h, --host=HOSTNAME     database server host
  -p, --port=PORT         database server port
  -U, --username=USERNAME database user name
  -w, --no-password       never prompt for password
  -W, --password          force password prompt

Debug options:
      --dump=FILENAME     dump information of arrow file
      --progress          shows progress of the job.

Report bugs to <pgstrom@heterodb.com>.

使用例

例として、hogehogeテーブルを日付順にソートしてダンプしてみる。

hogehogeテーブルの定義は以下の通り。c列は複合型(composite type)で、内部的にサブフィールドを持つ。

postgres=# \d hogehoge
                  Table "public.hogehoge"
 Column |       Type       | Collation | Nullable | Default
--------+------------------+-----------+----------+---------
 id     | integer          |           | not null |
 a      | bigint           |           | not null |
 b      | double precision |           | not null |
 c      | comp             |           |          |
 d      | text             |           |          |
 e      | double precision |           |          |
 ymd    | date             |           |          |
Indexes:
    "hogehoge_pkey" PRIMARY KEY, btree (id)

postgres=# \dS comp
                Composite type "public.comp"
 Column |       Type       | Collation | Nullable | Default
--------+------------------+-----------+----------+---------
 x      | integer          |           |          |
 y      | double precision |           |          |
 z      | numeric          |           |          |
 memo   | text             |           |          |

さっそく実行し、特にエラーもなく終了する。

$ ./pg2arrow postgres -o /tmp/hogehoge -c "SELECT * FROM hogehoge ORDER BY ymd"

PyArrowで結果を確認する

以下のように、PyArrowを用いて Apache Arrow 形式のファイルを読み出す事ができる。

>>> import pyarrow as pa
>>> X = pa.RecordBatchFileReader("/tmp/hogehoge").read_all()
>>> X.schema
id: int32
a: int64
b: double
c: struct<x: int32, y: double, z: decimal(30, 11), memo: string>
  child 0, x: int32
  child 1, y: double
  child 2, z: decimal(30, 11)
  child 3, memo: string
d: string
e: double
ymd: date32[day]

DBテーブルの定義に準じて、Apache Arrowとしてのスキーマが作られている事が分かる。

ちなみに、Pandasのread_sqlを使ってテーブルを読み出すと、複合型であるcは文字列型にされてしまっている。これは流石にチョットツライ。

>>> A = pd.read_sql(sql="SELECT * FROM hogehoge LIMIT 1000", con="postgresql://localhost/postgres")
>>> B = pa.Table.from_pandas(A)
>>> B.schema
id: int64
a: int64
b: double
c: string
d: string
e: double
ymd: date32[day]
__index_level_0__: int64
metadata
--------
{b'pandas': b'{"index_columns": ["__index_level_0__"], "column_indexes": [{"fi'
            b'eld_name": null, "name": null, "numpy_type": "object", "pandas_t'
            b'ype": "unicode", "metadata": {"encoding": "UTF-8"}}], "columns":'
            b' [{"field_name": "id", "name": "id", "numpy_type": "int64", "pan'
            b'das_type": "int64", "metadata": null}, {"field_name": "a", "name'
            b'": "a", "numpy_type": "int64", "pandas_type": "int64", "metadata'
            b'": null}, {"field_name": "b", "name": "b", "numpy_type": "float6'
            b'4", "pandas_type": "float64", "metadata": null}, {"field_name": '
            b'"c", "name": "c", "numpy_type": "object", "pandas_type": "unicod'
            b'e", "metadata": null}, {"field_name": "d", "name": "d", "numpy_t'
            b'ype": "object", "pandas_type": "unicode", "metadata": null}, {"f'
            b'ield_name": "e", "name": "e", "numpy_type": "float64", "pandas_t'
            b'ype": "float64", "metadata": null}, {"field_name": "ymd", "name"'
            b': "ymd", "numpy_type": "object", "pandas_type": "date", "metadat'
            b'a": null}, {"field_name": "__index_level_0__", "name": null, "nu'
            b'mpy_type": "int64", "pandas_type": "int64", "metadata": null}], '
            b'"pandas_version": "0.22.0"}'}

中身の方を見てみると、このような感じで正しくクエリの結果を保存できている事がわかる。

>>> X.to_pandas()
       id     a          b                                                  c  \
0      24  3379  96.200935  {'memo': '1ff1de774005f8da13f42943881c655f', '...
1    2041  2208  71.122772  {'memo': '3416a75f4cea9109507cacd8e2f2aefc', '...
2    2042  7040  54.081142  {'memo': 'a1d0c6e83f027327d8461063f4ac58a6', '...
3    2043  1635  92.302224  {'memo': '17e62166fc8586dfa4d1bc0e1742c08b', '...
4    2044  3295  58.273429  {'memo': 'f7177163c833dff4b38fc8d2872f1ec6', '...
5    2045  9671  58.085893  {'memo': '6c8349cc7260ae62e3b1396831a8398f', '...
        :          :              :
                                    d          e        ymd
0    f4c1893e352a4e08d1a3b3b444c2d692  34.916724 2025-08-08
1    13f5d46a9f51e30dac02b33b74e9043e  11.098757 2018-09-26
2    b041a9cb97b220c1b073266af31cb45f  81.570772 2025-03-02
3    77e882ed95bf5838abd1b4336a7d2fdc  16.990162 2016-12-29
4    f78d9fde23891ae293f07c576982155b   7.017451 2020-10-08
5    4dfe9131c7ee3a44583a8d21d5ca26a2   3.979350 2023-03-09

今後のロードマップ

ひとまず、pg2arrowを使う事で PostgreSQLApache Arrow へデータを変換する流れができた。
(と、同時に手を動かしてデータ形式を一通り勉強したといえる。えへん。)

次は本来の目標である、FDWを使って Apache Arrow ⇒ PostgreSQL へのデータの流れを作る事。
これによって、IoT/M2Mの領域で扱われるような大量データを、毎度DBへインポートする事なく集計・解析系のクエリで処理する事ができるようになる。

他に、既に個別に頂いているpg2arrowの拡張としては、既に存在するArrowファイルに差分だけを追加する--appendモードや、複数のArrowファイルをマージして一個の巨大なArrowファイルに再編する--mergeモードなど。この辺も追って作っていきたい。