Skip to content

Commit

Permalink
add connector db2 (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
haneeshmv authored Apr 9, 2024
1 parent a4f8eeb commit 6b57f22
Show file tree
Hide file tree
Showing 11 changed files with 714 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public class DatasourceLoadConfig {
"org.apache.seatunnel.datasource.plugin.starrocks.jdbc.StarRocksJdbcDataSourceFactory");
classLoaderFactoryName.put(
"MONGODB", "com.apache.seatunnel.datasource.plugin.mongodb.MongoDataSourceFactory");
classLoaderFactoryName.put(
"JDBC-DB2",
"org.apache.seatunnel.datasource.plugin.db2.jdbc.Db2JdbcDataSourceFactory");

classLoaderJarName.put("JDBC-ORACLE", "datasource-jdbc-oracle-");
classLoaderJarName.put("JDBC-CLICKHOUSE", "datasource-jdbc-clickhouse-");
Expand All @@ -114,6 +117,7 @@ public class DatasourceLoadConfig {
classLoaderJarName.put("S3-REDSHIFT", "datasource-s3redshift-");
classLoaderJarName.put("JDBC-STARROCKS", "datasource-jdbc-starrocks-");
classLoaderJarName.put("MONGODB", "datasource-mongodb-");
classLoaderJarName.put("JDBC-DB2", "datasource-jdbc-db2-");
}

public static final Set<String> pluginSet =
Expand All @@ -131,7 +135,8 @@ public class DatasourceLoadConfig {
"S3",
"SqlServer-CDC",
"StarRocks",
"MongoDB");
"MongoDB",
"JDBC-Db2");

public static Map<String, DatasourceClassLoader> datasourceClassLoaders = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@
<artifactId>datasource-sqlserver-cdc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-jdbc-db2</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>${revision}</version>
</parent>

<artifactId>datasource-jdbc-db2</artifactId>

<properties>
<mysql-connector.version>8.0.28</mysql-connector.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.auto.service/auto-service -->
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<scope>provided</scope>
</dependency>

<!-- driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc -->
<dependency>
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc</artifactId>
<version>db2jcc4</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<skip>${e2e.dependency.skip}</skip>
<appendOutput>true</appendOutput>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>

</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.datasource.plugin.db2.jdbc;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;

import com.google.common.collect.Sets;

import java.util.Set;

public class Db2DataSourceConfig {

public static final String PLUGIN_NAME = "JDBC-Db2";

public static final DataSourcePluginInfo DB2_DATASOURCE_PLUGIN_INFO =
DataSourcePluginInfo.builder()
.name(PLUGIN_NAME)
.icon(PLUGIN_NAME)
.version("1.0.0")
.type(DatasourcePluginTypeEnum.DATABASE.getCode())
.build();

public static final Set<String> DB2_SYSTEM_DATABASES =
Sets.newHashSet("information_schema", "mysql", "performance_schema", "sys");

public static final OptionRule OPTION_RULE =
OptionRule.builder()
.required(Db2OptionRule.URL, Db2OptionRule.DRIVER)
.optional(Db2OptionRule.USER, Db2OptionRule.PASSWORD)
.build();

public static final OptionRule METADATA_RULE =
OptionRule.builder().required(Db2OptionRule.DATABASE, Db2OptionRule.TABLE).build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package org.apache.seatunnel.datasource.plugin.db2.jdbc;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;

import org.apache.commons.lang3.StringUtils;

import lombok.NonNull;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkNotNull;

public class Db2JdbcDataSourceChannel implements DataSourceChannel {

@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return Db2DataSourceConfig.OPTION_RULE;
}

@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return Db2DataSourceConfig.METADATA_RULE;
}

@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> options) {
List<String> tableNames = new ArrayList<>();
String filterName = options.get("filterName");
String size = options.get("size");
boolean isSize = StringUtils.isNotEmpty(size);
if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) {
filterName = "%" + filterName + "%";
} else if (StringUtils.equals(filterName, "")) {
filterName = null;
}
try (Connection connection = getConnection(requestParams);
ResultSet resultSet =
connection
.getMetaData()
.getTables(null, null, "%", new String[] {"TABLE"})) {
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
tableNames.add(tableName);
if (isSize && tableNames.size() >= Integer.parseInt(size)) {
break;
}
}
}
return tableNames;
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
}
}

@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
// Hardcoded list of example database names
List<String> dbNames = Arrays.asList("default");
return dbNames;
}

@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try (Connection ignored = getConnection(requestParams)) {
return true;
} catch (Exception e) {
throw new DataSourcePluginException("check jdbc connectivity failed", e);
}
}

@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
List<TableField> tableFields = new ArrayList<>();
try (Connection connection = getConnection(requestParams)) {
DatabaseMetaData metaData = connection.getMetaData();

// Retrieve primary key information
String primaryKey = getPrimaryKey(metaData, database, table);

// Retrieve column information
try (ResultSet resultSet = metaData.getColumns(null, null, table, null)) {

while (resultSet.next()) {
TableField tableField = new TableField();
String columnName = resultSet.getString("COLUMN_NAME");

// Set primary key flag
tableField.setPrimaryKey(primaryKey != null && primaryKey.equals(columnName));

// Set other field attributes
tableField.setName(columnName);
tableField.setType(resultSet.getString("TYPE_NAME"));
tableField.setComment(resultSet.getString("REMARKS"));

// Set nullable flag
String isNullable = resultSet.getString("IS_NULLABLE");
tableField.setNullable("YES".equalsIgnoreCase(isNullable));

tableFields.add(tableField);
}
}
} catch (SQLException e) {
// Log the exception and rethrow as DataSourcePluginException
System.out.println("Error while retrieving table fields: " + e);
throw new DataSourcePluginException("Failed to get table fields", e);
} catch (ClassNotFoundException e) {
// Log the exception and rethrow as DataSourcePluginException
System.out.println("JDBC driver class not found" + e);
throw new DataSourcePluginException("JDBC driver class not found", e);
}
return tableFields;
}

@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
return tables.parallelStream()
.collect(
Collectors.toMap(
Function.identity(),
table ->
getTableFields(
pluginName, requestParams, database, table)));
}

private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(null, dbName, tableName);
while (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("COLUMN_NAME");
}
return null;
}

private Connection getConnection(Map<String, String> requestParams)
throws SQLException, ClassNotFoundException {
// Ensure the DB2 JDBC driver is loaded
Class.forName("com.ibm.db2.jcc.DB2Driver");
checkNotNull(requestParams.get(Db2OptionRule.URL.key()), "Jdbc url cannot be null");
String url = requestParams.get(Db2OptionRule.URL.key());
if (requestParams.containsKey(Db2OptionRule.USER.key())) {
String username = requestParams.get(Db2OptionRule.USER.key());
String password = requestParams.get(Db2OptionRule.PASSWORD.key());
return DriverManager.getConnection(url, username, password);
}
return DriverManager.getConnection(url);
}
}
Loading

0 comments on commit 6b57f22

Please sign in to comment.