Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dns check #381

Merged
merged 2 commits into from
Feb 22, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 47 additions & 17 deletions util/fibers/dns_resolve.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ struct AresSocketState {

struct AresChannelState {
ProactorBase* proactor;
detail::FiberInterface* fiber_ctx = nullptr;
fb2::CondVarAny cond;

absl::flat_hash_map<ares_socket_t, AresSocketState> sockets_state;
};
Expand Down Expand Up @@ -77,6 +77,7 @@ void UpdateSocketsCallback(void* arg, ares_socket_t socket_fd, int readable, int
it->second.removed = true;
} else {
auto [it, inserted] = state->sockets_state.try_emplace(socket_fd);

if (inserted || it->second.removed) {
AresSocketState& socket_state = it->second;
socket_state.mask = mask;
Expand All @@ -85,33 +86,32 @@ void UpdateSocketsCallback(void* arg, ares_socket_t socket_fd, int readable, int
if (state->proactor->GetKind() == ProactorBase::EPOLL) {
EpollProactor* epoll = (EpollProactor*)state->proactor;
auto cb = [state](uint32_t event_mask, int err, EpollProactor* me) {
if (state->fiber_ctx) {
ActivateSameThread(detail::FiberActive(), state->fiber_ctx);
}
state->cond.notify_one();
};
socket_state.arm_index = epoll->Arm(socket_fd, std::move(cb), mask);
} else {
CHECK_EQ(state->proactor->GetKind(), ProactorBase::IOURING);
#ifdef __linux__
UringProactor* uring = (UringProactor*)state->proactor;
auto cb = [state](uint32_t event_mask) {
VLOG(2) << "ArmCb: " << event_mask << " " << state->fiber_ctx << " "
<< state->sockets_state.size();
if (state->fiber_ctx) {
ActivateSameThread(detail::FiberActive(), state->fiber_ctx);
}
VLOG(2) << "ArmCb: " << event_mask << " " << state->sockets_state.size();
state->cond.notify_one();
};
socket_state.arm_index = uring->EpollAdd(socket_fd, std::move(cb), mask);
DVLOG(1) << "EpollAdd " << socket_fd << ", mask: " << mask
<< " index: " << socket_state.arm_index;
#endif
}
} else {
VLOG(1) << "Skipped updating the state for " << socket_fd;
}
}
}

void DnsResolveCallback(void* ares_arg, int status, int timeouts, struct ares_addrinfo* res) {
auto* cb_args = static_cast<DnsResolveCallbackArgs*>(ares_arg);
cb_args->done = true;
VLOG(1) << "DnsResolveCallback: " << status << " " << timeouts << " " << res->nodes;
VLOG(1) << "DnsResolveCallback: " << status << " " << timeouts << " " << (res ? res->name : "");

if (status != ARES_SUCCESS || res->nodes == nullptr) {
cb_args->ec = make_error_code(errc::address_not_available);
Expand Down Expand Up @@ -141,16 +141,39 @@ void DnsResolveCallback(void* ares_arg, int status, int timeouts, struct ares_ad
}
}

// TODO: to redesign the whole logic to make it a process-wide singleton.
// ProcessChannel should become a handler running by the proactor.
// Instead of blocking on cv, we can use proactor to set up a timer.
// ProcessChannel should not be call specific, and should serve multiple calls and should run
// indefinitely together with the proactor.
// The way I imagine it should work, ProactorBase should enable DNS functionality.
// ProactorPool enables DNS functionality by choosing a single proactor from the pool.
// DnsResolve knows which proactor handles dns requests and makes hops to pass Dns requests
// to the proactor.
void ProcessChannel(ares_channel channel, AresChannelState* state, DnsResolveCallbackArgs* args) {
auto* myself = detail::FiberActive();

while (!args->done) {
// It's important to set and reset fiber_ctx close to Suspend, to avoid the case
// where EPOLL callbacks wake up the fiber in the wrong place.
// ares_process_fd calls helio code that in turn can suspend a fiber as well.
state->fiber_ctx = myself;
myself->Suspend();
state->fiber_ctx = nullptr;
timeval timeout;
timeval* timeout_result = ares_timeout(channel, nullptr, &timeout);

fb2::NoOpLock lk;
if (timeout_result) {
uint64_t ms = timeout_result->tv_sec * 1000 + timeout_result->tv_usec / 1000;
VLOG(2) << "blocking on timeout " << ms << "ms";
cv_status st = state->cond.wait_for(lk, chrono::milliseconds(ms));
if (st == cv_status::timeout) {
VLOG_IF(1, !state->sockets_state.empty())
<< "Timed out on waiting for fd: " << state->sockets_state.begin()->first;
// according to https://c-ares.org/docs/ares_process_fd.html
// in case of timeouts we pass ARES_SOCKET_BAD to ares_process_fd.
ares_process_fd(channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD);
continue;
}
} else {
state->cond.wait(lk);
}

for (const auto& [socket_fd, socket_state] : state->sockets_state) {
int read_fd = HasReads(socket_state.mask) ? socket_fd : ARES_SOCKET_BAD;
Expand All @@ -166,17 +189,24 @@ void ProcessChannel(ares_channel channel, AresChannelState* state, DnsResolveCal
error_code DnsResolve(const string& host, uint32_t wait_ms, char dest_ip[],
ProactorBase* proactor) {
DCHECK(ProactorBase::me() == proactor) << "must call from the proactor thread";
VLOG(1) << "DnsResolveStart";
VLOG(1) << "DnsResolveStart " << host;

AresChannelState state;
state.proactor = proactor;

ares_options options = {};
options.sock_state_cb = &UpdateSocketsCallback;
options.sock_state_cb_data = &state;
char lookups[] = "fb";
options.lookups = lookups; // hosts file first, then DNS

// set timeout
options.timeout = wait_ms;
// TODO: use options.qcache_max_ttl once we be able to reuse cares channel.

ares_channel channel;
CHECK_EQ(ares_init_options(&channel, &options, ARES_OPT_SOCK_STATE_CB), ARES_SUCCESS);
constexpr int kOptions = ARES_OPT_SOCK_STATE_CB | ARES_OPT_LOOKUPS | ARES_OPT_TIMEOUTMS;
CHECK_EQ(ares_init_options(&channel, &options, kOptions), ARES_SUCCESS);

DnsResolveCallbackArgs cb_args;
cb_args.dest_ip = dest_ip;
Expand Down
Loading