Skip to content

Commit

Permalink
load test automation (GoogleCloudPlatform#1891)
Browse files Browse the repository at this point in the history
* load test automation

* added max connections

* doc fixes

* tps increase
  • Loading branch information
aksharauke authored Sep 25, 2024
1 parent ce2240d commit 8f14b1b
Show file tree
Hide file tree
Showing 6 changed files with 736 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;

import static org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils.getFullGcsPath;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;

import com.google.cloud.teleport.metadata.TemplateLoadTest;
import com.google.common.io.Resources;
import java.io.IOException;
import java.text.ParseException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.gcp.datagenerator.DataGenerator;
import org.apache.beam.it.jdbc.JDBCResourceManager;
import org.apache.beam.it.jdbc.MySQLResourceManager;
import org.apache.beam.it.jdbc.conditions.JDBCRowsCheck;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(TemplateLoadTest.class)
@TemplateLoadTest(SpannerToSourceDb.class)
@RunWith(JUnit4.class)
public class SpannerToMySqlSourceLT extends SpannerToSourceDbLTBase {

private static final Logger LOG = LoggerFactory.getLogger(SpannerToMySqlSourceLT.class);

private String generatorSchemaPath;
private final String artifactBucket = TestProperties.artifactBucket();
private final String spannerDdlResource = "SpannerToMySqlSourceLT/spanner-schema.sql";
private final String sessionFileResource = "SpannerToMySqlSourceLT/session.json";
private final String dataGeneratorSchemaResource =
"SpannerToMySqlSourceLT/datagenerator-schema.json";
private final String table = "Person";
private final int maxWorkers = 50;
private final int numWorkers = 20;
private PipelineLauncher.LaunchInfo jobInfo;
private PipelineLauncher.LaunchInfo readerJobInfo;

@Before
public void setup() throws IOException {
setupResourceManagers(spannerDdlResource, sessionFileResource, artifactBucket);
setupMySQLResourceManager(1);
generatorSchemaPath =
getFullGcsPath(
artifactBucket,
gcsResourceManager
.uploadArtifact(
"input/schema.json",
Resources.getResource(dataGeneratorSchemaResource).getPath())
.name());

createMySQLSchema(jdbcResourceManagers);
jobInfo = launchDataflowJob(artifactBucket, numWorkers, maxWorkers);
}

@After
public void tearDown() {
cleanupResourceManagers();
}

@Test
public void reverseReplication1KTpsLoadTest()
throws IOException, ParseException, InterruptedException {
// Start data generator
DataGenerator dataGenerator =
DataGenerator.builderWithSchemaLocation(testName, generatorSchemaPath)
.setQPS("1000")
.setMessagesLimit(String.valueOf(300000))
.setSpannerInstanceName(spannerResourceManager.getInstanceId())
.setSpannerDatabaseName(spannerResourceManager.getDatabaseId())
.setSpannerTableName(table)
.setNumWorkers("50")
.setMaxNumWorkers("100")
.setSinkType("SPANNER")
.setProjectId(project)
.setBatchSizeBytes("0")
.build();

dataGenerator.execute(Duration.ofMinutes(90));
assertThatPipeline(jobInfo).isRunning();

JDBCRowsCheck check =
JDBCRowsCheck.builder(jdbcResourceManagers.get(0), table)
.setMinRows(300000)
.setMaxRows(300000)
.build();

PipelineOperator.Result result =
pipelineOperator.waitForCondition(
createConfig(jobInfo, Duration.ofMinutes(10), Duration.ofSeconds(30)), check);

// Assert Conditions
assertThatResult(result).meetsConditions();

PipelineOperator.Result result1 =
pipelineOperator.cancelJobAndFinish(createConfig(jobInfo, Duration.ofMinutes(20)));

assertThatResult(result1).isLaunchFinished();

// export results
exportMetricsToBigQuery(jobInfo, getMetrics(jobInfo));
}

private void createMySQLSchema(List<JDBCResourceManager> jdbcResourceManagers) {
if (!(jdbcResourceManagers.get(0) instanceof MySQLResourceManager)) {
throw new IllegalArgumentException(jdbcResourceManagers.get(0).getClass().getSimpleName());
}
MySQLResourceManager jdbcResourceManager = (MySQLResourceManager) jdbcResourceManagers.get(0);
HashMap<String, String> columns = new HashMap<>();
columns.put("first_name1", "varchar(500)");
columns.put("last_name1", "varchar(500)");
columns.put("first_name2", "varchar(500)");
columns.put("last_name2", "varchar(500)");
columns.put("first_name3", "varchar(500)");
columns.put("last_name3", "varchar(500)");
columns.put("ID", "varchar(100) NOT NULL");

JDBCResourceManager.JDBCSchema schema = new JDBCResourceManager.JDBCSchema(columns, "ID");

jdbcResourceManager.createTable(table, schema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.common.base.MoreObjects;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.TemplateLoadTestBase;
import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.apache.beam.it.jdbc.JDBCResourceManager;
import org.apache.beam.it.jdbc.MySQLResourceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class for Spanner to sourcedb Load tests. It provides helper functions related to
* environment setup and assertConditions.
*/
public class SpannerToSourceDbLTBase extends TemplateLoadTestBase {

private static final Logger LOG = LoggerFactory.getLogger(SpannerToSourceDbLTBase.class);

private static final String TEMPLATE_SPEC_PATH =
MoreObjects.firstNonNull(
TestProperties.specPath(), "gs://dataflow-templates/latest/flex/Spanner_to_SourceDb");
public SpannerResourceManager spannerResourceManager;
public SpannerResourceManager spannerMetadataResourceManager;
public List<JDBCResourceManager> jdbcResourceManagers;
public GcsResourceManager gcsResourceManager;
private static PubsubResourceManager pubsubResourceManager;
private SubscriptionName subscriptionName;

public void setupResourceManagers(
String spannerDdlResource, String sessionFileResource, String artifactBucket)
throws IOException {
spannerResourceManager = createSpannerDatabase(spannerDdlResource);
spannerMetadataResourceManager = createSpannerMetadataDatabase();

gcsResourceManager =
GcsResourceManager.builder(artifactBucket, getClass().getSimpleName(), CREDENTIALS).build();

gcsResourceManager.uploadArtifact(
"input/session.json", Resources.getResource(sessionFileResource).getPath());

pubsubResourceManager = setUpPubSubResourceManager();
subscriptionName =
createPubsubResources(
getClass().getSimpleName(),
pubsubResourceManager,
getGcsPath(artifactBucket, "dlq", gcsResourceManager));
}

public void setupMySQLResourceManager(int numShards) throws IOException {
jdbcResourceManagers = new ArrayList<>();
for (int i = 0; i < numShards; ++i) {
jdbcResourceManagers.add(MySQLResourceManager.builder(testName).build());
}

createAndUploadShardConfigToGcs(gcsResourceManager, jdbcResourceManagers);
}

public void cleanupResourceManagers() {
ResourceManagerUtils.cleanResources(
spannerResourceManager,
spannerMetadataResourceManager,
gcsResourceManager,
pubsubResourceManager);
for (JDBCResourceManager jdbcResourceManager : jdbcResourceManagers) {
ResourceManagerUtils.cleanResources(jdbcResourceManager);
}
}

public PubsubResourceManager setUpPubSubResourceManager() throws IOException {
return PubsubResourceManager.builder(testName, project, CREDENTIALS_PROVIDER).build();
}

public SubscriptionName createPubsubResources(
String identifierSuffix, PubsubResourceManager pubsubResourceManager, String gcsPrefix) {
String topicNameSuffix = "rr-load" + identifierSuffix;
String subscriptionNameSuffix = "rr-load-sub" + identifierSuffix;
TopicName topic = pubsubResourceManager.createTopic(topicNameSuffix);
SubscriptionName subscription =
pubsubResourceManager.createSubscription(topic, subscriptionNameSuffix);
String prefix = gcsPrefix;
if (prefix.startsWith("/")) {
prefix = prefix.substring(1);
}
prefix += "/retry";
gcsResourceManager.createNotification(topic.toString(), prefix);
return subscription;
}

public SpannerResourceManager createSpannerDatabase(String spannerDdlResourceFile)
throws IOException {
SpannerResourceManager spannerResourceManager =
SpannerResourceManager.builder("rr-loadtest-" + testName, project, region)
.maybeUseStaticInstance()
.build();
String ddl =
String.join(
" ",
Resources.readLines(
Resources.getResource(spannerDdlResourceFile), StandardCharsets.UTF_8));
ddl = ddl.trim();
String[] ddls = ddl.split(";");
for (String d : ddls) {
if (!d.isBlank()) {
spannerResourceManager.executeDdlStatement(d);
}
}
return spannerResourceManager;
}

public SpannerResourceManager createSpannerMetadataDatabase() throws IOException {
SpannerResourceManager spannerMetadataResourceManager =
SpannerResourceManager.builder("rr-meta-" + testName, project, region)
.maybeUseStaticInstance()
.build();
String dummy = "create table t1(id INT64 ) primary key(id)";
spannerMetadataResourceManager.executeDdlStatement(dummy);
return spannerMetadataResourceManager;
}

public void createAndUploadShardConfigToGcs(
GcsResourceManager gcsResourceManager, List<JDBCResourceManager> jdbcResourceManagers)
throws IOException {
JsonArray ja = new JsonArray();
for (int i = 0; i < 1; ++i) {
if (jdbcResourceManagers.get(i) instanceof MySQLResourceManager) {
MySQLResourceManager resourceManager = (MySQLResourceManager) jdbcResourceManagers.get(i);
Shard shard = new Shard();
shard.setLogicalShardId("Shard" + (i + 1));
shard.setUser(jdbcResourceManagers.get(i).getUsername());
shard.setHost(resourceManager.getHost());
shard.setPassword(jdbcResourceManagers.get(i).getPassword());
shard.setPort(String.valueOf(resourceManager.getPort()));
shard.setDbName(jdbcResourceManagers.get(i).getDatabaseName());
JsonObject jsObj = (JsonObject) new Gson().toJsonTree(shard).getAsJsonObject();
jsObj.remove("secretManagerUri"); // remove field secretManagerUri
ja.add(jsObj);
} else {
throw new UnsupportedOperationException(
jdbcResourceManagers.get(i).getClass().getSimpleName() + " is not supported");
}
}
String shardFileContents = ja.toString();
LOG.info("Shard file contents: {}", shardFileContents);
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
}

public PipelineLauncher.LaunchInfo launchDataflowJob(
String artifactBucket, int numWorkers, int maxWorkers) throws IOException {
// default parameters

Map<String, String> params =
new HashMap<>() {
{
put(
"sessionFilePath",
getGcsPath(artifactBucket, "input/session.json", gcsResourceManager));
put("instanceId", spannerResourceManager.getInstanceId());
put("databaseId", spannerResourceManager.getDatabaseId());
put("spannerProjectId", project);
put("metadataDatabase", spannerMetadataResourceManager.getDatabaseId());
put("metadataInstance", spannerMetadataResourceManager.getInstanceId());
put(
"sourceShardsFilePath",
getGcsPath(artifactBucket, "input/shard.json", gcsResourceManager));
put("changeStreamName", "allstream");
put("dlqGcsPubSubSubscription", subscriptionName.toString());
put("deadLetterQueueDirectory", getGcsPath(artifactBucket, "dlq", gcsResourceManager));
put("maxShardConnections", "100");
}
};

LaunchConfig.Builder options =
LaunchConfig.builder(getClass().getSimpleName(), TEMPLATE_SPEC_PATH);
options
.addEnvironment("maxWorkers", maxWorkers)
.addEnvironment("numWorkers", numWorkers)
.addEnvironment("additionalExperiments", Collections.singletonList("use_runner_v2"));

options.setParameters(params);
PipelineLauncher.LaunchInfo jobInfo = pipelineLauncher.launch(project, region, options.build());
return jobInfo;
}

public String getGcsPath(
String bucket, String artifactId, GcsResourceManager gcsResourceManager) {
return ArtifactUtils.getFullGcsPath(
bucket, getClass().getSimpleName(), gcsResourceManager.runId(), artifactId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"ID": "{{uuid()}}",
"first_name1": "{{alphaNumeric(460,500)}}",
"last_name1": "{{alphaNumeric(460,500)}}",
"first_name2": "{{alphaNumeric(460,500)}}",
"last_name2": "{{alphaNumeric(460,500)}}",
"first_name3": "{{alphaNumeric(460,500)}}",
"last_name3": "{{alphaNumeric(460,500)}}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE `person1` (
`first_name1` varchar(500) DEFAULT NULL,
`last_name1` varchar(500) DEFAULT NULL,
`first_name2` varchar(500) DEFAULT NULL,
`last_name2` varchar(500) DEFAULT NULL,
`first_name3` varchar(500) DEFAULT NULL,
`last_name3` varchar(500) DEFAULT NULL,
`ID` varchar(100) NOT NULL,
PRIMARY KEY (`ID`)
)
Loading

0 comments on commit 8f14b1b

Please sign in to comment.