Skip to content

Commit 2897ab7

Browse files
authored
[FLINK-36705][table-common] Add initial ProcessTableFunction class and annotations
1 parent d7bfa77 commit 2897ab7

File tree

12 files changed

+531
-30
lines changed

12 files changed

+531
-30
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentHint.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@
3131
* <p>An {@code ArgumentHint} can be used to provide hints about the name, optionality, and data
3232
* type of argument.
3333
*
34-
* <p>{@code @ArgumentHint(name = "in1", type = @DataTypeHint("STRING"), isOptional = false)} is a
35-
* scalar argument with the data type STRING, named "in1", and cannot be omitted when calling.
34+
* <p>For example, {@code @ArgumentHint(name = "in1", type = @DataTypeHint("STRING"), isOptional =
35+
* false)} is a scalar argument with the data type STRING, named "in1", and cannot be omitted when
36+
* calling.
3637
*
3738
* @see FunctionHint
3839
*/
@@ -49,7 +50,7 @@
4950
ArgumentTrait[] value() default {ArgumentTrait.SCALAR};
5051

5152
/**
52-
* The name of the argument.
53+
* The name of the argument. It must be unique among other arguments.
5354
*
5455
* <p>This can be used to provide a descriptive name for the argument.
5556
*/

flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java

+20-12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.table.annotation;
2020

2121
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.table.functions.ProcessTableFunction;
2223
import org.apache.flink.table.types.inference.StaticArgumentTrait;
2324

2425
import java.util.Arrays;
@@ -43,31 +44,38 @@ public enum ArgumentTrait {
4344

4445
/**
4546
* An argument that accepts a table "as row" (i.e. with row semantics). This trait only applies
46-
* to {@code ProcessTableFunction} (PTF).
47+
* to {@link ProcessTableFunction} (PTF).
4748
*
48-
* <p>For scalability, input tables are distributed into virtual processors. Each virtual
49-
* processor executes a PTF instance and has access only to a share of the entire table. The
50-
* argument declaration decides about the size of the share and co-location of data.
49+
* <p>For scalability, input tables are distributed across so-called "virtual processors". A
50+
* virtual processor, as defined by the SQL standard, executes a PTF instance and has access
51+
* only to a portion of the entire table. The argument declaration decides about the size of the
52+
* portion and co-location of data. Conceptually, tables can be processed either "as row" (i.e.
53+
* with row semantics) or "as set" (i.e. with set semantics).
5154
*
5255
* <p>A table with row semantics assumes that there is no correlation between rows and each row
53-
* can be processed independently. The framework is free in how to distribute rows among virtual
54-
* processors and each virtual processor has access only to the currently processed row.
56+
* can be processed independently. The framework is free in how to distribute rows across
57+
* virtual processors and each virtual processor has access only to the currently processed row.
5558
*/
5659
TABLE_AS_ROW(StaticArgumentTrait.TABLE_AS_ROW),
5760

5861
/**
5962
* An argument that accepts a table "as set" (i.e. with set semantics). This trait only applies
60-
* to {@code ProcessTableFunction} (PTF).
63+
* to {@link ProcessTableFunction} (PTF).
6164
*
62-
* <p>For scalability, input tables are distributed into virtual processors. Each virtual
63-
* processor executes a PTF instance and has access only to a share of the entire table. The
64-
* argument declaration decides about the size of the share and co-location of data.
65+
* <p>For scalability, input tables are distributed across so-called "virtual processors". A
66+
* virtual processor, as defined by the SQL standard, executes a PTF instance and has access
67+
* only to a portion of the entire table. The argument declaration decides about the size of the
68+
* portion and co-location of data. Conceptually, tables can be processed either "as row" (i.e.
69+
* with row semantics) or "as set" (i.e. with set semantics).
6570
*
6671
* <p>A table with set semantics assumes that there is a correlation between rows. When calling
6772
* the function, the PARTITION BY clause defines the columns for correlation. The framework
6873
* ensures that all rows belonging to same set are co-located. A PTF instance is able to access
69-
* all rows belonging to the same set. In other words: The virtual processor is scoped under a
70-
* key context.
74+
* all rows belonging to the same set. In other words: The virtual processor is scoped by a key
75+
* context.
76+
*
77+
* <p>It is also possible not to provide a key ({@link #OPTIONAL_PARTITION_BY}), in which case
78+
* only one virtual processor handles the entire table, thereby losing scalability benefits.
7179
*/
7280
TABLE_AS_SET(StaticArgumentTrait.TABLE_AS_SET),
7381

flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/DataTypeHint.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,7 @@
8181

8282
// Note to implementers:
8383
// Because "null" is not supported as an annotation value. Every annotation parameter *must*
84-
// have
85-
// some representation for unknown values in order to merge multi-level annotations.
84+
// have some representation for unknown values in order to merge multi-level annotations.
8685

8786
// --------------------------------------------------------------------------------------------
8887
// Explicit data type specification

flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/FunctionHint.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
package org.apache.flink.table.annotation;
2020

2121
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.table.functions.AggregateFunction;
23+
import org.apache.flink.table.functions.ProcessTableFunction;
24+
import org.apache.flink.table.functions.TableAggregateFunction;
2225
import org.apache.flink.table.functions.UserDefinedFunction;
2326
import org.apache.flink.table.types.inference.TypeInference;
2427

@@ -175,13 +178,40 @@
175178
ArgumentHint[] arguments() default {};
176179

177180
/**
178-
* Explicitly defines the intermediate result type that a function uses as accumulator.
181+
* Explicitly defines the intermediate result type (i.e. state entry) that an aggregating
182+
* function uses as its accumulator. The entry is managed by the framework (usually via Flink's
183+
* managed state).
179184
*
180185
* <p>By default, an explicit accumulator type is undefined and the reflection-based extraction
181186
* is used.
187+
*
188+
* <p>This parameter is primarily intended for aggregating functions (i.e. {@link
189+
* AggregateFunction} and {@link TableAggregateFunction}). It is recommended to use {@link
190+
* #state()} for {@link ProcessTableFunction}.
182191
*/
183192
DataTypeHint accumulator() default @DataTypeHint();
184193

194+
/**
195+
* Explicitly lists the intermediate results (i.e. state entries) of a function that is managed
196+
* by the framework (i.e. Flink managed state). Including their names and data types.
197+
*
198+
* <p>State hints are primarily intended for {@link ProcessTableFunction}. A PTF supports
199+
* multiple state entries at the beginning of an eval()/onTimer() method (after an optional
200+
* context parameter).
201+
*
202+
* <p>Aggregating functions (i.e. {@link AggregateFunction} and {@link TableAggregateFunction})
203+
* support a single state entry at the beginning of an accumulate()/retract() method (i.e. the
204+
* accumulator).
205+
*
206+
* <p>By default, explicit state is undefined and the reflection-based extraction is used where
207+
* {@link StateHint} is present.
208+
*
209+
* <p>Using both {@link #accumulator()} and this parameter is not allowed. Specifying the list
210+
* of state entries manually disables the entire reflection-based extraction around {@link
211+
* StateHint} and accumulators for aggregating functions.
212+
*/
213+
StateHint[] state() default {};
214+
185215
/**
186216
* Explicitly defines the result type that a function uses as output.
187217
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.annotation;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.table.functions.AggregateFunction;
23+
import org.apache.flink.table.functions.ProcessTableFunction;
24+
import org.apache.flink.table.functions.TableAggregateFunction;
25+
26+
import java.lang.annotation.ElementType;
27+
import java.lang.annotation.Retention;
28+
import java.lang.annotation.RetentionPolicy;
29+
import java.lang.annotation.Target;
30+
31+
/**
32+
* A hint that declares an intermediate result (i.e. state entry) that is managed by the framework
33+
* (i.e. Flink managed state).
34+
*
35+
* <p>State hints are primarily intended for {@link ProcessTableFunction}. A PTF supports multiple
36+
* state entries at the beginning of an eval()/onTimer() method (after an optional context
37+
* parameter).
38+
*
39+
* <p>Aggregating functions (i.e. {@link AggregateFunction} and {@link TableAggregateFunction})
40+
* support a single state entry at the beginning of an accumulate()/retract() method (i.e. the
41+
* accumulator).
42+
*
43+
* <p>For example, {@code @StateHint(name = "count", type = @DataTypeHint("BIGINT"))} is a state
44+
* entry with the data type BIGINT named "count".
45+
*
46+
* <p>Note: Usually, a state entry is partitioned by a key and can not be accessed globally. The
47+
* partitioning (or whether it is only a single partition) is defined by the corresponding function
48+
* call.
49+
*
50+
* @see FunctionHint
51+
*/
52+
@PublicEvolving
53+
@Retention(RetentionPolicy.RUNTIME)
54+
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.PARAMETER})
55+
public @interface StateHint {
56+
57+
/**
58+
* The name of the state entry. It must be unique among other state entries.
59+
*
60+
* <p>This can be used to provide a descriptive name for the state entry. The name can be used
61+
* for referencing the entry during clean up.
62+
*/
63+
String name() default "";
64+
65+
/**
66+
* The data type hint for the state entry.
67+
*
68+
* <p>This can be used to provide additional information about the expected data type of the
69+
* argument. The {@link DataTypeHint} annotation can be used to specify the data type explicitly
70+
* or provide hints for the reflection-based extraction of the data type.
71+
*/
72+
DataTypeHint type() default @DataTypeHint();
73+
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionKind.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,7 @@ public enum FunctionKind {
3535

3636
TABLE_AGGREGATE,
3737

38+
PROCESS_TABLE,
39+
3840
OTHER
3941
}

0 commit comments

Comments
 (0)