Skip to content

Commit

Permalink
Rebase fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdebreceni committed Jan 23, 2025
1 parent 27ac6d1 commit 7949b98
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 96 deletions.
6 changes: 3 additions & 3 deletions extensions/civetweb/processors/ListenHTTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ bool ListenHTTP::processRequestBuffer(core::ProcessSession& session) {

namespace {

class MgConnectionInputStream : public io::InputStream {
class MgConnectionInputStream : public io::InputStreamImpl {
public:
MgConnectionInputStream(struct mg_connection* conn, std::optional<size_t> size): conn_(conn), netstream_size_limit_(size) {}

Expand All @@ -275,7 +275,7 @@ class MgConnectionInputStream : public io::InputStream {
std::optional<size_t> netstream_size_limit_; // how much can we read from conn_
};

class MgConnectionOutputStream : public io::OutputStream {
class MgConnectionOutputStream : public io::StreamImpl, public virtual io::OutputStreamImpl {
public:
explicit MgConnectionOutputStream(struct mg_connection* conn): conn_(conn) {}

Expand Down Expand Up @@ -548,7 +548,7 @@ void ListenHTTP::notifyStop() {
}

std::set<core::Connectable*> ListenHTTP::getOutGoingConnections(const std::string &relationship) {
auto result = core::Processor::getOutGoingConnections(relationship);
auto result = core::ProcessorImpl::getOutGoingConnections(relationship);
if (relationship == Self.name) {
result.insert(this);
}
Expand Down
3 changes: 2 additions & 1 deletion extensions/couchbase/tests/GetCouchbaseKeyTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class GetCouchbaseKeyTestController : public TestController {
LogTestController::getInstance().setDebug<controllers::CouchbaseClusterService>();
LogTestController::getInstance().setDebug<processors::GetCouchbaseKey>();
auto controller_service_node = controller_.plan->addController("MockCouchbaseClusterService", "MockCouchbaseClusterService");
mock_couchbase_cluster_service_ = std::static_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation());
mock_couchbase_cluster_service_ = std::dynamic_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation());
gsl_Assert(mock_couchbase_cluster_service_);
proc_->setProperty(processors::GetCouchbaseKey::CouchbaseClusterControllerService, "MockCouchbaseClusterService");
}

Expand Down
3 changes: 2 additions & 1 deletion extensions/couchbase/tests/PutCouchbaseKeyTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class PutCouchbaseKeyTestController : public TestController {
LogTestController::getInstance().setDebug<controllers::CouchbaseClusterService>();
LogTestController::getInstance().setDebug<processors::PutCouchbaseKey>();
auto controller_service_node = controller_.plan->addController("MockCouchbaseClusterService", "MockCouchbaseClusterService");
mock_couchbase_cluster_service_ = std::static_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation());
mock_couchbase_cluster_service_ = std::dynamic_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation());
gsl_Assert(mock_couchbase_cluster_service_);
proc_->setProperty(processors::PutCouchbaseKey::CouchbaseClusterControllerService, "MockCouchbaseClusterService");
}

Expand Down
1 change: 1 addition & 0 deletions extensions/lua/LuaScriptProcessContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "core/ProcessSession.h"
#include "LuaScriptStateManager.h"
#include "minifi-cpp/core/ProcessContext.h"

namespace org::apache::nifi::minifi::extensions::lua {

Expand Down
90 changes: 0 additions & 90 deletions libminifi/src/utils/TimeUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

#include "utils/TimeUtil.h"

#include "fmt/format.h"
#include "fmt/chrono.h"
#include "range/v3/algorithm/contains.hpp"

#ifdef WIN32
#include "date/tz.h"
#endif
Expand All @@ -41,92 +37,6 @@ void setClock(std::shared_ptr<SteadyClock> clock) {
global_clock = std::move(clock);
}

namespace {
template<class... Durations>
std::tuple<Durations...> breakDownDurations(std::chrono::system_clock::duration input_duration) {
std::tuple<Durations...> result;

([&]<typename T>(T& duration) {
duration = std::chrono::duration_cast<std::decay_t<T>>(input_duration);
input_duration -= duration;
} (std::get<Durations>(result)), ...);

return result;
}

std::string formatAsDaysHoursMinutesSeconds(std::chrono::system_clock::duration input_duration) {
const auto durs = breakDownDurations<std::chrono::days, std::chrono::hours, std::chrono::minutes, std::chrono::seconds>(input_duration);
const auto& days = std::get<std::chrono::days>(durs);
std::string day_string;
if (days.count() > 0) {
day_string = fmt::format("{} {}", days.count(), days.count() == 1 ? "day, " : "days, ");
}
return fmt::format("{}{:02}:{:02}:{:02}",
day_string, std::get<std::chrono::hours>(durs).count(),
std::get<std::chrono::minutes>(durs).count(),
std::get<std::chrono::seconds>(durs).count());
}

template<class... Durations>
std::string formatAsRoundedLargestUnit(std::chrono::system_clock::duration input_duration) {
std::optional<std::string> rounded_value_str;
using std::chrono::duration_cast;
using std::chrono::duration;

((rounded_value_str = input_duration >= Durations(1)
? std::optional<std::string>{fmt::format("{:.2}", duration_cast<duration<double, typename Durations::period>>(input_duration))}
: std::nullopt) || ...);


if (!rounded_value_str) {
return fmt::format("{}", input_duration);
}

return *rounded_value_str;
}

} // namespace

std::string humanReadableDuration(std::chrono::system_clock::duration input_duration) {
if (input_duration > 5s) {
return formatAsDaysHoursMinutesSeconds(input_duration);
}

return formatAsRoundedLargestUnit<std::chrono::seconds, std::chrono::milliseconds, std::chrono::microseconds>(input_duration);
}

std::optional<std::chrono::system_clock::time_point> parseRfc3339(const std::string& str) {
std::istringstream stream(str);
date::year_month_day date_part{};
date::from_stream(stream, "%F", date_part);

if (stream.fail())
return std::nullopt;

constexpr std::string_view accepted_delimiters = "tT_ ";
char delimiter_char = 0;
stream.get(delimiter_char);

if (stream.fail() || !ranges::contains(accepted_delimiters, delimiter_char))
return std::nullopt;

std::chrono::system_clock::duration time_part;
std::chrono::minutes offset = 0min;
if (str.ends_with('Z') || str.ends_with('z')) {
date::from_stream(stream, "%T", time_part);
if (stream.fail())
return std::nullopt;
stream.get();
} else {
date::from_stream(stream, "%T%Ez", time_part, {}, &offset);
}

if (stream.fail() || (stream.peek() && !stream.eof()))
return std::nullopt;

return date::sys_days(date_part) + time_part - offset;
}

#ifdef WIN32
void dateSetGlobalInstall(const std::string& install) {
date::set_install(install);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
#include "integration/IntegrationBase.h"
#include "core/logging/Logger.h"
#include "core/Scheduling.h"
#include "minifi-cpp/core/Scheduling.h"
#include "core/state/ProcessorController.h"
#include "unit/TestBase.h"
#include "../../../extensions/test-processors/KamikazeProcessor.h"
Expand Down
56 changes: 56 additions & 0 deletions utils/src/utils/TimeUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "utils/TimeUtil.h"
#include "range/v3/algorithm/contains.hpp"
#include "fmt/format.h"
#include "fmt/chrono.h"

#ifdef WIN32
#include "date/tz.h"
Expand All @@ -26,6 +28,60 @@ namespace org::apache::nifi::minifi::utils::timeutils {

using namespace std::literals::chrono_literals;

namespace {
template<class... Durations>
std::tuple<Durations...> breakDownDurations(std::chrono::system_clock::duration input_duration) {
std::tuple<Durations...> result;

([&]<typename T>(T& duration) {
duration = std::chrono::duration_cast<std::decay_t<T>>(input_duration);
input_duration -= duration;
} (std::get<Durations>(result)), ...);

return result;
}

std::string formatAsDaysHoursMinutesSeconds(std::chrono::system_clock::duration input_duration) {
const auto durs = breakDownDurations<std::chrono::days, std::chrono::hours, std::chrono::minutes, std::chrono::seconds>(input_duration);
const auto& days = std::get<std::chrono::days>(durs);
std::string day_string;
if (days.count() > 0) {
day_string = fmt::format("{} {}", days.count(), days.count() == 1 ? "day, " : "days, ");
}
return fmt::format("{}{:02}:{:02}:{:02}",
day_string, std::get<std::chrono::hours>(durs).count(),
std::get<std::chrono::minutes>(durs).count(),
std::get<std::chrono::seconds>(durs).count());
}

template<class... Durations>
std::string formatAsRoundedLargestUnit(std::chrono::system_clock::duration input_duration) {
std::optional<std::string> rounded_value_str;
using std::chrono::duration_cast;
using std::chrono::duration;

((rounded_value_str = input_duration >= Durations(1)
? std::optional<std::string>{fmt::format("{:.2}", duration_cast<duration<double, typename Durations::period>>(input_duration))}
: std::nullopt) || ...);


if (!rounded_value_str) {
return fmt::format("{}", input_duration);
}

return *rounded_value_str;
}

} // namespace

std::string humanReadableDuration(std::chrono::system_clock::duration input_duration) {
if (input_duration > 5s) {
return formatAsDaysHoursMinutesSeconds(input_duration);
}

return formatAsRoundedLargestUnit<std::chrono::seconds, std::chrono::milliseconds, std::chrono::microseconds>(input_duration);
}

std::optional<std::chrono::system_clock::time_point> parseRfc3339(const std::string& str) {
std::istringstream stream(str);
date::year_month_day date_part{};
Expand Down

0 comments on commit 7949b98

Please sign in to comment.