Skip to content

Commit 0267450

Browse files
committed
add partition filter
1 parent aee976a commit 0267450

File tree

6 files changed

+230
-15
lines changed

6 files changed

+230
-15
lines changed

datafusion/common/src/scalar/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ use arrow::{
4545
compute::kernels::cast::{cast_with_options, CastOptions},
4646
datatypes::{
4747
i256, ArrowDictionaryKeyType, ArrowNativeType, ArrowTimestampType, DataType,
48-
Date32Type, Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type,
49-
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
48+
Date32Type, Date64Type, Field, Float32Type, Int16Type, Int32Type, Int64Type,
49+
Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
5050
IntervalYearMonthType, TimeUnit, TimestampMicrosecondType,
5151
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
5252
UInt16Type, UInt32Type, UInt64Type, UInt8Type, DECIMAL128_MAX_PRECISION,
@@ -3188,8 +3188,12 @@ impl fmt::Display for ScalarValue {
31883188
ScalarValue::List(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
31893189
ScalarValue::LargeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
31903190
ScalarValue::FixedSizeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
3191-
ScalarValue::Date32(e) => format_option!(f, e)?,
3192-
ScalarValue::Date64(e) => format_option!(f, e)?,
3191+
ScalarValue::Date32(e) => {
3192+
format_option!(f, e.map(|v| Date32Type::to_naive_date(v).to_string()))?
3193+
}
3194+
ScalarValue::Date64(e) => {
3195+
format_option!(f, e.map(|v| Date64Type::to_naive_date(v).to_string()))?
3196+
}
31933197
ScalarValue::Time32Second(e) => format_option!(f, e)?,
31943198
ScalarValue::Time32Millisecond(e) => format_option!(f, e)?,
31953199
ScalarValue::Time64Microsecond(e) => format_option!(f, e)?,

datafusion/core/src/datasource/listing/helpers.rs

Lines changed: 214 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
//! Helper functions for the table implementation
1919
20+
use std::collections::HashMap;
2021
use std::sync::Arc;
2122

2223
use super::PartitionedFile;
2324
use crate::datasource::listing::ListingTableUrl;
2425
use crate::execution::context::SessionState;
26+
use crate::logical_expr::{BinaryExpr, Operator};
2527
use crate::{error::Result, scalar::ScalarValue};
2628

2729
use arrow::{
@@ -185,9 +187,17 @@ async fn list_partitions(
185187
store: &dyn ObjectStore,
186188
table_path: &ListingTableUrl,
187189
max_depth: usize,
190+
partition_prefix: Option<Path>,
188191
) -> Result<Vec<Partition>> {
189192
let partition = Partition {
190-
path: table_path.prefix().clone(),
193+
path: match partition_prefix {
194+
Some(prefix) => Path::from_iter(
195+
Path::from(table_path.prefix().as_ref())
196+
.parts()
197+
.chain(Path::from(prefix.as_ref()).parts()),
198+
),
199+
None => table_path.prefix().clone(),
200+
},
191201
depth: 0,
192202
files: None,
193203
};
@@ -321,6 +331,81 @@ async fn prune_partitions(
321331
Ok(filtered)
322332
}
323333

334+
#[derive(Debug)]
335+
enum PartitionValue {
336+
Single(String),
337+
Multi,
338+
}
339+
340+
fn populate_partition_values<'a>(
341+
partition_values: &mut HashMap<&'a str, PartitionValue>,
342+
filter: &'a Expr,
343+
) {
344+
if let Expr::BinaryExpr(BinaryExpr {
345+
ref left,
346+
op,
347+
ref right,
348+
}) = filter
349+
{
350+
match op {
351+
Operator::Eq => match (left.as_ref(), right.as_ref()) {
352+
(Expr::Column(Column { ref name, .. }), Expr::Literal(val))
353+
| (Expr::Literal(val), Expr::Column(Column { ref name, .. })) => {
354+
if partition_values
355+
.insert(name, PartitionValue::Single(val.to_string()))
356+
.is_some()
357+
{
358+
partition_values.insert(name, PartitionValue::Multi);
359+
}
360+
}
361+
_ => {}
362+
},
363+
Operator::And => {
364+
populate_partition_values(partition_values, left);
365+
populate_partition_values(partition_values, right);
366+
}
367+
_ => {}
368+
}
369+
}
370+
}
371+
372+
fn evaluate_partition_prefix<'a>(
373+
partition_cols: &'a [(String, DataType)],
374+
filters: &'a [Expr],
375+
) -> Option<Path> {
376+
let mut partition_values = HashMap::new();
377+
378+
if filters.len() > 1 {
379+
return None;
380+
}
381+
382+
for filter in filters {
383+
populate_partition_values(&mut partition_values, filter);
384+
}
385+
386+
if partition_values.is_empty() {
387+
return None;
388+
}
389+
390+
let mut parts = vec![];
391+
for (p, _) in partition_cols {
392+
match partition_values.get(p.as_str()) {
393+
Some(PartitionValue::Single(val)) => {
394+
parts.push(format!("{p}={val}"));
395+
}
396+
_ => {
397+
break;
398+
}
399+
}
400+
}
401+
402+
if parts.is_empty() {
403+
None
404+
} else {
405+
Some(Path::from_iter(parts))
406+
}
407+
}
408+
324409
/// Discover the partitions on the given path and prune out files
325410
/// that belong to irrelevant partitions using `filters` expressions.
326411
/// `filters` might contain expressions that can be resolved only at the
@@ -343,7 +428,10 @@ pub async fn pruned_partition_list<'a>(
343428
));
344429
}
345430

346-
let partitions = list_partitions(store, table_path, partition_cols.len()).await?;
431+
let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
432+
let partitions =
433+
list_partitions(store, table_path, partition_cols.len(), partition_prefix)
434+
.await?;
347435
debug!("Listed {} partitions", partitions.len());
348436

349437
let pruned =
@@ -433,7 +521,7 @@ mod tests {
433521

434522
use futures::StreamExt;
435523

436-
use crate::logical_expr::{case, col, lit};
524+
use crate::logical_expr::{case, col, lit, Expr, Operator};
437525
use crate::test::object_store::make_test_store_and_state;
438526

439527
use super::*;
@@ -692,4 +780,127 @@ mod tests {
692780
// this helper function
693781
assert!(expr_applicable_for_cols(&[], &lit(true)));
694782
}
783+
784+
#[test]
785+
fn test_evaluate_partition_prefix() {
786+
let partitions = &[
787+
("a".to_string(), DataType::Utf8),
788+
("b".to_string(), DataType::Int16),
789+
("c".to_string(), DataType::Boolean),
790+
];
791+
792+
assert_eq!(
793+
evaluate_partition_prefix(partitions, &[Expr::eq(col("a"), lit("foo"))],),
794+
Some(Path::from("a=foo")),
795+
);
796+
797+
assert_eq!(
798+
evaluate_partition_prefix(
799+
partitions,
800+
&[Expr::and(
801+
Expr::eq(col("a"), lit("foo")),
802+
Expr::eq(col("b"), lit("bar")),
803+
)],
804+
),
805+
Some(Path::from("a=foo/b=bar")),
806+
);
807+
808+
assert_eq!(
809+
evaluate_partition_prefix(
810+
partitions,
811+
&[Expr::and(
812+
Expr::eq(col("a"), lit("foo")),
813+
Expr::and(
814+
Expr::eq(col("b"), lit("1")),
815+
Expr::eq(col("c"), lit("true")),
816+
),
817+
)],
818+
),
819+
Some(Path::from("a=foo/b=1/c=true")),
820+
);
821+
822+
// no prefix when filter is empty
823+
assert_eq!(evaluate_partition_prefix(partitions, &[]), None);
824+
825+
// b=foo results in no prefix because a is not restricted
826+
assert_eq!(
827+
evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))],),
828+
None,
829+
);
830+
831+
// a=foo and c=baz only results in preifx a=foo because b is not restricted
832+
assert_eq!(
833+
evaluate_partition_prefix(
834+
partitions,
835+
&[Expr::and(
836+
Expr::eq(col("a"), lit("foo")),
837+
Expr::eq(col("c"), lit("baz")),
838+
)],
839+
),
840+
Some(Path::from("a=foo")),
841+
);
842+
843+
// a=foo or b=bar results in no prefix
844+
assert_eq!(
845+
evaluate_partition_prefix(
846+
partitions,
847+
&[
848+
Expr::eq(col("a"), lit("foo")),
849+
Expr::eq(col("b"), lit("bar")),
850+
],
851+
),
852+
None,
853+
);
854+
855+
// partition with multiple values results in no prefix
856+
assert_eq!(
857+
evaluate_partition_prefix(
858+
partitions,
859+
&[Expr::and(
860+
Expr::eq(col("a"), lit("foo")),
861+
Expr::eq(col("a"), lit("bar")),
862+
)],
863+
),
864+
None,
865+
);
866+
867+
// no prefix because partition a is not restricted to a single literal
868+
assert_eq!(
869+
evaluate_partition_prefix(
870+
partitions,
871+
&[Expr::or(
872+
Expr::eq(col("a"), lit("foo")),
873+
Expr::eq(col("a"), lit("bar")),
874+
)],
875+
),
876+
None,
877+
);
878+
}
879+
880+
#[test]
881+
fn test_evaluate_date_partition_prefix() {
882+
let partitions = &[("a".to_string(), DataType::Date32)];
883+
assert_eq!(
884+
evaluate_partition_prefix(
885+
partitions,
886+
&[Expr::eq(
887+
col("a"),
888+
Expr::Literal(ScalarValue::Date32(Some(3)))
889+
)],
890+
),
891+
Some(Path::from("a=1970-01-04")),
892+
);
893+
894+
let partitions = &[("a".to_string(), DataType::Date64)];
895+
assert_eq!(
896+
evaluate_partition_prefix(
897+
partitions,
898+
&[Expr::eq(
899+
col("a"),
900+
Expr::Literal(ScalarValue::Date64(Some(4 * 24 * 60 * 60 * 1000)))
901+
)],
902+
),
903+
Some(Path::from("a=1970-01-05")),
904+
);
905+
}
695906
}

datafusion/core/tests/simplification.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ fn select_date_plus_interval() -> Result<()> {
286286

287287
// Note that constant folder runs and folds the entire
288288
// expression down to a single constant (true)
289-
let expected = r#"Projection: Date32("18636") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408")
289+
let expected = r#"Projection: Date32("2021-01-09") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408")
290290
TableScan: test"#;
291291
let actual = get_optimized_plan_formatted(&plan, &time);
292292

datafusion/optimizer/tests/optimizer_integration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ fn between_date32_plus_interval() -> Result<()> {
186186
let expected =
187187
"Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]\
188188
\n Projection: \
189-
\n Filter: test.col_date32 >= Date32(\"10303\") AND test.col_date32 <= Date32(\"10393\")\
189+
\n Filter: test.col_date32 >= Date32(\"1998-03-18\") AND test.col_date32 <= Date32(\"1998-06-16\")\
190190
\n TableScan: test projection=[col_date32]";
191191
assert_eq!(expected, format!("{plan:?}"));
192192
Ok(())
@@ -200,7 +200,7 @@ fn between_date64_plus_interval() -> Result<()> {
200200
let expected =
201201
"Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]\
202202
\n Projection: \
203-
\n Filter: test.col_date64 >= Date64(\"890179200000\") AND test.col_date64 <= Date64(\"897955200000\")\
203+
\n Filter: test.col_date64 >= Date64(\"1998-03-18\") AND test.col_date64 <= Date64(\"1998-06-16\")\
204204
\n TableScan: test projection=[col_date64]";
205205
assert_eq!(expected, format!("{plan:?}"));
206206
Ok(())

datafusion/sqllogictest/test_files/tpch/q1.slt.part

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST
4444
--Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(*) AS count_order
4545
----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount * (Decimal128(Some(1),20,0) + lineitem.l_tax)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1)) AS COUNT(*)]]
4646
------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
47-
--------Filter: lineitem.l_shipdate <= Date32("10471")
48-
----------TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], partial_filters=[lineitem.l_shipdate <= Date32("10471")]
47+
--------Filter: lineitem.l_shipdate <= Date32("1998-09-02")
48+
----------TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], partial_filters=[lineitem.l_shipdate <= Date32("1998-09-02")]
4949
physical_plan
5050
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
5151
--SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
@@ -56,7 +56,7 @@ SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS
5656
------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(*)]
5757
--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)Decimal128(Some(1),20,0) - lineitem.l_discountlineitem.l_discountDecimal128(Some(1),20,0)lineitem.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
5858
----------------CoalesceBatchesExec: target_batch_size=8192
59-
------------------FilterExec: l_shipdate@6 <= 10471
59+
------------------FilterExec: l_shipdate@6 <= 1998-09-02
6060
--------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], has_header=false
6161

6262
query TTRRRRRRRI

datafusion/sqllogictest/test_files/tpch/q10.slt.part

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ Limit: skip=0, fetch=10
6363
------------------Inner Join: customer.c_custkey = orders.o_custkey
6464
--------------------TableScan: customer projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_comment]
6565
--------------------Projection: orders.o_orderkey, orders.o_custkey
66-
----------------------Filter: orders.o_orderdate >= Date32("8674") AND orders.o_orderdate < Date32("8766")
66+
----------------------Filter: orders.o_orderdate >= Date32("1993-10-01") AND orders.o_orderdate < Date32("1994-01-01")
6767
------------------------TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate], partial_filters=[orders.o_orderdate >= Date32("8674"), orders.o_orderdate < Date32("8766")]
6868
----------------Projection: lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount
6969
------------------Filter: lineitem.l_returnflag = Utf8("R")
@@ -96,7 +96,7 @@ GlobalLimitExec: skip=0, fetch=10
9696
--------------------------------------RepartitionExec: partitioning=Hash([o_custkey@1], 4), input_partitions=4
9797
----------------------------------------ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey]
9898
------------------------------------------CoalesceBatchesExec: target_batch_size=8192
99-
--------------------------------------------FilterExec: o_orderdate@2 >= 8674 AND o_orderdate@2 < 8766
99+
--------------------------------------------FilterExec: o_orderdate@2 >= 1993-10-01 AND o_orderdate@2 < 1994-01-01
100100
----------------------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]}, projection=[o_orderkey, o_custkey, o_orderdate], has_header=false
101101
----------------------------CoalesceBatchesExec: target_batch_size=8192
102102
------------------------------RepartitionExec: partitioning=Hash([l_orderkey@0], 4), input_partitions=4

0 commit comments

Comments
 (0)