Skip to content

Commit

Permalink
[core] Handle gRPC code DeadlineExceeded in DCS client
Browse files Browse the repository at this point in the history
  • Loading branch information
teo authored and knopers8 committed Dec 5, 2024
1 parent 9b967cc commit 54994ff
Showing 1 changed file with 166 additions and 47 deletions.
213 changes: 166 additions & 47 deletions core/integration/dcs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ import (
"github.com/spf13/viper"
"golang.org/x/exp/maps"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -782,22 +784,59 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

break
}
if err != nil { // stream termination in case of general error
logMsg := "bad DCS PFR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
Warn(logMsg)
if err != nil { // stream termination in case of unknown or gRPC error
got := status.Code(err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: PrepareForRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
if got == codes.DeadlineExceeded {
log.WithError(err).
WithField("partition", envId).
WithField("timeout", timeout.String()).
Debug("DCS PFR timed out")
err = fmt.Errorf("DCS PFR timed out after %s: %w", timeout.String(), err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: PrepareForRun",
OperationStepStatus: pb.OpStatus_DONE_TIMEOUT,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})
} else if got == codes.Unknown { // unknown error, likely not a gRPC code
logMsg := "bad DCS PFR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
Warn(logMsg)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: PrepareForRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
} else { // some other gRPC error code
log.WithError(err).
WithField("partition", envId).
Error("DCS PFR call error")
err = fmt.Errorf("DCS PFR call error: %w", err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: PrepareForRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})
}

break
}
Expand Down Expand Up @@ -1452,23 +1491,63 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

break
}
if err != nil { // stream termination in case of general error
logMsg := "bad DCS SOR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Warn(logMsg)
if err != nil { // stream termination in case of unknown or gRPC error
got := status.Code(err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: StartOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
if got == codes.DeadlineExceeded {
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
WithField("timeout", timeout.String()).
Debug("DCS SOR timed out")
err = fmt.Errorf("DCS SOR timed out after %s: %w", timeout.String(), err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: StartOfRun",
OperationStepStatus: pb.OpStatus_DONE_TIMEOUT,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})

} else if got == codes.Unknown { // unknown error, likely not a gRPC code
logMsg := "bad DCS SOR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Warn(logMsg)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: StartOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
} else { // some other gRPC error code
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Debug("DCS SOR call error")
err = fmt.Errorf("DCS SOR call error: %w", err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: StartOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})
}

break
}
Expand Down Expand Up @@ -2001,23 +2080,63 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

break
}
if err != nil { // stream termination in case of general error
logMsg := "bad DCS EOR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Warn(logMsg)
if err != nil { // stream termination in case of unknown or gRPC error
got := status.Code(err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: EndOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
if got == codes.DeadlineExceeded {
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
WithField("timeout", timeout.String()).
Debug("DCS EOR timed out")
err = fmt.Errorf("DCS EOR timed out after %s: %w", timeout.String(), err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: EndOfRun",
OperationStepStatus: pb.OpStatus_DONE_TIMEOUT,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})

} else if got == codes.Unknown { // unknown error, likely not a gRPC code
logMsg := "bad DCS EOR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Warn(logMsg)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: EndOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
} else { // some other gRPC error code
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Debug("DCS EOR call error")
err = fmt.Errorf("DCS EOR call error: %w", err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: EndOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})
}

break
}
Expand Down

0 comments on commit 54994ff

Please sign in to comment.