2025-02-08

聊聊Parquet

最近backport一个iceberg的读feature回一个比较老的presto版本,发现一些比较legacy的bug.
尝试重新实现绕过的时候,发现了parquet的另外一些比较有趣的事情.

设计上来说,parquet的column chunk里是可以记录min/max和num of null values的统计信息的.
这个也是parquet被广泛利用的一个特性,可以用来跳过不必要的io.

看的时候发觉是支持某种程度的udf的.

也就是说像like之类的模糊查询理论上也能够做一定程度的filter.
毕竟原生的operator是不太支持这种查询的.

不过测试的时候发现一些其他问题.

比如读取的时候其实是支持重新指定schema,也就是只读取/project部分column的场景的.
但是因为project的时候的一些逻辑会导致如果project的列里不包含filter条件的列的话,这个filter是会被丢弃的.

一个例子就是比如逻辑上有
select a from table where b > 0 
这种查询.

如果执行引擎只project了a,那么在parquet的io层面是不会触发b>0的过滤的.
连带的执行引擎对io返回结果的假设就不成立了.

这个行为要修倒也不太难.
只是一定程度上来说是个breaking change.

因为从另一个角度来说,返回的结果集存在一个invisible column产生了影响.
从而使得从查询引擎的角度来说,一些假设可能会被推翻.

当然,这个取决于各个上层查询引擎的执行计划构造和pushdown的完备性/防御性了.

不过这里倒是引申出一个问题.
当前查询引擎的pushdown设计是否应该更改为pull-up模式.

毕竟现在流行的一个概念是io设备/存储带有一定的处理能力.

pushdown的本意也是历史原因查询引擎的设计没有考虑到io/存储设备具备这些能力,所以才从优化角度产生了特例进行pushdown的.

如果说一个全新的引擎从设计之初就是考虑了io设备层面具有filter能力的话.
那么相应的就是对不支持的设备的pull-up/fallback机制了.

也就是从
project <- filter <- scan
直接变成
project <- filter-scan
形态.

不过从optimizer的角度来说,似乎是没有区别的.
毕竟在一个嵌套的比如sql查询当中,filter条件可以存在在任何一个层级.

如果按照这种storage with filter的设计的话.

一种是需要把每个subquery也抽象成某种storage,因为这种才能嵌入filter.
但是这样就会造成某些可以在io层面做的filter没有办法push down到io/存储设备.

另外一种是构造独立的intermediate/query区别于最底层的storage io.
但这样同样存在相同的无法push down问题.

而如果把pushdown交给optimizer做的话,就不是pull-up/fallback的本意了.
因为实际上还是存在一个pushdown的优化逻辑.

这样从设计上来说,pull-up/fallabck就成为一种鸡肋了.
无端引入了一种不必要的优化规则.

另外一个比较chaotic的特性是几乎随处可侵入的meta/key-value特性.

虽然目前看着现成的reader框架里是没有接口可以侵入利用这些matadata做进一步复杂的filter动作的.
但是读取的parquet信息里,这部分数据属于read once的东西,而且是required的必定会读的部分,要从代码层面侵入的话,倒也不是很难.

只是从规则上来说,如果真的对这种filter有需求的话,还是能够合并进统计信息里成为正式标准会比较好.

毕竟前面的一些magic特性已经足够让执行引擎对io结果的不确定性打上一定的问号了.

不过可能parquet的人也意识到这个问题.
在metadata里有一个特定的created_by字段.

这个初看可能觉得可有可无,但实际上算是一种变相扩展版本提示.

毕竟根据created_by就知道文件生产的vendor是谁,具有哪些特性是可靠可依赖的.

倒算是一种可扩展性/可玩性很高的设计了.

再一个可能就是生态性方面的问题了.

在写一些测试demo的时候发觉如果真多low level地取读取parquet文件的话,column nested/repeation之类的处理还是挺繁琐的.
而使用稍微一些high level的api的时候,又回发现avro的身影.

原因也简单,毕竟column io最终呈现回给调用方的high level api的结果一般就是通常的row oriented的record.
所需需要选择一种input/output的序列化格式.

问题在于不同的application需要的record类型有差异.

parquet自己选择的是avro.
iceberg这里类查询存储引擎因为面向设计的不同存储类型和文件结构,所以需要有自己的StructLike类型.
spark也有自己专属的Row类型.
presto则是自身对应更高阶递归的数据类型,而且还是此之上又加了一层自己的rowgrow/page/chunk概念.

这就是导致了诸如presto<->iceberg<->parquet的链路会存在很多反复同质又异构的类型拆装.

这个时候人类又才会重新认识到Arrow这类项目的光辉之处.

虽然Arrow的API也是挺low level的就是了.

不过至少提供了一种理论上可能更有的传递路径.

毕竟从结果/实际上来说,现在的大部分查询引擎都是面向column chunk的.
即使实现不同,但概念相近,data pipeline的基本unit也差不多是以这个为基础的.
所以如果能趋向同一个统一格式的话,多少还是会有些生态价值的.

不过从性能的角度来说,有多少好处,有没有好处倒确实挺难说的.

毕竟除了io之外,各个系统之间的memory allocator能不能也一统也可以是个影响因素.
如果不能互通的话,copy transfer还是避免不的.

这种情况下可能统一格式带来的收益就微乎其微了.




没有评论:

发表评论

瑕不掩瑜

新加坡哪吒2终于上映了. 也终于有机会去看了. 客观地说, 剧本应该是还算可以的.但是叙事成熟度还是不太够. 虽然哪吒二阶重生的片段确实很打动人,但切割开来看的话,缺少一个比较明显的叙事主线. 或者说在剧情长短安排上还是有些不太平衡. 像第一关的土拨鼠. 作为一个单元片段放出来算...