Skip to content

Commit

Permalink
feat(sql): Add JSON support for SQL databases
Browse files Browse the repository at this point in the history
  • Loading branch information
ztefanie committed Feb 12, 2025
1 parent ec9e0f3 commit 95edfb0
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 20 deletions.
11 changes: 10 additions & 1 deletion connectors/jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
<artifactId>connector-jdbc</artifactId>
<packaging>jar</packaging>

<properties>
<version.jdbi>3.47.0</version.jdbi>
</properties>

<licenses>
<license>
<name>Camunda Self-Managed Free Edition license</name>
Expand All @@ -36,7 +40,12 @@
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-core</artifactId>
<version>3.47.0</version>
<version>${version.jdbi}</version>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-jackson2</artifactId>
<version>${version.jdbi}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.Map;
import java.util.Objects;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.statement.Query;
import org.jdbi.v3.core.statement.SqlStatement;
import org.jdbi.v3.jackson2.Jackson2Plugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,12 +42,24 @@ JdbcResponse internalExecuteRequest(JdbcRequestData data, Connection connection)
throws SQLException, IllegalAccessException {
JdbcResponse response;
Jdbi jdbi = Jdbi.create(connection);
jdbi.installPlugin(new Jackson2Plugin());
String databaseProductName =
jdbi.withHandle(
handle -> {
try {
return handle.getConnection().getMetaData().getDatabaseProductName();
} catch (SQLException e) {
LOG.error("Failed to retrieve database dialect", e);
return "Unknown";
}
});

if (data.returnResults()) {
// SELECT query, or RETURNING clause
LOG.debug("Executing query: {}", data.query());
Query q = jdbi.withHandle(handle -> bindVariables(handle.createQuery(data.query()), data));
List<Map<String, Object>> result =
jdbi.withHandle(
handle -> bindVariables(handle.createQuery(data.query()), data).mapToMap().list());
JdbiJsonHelper.mapToParsedMap(databaseProductName, q).list();
response = JdbcResponse.of(result);
LOG.debug("JdbcResponse: {}", response);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.camunda.connector.jdbc.model.client;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.jdbi.v3.core.qualifier.QualifiedType;
import org.jdbi.v3.core.result.ResultIterable;
import org.jdbi.v3.core.result.UnableToProduceResultException;
import org.jdbi.v3.core.statement.Query;
import org.jdbi.v3.json.Json;

public class JdbiJsonHelper {
static Map<String, Set<String>> dbJsonTypeMapping =
Map.of(
"Microsoft SQL Server",
Set.of(),
"MySQL",
Set.of("JSON"),
"PostgreSQL",
Set.of("json", "jsonb"),
"MariaDB",
Set.of("JSON"));

public static ResultIterable<Map<String, Object>> mapToParsedMap(
String databaseProductName, Query query) {
return query.map(
(rs, ctx) -> {
Map<String, Object> row = new HashMap<>();
// Java SQL ResultSet and ResultSetMetadata columns start with index 1:
// https://docs.oracle.com/en/java/javase/17/docs/api/java.sql/java/sql/ResultSetMetaData.html#getColumnTypeName(int)
for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
String columnName = rs.getMetaData().getColumnLabel(i);
Object value = rs.getObject(i);
if (isJsonColumn(databaseProductName, rs.getMetaData().getColumnTypeName(i))) {
try {
value =
ctx.findColumnMapperFor(QualifiedType.of(JsonNode.class).with(Json.class))
.orElseThrow()
.map(rs, i, ctx);
} catch (UnableToProduceResultException ignored) {
row.put(columnName, value);
}
}
row.put(columnName, value);
}
return row;
});
}

private static boolean isJsonColumn(String databaseProductName, String columnTypeName) {
return dbJsonTypeMapping.getOrDefault(databaseProductName, Set.of()).contains(columnTypeName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.camunda.connector.jdbc.model.client.JdbcClient;
import io.camunda.connector.jdbc.model.client.JdbiJdbcClient;
import io.camunda.connector.jdbc.model.client.JdbiJsonHelper;
import io.camunda.connector.jdbc.model.request.JdbcRequest;
import io.camunda.connector.jdbc.model.request.JdbcRequestData;
import io.camunda.connector.jdbc.model.request.SupportedDatabase;
Expand All @@ -29,9 +34,14 @@
import java.util.stream.Collectors;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.result.NoResultsException;
import org.jdbi.v3.core.statement.Query;
import org.jdbi.v3.core.statement.UnableToCreateStatementException;
import org.jdbi.v3.jackson2.Jackson2Plugin;

public abstract class IntegrationBaseTest {

static final String DEFAULT_ADDRESS_JSON =
"{\"street\":\"123 Main St\",\"city\":\"New York\",\"zip\":\"10001\"}";
static final Employee NEW_EMPLOYEE = new Employee(7, "Eve", 55, "HR");

static final List<Employee> DEFAULT_EMPLOYEES =
Expand All @@ -56,19 +66,55 @@ static void createEmployeeTable(IntegrationTestConfig config) throws SQLExceptio
}
}

void addJsonColumn(IntegrationTestConfig config, String jsonDatabaseType) throws SQLException {
try (Connection conn =
DriverManager.getConnection(config.url(), config.username(), config.password());
Statement stmt = conn.createStatement()) {

if (config.databaseName() != null) {
stmt.executeUpdate("USE " + config.databaseName());
}

String addColumnSQL = "ALTER TABLE Employee ADD json " + jsonDatabaseType;
stmt.executeUpdate(addColumnSQL);
String dummyJson;
switch (config.database()) {
case MYSQL, MARIADB -> dummyJson = "'" + DEFAULT_ADDRESS_JSON + "'";
case POSTGRESQL -> dummyJson = "'" + DEFAULT_ADDRESS_JSON + "'::json";
case MSSQL -> dummyJson = "'" + DEFAULT_ADDRESS_JSON + "'";
default ->
throw new UnsupportedOperationException("Unsupported database: " + config.database());
}
String updateSQL = "UPDATE Employee SET json = " + dummyJson;
stmt.executeUpdate(updateSQL);
}
}

void dropJsonColumn(IntegrationTestConfig config) throws SQLException {
try (Connection conn =
DriverManager.getConnection(config.url(), config.username(), config.password());
Statement stmt = conn.createStatement()) {

if (config.databaseName() != null) {
stmt.executeUpdate("USE " + config.databaseName());
}
stmt.executeUpdate("ALTER TABLE Employee DROP COLUMN json");
}
}

List<Map<String, Object>> selectAll(IntegrationTestConfig config, String tableName)
throws SQLException {
try (Connection conn =
DriverManager.getConnection(config.url(), config.username(), config.password())) {
// using jdbi
try (var handle = Jdbi.create(conn).open()) {
var jdbi = Jdbi.create(conn);
jdbi.installPlugin(new Jackson2Plugin());
try (var handle = jdbi.open()) {
if (config.databaseName() != null) {
handle.execute("USE " + config.databaseName());
}
return handle
.createQuery("SELECT * FROM " + tableName + " ORDER BY id ASC")
.mapToMap()
.list();
Query q = handle.createQuery("SELECT * FROM " + tableName + " ORDER BY id ASC");
return JdbiJsonHelper.mapToParsedMap(conn.getMetaData().getDatabaseProductName(), q).list();
}
}
}
Expand Down Expand Up @@ -272,12 +318,12 @@ void selectDataWithPositionalParametersWhereInAndAssertSuccess(IntegrationTestCo
assertNull(response.modifiedRows());
assertNotNull(response.resultSet());
assertEquals(2, response.resultSet().size());
assertEquals(
assertTrue(
DEFAULT_EMPLOYEES.stream()
.filter(e -> e.name().equals("John Doe") || e.name().equals("Jane Doe"))
.map(Employee::toMap)
.collect(Collectors.toList()),
response.resultSet());
.toList()
.containsAll(response.resultSet()));
}

void selectDataWithBindingParametersWhereInAndAssertSuccess(IntegrationTestConfig config) {
Expand Down Expand Up @@ -307,6 +353,27 @@ void selectDataWithBindingParametersWhereInAndAssertSuccess(IntegrationTestConfi
response.resultSet());
}

void selectJsonDataAndAssertSuccess(IntegrationTestConfig config) throws JsonProcessingException {
JdbcRequest request =
new JdbcRequest(
config.database(),
new DetailedConnection(
config.host(),
config.port(),
config.username(),
config.password(),
config.databaseName(),
config.properties()),
new JdbcRequestData(true, "SELECT * FROM Employee ORDER BY Id ASC"));
var response = jdbiJdbcClient.executeRequest(request);

var row = response.resultSet().get(0);
ObjectMapper objectMapper = new ObjectMapper();
var expected = objectMapper.readTree(DEFAULT_ADDRESS_JSON);
assertEquals(
expected.get("street").asText(), ((ObjectNode) row.get("json")).get("street").asText());
}

void updateDataAndAssertSuccess(IntegrationTestConfig config) {
String name = DEFAULT_EMPLOYEES.get(0).name() + " UPDATED";
JdbcRequest request =
Expand Down Expand Up @@ -546,7 +613,7 @@ void insertDataWithNamedParametersAndAssertSuccess(IntegrationTestConfig config)
new JdbcRequestData(
false,
"INSERT INTO Employee (id, name, age, department) VALUES (:id, :name, :age, :department)",
NEW_EMPLOYEE.toMap()));
NEW_EMPLOYEE.toUnparsedMap()));
var response = jdbiJdbcClient.executeRequest(request);
assertEquals(1, response.modifiedRows());
assertNull(response.resultSet());
Expand Down Expand Up @@ -707,5 +774,9 @@ String toInsertQueryFormat() {
Map<String, Object> toMap() {
return Map.of("id", id, "name", name, "age", age, "department", department);
}

Map<String, Object> toUnparsedMap() {
return Map.of("id", id, "name", name, "age", age, "department", department);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ record IntegrationTestConfig(
String username,
String password,
String databaseName,
Map<String, String> properties) {
Map<String, String> properties,
List<String> jsonType) {

public static List<IntegrationTestConfig> from(
MySQLContainer mySqlServer,
Expand All @@ -40,7 +41,8 @@ public static List<IntegrationTestConfig> from(
msSqlServer.getUsername(),
msSqlServer.getPassword(),
null,
Map.of("encrypt", "false")),
Map.of("encrypt", "false"),
List.of()),
new IntegrationTestConfig(
SupportedDatabase.MYSQL,
mySqlServer.getJdbcUrl(),
Expand All @@ -50,7 +52,8 @@ public static List<IntegrationTestConfig> from(
mySqlServer.getUsername(),
mySqlServer.getPassword(),
mySqlServer.getDatabaseName(),
null),
null,
List.of("JSON")),
new IntegrationTestConfig(
SupportedDatabase.POSTGRESQL,
postgreServer.getJdbcUrl(),
Expand All @@ -60,7 +63,8 @@ public static List<IntegrationTestConfig> from(
postgreServer.getUsername(),
postgreServer.getPassword(),
null,
null),
null,
List.of("JSON", "JSONB")),
new IntegrationTestConfig(
SupportedDatabase.MARIADB,
mariaDbContainer.getJdbcUrl(),
Expand All @@ -70,6 +74,7 @@ public static List<IntegrationTestConfig> from(
mariaDbContainer.getUsername(),
mariaDbContainer.getPassword(),
mariaDbContainer.getDatabaseName(),
null));
null,
List.of("JSON")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.camunda.connector.api.error.ConnectorException;
import java.sql.SQLException;
import java.util.List;
Expand Down Expand Up @@ -142,7 +143,8 @@ public void shouldCreateDatabase_whenCreateTableQuery(IntegrationTestConfig conf
Optional.ofNullable(config.rootUser()).orElse(config.username()),
config.password(),
"mydb",
config.properties()),
config.properties(),
config.jsonType()),
"TestTable",
"id INT PRIMARY KEY, name VARCHAR(255)");
cleanUpDatabase(config, "mydb");
Expand Down Expand Up @@ -216,7 +218,8 @@ public void shouldThrowConnectorException_whenWrongUriConnection(IntegrationTest
config.username(),
config.password() + "wrong",
config.databaseName(),
config.properties())));
config.properties(),
config.jsonType())));
assertThat(exception.getMessage()).contains("Cannot create the Database connection");
}

Expand All @@ -238,7 +241,8 @@ public void shouldThrowConnectorException_whenWrongDetailedConnection(
config.username(),
config.password() + "wrong",
config.databaseName(),
config.properties())));
config.properties(),
config.jsonType())));
assertThat(exception.getMessage()).contains("Cannot create the Database connection");
}
}
Expand Down Expand Up @@ -394,4 +398,26 @@ public void shouldReturnResultList_whenSelectQueryWhereInWithBindingParameters(
}
}
}

@Nested
class JsonTests {

@ParameterizedTest
@MethodSource(PROVIDE_SQL_SERVERS_CONFIG)
public void shouldParseJson_whenJsonColumnTypeSupported(IntegrationTestConfig config)
throws SQLException {
config
.jsonType()
.forEach(
jsonType -> {
try {
addJsonColumn(config, jsonType);
selectJsonDataAndAssertSuccess(config);
dropJsonColumn(config);
} catch (SQLException | JsonProcessingException e) {
throw new RuntimeException(e);
}
});
}
}
}

0 comments on commit 95edfb0

Please sign in to comment.