Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 committed Mar 29, 2024
1 parent acc628d commit f3dc873
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;

/**
* JdbcSourceSchema is a subclass of SourceSchema, used to build metadata about jdbc-related
Expand All @@ -38,7 +40,15 @@ public JdbcSourceSchema(
String tableComment)
throws Exception {
super(databaseName, schemaName, tableName, tableComment);
fields = new LinkedHashMap<>();
fields = getColumnInfo(metaData, databaseName, schemaName, tableName);
primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName, tableName);
}

public LinkedHashMap<String, FieldSchema> getColumnInfo(
DatabaseMetaData metaData, String databaseName, String schemaName, String tableName)
throws SQLException {
LinkedHashMap<String, FieldSchema> fields = new LinkedHashMap<>();
//
try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
Expand All @@ -57,14 +67,21 @@ public JdbcSourceSchema(
fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
}
}
return fields;
}

primaryKeys = new ArrayList<>();
public List<String> getPrimaryKeys(
DatabaseMetaData metaData, String databaseName, String schemaName, String tableName)
throws SQLException {
List<String> primaryKeys = new ArrayList<>();
try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
primaryKeys.add(fieldName);
}
}

return primaryKeys;
}

public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public abstract class SourceSchema {
protected final String databaseName;
protected final String schemaName;
protected String tableName;
protected final String tableName;
protected final String tableComment;
protected LinkedHashMap<String, FieldSchema> fields;
public List<String> primaryKeys;
Expand Down Expand Up @@ -106,10 +106,6 @@ public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public Map<String, FieldSchema> getFields() {
return fields;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,9 @@ public List<SourceSchema> getSchemaList() throws Exception {
if (!isSyncNeeded(tableName)) {
continue;
}
// Oracle permits table names to include special characters like /,
// etc., such as 'A/B'.
// If we attempt to retrieve column information for `A/B` using JDBC, it can
// result in an
// ORA-01424 error.
// To circumvent this issue, we substitute `/` with '_' to prevent encountering
// the problem.
String formattedTableName = tableName;
if (tableName.contains("/")) {
formattedTableName = tableName.replace("/", "_");
}
SourceSchema sourceSchema =
new OracleSchema(
metaData,
databaseName,
schemaName,
formattedTableName,
tableComment);
// To ensure consistency between the Oracle source table and downstream systems.
sourceSchema.setTableName(tableName);
metaData, databaseName, schemaName, tableName, tableComment);
sourceSchema.setModel(
!sourceSchema.primaryKeys.isEmpty()
? DataModel.UNIQUE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.doris.flink.tools.cdc.oracle;

import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedHashMap;

public class OracleSchema extends JdbcSourceSchema {

Expand All @@ -42,4 +46,37 @@ public String convertToDorisType(String fieldType, Integer precision, Integer sc
public String getCdcTableName() {
return schemaName + "\\." + tableName;
}

@Override
public LinkedHashMap<String, FieldSchema> getColumnInfo(
DatabaseMetaData metaData, String databaseName, String schemaName, String tableName)
throws SQLException {

fields = new LinkedHashMap<>();
// Oracle permits table names to include special characters such as /,
// etc., as in 'A/B'.
// When attempting to fetch column information for `A/B` via JDBC,
// it may throw an ORA-01424 error.
// Hence, we substitute `/` with '_' to address the issue.
try (ResultSet rs =
metaData.getColumns(databaseName, schemaName, tableName.replace("/", "_"), null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
String comment = rs.getString("REMARKS");
String fieldType = rs.getString("TYPE_NAME");
Integer precision = rs.getInt("COLUMN_SIZE");

if (rs.wasNull()) {
precision = null;
}
Integer scale = rs.getInt("DECIMAL_DIGITS");
if (rs.wasNull()) {
scale = null;
}
String dorisTypeStr = convertToDorisType(fieldType, precision, scale);
fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
}
}
return fields;
}
}

0 comments on commit f3dc873

Please sign in to comment.