Skip to content

Commit

Permalink
Address comment (#487)
Browse files Browse the repository at this point in the history
* Address comment

* Update src/c++/perf_analyzer/client_backend/openai/openai_client.cc

* Update src/c++/perf_analyzer/client_backend/openai/http_client.cc

* formatting

---------

Co-authored-by: Timothy Gerdes <50968584+tgerdesnv@users.noreply.github.com>
Co-authored-by: tgerdes <tgerdes@nvidia.com>
  • Loading branch information
3 people committed Mar 6, 2024
1 parent 7df09ef commit 6052176
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
27 changes: 18 additions & 9 deletions src/c++/perf_analyzer/client_backend/openai/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,25 @@ HttpRequest::GetNextInput(uint8_t* buf, size_t size, size_t* input_bytes)
}
}

std::mutex HttpClient::curl_init_mtx_{};
HttpClient::HttpClient(
const std::string& server_url, bool verbose,
const HttpSslOptions& ssl_options)
: url_(server_url), verbose_(verbose), ssl_options_(ssl_options)
{
auto* ver = curl_version_info(CURLVERSION_NOW);
if (ver->features & CURL_VERSION_THREADSAFE == 0) {
throw std::runtime_error(
"HTTP client has dependency on CURL library to have thread-safe "
"support (CURL_VERSION_THREADSAFE set)");
}
if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
throw std::runtime_error("CURL global initialization failed");
// [TODO TMA-1670] uncomment below and remove class-wise mutex once confirm
// curl >= 7.84.0 will always be used
// auto* ver = curl_version_info(CURLVERSION_NOW);
// if (ver->features & CURL_VERSION_THREADSAFE == 0) {
// throw std::runtime_error(
// "HTTP client has dependency on CURL library to have thread-safe "
// "support (CURL_VERSION_THREADSAFE set)");
// }
{
std::lock_guard<std::mutex> lk(curl_init_mtx_);
if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
throw std::runtime_error("CURL global initialization failed");
}
}

multi_handle_ = curl_multi_init();
Expand All @@ -114,7 +120,10 @@ HttpClient::~HttpClient()
}
curl_multi_cleanup(multi_handle_);

curl_global_cleanup();
{
std::lock_guard<std::mutex> lk(curl_init_mtx_);
curl_global_cleanup();
}
}

const std::string&
Expand Down
1 change: 1 addition & 0 deletions src/c++/perf_analyzer/client_backend/openai/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,6 @@ class HttpClient {
private:
const std::string& ParseSslKeyType(HttpSslOptions::KEYTYPE key_type);
const std::string& ParseSslCertType(HttpSslOptions::CERTTYPE cert_type);
static std::mutex curl_init_mtx_;
};
}}}} // namespace triton::perfanalyzer::clientbackend::openai
18 changes: 15 additions & 3 deletions src/c++/perf_analyzer/client_backend/openai/openai_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,21 @@ size_t
ChatCompletionClient::ResponseHandler(
void* contents, size_t size, size_t nmemb, void* userp)
{
// [WIP] verify if the SSE responses received are complete, or the response
// need to be stitched first
// [TODO TMA-1666] verify if the SSE responses received are complete, or the
// response need to be stitched first. To verify, print out the received
// responses from SendResponse() to make sure the OpenAI server doesn't chunk
// the HTTP responses in the way that misaligns with the SSE responses. Reason
// of not stitching responses now is that it is a bit complicated that to make
// the write callback bulletproof is to assume the response can be chunked at
// arbitrary position, then bake in checking for SSE style (data:.*\n\n) by
// iterating all received buffer character by character.
size_t result_bytes = size * nmemb;
// return early if the response is empty as the response handling is
// triggered by the content of the response.
if (result_bytes == 0) {
return result_bytes;
}

auto request = reinterpret_cast<ChatCompletionRequest*>(userp);
if (request->timer_.Timestamp(
triton::client::RequestTimers::Kind::RECV_START) == 0) {
Expand All @@ -124,7 +137,6 @@ ChatCompletionClient::ResponseHandler(
}

char* buf = reinterpret_cast<char*>(contents);
size_t result_bytes = size * nmemb;
request->response_buffer_.append(buf, result_bytes);
// Send response now if streaming, otherwise wait until request has been
// completed
Expand Down

0 comments on commit 6052176

Please sign in to comment.