Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address comment #487

Merged
merged 4 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions src/c++/perf_analyzer/client_backend/openai/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,25 @@ HttpRequest::GetNextInput(uint8_t* buf, size_t size, size_t* input_bytes)
}
}

std::mutex HttpClient::curl_init_mtx_{};
HttpClient::HttpClient(
const std::string& server_url, bool verbose,
const HttpSslOptions& ssl_options)
: url_(server_url), verbose_(verbose), ssl_options_(ssl_options)
{
auto* ver = curl_version_info(CURLVERSION_NOW);
if (ver->features & CURL_VERSION_THREADSAFE == 0) {
throw std::runtime_error(
"HTTP client has dependency on CURL library to have thread-safe "
"support (CURL_VERSION_THREADSAFE set)");
}
if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
throw std::runtime_error("CURL global initialization failed");
// [FIXME] uncommon below and remove class-wise mutex once confirm
// curl >= 7.84.0 will always be used
// auto* ver = curl_version_info(CURLVERSION_NOW);
// if (ver->features & CURL_VERSION_THREADSAFE == 0) {
// throw std::runtime_error(
// "HTTP client has dependency on CURL library to have thread-safe "
// "support (CURL_VERSION_THREADSAFE set)");
// }
{
std::lock_guard<std::mutex> lk(curl_init_mtx_);
if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
throw std::runtime_error("CURL global initialization failed");
}
}

multi_handle_ = curl_multi_init();
Expand All @@ -114,7 +120,10 @@ HttpClient::~HttpClient()
}
curl_multi_cleanup(multi_handle_);

curl_global_cleanup();
{
std::lock_guard<std::mutex> lk(curl_init_mtx_);
curl_global_cleanup();
}
}

const std::string&
Expand Down
1 change: 1 addition & 0 deletions src/c++/perf_analyzer/client_backend/openai/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,6 @@ class HttpClient {
private:
const std::string& ParseSslKeyType(HttpSslOptions::KEYTYPE key_type);
const std::string& ParseSslCertType(HttpSslOptions::CERTTYPE cert_type);
static std::mutex curl_init_mtx_;
};
}}}} // namespace triton::perfanalyzer::clientbackend::openai
19 changes: 16 additions & 3 deletions src/c++/perf_analyzer/client_backend/openai/openai_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,22 @@ size_t
ChatCompletionClient::ResponseHandler(
void* contents, size_t size, size_t nmemb, void* userp)
{
// [WIP] verify if the SSE responses received are complete, or the response
// need to be stitched first
// [TODO] verify if the SSE responses received are complete, or the response
// need to be stitched first.
// To verify, print out the received responses from SendResponse() to make
// sure the OpenAI server doesn't chunk the HTTP responses in the way that
// misaligns with the SSE responses.
// Reason of not stitching responses now is that it is a bit complicated that
// to make the write callback bulletproof is to assume the response can be
// chunked at arbitrary position, then bake in checking for SSE style
// (data:.*\n\n) by iterating all received buffer character by character.
size_t result_bytes = size * nmemb;
// return early if the response is empty as the response handling is
// triggered by the content of the response.
if (result_bytes == 0) {
return result_bytes;
}

auto request = reinterpret_cast<ChatCompletionRequest*>(userp);
if (request->timer_.Timestamp(
triton::client::RequestTimers::Kind::RECV_START) == 0) {
Expand All @@ -124,7 +138,6 @@ ChatCompletionClient::ResponseHandler(
}

char* buf = reinterpret_cast<char*>(contents);
size_t result_bytes = size * nmemb;
request->response_buffer_.append(buf, result_bytes);
// Send response now if streaming, otherwise wait until request has been
// completed
Expand Down
Loading