Skip to content

Commit

Permalink
feat(cli): add --follow flag to logs command (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlemesh committed Oct 1, 2024
1 parent 955bda8 commit e322c31
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 25 deletions.
192 changes: 176 additions & 16 deletions cmd/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ package cmd
import (
"encoding/json"
"fmt"
"io"
"reanahub/reana-client-go/client"
"reanahub/reana-client-go/client/operations"
"reanahub/reana-client-go/pkg/config"
"reanahub/reana-client-go/pkg/displayer"
"reanahub/reana-client-go/pkg/filterer"
"strings"
"time"

"github.com/jedib0t/go-pretty/v6/text"
log "github.com/sirupsen/logrus"

"github.com/spf13/cobra"
"golang.org/x/exp/slices"
Expand All @@ -27,15 +30,18 @@ import (
const logsDesc = `
Get workflow logs.
The ` + "``logs``" + ` command allows to retrieve logs of running workflow. Note that
only finished steps of the workflow are returned, the logs of the currently
processed step is not returned until it is finished.
The ` + "``logs``" + ` command allows to retrieve logs of a running workflow.
Either retrive logs and print the result or follow the logs of a running workflow/job.
Examples:
$ reana-client logs -w myanalysis.42
$ reana-client logs -w myanalysis.42 -s 1st_ste
$ reana-client logs -w myanalysis.42 --json
$ reana-client logs -w myanalysis.42 --filter status=running
$ reana-client logs -w myanalysis.42 --filter step=1st_step --follow
`

const logsFilterFlagDesc = `Filter job logs to include only those steps that
Expand Down Expand Up @@ -65,13 +71,22 @@ type jobLogItem struct {
FinishedAt *string `json:"finished_at"`
}

// logsOptions struct that contains the options of the logs command.
type logsOptions struct {
token string
workflow string
jsonOutput bool
filters []string
page int64
size int64
follow bool
interval int64
}

// logsCommandRunner struct that executes logs command.
type logsCommandRunner struct {
api *client.API
options *logsOptions
}

// newLogsCmd creates a command to get workflow logs.
Expand All @@ -84,7 +99,12 @@ func newLogsCmd() *cobra.Command {
Long: logsDesc,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return o.run(cmd)
api, err := client.ApiClient()
if err != nil {
log.Error(err)

Check warning on line 104 in cmd/logs.go

View check run for this annotation

Codecov / codecov/patch

cmd/logs.go#L104

Added line #L104 was not covered by tests
}
runner := newLogsCommandRunner(api, o)
return runner.run(cmd)
},
}

Expand All @@ -101,12 +121,33 @@ func newLogsCmd() *cobra.Command {
f.StringSliceVar(&o.filters, "filter", []string{}, logsFilterFlagDesc)
f.Int64Var(&o.page, "page", 1, "Results page number (to be used with --size).")
f.Int64Var(&o.size, "size", 0, "Size of results per page (to be used with --page).")
f.BoolVar(
&o.follow,
"follow",
false,
"Follow the logs of the of running workflow or job (similar to `tail -f`).",
)
f.Int64VarP(
&o.interval,
"interval",
"i",
10,
"Sleep time in seconds between log polling if log following is enabled. [default=10]",
)

return cmd
}

func (o *logsOptions) run(cmd *cobra.Command) error {
filters, err := parseLogsFilters(o.filters)
// newLogsCommandRunner creates a new logs command runner.
func newLogsCommandRunner(api *client.API, options *logsOptions) *logsCommandRunner {
return &logsCommandRunner{api: api, options: options}
}

// run executes the logs command.
func (r *logsCommandRunner) run(cmd *cobra.Command) error {
r.validateOptions(cmd.OutOrStdout())

filters, err := parseLogsFilters(r.options.filters)
if err != nil {
return err
}
Expand All @@ -116,25 +157,136 @@ func (o *logsOptions) run(cmd *cobra.Command) error {
}

logsParams := operations.NewGetWorkflowLogsParams()
logsParams.SetAccessToken(&o.token)
logsParams.SetWorkflowIDOrName(o.workflow)
logsParams.SetPage(&o.page)
logsParams.SetAccessToken(&r.options.token)
logsParams.SetWorkflowIDOrName(r.options.workflow)
logsParams.SetPage(&r.options.page)
logsParams.SetSteps(steps)
if cmd.Flags().Changed("size") {
logsParams.SetSize(&o.size)
logsParams.SetSize(&r.options.size)
}

if r.options.follow {
return r.followLogs(logsParams, cmd, steps)
}

return r.retrieveLogs(filters, logsParams, cmd, steps)
}

// followLogs follows the logs of a running workflow or job.
func (r *logsCommandRunner) followLogs(
logsParams *operations.GetWorkflowLogsParams,
cmd *cobra.Command,
steps []string,
) error {
stepLength := len(steps)
var step, previousLogs string
stdout := cmd.OutOrStdout()

if stepLength > 0 {
step = steps[0]
}

if stepLength > 1 {
displayer.DisplayMessage(
"Only one step can be followed at a time, ignoring additional steps.",
displayer.Warning,
false,
stdout,
)
logsParams.SetSteps([]string{step})
}

msg := "Following logs for workflow: " + r.options.workflow
if step != "" {
msg += ", step: " + step
}
displayer.DisplayMessage(msg, displayer.Info, false, stdout)

workflowStatusParams := operations.NewGetWorkflowStatusParams()
workflowStatusParams.SetAccessToken(&r.options.token)
workflowStatusParams.SetWorkflowIDOrName(r.options.workflow)

for {
newLogs, status, err := r.getLogsWithStatus(step, logsParams, workflowStatusParams)
if err != nil {
return err
}

fmt.Fprint(stdout, strings.TrimPrefix(newLogs, previousLogs))

if slices.Contains(config.WorkflowCompletedStatuses, status) {
fmt.Fprintln(stdout)
displayer.DisplayMessage("Finished, status: "+status, displayer.Info, false, stdout)
return nil
}

time.Sleep(time.Duration(r.options.interval) * time.Second)
previousLogs = newLogs

Check warning on line 224 in cmd/logs.go

View check run for this annotation

Codecov / codecov/patch

cmd/logs.go#L223-L224

Added lines #L223 - L224 were not covered by tests
}
}

api, err := client.ApiClient()
// getData retrieves logs and status of a workflow or a job.
func (r *logsCommandRunner) getLogsWithStatus(
step string,
logsParams *operations.GetWorkflowLogsParams,
workflowStatusParams *operations.GetWorkflowStatusParams,
) (string, string, error) {
workflowLogs, err := r.getLogs(logsParams)
if err != nil {
return err
return "", "", err

Check warning on line 236 in cmd/logs.go

View check run for this annotation

Codecov / codecov/patch

cmd/logs.go#L236

Added line #L236 was not covered by tests
}
logsResp, err := api.Operations.GetWorkflowLogs(logsParams)

if step != "" {
job := getFirstJob(workflowLogs.JobLogs)
if job == nil {
return "", "", fmt.Errorf("step %s not found", step)
}
return job.Logs, job.Status, nil
}

statusResponse, err := r.api.Operations.GetWorkflowStatus(workflowStatusParams)
if err != nil {
return err
return "", "", err

Check warning on line 249 in cmd/logs.go

View check run for this annotation

Codecov / codecov/patch

cmd/logs.go#L249

Added line #L249 was not covered by tests
}

return *workflowLogs.WorkflowLogs, statusResponse.GetPayload().Status, nil
}

// getLogs retrieves logs of a workflow and unmarshals data into logs structure.
func (r *logsCommandRunner) getLogs(logsParams *operations.GetWorkflowLogsParams) (logs, error) {
var workflowLogs logs
logsResp, err := r.api.Operations.GetWorkflowLogs(logsParams)
if err != nil {
return workflowLogs, err
}

err = json.Unmarshal([]byte(logsResp.GetPayload().Logs), &workflowLogs)
if err != nil {
return workflowLogs, err

Check warning on line 265 in cmd/logs.go

View check run for this annotation

Codecov / codecov/patch

cmd/logs.go#L265

Added line #L265 was not covered by tests
}
return workflowLogs, nil
}

// validateOptions validates the options of the logs command.
func (r *logsCommandRunner) validateOptions(writer io.Writer) {
if r.options.jsonOutput && r.options.follow {
displayer.DisplayMessage(
"Ignoring --json as it cannot be used together with --follow.",
displayer.Warning,
false,
writer,
)
}
}

// retrieveLogs retrieves and prints logs of a workflow.
func (r *logsCommandRunner) retrieveLogs(
filters filterer.Filters,
logsParams *operations.GetWorkflowLogsParams,
cmd *cobra.Command,
steps []string,
) error {
workflowLogs, err := r.getLogs(logsParams)
if err != nil {
return err
}
Expand All @@ -144,15 +296,23 @@ func (o *logsOptions) run(cmd *cobra.Command) error {
return err
}

if o.jsonOutput {
if r.options.jsonOutput {
err := displayer.DisplayJsonOutput(workflowLogs, cmd.OutOrStdout())
if err != nil {
return err
}
} else {
displayHumanFriendlyLogs(cmd, workflowLogs, steps)
}
return nil
}

// getFirstJob returns the first job in the given map,
// or nil if the map is empty.
func getFirstJob(items map[string]jobLogItem) *jobLogItem {
for _, item := range items {
return &item
}
return nil
}

Expand Down
62 changes: 62 additions & 0 deletions cmd/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,68 @@ func TestLogs(t *testing.T) {
expected: []string{"Field 'page': Must be at least 1."},
wantError: true,
},
"follow workflow": {
serverResponses: map[string]ServerResponse{
fmt.Sprintf(logsPathTemplate, workflowName): {
statusCode: http.StatusOK,
responseFile: "logs_complete.json",
},
fmt.Sprintf(statusPathTemplate, workflowName): {
statusCode: http.StatusOK,
responseFile: "status_finished.json",
},
},
args: []string{"-w", workflowName, "--follow", "-i", "0"},
expected: []string{
"==> Following logs for workflow: my_workflow",
"workflow logs",
"==> Finished, status: finished",
},
unwanted: []string{
"job1",
"step",
},
},
"follow job multiple steps json": {
serverResponses: map[string]ServerResponse{
fmt.Sprintf(logsPathTemplate, workflowName): {
statusCode: http.StatusOK,
responseFile: "logs_complete.json",
},
},
args: []string{
"-w",
workflowName,
"--follow",
"--json",
"--filter",
"step=job1",
"--filter",
"step=job2",
"--size",
"1",
},
expected: []string{
"Ignoring --json as it cannot be used together with --follow.",
"Only one step can be followed at a time, ignoring additional steps.",
"==> Following logs for workflow: my_workflow, step: job1",
"workflow 1 logs",
"==> Finished, status: finished",
},
},
"follow job no logs": {
serverResponses: map[string]ServerResponse{
fmt.Sprintf(logsPathTemplate, workflowName): {
statusCode: http.StatusOK,
responseFile: "logs_empty.json",
},
},
args: []string{"-w", workflowName, "--follow", "--filter", "step=job1"},
expected: []string{
"step job1 not found",
},
wantError: true,
},
}

for name, params := range tests {
Expand Down
15 changes: 6 additions & 9 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,15 @@ var ReanaComputeBackendKeys = []string{"kubernetes", "htcondor", "slurm"}
// LeadingMark prefix used when displaying headers or important messages.
var LeadingMark = "==>"

var WorkflowCompletedStatuses = []string{"finished", "failed", "stopped"}

var WorkflowProgressingStatuses = []string{"created", "running", "queued", "pending"}

// GetRunStatuses provides a list of currently supported run statuses.
// Includes the deleted status if includeDeleted is set to true.
func GetRunStatuses(includeDeleted bool) []string {
runStatuses := []string{
"created",
"running",
"finished",
"failed",
"stopped",
"queued",
"pending",
}
runStatuses := append(WorkflowCompletedStatuses, WorkflowProgressingStatuses...)

if includeDeleted {
runStatuses = append(runStatuses, "deleted")
}
Expand Down

0 comments on commit e322c31

Please sign in to comment.