Skip to content

Commit e02f5d6

Browse files
authored
Fix windows compatibility. (#17872)
Missed #ifdef has been added. MSG_ZEROCOPY related code works only on linux.
1 parent fa086bc commit e02f5d6

File tree

4 files changed

+51
-13
lines changed

4 files changed

+51
-13
lines changed

ydb/library/actors/interconnect/interconnect_stream.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ namespace NInterconnect {
125125
return ret;
126126
}
127127

128+
#if defined(__linux__)
128129
ssize_t
129130
TStreamSocket::RecvErrQueue(struct msghdr* msg) const {
130131
const auto ret = ::recvmsg(Descriptor, msg, MSG_ERRQUEUE);
@@ -133,6 +134,7 @@ namespace NInterconnect {
133134

134135
return ret;
135136
}
137+
#endif
136138

137139
ssize_t
138140
TStreamSocket::Recv(void* buf, size_t len, TString* /*err*/) const {

ydb/library/actors/interconnect/interconnect_stream.h

+2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ namespace NInterconnect {
6060
virtual ssize_t ReadV(const struct iovec* iov, int iovcnt) const;
6161

6262
ssize_t SendWithFlags(const void* msg, size_t len, int flags) const;
63+
#if defined(__linux__)
6364
ssize_t RecvErrQueue(struct msghdr* msg) const;
65+
#endif
6466

6567
int Connect(const TAddress& addr) const;
6668
int Connect(const NAddr::IRemoteAddr* addr) const;

ydb/library/actors/interconnect/interconnect_zc_processor.cpp

+43-13
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ size_t AdjustLen(std::span<const TConstIoVec> wbuf, std::span<const TOutgoingStr
150150
}
151151

152152
void TInterconnectZcProcessor::DoProcessNotification(NInterconnect::TStreamSocket& socket) {
153+
#ifdef YDB_MSG_ZEROCOPY_SUPPORTED
153154
const TProcessErrQueueResult res = DoProcessErrQueue(socket);
154155

155156
std::visit(TOverloaded{
@@ -177,6 +178,9 @@ void TInterconnectZcProcessor::DoProcessNotification(NInterconnect::TStreamSocke
177178
AddErr("Hidden copy during ZC operation");
178179
ZcState = ZC_DISABLED_HIDDEN_COPY;
179180
}
181+
#else
182+
Y_UNUSED(socket);
183+
#endif
180184
}
181185

182186
void TInterconnectZcProcessor::ApplySocketOption(NInterconnect::TStreamSocket& socket)
@@ -191,6 +195,8 @@ void TInterconnectZcProcessor::ApplySocketOption(NInterconnect::TStreamSocket& s
191195
ResetState();
192196
}
193197
}
198+
#else
199+
Y_UNUSED(socket);
194200
#endif
195201
}
196202

@@ -264,10 +270,6 @@ ssize_t TInterconnectZcProcessor::ProcessSend(std::span<TConstIoVec> wbuf, TStre
264270
return r;
265271
}
266272

267-
TInterconnectZcProcessor::TInterconnectZcProcessor(bool enabled)
268-
: ZcState(enabled ? ZC_OK : ZC_DISABLED)
269-
{}
270-
271273
TString TInterconnectZcProcessor::GetCurrentStateName() const {
272274
switch (ZcState) {
273275
case ZC_DISABLED:
@@ -293,8 +295,23 @@ TString TInterconnectZcProcessor::ExtractErrText() {
293295
}
294296
}
295297

298+
void TInterconnectZcProcessor::AddErr(const TString& err) {
299+
if (LastErr) {
300+
LastErr.reserve(err.size() + 2);
301+
LastErr += ", ";
302+
LastErr += err;
303+
} else {
304+
LastErr = err;
305+
}
306+
}
307+
296308
///////////////////////////////////////////////////////////////////////////////
297309

310+
#ifdef YDB_MSG_ZEROCOPY_SUPPORTED
311+
TInterconnectZcProcessor::TInterconnectZcProcessor(bool enabled)
312+
: ZcState(enabled ? ZC_OK : ZC_DISABLED)
313+
{}
314+
298315
// Guard part.
299316
// We must guarantee liveness of buffers used for zc
300317
// until enqueued zc operation completed by kernel
@@ -399,20 +416,33 @@ class TGuardRunner : public IZcGuard {
399416
std::list<TEventHolder> Delayed;
400417
};
401418

402-
void TInterconnectZcProcessor::AddErr(const TString& err) {
403-
if (LastErr) {
404-
LastErr.reserve(err.size() + 2);
405-
LastErr += ", ";
406-
LastErr += err;
407-
} else {
408-
LastErr = err;
409-
}
419+
std::unique_ptr<IZcGuard> TInterconnectZcProcessor::GetGuard()
420+
{
421+
return std::make_unique<TGuardRunner>(SendAsZc, Confirmed);
410422
}
411423

424+
#else
425+
TInterconnectZcProcessor::TInterconnectZcProcessor(bool)
426+
: ZcState(ZC_DISABLED)
427+
{}
428+
429+
class TDummyGuardRunner : public IZcGuard {
430+
public:
431+
TDummyGuardRunner(ui64 uncompleted, ui64 confirmed)
432+
{
433+
Y_UNUSED(uncompleted);
434+
Y_UNUSED(confirmed);
435+
}
436+
437+
void ExtractToSafeTermination(std::list<TEventHolder>&) noexcept override {}
438+
void Terminate(std::unique_ptr<NActors::TEventHolderPool>&&, TIntrusivePtr<NInterconnect::TStreamSocket>, const NActors::TActorContext&) override {}
439+
};
412440

413441
std::unique_ptr<IZcGuard> TInterconnectZcProcessor::GetGuard()
414442
{
415-
return std::make_unique<TGuardRunner>(SendAsZc, Confirmed);
443+
return std::make_unique<TDummyGuardRunner>(SendAsZc, Confirmed);
416444
}
417445

446+
#endif
447+
418448
}

ydb/library/actors/interconnect/ut_fat/main.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,10 @@ Y_UNIT_TEST_SUITE(InterconnectZcLocalOp) {
212212

213213
NanoSleep(5ULL * 1000 * 1000 * 1000);
214214
// Zero copy send via loopback causes hidden copy inside linux kernel
215+
#if defined (__linux__)
215216
UNIT_ASSERT_VALUES_EQUAL("DisabledHiddenCopy", GetZcState(testCluster, 1, 2));
217+
#else
218+
UNIT_ASSERT_VALUES_EQUAL("Disabled", GetZcState(testCluster, 1, 2));
219+
#endif
216220
}
217221
}

0 commit comments

Comments
 (0)