Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get RocksDB Cloud Tests Passing on Windows #120

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions cloud/aws/aws_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ AwsAccessType AwsCloudAccessCredentials::GetAccessType() const {
return AwsAccessType::kConfig;
} else if (!access_key_id.empty() || !secret_key.empty()) {
return AwsAccessType::kSimple;
} else if (getenv("AWS_ACCESS_KEY_ID") != nullptr &&
getenv("AWS_SECRET_ACCESS_KEY") != nullptr) {
return AwsAccessType::kEnvironment;
}
return AwsAccessType::kUndefined;
}
Expand Down Expand Up @@ -161,6 +164,7 @@ Status AwsCloudAccessCredentials::GetCredentialsProvider(
}

#ifdef USE_AWS
static Aws::SDKOptions sdkOptions;

//
// The AWS credentials are specified to the constructor via
Expand All @@ -169,7 +173,8 @@ Status AwsCloudAccessCredentials::GetCredentialsProvider(
AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
const std::shared_ptr<Logger>& info_log)
: CloudEnvImpl(_cloud_env_options, underlying_env, info_log) {
Aws::InitAPI(Aws::SDKOptions());
Aws::InitAPI(sdkOptions); //**TODO: Move this into PrepareOptions and do it
// conditionally (first time)
if (cloud_env_options.src_bucket.GetRegion().empty() ||
cloud_env_options.dest_bucket.GetRegion().empty()) {
std::string region;
Expand All @@ -187,7 +192,11 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
base_env_ = underlying_env;
}

void AwsEnv::Shutdown() { Aws::ShutdownAPI(Aws::SDKOptions()); }
AwsEnv::~AwsEnv() {
//**TODO: Conditionally call shutdown (or make shutdown conditional on last...
}

void AwsEnv::Shutdown() { Aws::ShutdownAPI(sdkOptions); }


// The factory method for creating an S3 Env
Expand Down
2 changes: 1 addition & 1 deletion cloud/aws/aws_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class AwsEnv : public CloudEnvImpl {
const std::shared_ptr<Logger>& info_log,
CloudEnv** cenv);

virtual ~AwsEnv() {}
virtual ~AwsEnv();

const char* Name() const override { return "aws"; }

Expand Down
2 changes: 1 addition & 1 deletion cloud/aws/aws_retry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ Status AwsCloudOptions::GetClientConfiguration(
// Setup how retries need to be done
config->retryStrategy = std::make_shared<AwsRetryStrategy>(env);
if (cloud_env_options.request_timeout_ms != 0) {
config->requestTimeoutMs = cloud_env_options.request_timeout_ms;
config->requestTimeoutMs = static_cast<long>(cloud_env_options.request_timeout_ms);
}

config->region = ToAwsString(region);
Expand Down
85 changes: 71 additions & 14 deletions cloud/cloud_env.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2017 Rockset.
#ifndef ROCKSDB_LITE

#ifndef _WIN32_WINNT
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>
Expand All @@ -12,6 +12,7 @@
#include "cloud/cloud_env_wrapper.h"
#include "cloud/db_cloud_impl.h"
#include "cloud/filename.h"
#include "file/filename.h"
#include "port/likely.h"
#include "rocksdb/cloud/cloud_log_controller.h"
#include "rocksdb/db.h"
Expand Down Expand Up @@ -59,6 +60,50 @@ void BucketOptions::SetBucketName(const std::string& bucket,
}
}

static Status IsNormalizedPath(const std::string& path) {
auto pos = path.find_first_of("\\{}[]<>%~#|^\'\"");
if (pos != std::string::npos) {
return Status::InvalidArgument("Illegal character in object path:", path);
}
pos = path.find_first_of("&$@=;:+,?");
if (pos != std::string::npos) {
return Status::InvalidArgument("Special character in object path: ", path);
}
return Status::OK();
}

Status BucketOptions::NormalizeObjectPath(const std::string& path,
std::string* result) {
// Remove the drive if there is one...
auto colon = path.find(':');
std::string normalized;
if (colon != std::string::npos) {
normalized = path.substr(colon + 1);
} else {
normalized = path;
}
// Replace any "\" with "/"
for (auto pos = normalized.find('\\'); pos != std::string::npos;
pos = normalized.find('\\', pos)) {
normalized[pos] = '/';
}
// Remove any duplicate markers
normalized = NormalizePath(normalized);
Status s = IsNormalizedPath(normalized);
if (s.ok()) {
*result = normalized;
}
return s;
}

Status BucketOptions::SetObjectPath(const std::string& object) {
Status s = IsNormalizedPath(object);
if (s.ok()) {
object_ = object;
}
return s;
}

// Initializes the bucket properties

void BucketOptions::TEST_Initialize(const std::string& bucket,
Expand All @@ -70,15 +115,13 @@ void BucketOptions::TEST_Initialize(const std::string& bucket,
if (!CloudEnvOptions::GetNameFromEnvironment("ROCKSDB_CLOUD_TEST_BUCKET_NAME",
"ROCKSDB_CLOUD_BUCKET_NAME",
&bucket_)) {
#ifdef _WIN32_WINNT
#ifdef _WIN32
char user_name[257]; // UNLEN + 1
DWORD dwsize = sizeof(user_name);
if (!::GetUserName(user_name, &dwsize)) {
bucket_ = bucket_ + "unknown";
} else {
bucket_ =
bucket_ +
std::string(user_name, static_cast<std::string::size_type>(dwsize));
bucket_ = bucket_ + user_name;
}
#else
bucket_ = bucket + std::to_string(geteuid());
Expand All @@ -90,11 +133,16 @@ void BucketOptions::TEST_Initialize(const std::string& bucket,
prefix_ = prefix;
}
name_ = prefix_ + bucket_;
if (!CloudEnvOptions::GetNameFromEnvironment("ROCKSDB_CLOUD_TEST_OBECT_PATH",
"ROCKSDB_CLOUD_OBJECT_PATH",
&object_)) {
object_ = object;
std::string value;
if (CloudEnvOptions::GetNameFromEnvironment("ROCKSDB_CLOUD_TEST_OBECT_PATH",
"ROCKSDB_CLOUD_OBJECT_PATH",
&value)) {
NormalizeObjectPath(value, &value);
} else {
NormalizeObjectPath(object, &value);
}
SetObjectPath(value);

if (!CloudEnvOptions::GetNameFromEnvironment(
"ROCKSDB_CLOUD_TEST_REGION", "ROCKSDB_CLOUD_REGION", &region_)) {
region_ = region;
Expand All @@ -117,15 +165,24 @@ Status CloudEnv::NewAwsEnv(
const std::string& dest_cloud_region, const CloudEnvOptions& cloud_options,
const std::shared_ptr<Logger>& logger, CloudEnv** cenv) {
CloudEnvOptions options = cloud_options;
Status s;
if (!src_cloud_bucket.empty())
options.src_bucket.SetBucketName(src_cloud_bucket);
if (!src_cloud_object.empty())
options.src_bucket.SetObjectPath(src_cloud_object);
if (!src_cloud_object.empty()) {
s = options.src_bucket.SetObjectPath(src_cloud_object);
if (!s.ok()) {
return s;
}
}
if (!src_cloud_region.empty()) options.src_bucket.SetRegion(src_cloud_region);
if (!dest_cloud_bucket.empty())
if (!dest_cloud_bucket.empty())
options.dest_bucket.SetBucketName(dest_cloud_bucket);
if (!dest_cloud_object.empty())
options.dest_bucket.SetObjectPath(dest_cloud_object);
if (!dest_cloud_object.empty()) {
s = options.dest_bucket.SetObjectPath(dest_cloud_object);
if (!s.ok()) {
return s;
}
}
if (!dest_cloud_region.empty())
options.dest_bucket.SetRegion(dest_cloud_region);
return NewAwsEnv(base_env, options, logger, cenv);
Expand Down
2 changes: 1 addition & 1 deletion cloud/cloud_env_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ std::string CloudEnvImpl::RemapFilename(const std::string& logical_path) const {
return logical_path;
}
auto file_name = basename(logical_path);
uint64_t fileNumber;
uint64_t fileNumber = 0;
FileType type;
WalFileType walType;
if (file_name == "MANIFEST") {
Expand Down
11 changes: 10 additions & 1 deletion cloud/cloud_env_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
#include "rocksdb/env.h"
#include "rocksdb/status.h"

#ifdef _WIN32
// Windows API macro interference
#undef DeleteFile
#undef GetCurrentTime
#undef GetFreeSpace
#endif

namespace ROCKSDB_NAMESPACE {
class CloudScheduler;
class CloudStorageReadableFile;
Expand Down Expand Up @@ -336,7 +343,9 @@ class CloudEnvImpl : public CloudEnv {
bool test_disable_cloud_manifest_{false};

// scratch space in local dir
static constexpr const char* SCRATCH_LOCAL_DIR = "/tmp";
std::string GetScratchDirectory() const;
std::string GetScratchFile() const;

std::mutex files_to_delete_mutex_;
std::chrono::seconds file_deletion_delay_ = std::chrono::hours(1);
std::unordered_map<std::string, int> files_to_delete_;
Expand Down
18 changes: 12 additions & 6 deletions cloud/cloud_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@
#include <thread>
#include <unordered_set>

#include "rocksdb/env.h"
#include "test_util/testharness.h"

namespace ROCKSDB_NAMESPACE {

class CloudSchedulerTest : public testing::Test {
public:
CloudSchedulerTest() { scheduler_ = CloudScheduler::Get(); }
CloudSchedulerTest() {
scheduler_ = CloudScheduler::Get();
env_ = Env::Default();
}
~CloudSchedulerTest() {}

std::shared_ptr<CloudScheduler> scheduler_;
Env *env_;

void WaitForJobs(const std::vector<long> &jobs, uint32_t delay) {
bool running = true;
while (running) {
Expand All @@ -34,7 +40,7 @@ class CloudSchedulerTest : public testing::Test {
}
}
if (running) {
usleep(delay);
env_->SleepForMicroseconds(delay);
}
}
}
Expand Down Expand Up @@ -89,14 +95,14 @@ TEST_F(CloudSchedulerTest, TestRecurring) {
std::chrono::microseconds(100), doJob2,
nullptr);
while (job2 <= 4) {
usleep(100);
env_->SleepForMicroseconds(100);
}
ASSERT_GE(job2.load(), 4);
ASSERT_GT(job1.load(), job2);
ASSERT_TRUE(scheduler_->CancelJob(handle1));
auto old1 = job1.load();
auto old2 = job2.load();
usleep(200);
env_->SleepForMicroseconds(200);
ASSERT_EQ(job1.load(), old1);
ASSERT_GT(job2.load(), old2);
}
Expand All @@ -117,7 +123,7 @@ TEST_F(CloudSchedulerTest, TestMultipleSchedulers) {
ASSERT_FALSE(scheduler2->CancelJob(handle1));
ASSERT_TRUE(scheduler2->CancelJob(handle2));
ASSERT_FALSE(scheduler2->CancelJob(handle2));
usleep(200);
env_->SleepForMicroseconds(200);
ASSERT_EQ(job1, 2);
ASSERT_EQ(job2, 1);

Expand All @@ -130,7 +136,7 @@ TEST_F(CloudSchedulerTest, TestMultipleSchedulers) {
scheduler2.reset();
auto old1 = job1.load();
auto old2 = job2.load();
usleep(200);
env_->SleepForMicroseconds(200);
ASSERT_EQ(job2, old2);
ASSERT_GT(job1, old1);
}
Expand Down
Loading