diff --git a/flink-formats/flink-protobuf/pom.xml b/flink-formats/flink-protobuf/pom.xml index 00e04bc385e4b..760d32f230a72 100644 --- a/flink-formats/flink-protobuf/pom.xml +++ b/flink-formats/flink-protobuf/pom.xml @@ -56,6 +56,16 @@ under the License. provided + + + + org.apache.flink + flink-connector-files + ${project.version} + provided + true + + com.google.protobuf protobuf-java diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFileFormatFactory.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFileFormatFactory.java new file mode 100644 index 0000000000000..fc1cecf3c48bc --- /dev/null +++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFileFormatFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.BulkWriter.Factory; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory; +import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory; +import org.apache.flink.connector.file.table.format.BulkDecodingFormat; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory; + +import java.util.Collections; +import java.util.Set; + +/** + * Throw a {@link ValidationException} when using Protobuf format factory for file system. + * + *

In practice, there is + * no standard for storing bulk protobuf messages. This factory is present to prevent falling + * back to the {@link org.apache.flink.connector.file.table.DeserializationSchemaAdapter}, a + * line-based format which could silently succeed but write unrecoverable data to disk. + * + *

If your use case requires storing bulk protobuf messages on disk, the parquet file format + * might be the appropriate container and has an API for mapping records to protobuf messages. + */ +@Internal +public class PbFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory { + + @Override + public String factoryIdentifier() { + return PbFormatFactory.IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public Set> forwardOptions() { + return Collections.emptySet(); + } + + @Override + public BulkDecodingFormat createDecodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + throw new ValidationException( + "The 'protobuf' format is not supported for the 'filesystem' connector."); + } + + @Override + public EncodingFormat> createEncodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + throw new ValidationException( + "The 'protobuf' format is not supported for the 'filesystem' connector."); + } +} diff --git a/flink-formats/flink-protobuf/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats/flink-protobuf/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 48f9681e27882..b613bb111babe 100644 --- a/flink-formats/flink-protobuf/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-formats/flink-protobuf/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,3 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.flink.formats.protobuf.PbFormatFactory +org.apache.flink.formats.protobuf.PbFileFormatFactory diff --git a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCaseTest.java b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCaseTest.java index f4f279b2af2ac..1cc1a200d3a36 100644 --- a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCaseTest.java +++ b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/ProtobufSQLITCaseTest.java @@ -22,6 +22,7 @@ import org.apache.flink.formats.protobuf.testproto.MapTest; import org.apache.flink.formats.protobuf.testproto.Pb3Test; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.planner.runtime.utils.BatchTestBase; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; @@ -31,6 +32,8 @@ import java.util.Map; import java.util.concurrent.ExecutionException; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -328,4 +331,33 @@ public void testSinkWithNullLiteralWithEscape() throws Exception { MapTest.InnerMessageTest innerMessageTest = mapTest.getMap2Map().get("b"); assertEquals(MapTest.InnerMessageTest.getDefaultInstance(), innerMessageTest); } + + @Test + public void testUnsupportedBulkFilesystemSink() { + env().setParallelism(1); + String sql = + "create table bigdata_sink ( " + + " a int, " + + " map1 map," + + " map2 map>" + + ") with (" + + " 'connector' = 'filesystem', " + + " 'path' = '/tmp/unused', " + + " 'format' = 'protobuf', " + + " 'protobuf.message-class-name' = 'org.apache.flink.formats.protobuf.testproto.MapTest'" + + ")"; + tEnv().executeSql(sql); + + assertThatThrownBy( + () -> { + TableResult tableResult = + tEnv().executeSql( + "insert into bigdata_sink select 1, map['a', 'b', 'c', 'd'], map['f', row(1,cast(2 as bigint))] "); + tableResult.await(); + }) + .satisfies( + anyCauseMatches( + ValidationException.class, + "The 'protobuf' format is not supported for the 'filesystem' connector.")); + } }