From 54994ff0d3e45d7f36925261e7944a497c33f485 Mon Sep 17 00:00:00 2001 From: Teo Mrnjavac Date: Thu, 5 Dec 2024 12:18:12 +0100 Subject: [PATCH] [core] Handle gRPC code DeadlineExceeded in DCS client --- core/integration/dcs/plugin.go | 213 +++++++++++++++++++++++++-------- 1 file changed, 166 insertions(+), 47 deletions(-) diff --git a/core/integration/dcs/plugin.go b/core/integration/dcs/plugin.go index 5a9898d3..ee951dca 100644 --- a/core/integration/dcs/plugin.go +++ b/core/integration/dcs/plugin.go @@ -53,7 +53,9 @@ import ( "github.com/spf13/viper" "golang.org/x/exp/maps" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/status" ) const ( @@ -782,22 +784,59 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { break } - if err != nil { // stream termination in case of general error - logMsg := "bad DCS PFR event received, any future DCS events are ignored" - log.WithError(err). - WithField("partition", envId). - Warn(logMsg) + if err != nil { // stream termination in case of unknown or gRPC error + got := status.Code(err) - the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ - Name: call.GetName(), - OperationName: call.Func, - OperationStatus: pb.OpStatus_ONGOING, - OperationStep: "perform DCS call: PrepareForRun", - OperationStepStatus: pb.OpStatus_DONE_ERROR, - EnvironmentId: envId, - Payload: string(payloadJson[:]), - Error: logMsg, - }) + if got == codes.DeadlineExceeded { + log.WithError(err). + WithField("partition", envId). + WithField("timeout", timeout.String()). + Debug("DCS PFR timed out") + err = fmt.Errorf("DCS PFR timed out after %s: %w", timeout.String(), err) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: PrepareForRun", + OperationStepStatus: pb.OpStatus_DONE_TIMEOUT, + EnvironmentId: envId, + Payload: string(payloadJson[:]), + Error: err.Error(), + }) + } else if got == codes.Unknown { // unknown error, likely not a gRPC code + logMsg := "bad DCS PFR event received, any future DCS events are ignored" + log.WithError(err). + WithField("partition", envId). + Warn(logMsg) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: PrepareForRun", + OperationStepStatus: pb.OpStatus_DONE_ERROR, + EnvironmentId: envId, + Payload: string(payloadJson[:]), + Error: logMsg, + }) + } else { // some other gRPC error code + log.WithError(err). + WithField("partition", envId). + Error("DCS PFR call error") + err = fmt.Errorf("DCS PFR call error: %w", err) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: PrepareForRun", + OperationStepStatus: pb.OpStatus_DONE_ERROR, + EnvironmentId: envId, + Payload: string(payloadJson[:]), + Error: err.Error(), + }) + } break } @@ -1452,23 +1491,63 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { break } - if err != nil { // stream termination in case of general error - logMsg := "bad DCS SOR event received, any future DCS events are ignored" - log.WithError(err). - WithField("partition", envId). - WithField("run", runNumber64). - Warn(logMsg) + if err != nil { // stream termination in case of unknown or gRPC error + got := status.Code(err) - the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ - Name: call.GetName(), - OperationName: call.Func, - OperationStatus: pb.OpStatus_ONGOING, - OperationStep: "perform DCS call: StartOfRun", - OperationStepStatus: pb.OpStatus_DONE_ERROR, - EnvironmentId: envId, - Payload: string(payloadJson[:]), - Error: logMsg, - }) + if got == codes.DeadlineExceeded { + log.WithError(err). + WithField("partition", envId). + WithField("run", runNumber64). + WithField("timeout", timeout.String()). + Debug("DCS SOR timed out") + err = fmt.Errorf("DCS SOR timed out after %s: %w", timeout.String(), err) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: StartOfRun", + OperationStepStatus: pb.OpStatus_DONE_TIMEOUT, + EnvironmentId: envId, + Payload: string(payloadJson[:]), + Error: err.Error(), + }) + + } else if got == codes.Unknown { // unknown error, likely not a gRPC code + logMsg := "bad DCS SOR event received, any future DCS events are ignored" + log.WithError(err). + WithField("partition", envId). + WithField("run", runNumber64). + Warn(logMsg) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: StartOfRun", + OperationStepStatus: pb.OpStatus_DONE_ERROR, + EnvironmentId: envId, + Payload: string(payloadJson[:]), + Error: logMsg, + }) + } else { // some other gRPC error code + log.WithError(err). + WithField("partition", envId). + WithField("run", runNumber64). + Debug("DCS SOR call error") + err = fmt.Errorf("DCS SOR call error: %w", err) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: StartOfRun", + OperationStepStatus: pb.OpStatus_DONE_ERROR, + EnvironmentId: envId, + Payload: string(payloadJson[:]), + Error: err.Error(), + }) + } break } @@ -2001,23 +2080,63 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { break } - if err != nil { // stream termination in case of general error - logMsg := "bad DCS EOR event received, any future DCS events are ignored" - log.WithError(err). - WithField("partition", envId). - WithField("run", runNumber64). - Warn(logMsg) + if err != nil { // stream termination in case of unknown or gRPC error + got := status.Code(err) - the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ - Name: call.GetName(), - OperationName: call.Func, - OperationStatus: pb.OpStatus_ONGOING, - OperationStep: "perform DCS call: EndOfRun", - OperationStepStatus: pb.OpStatus_DONE_ERROR, - EnvironmentId: envId, - Payload: string(payloadJson[:]), - Error: logMsg, - }) + if got == codes.DeadlineExceeded { + log.WithError(err). + WithField("partition", envId). + WithField("run", runNumber64). + WithField("timeout", timeout.String()). + Debug("DCS EOR timed out") + err = fmt.Errorf("DCS EOR timed out after %s: %w", timeout.String(), err) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: EndOfRun", + OperationStepStatus: pb.OpStatus_DONE_TIMEOUT, + EnvironmentId: envId, + Payload: string(payloadJson[:]), + Error: err.Error(), + }) + + } else if got == codes.Unknown { // unknown error, likely not a gRPC code + logMsg := "bad DCS EOR event received, any future DCS events are ignored" + log.WithError(err). + WithField("partition", envId). + WithField("run", runNumber64). + Warn(logMsg) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: EndOfRun", + OperationStepStatus: pb.OpStatus_DONE_ERROR, + EnvironmentId: envId, + Payload: string(payloadJson[:]), + Error: logMsg, + }) + } else { // some other gRPC error code + log.WithError(err). + WithField("partition", envId). + WithField("run", runNumber64). + Debug("DCS EOR call error") + err = fmt.Errorf("DCS EOR call error: %w", err) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: EndOfRun", + OperationStepStatus: pb.OpStatus_DONE_ERROR, + EnvironmentId: envId, + Payload: string(payloadJson[:]), + Error: err.Error(), + }) + } break }