Skip to content

Commit

Permalink
feat(idp-extraction-connector): add integration and support for gemini
Browse files Browse the repository at this point in the history
  • Loading branch information
reiballa committed Feb 26, 2025
1 parent 916730a commit 41fe024
Show file tree
Hide file tree
Showing 19 changed files with 427 additions and 48 deletions.
17 changes: 17 additions & 0 deletions connectors/idp-extraction/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.camunda.connector</groupId>
<artifactId>connector-google-base</artifactId>
<version>8.8.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bedrockruntime</artifactId>
Expand All @@ -51,6 +58,15 @@
<artifactId>pdfbox</artifactId>
<version>${version.pdfbox}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-vertexai</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>2.48.1</version>
</dependency>
</dependencies>

<build>
Expand All @@ -74,6 +90,7 @@
</connectors>
<includeDependencies>
<includeDependency>io.camunda.connector:connector-aws-base</includeDependency>
<includeDependency>io.camunda.connector:connector-google-base</includeDependency>
</includeDependencies>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.api.outbound.OutboundConnectorFunction;
import io.camunda.connector.aws.model.impl.AwsBaseRequest;
import io.camunda.connector.generator.java.annotation.ElementTemplate;
import io.camunda.connector.idp.extraction.caller.BedrockCaller;
import io.camunda.connector.idp.extraction.caller.GeminiCaller;
import io.camunda.connector.idp.extraction.caller.PollingTextractCaller;
import io.camunda.connector.idp.extraction.model.ExtractionRequest;
import io.camunda.connector.idp.extraction.model.ExtractionResult;
import io.camunda.connector.idp.extraction.model.*;
import io.camunda.connector.idp.extraction.supplier.BedrockRuntimeClientSupplier;
import io.camunda.connector.idp.extraction.supplier.S3ClientSupplier;
import io.camunda.connector.idp.extraction.supplier.TextractClientSupplier;
Expand All @@ -26,7 +27,7 @@

@OutboundConnector(
name = "IDP extraction outbound Connector",
inputVariables = {"baseRequest", "input"},
inputVariables = {"baseRequest", "providerConfiguration", "input"},
type = "io.camunda:idp-extraction-connector-template:1")
@ElementTemplate(
id = "io.camunda.connector.IdpExtractionOutBoundTemplate.v1",
Expand All @@ -51,58 +52,106 @@ public class ExtractionConnectorFunction implements OutboundConnectorFunction {

private final BedrockCaller bedrockCaller;

private final GeminiCaller geminiCaller;

public ExtractionConnectorFunction() {
this.textractClientSupplier = new TextractClientSupplier();
this.s3ClientSupplier = new S3ClientSupplier();
this.bedrockRuntimeClientSupplier = new BedrockRuntimeClientSupplier();
this.pollingTextractCaller = new PollingTextractCaller();
this.bedrockCaller = new BedrockCaller();
this.geminiCaller = new GeminiCaller();
}

public ExtractionConnectorFunction(
PollingTextractCaller pollingTextractCaller, BedrockCaller bedrockCaller) {
PollingTextractCaller pollingTextractCaller,
BedrockCaller bedrockCaller,
GeminiCaller geminiCaller) {
this.textractClientSupplier = new TextractClientSupplier();
this.s3ClientSupplier = new S3ClientSupplier();
this.bedrockRuntimeClientSupplier = new BedrockRuntimeClientSupplier();
this.pollingTextractCaller = pollingTextractCaller;
this.bedrockCaller = bedrockCaller;
this.geminiCaller = geminiCaller;
}

@Override
public Object execute(OutboundConnectorContext context) {
final var extractionRequest = context.bindVariables(ExtractionRequest.class);

return switch (extractionRequest.input().extractionEngineType()) {
case GCP_GEMINI -> extractUsingGcp(extractionRequest);
case APACHE_PDFBOX, AWS_TEXTRACT -> extractUsingAws(extractionRequest);
};
}

private ExtractionResult extractUsingGcp(ExtractionRequest extractionRequest) {
try {
long startTime = System.currentTimeMillis();
Object result = geminiCaller.generateContent(extractionRequest);
long endTime = System.currentTimeMillis();
LOGGER.info("Gemini content extraction took {} ms", (endTime - startTime));
return new ExtractionResult(result);
} catch (Exception e) {
LOGGER.error(
"Document extraction via {} failed: {}",
extractionRequest.input().extractionEngineType(),
e.getMessage());
throw new ConnectorException(e);
}
}

private ExtractionResult extractUsingAws(ExtractionRequest extractionRequest) {
AwsBaseRequest baseRequest = getAwsBaseRequest(extractionRequest);
try {
long startTime = System.currentTimeMillis();
String extractedText =
switch (extractionRequest.input().extractionEngineType()) {
case AWS_TEXTRACT -> extractTextUsingAwsTextract(extractionRequest);
case AWS_TEXTRACT ->
extractTextUsingAwsTextract(extractionRequest.input(), baseRequest);
case APACHE_PDFBOX -> extractTextUsingApachePdf(extractionRequest);
default ->
throw new ConnectorException("Unsupported extraction engine for AWS provider");
};

String bedrockResponse =
bedrockCaller.call(
extractionRequest,
extractedText,
bedrockRuntimeClientSupplier.getBedrockRuntimeClient(extractionRequest));

bedrockRuntimeClientSupplier.getBedrockRuntimeClient(baseRequest));
long endTime = System.currentTimeMillis();
LOGGER.info("Aws content extraction took {} ms", (endTime - startTime));
return new ExtractionResult(bedrockResponse);
} catch (Exception e) {
LOGGER.error("Document extraction failed: {}", e.getMessage());
throw new ConnectorException(e);
}
}

private String extractTextUsingAwsTextract(ExtractionRequest extractionRequest) throws Exception {
private String extractTextUsingAwsTextract(
ExtractionRequestData input, AwsBaseRequest baseRequest) throws Exception {
return pollingTextractCaller.call(
extractionRequest.input().document(),
extractionRequest.input().s3BucketName(),
textractClientSupplier.getTextractClient(extractionRequest),
s3ClientSupplier.getAsyncS3Client(extractionRequest));
input.document(),
input.s3BucketName(),
textractClientSupplier.getTextractClient(baseRequest),
s3ClientSupplier.getAsyncS3Client(baseRequest));
}

private String extractTextUsingApachePdf(ExtractionRequest extractionRequest) throws Exception {
PDDocument document = Loader.loadPDF(extractionRequest.input().document().asByteArray());
PDFTextStripper pdfStripper = new PDFTextStripper();
return pdfStripper.getText(document);
}

// for compatibility with older versions of web-modeler
private AwsBaseRequest getAwsBaseRequest(ExtractionRequest extractionRequest) {
if (extractionRequest.baseRequest() != null) {
return extractionRequest.baseRequest();
} else if (extractionRequest.providerConfiguration() != null
&& extractionRequest.providerConfiguration().awsRequest() != null) {
return extractionRequest.providerConfiguration().awsRequest();
} else {
throw new ConnectorException("Aws request is not provided");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. Licensed under a proprietary license.
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
package io.camunda.connector.idp.extraction.caller;

import static com.google.cloud.vertexai.api.HarmCategory.*;

import com.google.cloud.vertexai.VertexAI;
import com.google.cloud.vertexai.api.*;
import com.google.cloud.vertexai.generativeai.ContentMaker;
import com.google.cloud.vertexai.generativeai.GenerativeModel;
import com.google.cloud.vertexai.generativeai.PartMaker;
import com.google.cloud.vertexai.generativeai.ResponseHandler;
import io.camunda.connector.idp.extraction.model.ExtractionRequest;
import io.camunda.connector.idp.extraction.model.ExtractionRequestData;
import io.camunda.connector.idp.extraction.model.LlmModel;
import io.camunda.connector.idp.extraction.supplier.VertexAISupplier;
import io.camunda.connector.idp.extraction.utils.GcsUtil;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GeminiCaller {

private static final Logger LOGGER = LoggerFactory.getLogger(GeminiCaller.class);

public String generateContent(ExtractionRequest extractionRequest) throws Exception {
LOGGER.debug("Starting gemini generate content with request data: {}", extractionRequest);

Check failure

Code scanning / CodeQL

Insertion of sensitive information into log files High

This
potentially sensitive information
is written to a log file.
This
potentially sensitive information
is written to a log file.
LlmModel llmModel = LlmModel.fromId(extractionRequest.input().converseData().modelId());
String fileUri;
try {
fileUri =
GcsUtil.uploadNewFileFromDocument(
extractionRequest.input().document(),
extractionRequest
.providerConfiguration()
.geminiRequest()
.getConfiguration()
.bucketName(),
extractionRequest
.providerConfiguration()
.geminiRequest()
.getConfiguration()
.projectId(),
extractionRequest.providerConfiguration().geminiRequest().getAuthentication());
LOGGER.debug("File uploaded to GCS with URI: {}", fileUri);
} catch (IOException e) {
LOGGER.error("Error while uploading file to GCS", e);
throw new RuntimeException(e);
}

try (VertexAI vertexAi =
VertexAISupplier.getVertexAI(extractionRequest.providerConfiguration().geminiRequest())) {
GenerativeModel model =
getGenerativeModel(extractionRequest, vertexAi)
.withSystemInstruction(ContentMaker.fromString(llmModel.getSystemPrompt()));
var content =
ContentMaker.fromMultiModalData(
llmModel.getMessage(extractionRequest.input().taxonomyItems()),
PartMaker.fromMimeTypeAndData(
extractionRequest.input().document().metadata().getContentType(), fileUri));
GenerateContentResponse response = model.generateContent(content);
String output = ResponseHandler.getText(response);
LOGGER.debug("Gemini generate content response: {}", output);
return output;
} finally {
CompletableFuture.runAsync(
() -> {
try {
GcsUtil.deleteObjectFromBucket(
extractionRequest
.providerConfiguration()
.geminiRequest()
.getConfiguration()
.bucketName(),
extractionRequest.input().document().metadata().getFileName(),
extractionRequest
.providerConfiguration()
.geminiRequest()
.getConfiguration()
.projectId(),
extractionRequest.providerConfiguration().geminiRequest().getAuthentication());
LOGGER.debug("File deleted from GCS");
} catch (Exception e) {
LOGGER.error("Error while deleting file from GCS", e);
}
});
}
}

private GenerativeModel getGenerativeModel(ExtractionRequest requestData, VertexAI vertexAi) {
GenerativeModel.Builder modelBuilder =
new GenerativeModel.Builder()
.setModelName(requestData.input().converseData().modelId())
.setVertexAi(vertexAi)
.setGenerationConfig(buildGenerationConfig(requestData.input()))
.setSafetySettings(prepareSafetySettings());

return modelBuilder.build();
}

private GenerationConfig buildGenerationConfig(ExtractionRequestData input) {
GenerationConfig.Builder builder =
GenerationConfig.newBuilder()
.setMaxOutputTokens(input.converseData().maxTokens())
.setTemperature(input.converseData().temperature())
.setTopP(input.converseData().topP());
return builder.build();
}

private List<SafetySetting> prepareSafetySettings() {
return List.of(
createSafetySetting(
HARM_CATEGORY_HATE_SPEECH, SafetySetting.HarmBlockThreshold.BLOCK_ONLY_HIGH),
createSafetySetting(HARM_CATEGORY_DANGEROUS_CONTENT, SafetySetting.HarmBlockThreshold.OFF),
createSafetySetting(
HARM_CATEGORY_SEXUALLY_EXPLICIT, SafetySetting.HarmBlockThreshold.BLOCK_ONLY_HIGH),
createSafetySetting(
HARM_CATEGORY_HARASSMENT, SafetySetting.HarmBlockThreshold.BLOCK_ONLY_HIGH));
}

private SafetySetting createSafetySetting(
HarmCategory category, SafetySetting.HarmBlockThreshold threshold) {
return SafetySetting.newBuilder().setCategory(category).setThreshold(threshold).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@

import io.camunda.connector.aws.model.impl.AwsBaseRequest;

@Deprecated
public class BaseRequest extends AwsBaseRequest {}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@
import jakarta.validation.constraints.NotNull;

public record ExtractionRequest(
@Valid @NotNull ExtractionRequestData input, BaseRequest baseRequest) {}
@Valid @NotNull ExtractionRequestData input,
BaseRequest baseRequest, // need to keep this here for backwards compatibility
ProviderConfiguration providerConfiguration) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. Licensed under a proprietary license.
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
package io.camunda.connector.idp.extraction.model;

import io.camunda.google.model.GoogleBaseRequest;

public class GeminiBaseRequest extends GoogleBaseRequest {

private GeminiRequestConfiguration configuration;

public GeminiRequestConfiguration getConfiguration() {
return configuration;
}

public void setConfiguration(GeminiRequestConfiguration configuration) {
this.configuration = configuration;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. Licensed under a proprietary license.
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
package io.camunda.connector.idp.extraction.model;

public record GeminiRequestConfiguration(
String region, String projectId, String bucketName, String grounding, String safetySettings) {}
Loading

0 comments on commit 41fe024

Please sign in to comment.