Skip to content

Commit 6252118

Browse files
authored
feat(cpp-client): Honor Arrow TimeUnit when converting Timestamp and Time64 to ColumnSource (#6609)
This PR has adds two tests but leaves one disabled. The disabled one will fail on the main branch; that is why it is disabled. To run all the tests, execute this command. ``` ./gradlew :cpp-client:testCppClient ``` Then, if you rebase this branch onto your `nate/nightly/barrage_types` branch, you can re-enable the disabled test by editing the file `cpp-client/deephaven/tests/src/time_unit_test.cc` and removing the tag `[.hidden]`. Currently that tag is at line 75. You should then be able to run all the unit tests with the same gradlew command above and they should all pass.
1 parent 1feacf4 commit 6252118

File tree

9 files changed

+323
-23
lines changed

9 files changed

+323
-23
lines changed

cpp-client/deephaven/dhclient/CMakeLists.txt

+2
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ set(ALL_FILES
7474
include/private/deephaven/client/impl/util.h
7575

7676
src/arrowutil/arrow_client_table.cc
77+
src/arrowutil/arrow_column_source.cc
7778
include/private/deephaven/client/arrowutil/arrow_client_table.h
7879
include/private/deephaven/client/arrowutil/arrow_column_source.h
7980
include/private/deephaven/client/arrowutil/arrow_value_converter.h
@@ -107,6 +108,7 @@ set(ALL_FILES
107108
src/utility/table_maker.cc
108109

109110
include/public/deephaven/client/utility/arrow_util.h
111+
include/public/deephaven/client/utility/internal_types.h
110112
include/public/deephaven/client/utility/misc_types.h
111113
include/public/deephaven/client/utility/table_maker.h
112114
)

cpp-client/deephaven/dhclient/include/private/deephaven/client/arrowutil/arrow_column_source.h

+40-9
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,22 @@ namespace internal {
2222
// null-ness by determining whether the optional has a value.
2323
// kTimestamp is its own special case, where nullness is determined by the underlying nanos
2424
// being equal to Deephaven's NULL_LONG.
25-
// kLocalDate and kLocalTime are like kTimestamp except they resolve to different data types.
25+
// kLocalDate and kLocalTime are similar to kTimestamp in nullness except they resolve to different
26+
// data types.
2627
enum class ArrowProcessingStyle { kNormal, kBooleanOrString, kTimestamp, kLocalDate, kLocalTime };
2728

29+
/**
30+
* When 'array' has dynamic type arrow::TimestampArray or arrow::Time64Array, look at the
31+
* underlying time resolution of the arrow type and calculate a conversion factor from that unit
32+
* to nanoseconds. For example if the underlying time unit is arrow::TimeUnit::MILLI, then the
33+
* conversion factor would be 1_000_000, meaning that one needs to multiply incoming millisecond
34+
* values by one million to convert them to nanoseconds. If 'array' is not one of those types,
35+
* return 1.
36+
* @param array The Arrow array
37+
* @return For supported time types, the conversion factor to nanoseconds. Otherwise, 1.
38+
*/
39+
size_t CalcTimeNanoScaleFactor(const arrow::Array &array);
40+
2841
template<ArrowProcessingStyle Style, typename TColumnSourceBase, typename TArrowArray, typename TChunk>
2942
class GenericArrowColumnSource final : public TColumnSourceBase {
3043
using BooleanChunk = deephaven::dhcore::chunk::BooleanChunk;
@@ -37,7 +50,8 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
3750
using UInt64Chunk = deephaven::dhcore::chunk::UInt64Chunk;
3851

3952
public:
40-
static std::shared_ptr<GenericArrowColumnSource> OfArrowArray(std::shared_ptr<TArrowArray> array) {
53+
static std::shared_ptr<GenericArrowColumnSource>
54+
OfArrowArray(std::shared_ptr<TArrowArray> array) {
4155
std::vector<std::shared_ptr<TArrowArray>> arrays{std::move(array)};
4256
return OfArrowArrayVec(std::move(arrays));
4357
}
@@ -48,7 +62,9 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
4862
}
4963

5064
explicit GenericArrowColumnSource(std::vector<std::shared_ptr<TArrowArray>> arrays) :
51-
arrays_(std::move(arrays)) {}
65+
arrays_(std::move(arrays)) {
66+
time_nano_scale_factor_ = arrays_.empty() ? 1 : CalcTimeNanoScaleFactor(*arrays_.front());
67+
}
5268

5369
~GenericArrowColumnSource() final = default;
5470

@@ -67,13 +83,14 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
6783

6884
// This algorithm is a little tricky because the source data and RowSequence are both
6985
// segmented, perhaps in different ways.
70-
auto *typed_dest = VerboseCast<TChunk*>(DEEPHAVEN_LOCATION_EXPR(dest_data));
86+
auto *typed_dest = VerboseCast<TChunk *>(DEEPHAVEN_LOCATION_EXPR(dest_data));
7187
auto *destp = typed_dest->data();
7288
auto outerp = arrays_.begin();
7389
size_t src_segment_begin = 0;
7490
size_t src_segment_end = (*outerp)->length();
7591

76-
auto *null_destp = optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr;
92+
auto *null_destp =
93+
optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr;
7794

7895
rows.ForEachInterval([&](uint64_t requested_segment_begin, uint64_t requested_segment_end) {
7996
while (true) {
@@ -147,11 +164,12 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
147164
const auto *src_endp = innerp->raw_values() + relative_end;
148165

149166
for (const auto *ip = src_beginp; ip != src_endp; ++ip) {
150-
*destp = DateTime::FromNanos(*ip);
167+
auto is_null = *ip == DeephavenTraits<int64_t>::kNullValue;
168+
*destp = DateTime::FromNanos(is_null ? *ip : (*ip * time_nano_scale_factor_));
151169
++destp;
152170

153171
if (null_destp != nullptr) {
154-
*null_destp = *ip == DeephavenTraits<int64_t>::kNullValue;
172+
*null_destp = is_null;
155173
++null_destp;
156174
}
157175
}
@@ -175,11 +193,12 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
175193
const auto *src_endp = innerp->raw_values() + relative_end;
176194

177195
for (const auto *ip = src_beginp; ip != src_endp; ++ip) {
178-
*destp = LocalTime::FromNanos(*ip);
196+
auto is_null = *ip == DeephavenTraits<int64_t>::kNullValue;
197+
*destp = LocalTime::FromNanos(is_null ? *ip : (*ip * time_nano_scale_factor_));
179198
++destp;
180199

181200
if (null_destp != nullptr) {
182-
*null_destp = *ip == DeephavenTraits<int64_t>::kNullValue;
201+
*null_destp = is_null;
183202
++null_destp;
184203
}
185204
}
@@ -200,6 +219,18 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
200219

201220
private:
202221
std::vector<std::shared_ptr<TArrowArray>> arrays_;
222+
/**
223+
* This value is valid for Style == ArrowProcessingStyle::kTimestamp and
224+
* ArrowProcessingStyle::kLocalTime, and ignored for other ArrowProcessingStyle enumeration
225+
* values.
226+
*
227+
* These ArrowProcessingStyles come into play when processing the arrow types
228+
* arrow::TimestampType and arrow::Time64Type respectively.
229+
*
230+
* The value stores a conversion factor from whatever the input scale is to nanoseconds.
231+
* For example, if the input timescale is milliseconds, this value will be 1_000_000.
232+
*/
233+
size_t time_nano_scale_factor_ = 1;
203234
};
204235
} // namespace internal
205236

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
3+
*/
4+
#pragma once
5+
6+
#include <arrow/flight/types.h>
7+
8+
namespace deephaven::client::utility {
9+
/**
10+
* For Deephaven use only
11+
*/
12+
namespace internal {
13+
/**
14+
* This class exists only for the benefit of the unit tests. Our normal DateTime class has a
15+
* native time resolution of nanoseconds. This class allows our unit tests to upload a
16+
* DateTime having a different time unit so we can confirm that the client and server both handle
17+
* it correctly.
18+
*/
19+
template<arrow::TimeUnit::type UNIT>
20+
struct InternalDateTime {
21+
explicit InternalDateTime(int64_t value) : value_(value) {}
22+
23+
int64_t value_ = 0;
24+
};
25+
26+
/**
27+
* This class exists only for the benefit of the unit tests. Our normal LocalTime class has a
28+
* native time resolution of nanoseconds. This class allows our unit tests to upload a
29+
* LocalTime having a different time unit so we can confirm that the client and server both handle
30+
* it correctly.
31+
*/
32+
template<arrow::TimeUnit::type UNIT>
33+
struct InternalLocalTime {
34+
// Arrow Time64 only supports micro and nano units
35+
static_assert(UNIT == arrow::TimeUnit::MICRO || UNIT == arrow::TimeUnit::NANO);
36+
37+
explicit InternalLocalTime(int64_t value) : value_(value) {}
38+
39+
int64_t value_ = 0;
40+
};
41+
} // namespace internal
42+
} // namespace deephaven::client::utility

cpp-client/deephaven/dhclient/include/public/deephaven/client/utility/table_maker.h

+35
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
#include "deephaven/client/client.h"
2121
#include "deephaven/client/utility/arrow_util.h"
22+
#include "deephaven/client/utility/internal_types.h"
23+
#include "deephaven/client/utility/misc_types.h"
24+
#include "deephaven/dhcore/types.h"
2225
#include "deephaven/dhcore/utility/utility.h"
2326
#include "deephaven/third_party/fmt/format.h"
2427

@@ -356,6 +359,38 @@ struct TypeConverterTraits<std::optional<T>> {
356359
}
357360
};
358361

362+
template<arrow::TimeUnit::type UNIT>
363+
struct TypeConverterTraits<deephaven::client::utility::internal::InternalDateTime<UNIT>> {
364+
static std::shared_ptr<arrow::DataType> GetDataType() {
365+
return arrow::timestamp(UNIT, "UTC");
366+
}
367+
static arrow::TimestampBuilder GetBuilder() {
368+
return arrow::TimestampBuilder(GetDataType(), arrow::default_memory_pool());
369+
}
370+
static int64_t Reinterpret(const deephaven::client::utility::internal::InternalDateTime<UNIT> &o) {
371+
return o.value_;
372+
}
373+
static std::string_view GetDeephavenTypeName() {
374+
return "java.time.ZonedDateTime";
375+
}
376+
};
377+
378+
template<arrow::TimeUnit::type UNIT>
379+
struct TypeConverterTraits<deephaven::client::utility::internal::InternalLocalTime<UNIT>> {
380+
static std::shared_ptr<arrow::DataType> GetDataType() {
381+
return arrow::time64(UNIT);
382+
}
383+
static arrow::Time64Builder GetBuilder() {
384+
return arrow::Time64Builder(GetDataType(), arrow::default_memory_pool());
385+
}
386+
static int64_t Reinterpret(const deephaven::client::utility::internal::InternalLocalTime<UNIT> &o) {
387+
return o.value_;
388+
}
389+
static std::string_view GetDeephavenTypeName() {
390+
return "java.time.LocalTime";
391+
}
392+
};
393+
359394
template<typename T>
360395
TypeConverter TypeConverter::CreateNew(const std::vector<T> &values) {
361396
using deephaven::client::utility::OkOrThrow;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
3+
*/
4+
#include "deephaven/client/arrowutil/arrow_column_source.h"
5+
#include "deephaven/client/utility/arrow_util.h"
6+
7+
using deephaven::client::utility::OkOrThrow;
8+
9+
namespace deephaven::client::arrowutil {
10+
namespace internal {
11+
12+
namespace {
13+
struct NanoScaleFactorVisitor final : public arrow::TypeVisitor {
14+
size_t result_ = 1;
15+
16+
arrow::Status Visit(const arrow::Int8Type &/*type*/) final {
17+
return arrow::Status::OK();
18+
}
19+
20+
arrow::Status Visit(const arrow::Int16Type &/*type*/) final {
21+
return arrow::Status::OK();
22+
}
23+
24+
arrow::Status Visit(const arrow::Int32Type &/*type*/) final {
25+
return arrow::Status::OK();
26+
}
27+
28+
arrow::Status Visit(const arrow::Int64Type &/*type*/) final {
29+
return arrow::Status::OK();
30+
}
31+
32+
arrow::Status Visit(const arrow::FloatType &/*type*/) final {
33+
return arrow::Status::OK();
34+
}
35+
36+
arrow::Status Visit(const arrow::DoubleType &/*type*/) final {
37+
return arrow::Status::OK();
38+
}
39+
40+
arrow::Status Visit(const arrow::BooleanType &/*type*/) final {
41+
return arrow::Status::OK();
42+
}
43+
44+
arrow::Status Visit(const arrow::UInt16Type &/*type*/) final {
45+
return arrow::Status::OK();
46+
}
47+
48+
arrow::Status Visit(const arrow::StringType &/*type*/) final {
49+
return arrow::Status::OK();
50+
}
51+
52+
arrow::Status Visit(const arrow::TimestampType &type) final {
53+
result_ = ScaleFromUnit(type.unit());
54+
return arrow::Status::OK();
55+
}
56+
57+
arrow::Status Visit(const arrow::Date64Type &/*type*/) final {
58+
return arrow::Status::OK();
59+
}
60+
61+
arrow::Status Visit(const arrow::Time64Type &type) final {
62+
result_ = ScaleFromUnit(type.unit());
63+
return arrow::Status::OK();
64+
}
65+
66+
static size_t ScaleFromUnit(arrow::TimeUnit::type unit) {
67+
switch (unit) {
68+
case arrow::TimeUnit::SECOND: return 1'000'000'000;
69+
case arrow::TimeUnit::MILLI: return 1'000'000;
70+
case arrow::TimeUnit::MICRO: return 1'000;
71+
case arrow::TimeUnit::NANO: return 1;
72+
default: {
73+
auto message = fmt::format("Unhandled arrow::TimeUnit {}", static_cast<size_t>(unit));
74+
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
75+
}
76+
}
77+
}
78+
};
79+
} // namespace
80+
81+
size_t CalcTimeNanoScaleFactor(const arrow::Array &array) {
82+
NanoScaleFactorVisitor visitor;
83+
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(array.type()->Accept(&visitor)));
84+
return visitor.result_;
85+
}
86+
} // namespace internal
87+
} // namespace deephaven::client::arrowutil

cpp-client/deephaven/dhclient/src/utility/arrow_util.cc

+2-12
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,7 @@ struct ArrowToElementTypeId final : public arrow::TypeVisitor {
8181
return arrow::Status::OK();
8282
}
8383

84-
arrow::Status Visit(const arrow::TimestampType &type) final {
85-
if (type.unit() != arrow::TimeUnit::NANO) {
86-
auto message = fmt::format("Expected TimestampType with nano units, got {}",
87-
type.ToString());
88-
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
89-
}
84+
arrow::Status Visit(const arrow::TimestampType &/*type*/) final {
9085
type_id_ = ElementTypeId::kTimestamp;
9186
return arrow::Status::OK();
9287
}
@@ -96,12 +91,7 @@ struct ArrowToElementTypeId final : public arrow::TypeVisitor {
9691
return arrow::Status::OK();
9792
}
9893

99-
arrow::Status Visit(const arrow::Time64Type &type) final {
100-
if (type.unit() != arrow::TimeUnit::NANO) {
101-
auto message = fmt::format("Expected Time64Type with nano units, got {}",
102-
type.ToString());
103-
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
104-
}
94+
arrow::Status Visit(const arrow::Time64Type &/*type*/) final {
10595
type_id_ = ElementTypeId::kLocalTime;
10696
return arrow::Status::OK();
10797
}

cpp-client/deephaven/dhcore/include/public/deephaven/dhcore/types.h

-2
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,6 @@ class LocalTime {
541541
/**
542542
* Converts nanoseconds-since-start-of-day to LocalTime. The Deephaven null value sentinel is
543543
* turned into LocalTime(0).
544-
* TODO(kosak): find out null convention
545544
* @param nanos Nanoseconds since the start of the day.
546545
* @return The corresponding LocalTime.
547546
*/
@@ -579,7 +578,6 @@ class LocalTime {
579578
return !(lhs == rhs);
580579
}
581580
};
582-
583581
} // namespace deephaven::dhcore
584582

585583
template<> struct fmt::formatter<deephaven::dhcore::DateTime> : ostream_formatter {};

cpp-client/deephaven/tests/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ add_executable(dhclient_tests
2727
src/table_test.cc
2828
src/test_util.cc
2929
src/ticking_test.cc
30+
src/time_unit_test.cc
3031
src/ungroup_test.cc
3132
src/update_by_test.cc
3233
src/utility_test.cc

0 commit comments

Comments
 (0)