From 95edfb04a0e3b8b29a6d33586894b4dd3dce66f8 Mon Sep 17 00:00:00 2001 From: ztefanie Date: Wed, 12 Feb 2025 08:12:45 +0100 Subject: [PATCH] feat(sql): Add JSON support for SQL databases --- connectors/jdbc/pom.xml | 11 ++- .../jdbc/model/client/JdbiJdbcClient.java | 18 +++- .../jdbc/model/client/JdbiJsonHelper.java | 54 +++++++++++ .../jdbc/integration/IntegrationBaseTest.java | 89 +++++++++++++++++-- .../integration/IntegrationTestConfig.java | 15 ++-- .../JdbiJdbcClientIntegrationTest.java | 32 ++++++- 6 files changed, 199 insertions(+), 20 deletions(-) create mode 100644 connectors/jdbc/src/main/java/io/camunda/connector/jdbc/model/client/JdbiJsonHelper.java diff --git a/connectors/jdbc/pom.xml b/connectors/jdbc/pom.xml index 114c6bfcc6..ba70049025 100644 --- a/connectors/jdbc/pom.xml +++ b/connectors/jdbc/pom.xml @@ -13,6 +13,10 @@ connector-jdbc jar + + 3.47.0 + + Camunda Self-Managed Free Edition license @@ -36,7 +40,12 @@ org.jdbi jdbi3-core - 3.47.0 + ${version.jdbi} + + + org.jdbi + jdbi3-jackson2 + ${version.jdbi} diff --git a/connectors/jdbc/src/main/java/io/camunda/connector/jdbc/model/client/JdbiJdbcClient.java b/connectors/jdbc/src/main/java/io/camunda/connector/jdbc/model/client/JdbiJdbcClient.java index 5070dd5f82..9b1a9e5add 100644 --- a/connectors/jdbc/src/main/java/io/camunda/connector/jdbc/model/client/JdbiJdbcClient.java +++ b/connectors/jdbc/src/main/java/io/camunda/connector/jdbc/model/client/JdbiJdbcClient.java @@ -18,7 +18,9 @@ import java.util.Map; import java.util.Objects; import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.statement.Query; import org.jdbi.v3.core.statement.SqlStatement; +import org.jdbi.v3.jackson2.Jackson2Plugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,12 +42,24 @@ JdbcResponse internalExecuteRequest(JdbcRequestData data, Connection connection) throws SQLException, IllegalAccessException { JdbcResponse response; Jdbi jdbi = Jdbi.create(connection); + jdbi.installPlugin(new Jackson2Plugin()); + String databaseProductName = + jdbi.withHandle( + handle -> { + try { + return handle.getConnection().getMetaData().getDatabaseProductName(); + } catch (SQLException e) { + LOG.error("Failed to retrieve database dialect", e); + return "Unknown"; + } + }); + if (data.returnResults()) { // SELECT query, or RETURNING clause LOG.debug("Executing query: {}", data.query()); + Query q = jdbi.withHandle(handle -> bindVariables(handle.createQuery(data.query()), data)); List> result = - jdbi.withHandle( - handle -> bindVariables(handle.createQuery(data.query()), data).mapToMap().list()); + JdbiJsonHelper.mapToParsedMap(databaseProductName, q).list(); response = JdbcResponse.of(result); LOG.debug("JdbcResponse: {}", response); } else { diff --git a/connectors/jdbc/src/main/java/io/camunda/connector/jdbc/model/client/JdbiJsonHelper.java b/connectors/jdbc/src/main/java/io/camunda/connector/jdbc/model/client/JdbiJsonHelper.java new file mode 100644 index 0000000000..f7c1a73dc1 --- /dev/null +++ b/connectors/jdbc/src/main/java/io/camunda/connector/jdbc/model/client/JdbiJsonHelper.java @@ -0,0 +1,54 @@ +package io.camunda.connector.jdbc.model.client; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.jdbi.v3.core.qualifier.QualifiedType; +import org.jdbi.v3.core.result.ResultIterable; +import org.jdbi.v3.core.result.UnableToProduceResultException; +import org.jdbi.v3.core.statement.Query; +import org.jdbi.v3.json.Json; + +public class JdbiJsonHelper { + static Map> dbJsonTypeMapping = + Map.of( + "Microsoft SQL Server", + Set.of(), + "MySQL", + Set.of("JSON"), + "PostgreSQL", + Set.of("json", "jsonb"), + "MariaDB", + Set.of("JSON")); + + public static ResultIterable> mapToParsedMap( + String databaseProductName, Query query) { + return query.map( + (rs, ctx) -> { + Map row = new HashMap<>(); + // Java SQL ResultSet and ResultSetMetadata columns start with index 1: + // https://docs.oracle.com/en/java/javase/17/docs/api/java.sql/java/sql/ResultSetMetaData.html#getColumnTypeName(int) + for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) { + String columnName = rs.getMetaData().getColumnLabel(i); + Object value = rs.getObject(i); + if (isJsonColumn(databaseProductName, rs.getMetaData().getColumnTypeName(i))) { + try { + value = + ctx.findColumnMapperFor(QualifiedType.of(JsonNode.class).with(Json.class)) + .orElseThrow() + .map(rs, i, ctx); + } catch (UnableToProduceResultException ignored) { + row.put(columnName, value); + } + } + row.put(columnName, value); + } + return row; + }); + } + + private static boolean isJsonColumn(String databaseProductName, String columnTypeName) { + return dbJsonTypeMapping.getOrDefault(databaseProductName, Set.of()).contains(columnTypeName); + } +} diff --git a/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/IntegrationBaseTest.java b/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/IntegrationBaseTest.java index 451d7d6948..b0c3c254ea 100644 --- a/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/IntegrationBaseTest.java +++ b/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/IntegrationBaseTest.java @@ -12,9 +12,14 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.camunda.connector.jdbc.model.client.JdbcClient; import io.camunda.connector.jdbc.model.client.JdbiJdbcClient; +import io.camunda.connector.jdbc.model.client.JdbiJsonHelper; import io.camunda.connector.jdbc.model.request.JdbcRequest; import io.camunda.connector.jdbc.model.request.JdbcRequestData; import io.camunda.connector.jdbc.model.request.SupportedDatabase; @@ -29,9 +34,14 @@ import java.util.stream.Collectors; import org.jdbi.v3.core.Jdbi; import org.jdbi.v3.core.result.NoResultsException; +import org.jdbi.v3.core.statement.Query; import org.jdbi.v3.core.statement.UnableToCreateStatementException; +import org.jdbi.v3.jackson2.Jackson2Plugin; public abstract class IntegrationBaseTest { + + static final String DEFAULT_ADDRESS_JSON = + "{\"street\":\"123 Main St\",\"city\":\"New York\",\"zip\":\"10001\"}"; static final Employee NEW_EMPLOYEE = new Employee(7, "Eve", 55, "HR"); static final List DEFAULT_EMPLOYEES = @@ -56,19 +66,55 @@ static void createEmployeeTable(IntegrationTestConfig config) throws SQLExceptio } } + void addJsonColumn(IntegrationTestConfig config, String jsonDatabaseType) throws SQLException { + try (Connection conn = + DriverManager.getConnection(config.url(), config.username(), config.password()); + Statement stmt = conn.createStatement()) { + + if (config.databaseName() != null) { + stmt.executeUpdate("USE " + config.databaseName()); + } + + String addColumnSQL = "ALTER TABLE Employee ADD json " + jsonDatabaseType; + stmt.executeUpdate(addColumnSQL); + String dummyJson; + switch (config.database()) { + case MYSQL, MARIADB -> dummyJson = "'" + DEFAULT_ADDRESS_JSON + "'"; + case POSTGRESQL -> dummyJson = "'" + DEFAULT_ADDRESS_JSON + "'::json"; + case MSSQL -> dummyJson = "'" + DEFAULT_ADDRESS_JSON + "'"; + default -> + throw new UnsupportedOperationException("Unsupported database: " + config.database()); + } + String updateSQL = "UPDATE Employee SET json = " + dummyJson; + stmt.executeUpdate(updateSQL); + } + } + + void dropJsonColumn(IntegrationTestConfig config) throws SQLException { + try (Connection conn = + DriverManager.getConnection(config.url(), config.username(), config.password()); + Statement stmt = conn.createStatement()) { + + if (config.databaseName() != null) { + stmt.executeUpdate("USE " + config.databaseName()); + } + stmt.executeUpdate("ALTER TABLE Employee DROP COLUMN json"); + } + } + List> selectAll(IntegrationTestConfig config, String tableName) throws SQLException { try (Connection conn = DriverManager.getConnection(config.url(), config.username(), config.password())) { // using jdbi - try (var handle = Jdbi.create(conn).open()) { + var jdbi = Jdbi.create(conn); + jdbi.installPlugin(new Jackson2Plugin()); + try (var handle = jdbi.open()) { if (config.databaseName() != null) { handle.execute("USE " + config.databaseName()); } - return handle - .createQuery("SELECT * FROM " + tableName + " ORDER BY id ASC") - .mapToMap() - .list(); + Query q = handle.createQuery("SELECT * FROM " + tableName + " ORDER BY id ASC"); + return JdbiJsonHelper.mapToParsedMap(conn.getMetaData().getDatabaseProductName(), q).list(); } } } @@ -272,12 +318,12 @@ void selectDataWithPositionalParametersWhereInAndAssertSuccess(IntegrationTestCo assertNull(response.modifiedRows()); assertNotNull(response.resultSet()); assertEquals(2, response.resultSet().size()); - assertEquals( + assertTrue( DEFAULT_EMPLOYEES.stream() .filter(e -> e.name().equals("John Doe") || e.name().equals("Jane Doe")) .map(Employee::toMap) - .collect(Collectors.toList()), - response.resultSet()); + .toList() + .containsAll(response.resultSet())); } void selectDataWithBindingParametersWhereInAndAssertSuccess(IntegrationTestConfig config) { @@ -307,6 +353,27 @@ void selectDataWithBindingParametersWhereInAndAssertSuccess(IntegrationTestConfi response.resultSet()); } + void selectJsonDataAndAssertSuccess(IntegrationTestConfig config) throws JsonProcessingException { + JdbcRequest request = + new JdbcRequest( + config.database(), + new DetailedConnection( + config.host(), + config.port(), + config.username(), + config.password(), + config.databaseName(), + config.properties()), + new JdbcRequestData(true, "SELECT * FROM Employee ORDER BY Id ASC")); + var response = jdbiJdbcClient.executeRequest(request); + + var row = response.resultSet().get(0); + ObjectMapper objectMapper = new ObjectMapper(); + var expected = objectMapper.readTree(DEFAULT_ADDRESS_JSON); + assertEquals( + expected.get("street").asText(), ((ObjectNode) row.get("json")).get("street").asText()); + } + void updateDataAndAssertSuccess(IntegrationTestConfig config) { String name = DEFAULT_EMPLOYEES.get(0).name() + " UPDATED"; JdbcRequest request = @@ -546,7 +613,7 @@ void insertDataWithNamedParametersAndAssertSuccess(IntegrationTestConfig config) new JdbcRequestData( false, "INSERT INTO Employee (id, name, age, department) VALUES (:id, :name, :age, :department)", - NEW_EMPLOYEE.toMap())); + NEW_EMPLOYEE.toUnparsedMap())); var response = jdbiJdbcClient.executeRequest(request); assertEquals(1, response.modifiedRows()); assertNull(response.resultSet()); @@ -707,5 +774,9 @@ String toInsertQueryFormat() { Map toMap() { return Map.of("id", id, "name", name, "age", age, "department", department); } + + Map toUnparsedMap() { + return Map.of("id", id, "name", name, "age", age, "department", department); + } } } diff --git a/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/IntegrationTestConfig.java b/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/IntegrationTestConfig.java index e8c8d83736..ebbadf023b 100644 --- a/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/IntegrationTestConfig.java +++ b/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/IntegrationTestConfig.java @@ -23,7 +23,8 @@ record IntegrationTestConfig( String username, String password, String databaseName, - Map properties) { + Map properties, + List jsonType) { public static List from( MySQLContainer mySqlServer, @@ -40,7 +41,8 @@ public static List from( msSqlServer.getUsername(), msSqlServer.getPassword(), null, - Map.of("encrypt", "false")), + Map.of("encrypt", "false"), + List.of()), new IntegrationTestConfig( SupportedDatabase.MYSQL, mySqlServer.getJdbcUrl(), @@ -50,7 +52,8 @@ public static List from( mySqlServer.getUsername(), mySqlServer.getPassword(), mySqlServer.getDatabaseName(), - null), + null, + List.of("JSON")), new IntegrationTestConfig( SupportedDatabase.POSTGRESQL, postgreServer.getJdbcUrl(), @@ -60,7 +63,8 @@ public static List from( postgreServer.getUsername(), postgreServer.getPassword(), null, - null), + null, + List.of("JSON", "JSONB")), new IntegrationTestConfig( SupportedDatabase.MARIADB, mariaDbContainer.getJdbcUrl(), @@ -70,6 +74,7 @@ public static List from( mariaDbContainer.getUsername(), mariaDbContainer.getPassword(), mariaDbContainer.getDatabaseName(), - null)); + null, + List.of("JSON"))); } } diff --git a/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/JdbiJdbcClientIntegrationTest.java b/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/JdbiJdbcClientIntegrationTest.java index 018da003e2..c6ca9ec96a 100644 --- a/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/JdbiJdbcClientIntegrationTest.java +++ b/connectors/jdbc/src/test/java/io/camunda/connector/jdbc/integration/JdbiJdbcClientIntegrationTest.java @@ -9,6 +9,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.fasterxml.jackson.core.JsonProcessingException; import io.camunda.connector.api.error.ConnectorException; import java.sql.SQLException; import java.util.List; @@ -142,7 +143,8 @@ public void shouldCreateDatabase_whenCreateTableQuery(IntegrationTestConfig conf Optional.ofNullable(config.rootUser()).orElse(config.username()), config.password(), "mydb", - config.properties()), + config.properties(), + config.jsonType()), "TestTable", "id INT PRIMARY KEY, name VARCHAR(255)"); cleanUpDatabase(config, "mydb"); @@ -216,7 +218,8 @@ public void shouldThrowConnectorException_whenWrongUriConnection(IntegrationTest config.username(), config.password() + "wrong", config.databaseName(), - config.properties()))); + config.properties(), + config.jsonType()))); assertThat(exception.getMessage()).contains("Cannot create the Database connection"); } @@ -238,7 +241,8 @@ public void shouldThrowConnectorException_whenWrongDetailedConnection( config.username(), config.password() + "wrong", config.databaseName(), - config.properties()))); + config.properties(), + config.jsonType()))); assertThat(exception.getMessage()).contains("Cannot create the Database connection"); } } @@ -394,4 +398,26 @@ public void shouldReturnResultList_whenSelectQueryWhereInWithBindingParameters( } } } + + @Nested + class JsonTests { + + @ParameterizedTest + @MethodSource(PROVIDE_SQL_SERVERS_CONFIG) + public void shouldParseJson_whenJsonColumnTypeSupported(IntegrationTestConfig config) + throws SQLException { + config + .jsonType() + .forEach( + jsonType -> { + try { + addJsonColumn(config, jsonType); + selectJsonDataAndAssertSuccess(config); + dropJsonColumn(config); + } catch (SQLException | JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + } + } }