@@ -846,100 +846,6 @@ std::string awsCanonicalURI(const std::string& resource, std::vector<std::string
846
846
return canonicalURI;
847
847
}
848
848
849
- // ref: https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
850
- std::string parseErrorCodeFromS3 (std::string xmlResponse) {
851
- // Copy XML string to a modifiable buffer
852
- try {
853
- std::vector<char > xmlBuffer (xmlResponse.begin (), xmlResponse.end ());
854
- xmlBuffer.push_back (' \0 ' ); // Ensure null-terminated string
855
- // Parse the XML
856
- xml_document<> doc;
857
- doc.parse <0 >(&xmlBuffer[0 ]);
858
- // Find the root node
859
- xml_node<>* root = doc.first_node (" Error" );
860
- if (!root) {
861
- TraceEvent (SevWarn, " ParseS3XMLResponseNoError" ).detail (" Response" , xmlResponse).log ();
862
- return " " ;
863
- }
864
- // Find the <Code> node
865
- xml_node<>* codeNode = root->first_node (" Code" );
866
- if (!codeNode) {
867
- TraceEvent (SevWarn, " ParseS3XMLResponseNoErrorCode" ).detail (" Response" , xmlResponse).log ();
868
- return " " ;
869
- }
870
- return std::string (codeNode->value ());
871
- } catch (Error e) {
872
- TraceEvent (" BackupParseS3ErrorCodeFailure" ).errorUnsuppressed (e);
873
- throw backup_parse_s3_response_failure ();
874
- } catch (...) {
875
- throw backup_parse_s3_response_failure ();
876
- }
877
- }
878
-
879
- bool isS3TokenError (const std::string& s3Error) {
880
- return s3Error == " InvalidToken" || s3Error == " ExpiredToken" ;
881
- }
882
-
883
- void setHeaders (Reference<S3BlobStoreEndpoint> bstore, Reference<HTTP::OutgoingRequest> req) {
884
- // Finish/update the request headers (which includes Date header)
885
- // This must be done AFTER the connection is ready because if credentials are coming from disk they are
886
- // refreshed when a new connection is established and setAuthHeaders() would need the updated secret.
887
- if (bstore->credentials .present () && !bstore->credentials .get ().securityToken .empty ())
888
- req->data .headers [" x-amz-security-token" ] = bstore->credentials .get ().securityToken ;
889
- if (CLIENT_KNOBS->HTTP_REQUEST_AWS_V4_HEADER ) {
890
- bstore->setV4AuthHeaders (req->verb , req->resource , req->data .headers );
891
- } else {
892
- bstore->setAuthHeaders (req->verb , req->resource , req->data .headers );
893
- }
894
- }
895
-
896
- std::string getCanonicalURI (Reference<S3BlobStoreEndpoint> bstore, Reference<HTTP::OutgoingRequest> req) {
897
- std::vector<std::string> queryParameters;
898
- std::string canonicalURI =
899
- awsCanonicalURI (req->resource , queryParameters, CLIENT_KNOBS->HTTP_REQUEST_AWS_V4_HEADER );
900
- if (!queryParameters.empty ()) {
901
- canonicalURI += " ?" ;
902
- canonicalURI += boost::algorithm::join (queryParameters, " &" );
903
- }
904
-
905
- if (bstore->useProxy && bstore->knobs .secure_connection == 0 ) {
906
- // Has to be in absolute-form.
907
- canonicalURI = " http://" + bstore->host + " :" + bstore->service + canonicalURI;
908
- }
909
- return canonicalURI;
910
- }
911
-
912
- void populateDryrunRequest (Reference<HTTP::OutgoingRequest> dryrunRequest,
913
- Reference<S3BlobStoreEndpoint> bstore,
914
- std::string bucket) {
915
- // dryrun with a check bucket exist request, to avoid sending duplicate data
916
- HTTP::Headers headers;
917
- dryrunRequest->verb = " GET" ;
918
- dryrunRequest->data .contentLen = 0 ;
919
- dryrunRequest->data .headers = headers;
920
- dryrunRequest->data .headers [" Host" ] = bstore->host ;
921
- dryrunRequest->data .headers [" Accept" ] = " application/xml" ;
922
-
923
- dryrunRequest->resource = constructResourcePath (bstore, bucket, " " );
924
- }
925
-
926
- bool isWriteRequest (std::string verb) {
927
- return verb == " POST" || verb == " PUT" ;
928
- }
929
-
930
- std::string parseBucketFromURI (std::string uri) {
931
- if (uri.size () <= 1 || uri[0 ] != ' /' ) {
932
- // there is no bucket in the uri
933
- return " " ;
934
- }
935
- uri = uri.substr (1 );
936
- size_t secondSlash = uri.find (' /' );
937
- if (secondSlash == std::string::npos) {
938
- return uri;
939
- }
940
- return uri.substr (0 , secondSlash);
941
- }
942
-
943
849
// Do a request, get a Response.
944
850
// Request content is provided as UnsentPacketQueue *pContent which will be depleted as bytes are sent but the queue
945
851
// itself must live for the life of this actor and be destroyed by the caller
@@ -951,9 +857,7 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
951
857
int contentLen,
952
858
std::set<unsigned int > successCodes) {
953
859
state UnsentPacketQueue contentCopy;
954
- state UnsentPacketQueue dryrunContentCopy; // NonCopyable state var so must be declared at top of actor
955
860
state Reference<HTTP::OutgoingRequest> req = makeReference<HTTP::OutgoingRequest>();
956
- state Reference<HTTP::OutgoingRequest> dryrunRequest = makeReference<HTTP::OutgoingRequest>();
957
861
req->verb = verb;
958
862
req->data .content = &contentCopy;
959
863
req->data .contentLen = contentLen;
@@ -967,8 +871,6 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
967
871
resource = " /" ;
968
872
}
969
873
970
- req->resource = resource;
971
-
972
874
// Merge extraHeaders into headers
973
875
for (const auto & [k, v] : bstore->extraHeaders ) {
974
876
std::string& fieldValue = req->data .headers [k];
@@ -989,23 +891,19 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
989
891
990
892
state int maxTries = std::min (bstore->knobs .request_tries , bstore->knobs .connect_tries );
991
893
state int thisTry = 1 ;
992
- state int badRequestCode = 400 ;
993
- state bool s3TokenError = false ;
994
894
state double nextRetryDelay = 2.0 ;
995
895
996
896
loop {
997
897
state Optional<Error> err;
998
898
state Optional<NetworkAddress> remoteAddress;
999
899
state bool connectionEstablished = false ;
1000
900
state Reference<HTTP::IncomingResponse> r;
1001
- state Reference<HTTP::IncomingResponse> dryrunR;
1002
901
state std::string canonicalURI = resource;
1003
902
state UID connID = UID ();
1004
903
state double reqStartTimer;
1005
904
state double connectStartTimer = g_network->timer ();
1006
905
state bool reusingConn = false ;
1007
906
state bool fastRetry = false ;
1008
- state bool simulateS3TokenError = false ;
1009
907
1010
908
try {
1011
909
// Start connecting
@@ -1033,86 +931,44 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
1033
931
connID = rconn.conn ->getDebugID ();
1034
932
reqStartTimer = g_network->timer ();
1035
933
1036
- try {
1037
- if (s3TokenError && isWriteRequest (req->verb ) && CLIENT_KNOBS->BACKUP_ALLOW_DRYRUN ) {
1038
- // if it is a write request with s3TokenError, retry with a HEAD dryrun request
1039
- // to avoid sending duplicate data indefinitly to save network bandwidth
1040
- // because it might due to expired or invalid S3 token from the disk
1041
- state std::string bucket = parseBucketFromURI (resource);
1042
- if (bucket.empty ()) {
1043
- TraceEvent (SevError, " EmptyBucketRequest" )
1044
- .detail (" S3TokenError" , s3TokenError)
1045
- .detail (" Verb" , req->verb )
1046
- .detail (" Resource" , resource)
1047
- .log ();
1048
- throw bucket_not_in_url ();
1049
- }
1050
- dryrunRequest->data .content = &dryrunContentCopy;
1051
- dryrunRequest->data .content ->discardAll (); // this should always be empty
1052
- populateDryrunRequest (dryrunRequest, bstore, bucket);
1053
- setHeaders (bstore, dryrunRequest);
1054
- dryrunRequest->resource = getCanonicalURI (bstore, dryrunRequest);
1055
- TraceEvent (" RetryS3RequestDueToTokenIssue" )
1056
- .detail (" S3TokenError" , s3TokenError)
1057
- .detail (" OriginalResource" , resource)
1058
- .detail (" DryrunResource" , dryrunRequest->resource )
1059
- .detail (" Bucket" , bucket)
1060
- .detail (" V4" , CLIENT_KNOBS->HTTP_REQUEST_AWS_V4_HEADER )
1061
- .log ();
1062
- wait (bstore->requestRate ->getAllowance (1 ));
1063
- Future<Reference<HTTP::IncomingResponse>> dryrunResponse = HTTP::doRequest (
1064
- rconn.conn , dryrunRequest, bstore->sendRate , &bstore->s_stats .bytes_sent , bstore->recvRate );
1065
- Reference<HTTP::IncomingResponse> _dryrunR = wait (timeoutError (dryrunResponse, requestTimeout));
1066
- dryrunR = _dryrunR;
1067
- std::string s3Error = parseErrorCodeFromS3 (dryrunR->data .content );
1068
- if (dryrunR->code == badRequestCode && isS3TokenError (s3Error)) {
1069
- // authentication fails and s3 token error persists, retry in the hope token is corrected
1070
- wait (delay (bstore->knobs .max_delay_retryable_error ));
1071
- } else if (dryrunR->code == 200 || dryrunR->code == 404 ) {
1072
- // authentication has passed, and bucket existence has been verified(200 or 404)
1073
- // it might work now(or it might be another error) thus reset s3TokenError.
1074
- TraceEvent (" S3TokenIssueResolved" )
1075
- .detail (" HttpCode" , dryrunR->code )
1076
- .detail (" HttpResponseContent" , dryrunR->data .content )
1077
- .detail (" S3Error" , s3Error)
1078
- .detail (" URI" , dryrunRequest->resource )
1079
- .log ();
1080
- s3TokenError = false ;
1081
- } else {
1082
- TraceEvent (SevError, " S3UnexpectedError" )
1083
- .detail (" HttpCode" , dryrunR->code )
1084
- .detail (" HttpResponseContent" , dryrunR->data .content )
1085
- .detail (" S3Error" , s3Error)
1086
- .detail (" URI" , dryrunRequest->resource )
1087
- .log ();
1088
- throw http_bad_response ();
1089
- }
1090
- continue ;
1091
- }
1092
- } catch (Error& e) {
1093
- // retry with GET failed, but continue to do original request anyway
1094
- TraceEvent (SevError, " ErrorDuringRetryS3TokenIssue" ).errorUnsuppressed (e);
934
+ // Finish/update the request headers (which includes Date header)
935
+ // This must be done AFTER the connection is ready because if credentials are coming from disk they are
936
+ // refreshed when a new connection is established and setAuthHeaders() would need the updated secret.
937
+ if (bstore->credentials .present () && !bstore->credentials .get ().securityToken .empty ())
938
+ req->data .headers [" x-amz-security-token" ] = bstore->credentials .get ().securityToken ;
939
+ if (CLIENT_KNOBS->HTTP_REQUEST_AWS_V4_HEADER ) {
940
+ bstore->setV4AuthHeaders (verb, resource, req->data .headers );
941
+ } else {
942
+ bstore->setAuthHeaders (verb, resource, req->data .headers );
1095
943
}
1096
- setHeaders (bstore, req);
1097
- req->resource = getCanonicalURI (bstore, req);
944
+
945
+ std::vector<std::string> queryParameters;
946
+ canonicalURI = awsCanonicalURI (resource, queryParameters, CLIENT_KNOBS->HTTP_REQUEST_AWS_V4_HEADER );
947
+ if (!queryParameters.empty ()) {
948
+ canonicalURI += " ?" ;
949
+ canonicalURI += boost::algorithm::join (queryParameters, " &" );
950
+ }
951
+
952
+ if (bstore->useProxy && bstore->knobs .secure_connection == 0 ) {
953
+ // Has to be in absolute-form.
954
+ canonicalURI = " http://" + bstore->host + " :" + bstore->service + canonicalURI;
955
+ }
956
+
957
+ req->resource = canonicalURI;
1098
958
1099
959
remoteAddress = rconn.conn ->getPeerAddress ();
1100
960
wait (bstore->requestRate ->getAllowance (1 ));
1101
961
1102
962
Future<Reference<HTTP::IncomingResponse>> reqF =
1103
963
HTTP::doRequest (rconn.conn , req, bstore->sendRate , &bstore->s_stats .bytes_sent , bstore->recvRate );
964
+
1104
965
// if we reused a connection from the pool, and immediately got an error, retry immediately discarding the
1105
966
// connection
1106
967
if (reqF.isReady () && reusingConn) {
1107
968
fastRetry = true ;
1108
969
}
1109
970
1110
971
Reference<HTTP::IncomingResponse> _r = wait (timeoutError (reqF, requestTimeout));
1111
- if (g_network->isSimulated () && deterministicRandom ()->random01 () < 0.1 ) {
1112
- // simulate an error from s3
1113
- _r->code = badRequestCode;
1114
- simulateS3TokenError = true ;
1115
- }
1116
972
r = _r;
1117
973
1118
974
// Since the response was parsed successfully (which is why we are here) reuse the connection unless we
@@ -1151,6 +1007,7 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
1151
1007
1152
1008
// All errors in err are potentially retryable as well as certain HTTP response codes...
1153
1009
bool retryable = err.present () || r->code == 500 || r->code == 502 || r->code == 503 || r->code == 429 ;
1010
+
1154
1011
// But only if our previous attempt was not the last allowable try.
1155
1012
retryable = retryable && (thisTry < maxTries);
1156
1013
@@ -1172,21 +1029,8 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
1172
1029
}
1173
1030
}
1174
1031
event.suppressFor (60 );
1175
-
1176
1032
if (!err.present ()) {
1177
1033
event.detail (" ResponseCode" , r->code );
1178
- std::string s3Error = parseErrorCodeFromS3 (r->data .content );
1179
- event.detail (" S3ErrorCode" , s3Error);
1180
- if (r->code == badRequestCode) {
1181
- if (isS3TokenError (s3Error) || simulateS3TokenError) {
1182
- s3TokenError = true ;
1183
- }
1184
- TraceEvent (SevWarnAlways, " S3BlobStoreBadRequest" )
1185
- .detail (" HttpCode" , r->code )
1186
- .detail (" HttpResponseContent" , r->data .content )
1187
- .detail (" S3Error" , s3Error)
1188
- .log ();
1189
- }
1190
1034
}
1191
1035
1192
1036
event.detail (" ConnectionEstablished" , connectionEstablished);
@@ -1216,7 +1060,7 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
1216
1060
if (fastRetry) {
1217
1061
++bstore->blobStats ->fastRetries ;
1218
1062
wait (delay (0 ));
1219
- } else if (retryable || s3TokenError ) {
1063
+ } else if (retryable) {
1220
1064
// We will wait delay seconds before the next retry, start with nextRetryDelay.
1221
1065
double delay = nextRetryDelay;
1222
1066
// conenctionFailed is treated specially as we know proxy to AWS can only serve 1 request per connection
@@ -1239,6 +1083,7 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
1239
1083
delay = std::max (delay, retryAfter);
1240
1084
}
1241
1085
}
1086
+
1242
1087
// Log the delay then wait.
1243
1088
1244
1089
event.detail (" RetryDelay" , delay);
0 commit comments