From 605217633fcb4736e62876eb46ccff688a28370d Mon Sep 17 00:00:00 2001 From: GuanLuo <41310872+GuanLuo@users.noreply.github.com> Date: Wed, 6 Mar 2024 13:33:39 -0800 Subject: [PATCH] Address comment (#487) * Address comment * Update src/c++/perf_analyzer/client_backend/openai/openai_client.cc * Update src/c++/perf_analyzer/client_backend/openai/http_client.cc * formatting --------- Co-authored-by: Timothy Gerdes <50968584+tgerdesnv@users.noreply.github.com> Co-authored-by: tgerdes --- .../client_backend/openai/http_client.cc | 27 ++++++++++++------- .../client_backend/openai/http_client.h | 1 + .../client_backend/openai/openai_client.cc | 18 ++++++++++--- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/c++/perf_analyzer/client_backend/openai/http_client.cc b/src/c++/perf_analyzer/client_backend/openai/http_client.cc index ff636388b..08e4b4b3c 100644 --- a/src/c++/perf_analyzer/client_backend/openai/http_client.cc +++ b/src/c++/perf_analyzer/client_backend/openai/http_client.cc @@ -76,19 +76,25 @@ HttpRequest::GetNextInput(uint8_t* buf, size_t size, size_t* input_bytes) } } +std::mutex HttpClient::curl_init_mtx_{}; HttpClient::HttpClient( const std::string& server_url, bool verbose, const HttpSslOptions& ssl_options) : url_(server_url), verbose_(verbose), ssl_options_(ssl_options) { - auto* ver = curl_version_info(CURLVERSION_NOW); - if (ver->features & CURL_VERSION_THREADSAFE == 0) { - throw std::runtime_error( - "HTTP client has dependency on CURL library to have thread-safe " - "support (CURL_VERSION_THREADSAFE set)"); - } - if (curl_global_init(CURL_GLOBAL_ALL) != 0) { - throw std::runtime_error("CURL global initialization failed"); + // [TODO TMA-1670] uncomment below and remove class-wise mutex once confirm + // curl >= 7.84.0 will always be used + // auto* ver = curl_version_info(CURLVERSION_NOW); + // if (ver->features & CURL_VERSION_THREADSAFE == 0) { + // throw std::runtime_error( + // "HTTP client has dependency on CURL library to have thread-safe " + // "support (CURL_VERSION_THREADSAFE set)"); + // } + { + std::lock_guard lk(curl_init_mtx_); + if (curl_global_init(CURL_GLOBAL_ALL) != 0) { + throw std::runtime_error("CURL global initialization failed"); + } } multi_handle_ = curl_multi_init(); @@ -114,7 +120,10 @@ HttpClient::~HttpClient() } curl_multi_cleanup(multi_handle_); - curl_global_cleanup(); + { + std::lock_guard lk(curl_init_mtx_); + curl_global_cleanup(); + } } const std::string& diff --git a/src/c++/perf_analyzer/client_backend/openai/http_client.h b/src/c++/perf_analyzer/client_backend/openai/http_client.h index 3c311569e..6b78d836e 100644 --- a/src/c++/perf_analyzer/client_backend/openai/http_client.h +++ b/src/c++/perf_analyzer/client_backend/openai/http_client.h @@ -168,5 +168,6 @@ class HttpClient { private: const std::string& ParseSslKeyType(HttpSslOptions::KEYTYPE key_type); const std::string& ParseSslCertType(HttpSslOptions::CERTTYPE cert_type); + static std::mutex curl_init_mtx_; }; }}}} // namespace triton::perfanalyzer::clientbackend::openai diff --git a/src/c++/perf_analyzer/client_backend/openai/openai_client.cc b/src/c++/perf_analyzer/client_backend/openai/openai_client.cc index 362278436..28e55f3c0 100644 --- a/src/c++/perf_analyzer/client_backend/openai/openai_client.cc +++ b/src/c++/perf_analyzer/client_backend/openai/openai_client.cc @@ -114,8 +114,21 @@ size_t ChatCompletionClient::ResponseHandler( void* contents, size_t size, size_t nmemb, void* userp) { - // [WIP] verify if the SSE responses received are complete, or the response - // need to be stitched first + // [TODO TMA-1666] verify if the SSE responses received are complete, or the + // response need to be stitched first. To verify, print out the received + // responses from SendResponse() to make sure the OpenAI server doesn't chunk + // the HTTP responses in the way that misaligns with the SSE responses. Reason + // of not stitching responses now is that it is a bit complicated that to make + // the write callback bulletproof is to assume the response can be chunked at + // arbitrary position, then bake in checking for SSE style (data:.*\n\n) by + // iterating all received buffer character by character. + size_t result_bytes = size * nmemb; + // return early if the response is empty as the response handling is + // triggered by the content of the response. + if (result_bytes == 0) { + return result_bytes; + } + auto request = reinterpret_cast(userp); if (request->timer_.Timestamp( triton::client::RequestTimers::Kind::RECV_START) == 0) { @@ -124,7 +137,6 @@ ChatCompletionClient::ResponseHandler( } char* buf = reinterpret_cast(contents); - size_t result_bytes = size * nmemb; request->response_buffer_.append(buf, result_bytes); // Send response now if streaming, otherwise wait until request has been // completed