From 5771cc3bad0c24235018d9dbaf48082b31b3da8a Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Fri, 30 May 2025 11:10:43 +0100 Subject: [PATCH 01/14] :sparkles: `logs` Add support for reading line by line in FIFO reader --- changes/20250530105438.feature | 1 + utils/logs/fifo_logger.go | 68 ++++++++++++++++++++++++++++++++++ utils/logs/fifo_logger_test.go | 60 +++++++++++++++++++++++++++++- 3 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 changes/20250530105438.feature diff --git a/changes/20250530105438.feature b/changes/20250530105438.feature new file mode 100644 index 0000000000..299cd9d998 --- /dev/null +++ b/changes/20250530105438.feature @@ -0,0 +1 @@ +:sparkles: `logs` Add support for reading line by line in FIFO reader diff --git a/utils/logs/fifo_logger.go b/utils/logs/fifo_logger.go index ba8bbf16a9..cfc52333c0 100644 --- a/utils/logs/fifo_logger.go +++ b/utils/logs/fifo_logger.go @@ -2,10 +2,13 @@ package logs import ( "bytes" + "context" "fmt" "io" "log" "sync" + + "github.com/ARM-software/golang-utils/utils/parallelisation" ) type FIFOWriter struct { @@ -39,6 +42,67 @@ func (w *FIFOWriter) Read() string { return string(bytes) } +func (w *FIFOWriter) ReadLines(ctx context.Context, delim byte) func(yield func(string) bool) { + return func(yield func(string) bool) { + var partial []byte + for { + if err := parallelisation.DetermineContextError(ctx); err != nil { + return + } + + buf := func() []byte { + w.mu.Lock() + defer w.mu.Unlock() + defer w.Logs.Reset() + tmp := w.Logs.Bytes() + buf := make([]byte, len(tmp)) + copy(buf, tmp) + return buf + }() + + if len(buf) == 0 { + if err := parallelisation.DetermineContextError(ctx); err != nil { + if len(partial) > 0 { + yield(string(partial)) + } + return + } + + continue + } + + if len(partial) > 0 { + buf = append(partial, buf...) + partial = nil + } + + for { + idx := bytes.IndexByte(buf, delim) + if idx < 0 { + break + } + line := buf[:idx] + + if len(line) > 0 && line[len(line)-1] == '\r' { + line = line[:len(line)-1] + } + buf = buf[idx+1:] + if len(line) == 0 { + continue + } + + if !yield(string(line)) { + return + } + } + + if len(buf) > 0 { + partial = buf + } + } + } +} + type FIFOLoggers struct { GenericLoggers LogWriter FIFOWriter @@ -52,6 +116,10 @@ func (l *FIFOLoggers) Read() string { return l.LogWriter.Read() } +func (l *FIFOLoggers) ReadLines(ctx context.Context, delim byte) func(yield func(string) bool) { + return l.LogWriter.ReadLines(ctx, delim) +} + // Close closes the logger func (l *FIFOLoggers) Close() (err error) { err = l.LogWriter.Close() diff --git a/utils/logs/fifo_logger_test.go b/utils/logs/fifo_logger_test.go index ce876e41ca..41f5a8abe9 100644 --- a/utils/logs/fifo_logger_test.go +++ b/utils/logs/fifo_logger_test.go @@ -1,13 +1,17 @@ package logs import ( + "context" + "regexp" "strings" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestFIFOLogger(t *testing.T) { +func TestFIFOLoggerRead(t *testing.T) { loggers, err := NewFIFOLogger("Test") require.NoError(t, err) testLog(t, loggers) @@ -27,7 +31,7 @@ func TestFIFOLogger(t *testing.T) { require.Empty(t, contents) } -func TestPlainFIFOLogger(t *testing.T) { +func TestPlainFIFOLoggerRead(t *testing.T) { loggers, err := NewPlainFIFOLogger() require.NoError(t, err) testLog(t, loggers) @@ -46,3 +50,55 @@ func TestPlainFIFOLogger(t *testing.T) { contents = loggers.Read() require.Empty(t, contents) } + +func TestFIFOLoggerReadlines(t *testing.T) { + loggers, err := NewFIFOLogger("Test") + require.NoError(t, err) + testLog(t, loggers) + loggers.LogError("Test err\n") + loggers.Log("Test1") + count := 0 + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var b strings.Builder + for line := range loggers.ReadLines(ctx, '\n') { + _, err := b.WriteString(line + "\n") + require.NoError(t, err) + count++ + } + + assert.Regexp(t, regexp.MustCompile(`\[Test\] Error: .* .* Test err\n\[Test\] Output: .* .* Test1\n`), b.String()) + assert.Equal(t, 2, count) +} + +func TestPlainFIFOLoggerReadlines(t *testing.T) { + loggers, err := NewPlainFIFOLogger() + require.NoError(t, err) + testLog(t, loggers) + + go func() { + time.Sleep(500 * time.Millisecond) + loggers.LogError("Test err") + loggers.Log("") + time.Sleep(100 * time.Millisecond) + loggers.Log("Test1") + loggers.Log("\n\n\n") + time.Sleep(200 * time.Millisecond) + loggers.Log("Test2") + }() + + count := 0 + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + var b strings.Builder + for line := range loggers.ReadLines(ctx, '\n') { + _, err := b.WriteString(line + "\n") + require.NoError(t, err) + count++ + } + + assert.Equal(t, "Test err\nTest1\nTest2\n", b.String()) + assert.Equal(t, 3, count) +} From 8567d021a51251e8b75b04b65cbefdf751f38ab4 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Fri, 30 May 2025 11:14:01 +0100 Subject: [PATCH 02/14] remove delimiter option since it is for reading lines --- utils/logs/fifo_logger.go | 8 ++++---- utils/logs/fifo_logger_test.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/utils/logs/fifo_logger.go b/utils/logs/fifo_logger.go index cfc52333c0..fdfccd0ff8 100644 --- a/utils/logs/fifo_logger.go +++ b/utils/logs/fifo_logger.go @@ -42,7 +42,7 @@ func (w *FIFOWriter) Read() string { return string(bytes) } -func (w *FIFOWriter) ReadLines(ctx context.Context, delim byte) func(yield func(string) bool) { +func (w *FIFOWriter) ReadLines(ctx context.Context) func(yield func(string) bool) { return func(yield func(string) bool) { var partial []byte for { @@ -77,7 +77,7 @@ func (w *FIFOWriter) ReadLines(ctx context.Context, delim byte) func(yield func( } for { - idx := bytes.IndexByte(buf, delim) + idx := bytes.IndexByte(buf, '\n') if idx < 0 { break } @@ -116,8 +116,8 @@ func (l *FIFOLoggers) Read() string { return l.LogWriter.Read() } -func (l *FIFOLoggers) ReadLines(ctx context.Context, delim byte) func(yield func(string) bool) { - return l.LogWriter.ReadLines(ctx, delim) +func (l *FIFOLoggers) ReadLines(ctx context.Context) func(yield func(string) bool) { + return l.LogWriter.ReadLines(ctx) } // Close closes the logger diff --git a/utils/logs/fifo_logger_test.go b/utils/logs/fifo_logger_test.go index 41f5a8abe9..8aa757f98c 100644 --- a/utils/logs/fifo_logger_test.go +++ b/utils/logs/fifo_logger_test.go @@ -62,7 +62,7 @@ func TestFIFOLoggerReadlines(t *testing.T) { defer cancel() var b strings.Builder - for line := range loggers.ReadLines(ctx, '\n') { + for line := range loggers.ReadLines(ctx) { _, err := b.WriteString(line + "\n") require.NoError(t, err) count++ @@ -93,7 +93,7 @@ func TestPlainFIFOLoggerReadlines(t *testing.T) { defer cancel() var b strings.Builder - for line := range loggers.ReadLines(ctx, '\n') { + for line := range loggers.ReadLines(ctx) { _, err := b.WriteString(line + "\n") require.NoError(t, err) count++ From 32848dfda5b2255a59070747a3ca08a02044fa5a Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Fri, 30 May 2025 11:18:56 +0100 Subject: [PATCH 03/14] use iter.Seq[string] for iterator return type --- utils/logs/fifo_logger.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/utils/logs/fifo_logger.go b/utils/logs/fifo_logger.go index fdfccd0ff8..ab033ab7bc 100644 --- a/utils/logs/fifo_logger.go +++ b/utils/logs/fifo_logger.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "iter" "log" "sync" @@ -42,7 +43,7 @@ func (w *FIFOWriter) Read() string { return string(bytes) } -func (w *FIFOWriter) ReadLines(ctx context.Context) func(yield func(string) bool) { +func (w *FIFOWriter) ReadLines(ctx context.Context) iter.Seq[string] { return func(yield func(string) bool) { var partial []byte for { @@ -116,7 +117,7 @@ func (l *FIFOLoggers) Read() string { return l.LogWriter.Read() } -func (l *FIFOLoggers) ReadLines(ctx context.Context) func(yield func(string) bool) { +func (l *FIFOLoggers) ReadLines(ctx context.Context) iter.Seq[string] { return l.LogWriter.ReadLines(ctx) } From a0889233e2354cc535cfc849a3932f35ecf322db Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Fri, 30 May 2025 11:26:13 +0100 Subject: [PATCH 04/14] add sleep before continue --- utils/logs/fifo_logger.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/utils/logs/fifo_logger.go b/utils/logs/fifo_logger.go index ab033ab7bc..90ff186ae2 100644 --- a/utils/logs/fifo_logger.go +++ b/utils/logs/fifo_logger.go @@ -8,6 +8,7 @@ import ( "iter" "log" "sync" + "time" "github.com/ARM-software/golang-utils/utils/parallelisation" ) @@ -69,6 +70,7 @@ func (w *FIFOWriter) ReadLines(ctx context.Context) iter.Seq[string] { return } + parallelisation.SleepWithContext(ctx, 50*time.Millisecond) continue } From b979715200ef93c1f0f86cd6f18e7a87b3644b1f Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Fri, 30 May 2025 17:11:30 +0100 Subject: [PATCH 05/14] :sparkles: `subprocess` Add support for terminating processes instead of killing them --- changes/20250530171020.feature | 1 + utils/subprocess/command_wrapper.go | 31 +++++++++-- utils/subprocess/command_wrapper_test.go | 4 ++ utils/subprocess/executor.go | 29 ++++++++++ utils/subprocess/executor_test.go | 68 ++++++++++++++++++++++++ 5 files changed, 129 insertions(+), 4 deletions(-) create mode 100644 changes/20250530171020.feature diff --git a/changes/20250530171020.feature b/changes/20250530171020.feature new file mode 100644 index 0000000000..96daee4141 --- /dev/null +++ b/changes/20250530171020.feature @@ -0,0 +1 @@ +:sparkles: `subprocess` Add support for terminating processes instead of killing them diff --git a/utils/subprocess/command_wrapper.go b/utils/subprocess/command_wrapper.go index 9961e5c590..a5afd4d9ff 100644 --- a/utils/subprocess/command_wrapper.go +++ b/utils/subprocess/command_wrapper.go @@ -59,15 +59,23 @@ func (c *cmdWrapper) Run() error { return ConvertCommandError(c.cmd.Run()) } -func (c *cmdWrapper) Stop() error { +type interruptType int + +const ( + kill interruptType = 9 + term = 15 +) + +func (c *cmdWrapper) interrupt(interrupt interruptType) error { c.mu.RLock() defer c.mu.RUnlock() if c.cmd == nil { - return fmt.Errorf("%w:undefined command", commonerrors.ErrUndefined) + return commonerrors.New(commonerrors.ErrUndefined, "undefined command") } subprocess := c.cmd.Process ctx, cancel := context.WithCancel(context.Background()) defer cancel() + var stopErr error if subprocess != nil { pid := subprocess.Pid parallelisation.ScheduleAfter(ctx, 10*time.Millisecond, func(time.Time) { @@ -75,11 +83,26 @@ func (c *cmdWrapper) Stop() error { if process == nil || err != nil { return } - _ = process.KillWithChildren(ctx) + switch interrupt { + case kill: + _ = process.KillWithChildren(ctx) + case term: + _ = process.Terminate(ctx) + default: + stopErr = commonerrors.New(commonerrors.ErrInvalid, "unknown interrupt type for process") + } }) } _ = c.cmd.Wait() - return nil + return stopErr +} + +func (c *cmdWrapper) Stop() error { + return c.interrupt(kill) +} + +func (c *cmdWrapper) Interrupt() error { + return c.interrupt(term) } func (c *cmdWrapper) Pid() (pid int, err error) { diff --git a/utils/subprocess/command_wrapper_test.go b/utils/subprocess/command_wrapper_test.go index 19ec07a7c1..fb2f65451d 100644 --- a/utils/subprocess/command_wrapper_test.go +++ b/utils/subprocess/command_wrapper_test.go @@ -162,6 +162,10 @@ func TestCmdStartStop(t *testing.T) { require.Error(t, err) err = wrapper.Stop() require.NoError(t, err) + err = wrapper.Start() + require.Error(t, err) + err = wrapper.Interrupt() + require.NoError(t, err) }) } } diff --git a/utils/subprocess/executor.go b/utils/subprocess/executor.go index b089644671..863c521814 100644 --- a/utils/subprocess/executor.go +++ b/utils/subprocess/executor.go @@ -268,6 +268,13 @@ func (s *Subprocess) Stop() (err error) { return s.stop(true) } +// Interrupt stops the process via the TERM signal. +// This method should be used in combination with `Start`. +// This method is idempotent +func (s *Subprocess) Interrupt() (err error) { + return s.interrupt() +} + // Restart restarts a process. It will stop the process if currently running. func (s *Subprocess) Restart() (err error) { err = s.stop(false) @@ -314,3 +321,25 @@ func (s *Subprocess) stop(cancel bool) (err error) { s.messaging.LogEnd(nil) return } + +func (s *Subprocess) interrupt() (err error) { + if !s.IsOn() { + return + } + err = s.Check() + if err != nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + defer s.Cancel() + if !s.IsOn() { + return + } + s.messaging.LogStopping() + err = s.getCmd().Interrupt() + s.command.Reset() + s.isRunning.Store(false) + s.messaging.LogEnd(nil) + return +} diff --git a/utils/subprocess/executor_test.go b/utils/subprocess/executor_test.go index 3d994c488f..2eb6628604 100644 --- a/utils/subprocess/executor_test.go +++ b/utils/subprocess/executor_test.go @@ -178,6 +178,74 @@ func TestStartStop(t *testing.T) { } } +func TestStartInterrupt(t *testing.T) { + currentDir, err := os.Getwd() + require.NoError(t, err) + tests := []struct { + name string + cmdWindows string + argWindows []string + cmdOther string + argOther []string + }{ + { + name: "ShortProcess", + cmdWindows: "cmd", + argWindows: []string{"dir", currentDir}, + cmdOther: "ls", + argOther: []string{"-l", currentDir}, + }, + { + name: "LongProcess", + cmdWindows: "cmd", + argWindows: []string{"SLEEP 1"}, + cmdOther: "sleep", + argOther: []string{"1"}, + }, + } + + for i := range tests { + test := tests[i] + t.Run(test.name, func(t *testing.T) { + defer goleak.VerifyNone(t) + loggers, err := logs.NewLogrLogger(logstest.NewTestLogger(t), "test") + require.NoError(t, err) + + var p *Subprocess + if platform.IsWindows() { + p, err = New(context.Background(), loggers, "", "", "", test.cmdWindows, test.argWindows...) + } else { + p, err = New(context.Background(), loggers, "", "", "", test.cmdOther, test.argOther...) + } + require.NoError(t, err) + require.NotNil(t, p) + assert.False(t, p.IsOn()) + err = p.Start() + require.NoError(t, err) + assert.True(t, p.IsOn()) + + // Checking idempotence + err = p.Start() + require.NoError(t, err) + err = p.Check() + require.NoError(t, err) + + time.Sleep(200 * time.Millisecond) + err = p.Restart() + require.NoError(t, err) + assert.True(t, p.IsOn()) + err = p.Interrupt() + require.NoError(t, err) + assert.False(t, p.IsOn()) + // Checking idempotence + err = p.Interrupt() + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + err = p.Execute() + require.NoError(t, err) + }) + } +} func TestExecute(t *testing.T) { currentDir, err := os.Getwd() require.NoError(t, err) From 3e8e599fbcf52a5de8023623295a6ee281d91d35 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Fri, 30 May 2025 17:12:10 +0100 Subject: [PATCH 06/14] lint --- utils/subprocess/command_wrapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/subprocess/command_wrapper.go b/utils/subprocess/command_wrapper.go index a5afd4d9ff..a0949edb35 100644 --- a/utils/subprocess/command_wrapper.go +++ b/utils/subprocess/command_wrapper.go @@ -63,7 +63,7 @@ type interruptType int const ( kill interruptType = 9 - term = 15 + term interruptType = 15 ) func (c *cmdWrapper) interrupt(interrupt interruptType) error { From 29ebb7d473cc160a7778f1c8653a0569955c6e96 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Mon, 2 Jun 2025 11:39:05 +0100 Subject: [PATCH 07/14] add stopping of command --- changes/20250602113846.feature | 1 + utils/subprocess/supervisor/interface.go | 1 + utils/subprocess/supervisor/supervisor.go | 29 ++++++++++++++++++++--- 3 files changed, 28 insertions(+), 3 deletions(-) create mode 100644 changes/20250602113846.feature diff --git a/changes/20250602113846.feature b/changes/20250602113846.feature new file mode 100644 index 0000000000..b0183da485 --- /dev/null +++ b/changes/20250602113846.feature @@ -0,0 +1 @@ +:sparkles: `supervisor` Add support for stopping supervised command after starting diff --git a/utils/subprocess/supervisor/interface.go b/utils/subprocess/supervisor/interface.go index 48a4d24fb6..2c7264bc70 100644 --- a/utils/subprocess/supervisor/interface.go +++ b/utils/subprocess/supervisor/interface.go @@ -9,4 +9,5 @@ import "context" type ISupervisor interface { // Run will run the supervisor and execute any of the command hooks. If it receives a halting error or the context is cancelled then it will exit Run(ctx context.Context) error + Stop() error } diff --git a/utils/subprocess/supervisor/supervisor.go b/utils/subprocess/supervisor/supervisor.go index 5570c71363..8b8e01df32 100644 --- a/utils/subprocess/supervisor/supervisor.go +++ b/utils/subprocess/supervisor/supervisor.go @@ -24,6 +24,7 @@ type Supervisor struct { haltingErrors []error restartDelay time.Duration count uint + cmd *subprocess.Subprocess } type SupervisorOption func(*Supervisor) @@ -117,17 +118,32 @@ func (s *Supervisor) Run(ctx context.Context) (err error) { } g, _ := errgroup.WithContext(ctx) - cmd, err := s.newCommand(ctx) + s.cmd, err = s.newCommand(ctx) if err != nil { if commonerrors.Any(err, commonerrors.ErrCancelled, commonerrors.ErrTimeout) { return err } return fmt.Errorf("%w: error occurred when creating new command: %v", commonerrors.ErrUnexpected, err.Error()) } - if cmd == nil { + if s.cmd == nil { return fmt.Errorf("%w: command was undefined", commonerrors.ErrUndefined) } - g.Go(cmd.Execute) + + g.Go(s.cmd.Start) + for !s.cmd.IsOn() { // wait for job to start + if err := parallelisation.DetermineContextError(ctx); err != nil { + break + } + parallelisation.SleepWithContext(ctx, 200*time.Millisecond) + } + for s.cmd.IsOn() { // wait for job to end + if err := parallelisation.DetermineContextError(ctx); err != nil { + break + } + parallelisation.SleepWithContext(ctx, 200*time.Millisecond) + } + + fmt.Println(1234456) if s.postStart != nil { err = s.postStart(ctx) @@ -167,3 +183,10 @@ func (s *Supervisor) Run(ctx context.Context) (err error) { return } + +func (s *Supervisor) Stop() error { + if s.cmd == nil { + return nil + } + return s.cmd.Interrupt() +} From 3f7c12bacd4c012b40b9ef1113c0990bb9b73d7c Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Mon, 2 Jun 2025 15:20:19 +0100 Subject: [PATCH 08/14] undo changes --- changes/20250602113846.feature | 1 - utils/subprocess/supervisor/interface.go | 1 - utils/subprocess/supervisor/supervisor.go | 23 +---------------------- 3 files changed, 1 insertion(+), 24 deletions(-) delete mode 100644 changes/20250602113846.feature diff --git a/changes/20250602113846.feature b/changes/20250602113846.feature deleted file mode 100644 index b0183da485..0000000000 --- a/changes/20250602113846.feature +++ /dev/null @@ -1 +0,0 @@ -:sparkles: `supervisor` Add support for stopping supervised command after starting diff --git a/utils/subprocess/supervisor/interface.go b/utils/subprocess/supervisor/interface.go index 2c7264bc70..48a4d24fb6 100644 --- a/utils/subprocess/supervisor/interface.go +++ b/utils/subprocess/supervisor/interface.go @@ -9,5 +9,4 @@ import "context" type ISupervisor interface { // Run will run the supervisor and execute any of the command hooks. If it receives a halting error or the context is cancelled then it will exit Run(ctx context.Context) error - Stop() error } diff --git a/utils/subprocess/supervisor/supervisor.go b/utils/subprocess/supervisor/supervisor.go index 8b8e01df32..da67a7c1a4 100644 --- a/utils/subprocess/supervisor/supervisor.go +++ b/utils/subprocess/supervisor/supervisor.go @@ -129,21 +129,7 @@ func (s *Supervisor) Run(ctx context.Context) (err error) { return fmt.Errorf("%w: command was undefined", commonerrors.ErrUndefined) } - g.Go(s.cmd.Start) - for !s.cmd.IsOn() { // wait for job to start - if err := parallelisation.DetermineContextError(ctx); err != nil { - break - } - parallelisation.SleepWithContext(ctx, 200*time.Millisecond) - } - for s.cmd.IsOn() { // wait for job to end - if err := parallelisation.DetermineContextError(ctx); err != nil { - break - } - parallelisation.SleepWithContext(ctx, 200*time.Millisecond) - } - - fmt.Println(1234456) + g.Go(s.cmd.Execute) if s.postStart != nil { err = s.postStart(ctx) @@ -183,10 +169,3 @@ func (s *Supervisor) Run(ctx context.Context) (err error) { return } - -func (s *Supervisor) Stop() error { - if s.cmd == nil { - return nil - } - return s.cmd.Interrupt() -} From 803c11406285d368e438ccda9aa1cd1aed32fd5e Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Mon, 2 Jun 2025 15:21:33 +0100 Subject: [PATCH 09/14] remvoe unused newsfiles --- changes/20250529101702.bugfix | 1 - changes/20250529175329.feature | 1 - changes/20250530105438.feature | 1 - 3 files changed, 3 deletions(-) delete mode 100644 changes/20250529101702.bugfix delete mode 100644 changes/20250529175329.feature delete mode 100644 changes/20250530105438.feature diff --git a/changes/20250529101702.bugfix b/changes/20250529101702.bugfix deleted file mode 100644 index 4e0f91220c..0000000000 --- a/changes/20250529101702.bugfix +++ /dev/null @@ -1 +0,0 @@ -Dependency upgrade: logr-1.4.3 diff --git a/changes/20250529175329.feature b/changes/20250529175329.feature deleted file mode 100644 index 245f484645..0000000000 --- a/changes/20250529175329.feature +++ /dev/null @@ -1 +0,0 @@ -:sparkles: `logs` Add support for a FIFO logger that discards logs ater reading them diff --git a/changes/20250530105438.feature b/changes/20250530105438.feature deleted file mode 100644 index 299cd9d998..0000000000 --- a/changes/20250530105438.feature +++ /dev/null @@ -1 +0,0 @@ -:sparkles: `logs` Add support for reading line by line in FIFO reader From 43a7b5eeb177e2fc918091f494de38c5c1aa6b8d Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Mon, 2 Jun 2025 16:10:10 +0100 Subject: [PATCH 10/14] FIFO logger should use print not println --- utils/logs/fifo_logger.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/utils/logs/fifo_logger.go b/utils/logs/fifo_logger.go index 90ff186ae2..aa35fb01d6 100644 --- a/utils/logs/fifo_logger.go +++ b/utils/logs/fifo_logger.go @@ -111,6 +111,14 @@ type FIFOLoggers struct { LogWriter FIFOWriter } +func (l *FIFOLoggers) Log(output ...interface{}) { + l.Output.Print(output...) +} + +func (l *FIFOLoggers) LogError(err ...interface{}) { + l.Error.Print(err...) +} + func (l *FIFOLoggers) Check() error { return l.GenericLoggers.Check() } From 1e9d02b5e2e3a306ead3804db42cd57a8dcb84e6 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Mon, 2 Jun 2025 17:48:02 +0100 Subject: [PATCH 11/14] diode thing --- utils/logs/fifo_logger.go | 91 ++++++++++++++++++++++++---------- utils/logs/fifo_logger_test.go | 10 ++-- 2 files changed, 70 insertions(+), 31 deletions(-) diff --git a/utils/logs/fifo_logger.go b/utils/logs/fifo_logger.go index f42699858b..22202b1b47 100644 --- a/utils/logs/fifo_logger.go +++ b/utils/logs/fifo_logger.go @@ -6,19 +6,25 @@ import ( "fmt" "io" "iter" - "log" "sync" "time" + "github.com/ARM-software/golang-utils/utils/commonerrors" "github.com/ARM-software/golang-utils/utils/parallelisation" ) -var _ Loggers = &FIFOLoggers{} - type FIFOWriter struct { io.WriteCloser - mu sync.RWMutex - Logs bytes.Buffer + source string + mu sync.RWMutex + Logs bytes.Buffer +} + +func (w *FIFOWriter) SetSource(source string) (err error) { + w.mu.RLock() + defer w.mu.RUnlock() + w.source = source + return } func (w *FIFOWriter) Write(p []byte) (n int, err error) { @@ -109,20 +115,47 @@ func (w *FIFOWriter) ReadLines(ctx context.Context) iter.Seq[string] { } type FIFOLoggers struct { - GenericLoggers - LogWriter FIFOWriter + Output WriterWithSource + Error WriterWithSource + LogWriter *FIFOWriter + newline bool +} + +func (l *FIFOLoggers) SetLogSource(source string) error { + err := l.Check() + if err != nil { + return err + } + return l.Output.SetSource(source) +} + +func (l *FIFOLoggers) SetLoggerSource(source string) error { + err := l.Check() + if err != nil { + return err + } + return l.Output.SetSource(source) } func (l *FIFOLoggers) Log(output ...interface{}) { - l.Output.Print(output...) + _, _ = l.Output.Write([]byte(fmt.Sprint(output...))) + if l.newline { + _, _ = l.Output.Write([]byte("\n")) + } } func (l *FIFOLoggers) LogError(err ...interface{}) { - l.Error.Print(err...) + _, _ = l.Error.Write([]byte(fmt.Sprint(err...))) + if l.newline { + _, _ = l.Output.Write([]byte("\n")) + } } func (l *FIFOLoggers) Check() error { - return l.GenericLoggers.Check() + if l.Error == nil || l.Output == nil { + return commonerrors.ErrNoLogger + } + return nil } func (l *FIFOLoggers) Read() string { @@ -135,24 +168,24 @@ func (l *FIFOLoggers) ReadLines(ctx context.Context) iter.Seq[string] { // Close closes the logger func (l *FIFOLoggers) Close() (err error) { - err = l.LogWriter.Close() - if err != nil { - return - } - err = l.GenericLoggers.Close() - return + return l.LogWriter.Close() } // NewFIFOLogger creates a logger to a bytes buffer. // All messages (whether they are output or error) are merged together. // Once messages have been accessed they are gone -func NewFIFOLogger(loggerSource string) (loggers *FIFOLoggers, err error) { - loggers = &FIFOLoggers{ - LogWriter: FIFOWriter{}, +func NewFIFOLogger() (loggers *FIFOLoggers, err error) { + l, err := NewNoopLogger("Noop Logger") + if err != nil { + return } - loggers.GenericLoggers = GenericLoggers{ - Output: log.New(&loggers.LogWriter, fmt.Sprintf("[%v] Output: ", loggerSource), log.LstdFlags), - Error: log.New(&loggers.LogWriter, fmt.Sprintf("[%v] Error: ", loggerSource), log.LstdFlags), + logWriter := &FIFOWriter{} + + loggers = &FIFOLoggers{ + LogWriter: logWriter, + newline: true, + Output: NewDiodeWriterForSlowWriter(logWriter, 10000, 50*time.Millisecond, l), + Error: NewDiodeWriterForSlowWriter(logWriter, 10000, 50*time.Millisecond, l), } return } @@ -161,12 +194,16 @@ func NewFIFOLogger(loggerSource string) (loggers *FIFOLoggers, err error) { // All messages (whether they are output or error) are merged together. // Once messages have been accessed they are gone func NewPlainFIFOLogger() (loggers *FIFOLoggers, err error) { - loggers = &FIFOLoggers{ - LogWriter: FIFOWriter{}, + l, err := NewNoopLogger("Noop Logger") + if err != nil { + return } - loggers.GenericLoggers = GenericLoggers{ - Output: log.New(&loggers.LogWriter, "", 0), - Error: log.New(&loggers.LogWriter, "", 0), + + logWriter := &FIFOWriter{} + loggers = &FIFOLoggers{ + LogWriter: logWriter, + Output: NewDiodeWriterForSlowWriter(logWriter, 10000, 50*time.Millisecond, l), + Error: NewDiodeWriterForSlowWriter(logWriter, 10000, 50*time.Millisecond, l), } return } diff --git a/utils/logs/fifo_logger_test.go b/utils/logs/fifo_logger_test.go index 8aa757f98c..cde2cddc7c 100644 --- a/utils/logs/fifo_logger_test.go +++ b/utils/logs/fifo_logger_test.go @@ -12,13 +12,14 @@ import ( ) func TestFIFOLoggerRead(t *testing.T) { - loggers, err := NewFIFOLogger("Test") + loggers, err := NewFIFOLogger() require.NoError(t, err) testLog(t, loggers) loggers.LogError("Test err") loggers.Log("Test1") contents := loggers.Read() require.NotEmpty(t, contents) + time.Sleep(200 * time.Millisecond) // account for slow polling require.True(t, strings.Contains(contents, "Test err")) require.True(t, strings.Contains(contents, "Test1")) loggers.Log("Test2") @@ -37,6 +38,7 @@ func TestPlainFIFOLoggerRead(t *testing.T) { testLog(t, loggers) loggers.LogError("Test err") loggers.Log("Test1") + time.Sleep(200 * time.Millisecond) // account for slow polling contents := loggers.Read() require.NotEmpty(t, contents) require.True(t, strings.Contains(contents, "Test err")) @@ -52,7 +54,7 @@ func TestPlainFIFOLoggerRead(t *testing.T) { } func TestFIFOLoggerReadlines(t *testing.T) { - loggers, err := NewFIFOLogger("Test") + loggers, err := NewFIFOLogger() require.NoError(t, err) testLog(t, loggers) loggers.LogError("Test err\n") @@ -85,7 +87,7 @@ func TestPlainFIFOLoggerReadlines(t *testing.T) { loggers.Log("Test1") loggers.Log("\n\n\n") time.Sleep(200 * time.Millisecond) - loggers.Log("Test2") + loggers.Log("Test2\n") }() count := 0 @@ -99,6 +101,6 @@ func TestPlainFIFOLoggerReadlines(t *testing.T) { count++ } - assert.Equal(t, "Test err\nTest1\nTest2\n", b.String()) + assert.Equal(t, "Test errTest1\nTest2\n", b.String()) assert.Equal(t, 3, count) } From bb8f2c68f48bf38a75eeda4307164d7f41263f5f Mon Sep 17 00:00:00 2001 From: Adrien CABARBAYE Date: Wed, 4 Jun 2025 15:09:38 +0100 Subject: [PATCH 12/14] Update utils/subprocess/executor.go --- utils/subprocess/executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/subprocess/executor.go b/utils/subprocess/executor.go index 863c521814..35770b65ea 100644 --- a/utils/subprocess/executor.go +++ b/utils/subprocess/executor.go @@ -268,7 +268,7 @@ func (s *Subprocess) Stop() (err error) { return s.stop(true) } -// Interrupt stops the process via the TERM signal. +// Interrupt terminates the process // This method should be used in combination with `Start`. // This method is idempotent func (s *Subprocess) Interrupt() (err error) { From 03ca8a3a436ccc36400c663b6918049813f97e5d Mon Sep 17 00:00:00 2001 From: Adrien CABARBAYE Date: Wed, 4 Jun 2025 20:30:44 +0100 Subject: [PATCH 13/14] Fixes for the FIFO logger --- .secrets.baseline | 2 +- changes/20250604202816.feature | 1 + utils/diodes/README | 2 + utils/diodes/many_to_one.go | 130 ++++++++++++++++ utils/diodes/one_to_one.go | 129 ++++++++++++++++ utils/diodes/poller.go | 80 ++++++++++ utils/diodes/waiter.go | 71 +++++++++ utils/logs/fifo_logger.go | 270 +++++++++++++++++---------------- utils/logs/fifo_logger_test.go | 206 +++++++++++++++---------- 9 files changed, 676 insertions(+), 215 deletions(-) create mode 100644 changes/20250604202816.feature create mode 100644 utils/diodes/README create mode 100644 utils/diodes/many_to_one.go create mode 100644 utils/diodes/one_to_one.go create mode 100644 utils/diodes/poller.go create mode 100644 utils/diodes/waiter.go diff --git a/.secrets.baseline b/.secrets.baseline index 466543e52b..5b4ecb110a 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -265,5 +265,5 @@ } ] }, - "generated_at": "2025-05-30T10:38:21Z" + "generated_at": "2025-06-04T19:30:24Z" } diff --git a/changes/20250604202816.feature b/changes/20250604202816.feature new file mode 100644 index 0000000000..b1d90080a2 --- /dev/null +++ b/changes/20250604202816.feature @@ -0,0 +1 @@ +:sparkles: Introducing [diodes] module which is a copy of [cloud foundary's](https://github.com/cloudfoundry/go-diodes) library. diff --git a/utils/diodes/README b/utils/diodes/README new file mode 100644 index 0000000000..ee897e9dcc --- /dev/null +++ b/utils/diodes/README @@ -0,0 +1,2 @@ +Vendoring [cloud foundary](https://github.com/cloudfoundry/go-diodes)) diode libraries to avoid importing test dependencies. + diff --git a/utils/diodes/many_to_one.go b/utils/diodes/many_to_one.go new file mode 100644 index 0000000000..3acc9f30f2 --- /dev/null +++ b/utils/diodes/many_to_one.go @@ -0,0 +1,130 @@ +package diodes + +import ( + "log" + "sync/atomic" + "unsafe" +) + +// ManyToOne diode is optimal for many writers (go-routines B-n) and a single +// reader (go-routine A). It is not thread safe for multiple readers. +type ManyToOne struct { + writeIndex uint64 + buffer []unsafe.Pointer + readIndex uint64 + alerter Alerter +} + +// NewManyToOne creates a new diode (ring buffer). The ManyToOne diode +// is optimzed for many writers (on go-routines B-n) and a single reader +// (on go-routine A). The alerter is invoked on the read's go-routine. It is +// called when it notices that the writer go-routine has passed it and wrote +// over data. A nil can be used to ignore alerts. +func NewManyToOne(size int, alerter Alerter) *ManyToOne { + if alerter == nil { + alerter = AlertFunc(func(int) {}) + } + + d := &ManyToOne{ + buffer: make([]unsafe.Pointer, size), + alerter: alerter, + } + + // Start write index at the value before 0 + // to allow the first write to use AddUint64 + // and still have a beginning index of 0 + d.writeIndex = ^d.writeIndex + return d +} + +// Set sets the data in the next slot of the ring buffer. +func (d *ManyToOne) Set(data GenericDataType) { + for { + writeIndex := atomic.AddUint64(&d.writeIndex, 1) + idx := writeIndex % uint64(len(d.buffer)) + old := atomic.LoadPointer(&d.buffer[idx]) + + if old != nil && + (*bucket)(old) != nil && + (*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) { + log.Println("Diode set collision: consider using a larger diode") + continue + } + + newBucket := &bucket{ + data: data, + seq: writeIndex, + } + + if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) { + log.Println("Diode set collision: consider using a larger diode") + continue + } + + return + } +} + +// TryNext will attempt to read from the next slot of the ring buffer. +// If there is not data available, it will return (nil, false). +func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) { + // Read a value from the ring buffer based on the readIndex. + idx := d.readIndex % uint64(len(d.buffer)) + result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil)) + + // When the result is nil that means the writer has not had the + // opportunity to write a value into the diode. This value must be ignored + // and the read head must not increment. + if result == nil { + return nil, false + } + + // When the seq value is less than the current read index that means a + // value was read from idx that was previously written but has since has + // been dropped. This value must be ignored and the read head must not + // increment. + // + // The simulation for this scenario assumes the fast forward occurred as + // detailed below. + // + // 5. The reader reads again getting seq 5. It then reads again expecting + // seq 6 but gets seq 2. This is a read of a stale value that was + // effectively "dropped" so the read fails and the read head stays put. + // `| 4 | 5 | 2 | 3 |` r: 7, w: 6 + // + if result.seq < d.readIndex { + return nil, false + } + + // When the seq value is greater than the current read index that means a + // value was read from idx that overwrote the value that was expected to + // be at this idx. This happens when the writer has lapped the reader. The + // reader needs to catch up to the writer so it moves its write head to + // the new seq, effectively dropping the messages that were not read in + // between the two values. + // + // Here is a simulation of this scenario: + // + // 1. Both the read and write heads start at 0. + // `| nil | nil | nil | nil |` r: 0, w: 0 + // 2. The writer fills the buffer. + // `| 0 | 1 | 2 | 3 |` r: 0, w: 4 + // 3. The writer laps the read head. + // `| 4 | 5 | 2 | 3 |` r: 0, w: 6 + // 4. The reader reads the first value, expecting a seq of 0 but reads 4, + // this forces the reader to fast forward to 5. + // `| 4 | 5 | 2 | 3 |` r: 5, w: 6 + // + if result.seq > d.readIndex { + dropped := result.seq - d.readIndex + d.readIndex = result.seq + d.alerter.Alert(int(dropped)) // nolint:gosec + } + + // Only increment read index if a regular read occurred (where seq was + // equal to readIndex) or a value was read that caused a fast forward + // (where seq was greater than readIndex). + // + d.readIndex++ + return result.data, true +} diff --git a/utils/diodes/one_to_one.go b/utils/diodes/one_to_one.go new file mode 100644 index 0000000000..9b628b5a2e --- /dev/null +++ b/utils/diodes/one_to_one.go @@ -0,0 +1,129 @@ +package diodes + +import ( + "sync/atomic" + "unsafe" +) + +// GenericDataType is the data type the diodes operate on. +type GenericDataType unsafe.Pointer + +// Alerter is used to report how many values were overwritten since the +// last write. +type Alerter interface { + Alert(missed int) +} + +// AlertFunc type is an adapter to allow the use of ordinary functions as +// Alert handlers. +type AlertFunc func(missed int) + +// Alert calls f(missed) +func (f AlertFunc) Alert(missed int) { + f(missed) +} + +type bucket struct { + data GenericDataType + seq uint64 // seq is the recorded write index at the time of writing +} + +// OneToOne diode is meant to be used by a single reader and a single writer. +// It is not thread safe if used otherwise. +type OneToOne struct { + buffer []unsafe.Pointer + writeIndex uint64 + readIndex uint64 + alerter Alerter +} + +// NewOneToOne creates a new diode is meant to be used by a single reader and +// a single writer. The alerter is invoked on the read's go-routine. It is +// called when it notices that the writer go-routine has passed it and wrote +// over data. A nil can be used to ignore alerts. +func NewOneToOne(size int, alerter Alerter) *OneToOne { + if alerter == nil { + alerter = AlertFunc(func(int) {}) + } + + return &OneToOne{ + buffer: make([]unsafe.Pointer, size), + alerter: alerter, + } +} + +// Set sets the data in the next slot of the ring buffer. +func (d *OneToOne) Set(data GenericDataType) { + idx := d.writeIndex % uint64(len(d.buffer)) + + newBucket := &bucket{ + data: data, + seq: d.writeIndex, + } + d.writeIndex++ + + atomic.StorePointer(&d.buffer[idx], unsafe.Pointer(newBucket)) +} + +// TryNext will attempt to read from the next slot of the ring buffer. +// If there is no data available, it will return (nil, false). +func (d *OneToOne) TryNext() (data GenericDataType, ok bool) { + // Read a value from the ring buffer based on the readIndex. + idx := d.readIndex % uint64(len(d.buffer)) + result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil)) + + // When the result is nil that means the writer has not had the + // opportunity to write a value into the diode. This value must be ignored + // and the read head must not increment. + if result == nil { + return nil, false + } + + // When the seq value is less than the current read index that means a + // value was read from idx that was previously written but has since has + // been dropped. This value must be ignored and the read head must not + // increment. + // + // The simulation for this scenario assumes the fast forward occurred as + // detailed below. + // + // 5. The reader reads again getting seq 5. It then reads again expecting + // seq 6 but gets seq 2. This is a read of a stale value that was + // effectively "dropped" so the read fails and the read head stays put. + // `| 4 | 5 | 2 | 3 |` r: 7, w: 6 + // + if result.seq < d.readIndex { + return nil, false + } + + // When the seq value is greater than the current read index that means a + // value was read from idx that overwrote the value that was expected to + // be at this idx. This happens when the writer has lapped the reader. The + // reader needs to catch up to the writer so it moves its write head to + // the new seq, effectively dropping the messages that were not read in + // between the two values. + // + // Here is a simulation of this scenario: + // + // 1. Both the read and write heads start at 0. + // `| nil | nil | nil | nil |` r: 0, w: 0 + // 2. The writer fills the buffer. + // `| 0 | 1 | 2 | 3 |` r: 0, w: 4 + // 3. The writer laps the read head. + // `| 4 | 5 | 2 | 3 |` r: 0, w: 6 + // 4. The reader reads the first value, expecting a seq of 0 but reads 4, + // this forces the reader to fast forward to 5. + // `| 4 | 5 | 2 | 3 |` r: 5, w: 6 + // + if result.seq > d.readIndex { + dropped := result.seq - d.readIndex + d.readIndex = result.seq + d.alerter.Alert(int(dropped)) // nolint:gosec + } + + // Only increment read index if a regular read occurred (where seq was + // equal to readIndex) or a value was read that caused a fast forward + // (where seq was greater than readIndex). + d.readIndex++ + return result.data, true +} diff --git a/utils/diodes/poller.go b/utils/diodes/poller.go new file mode 100644 index 0000000000..b8a2880e8c --- /dev/null +++ b/utils/diodes/poller.go @@ -0,0 +1,80 @@ +package diodes + +import ( + "context" + "time" +) + +// Diode is any implementation of a diode. +type Diode interface { + Set(GenericDataType) + TryNext() (GenericDataType, bool) +} + +// Poller will poll a diode until a value is available. +type Poller struct { + Diode + interval time.Duration + ctx context.Context +} + +// PollerConfigOption can be used to setup the poller. +type PollerConfigOption func(*Poller) + +// WithPollingInterval sets the interval at which the diode is queried +// for new data. The default is 10ms. +func WithPollingInterval(interval time.Duration) PollerConfigOption { + return PollerConfigOption(func(c *Poller) { + c.interval = interval + }) +} + +// WithPollingContext sets the context to cancel any retrieval (Next()). It +// will not change any results for adding data (Set()). Default is +// context.Background(). +func WithPollingContext(ctx context.Context) PollerConfigOption { + return PollerConfigOption(func(c *Poller) { + c.ctx = ctx + }) +} + +// NewPoller returns a new Poller that wraps the given diode. +func NewPoller(d Diode, opts ...PollerConfigOption) *Poller { + p := &Poller{ + Diode: d, + interval: 10 * time.Millisecond, + ctx: context.Background(), + } + + for _, o := range opts { + o(p) + } + + return p +} + +// Next polls the diode until data is available or until the context is done. +// If the context is done, then nil will be returned. +func (p *Poller) Next() GenericDataType { + for { + data, ok := p.Diode.TryNext() // nolint:staticcheck + if !ok { + if p.isDone() { + return nil + } + + time.Sleep(p.interval) + continue + } + return data + } +} + +func (p *Poller) isDone() bool { + select { + case <-p.ctx.Done(): + return true + default: + return false + } +} diff --git a/utils/diodes/waiter.go b/utils/diodes/waiter.go new file mode 100644 index 0000000000..fc203108da --- /dev/null +++ b/utils/diodes/waiter.go @@ -0,0 +1,71 @@ +package diodes + +import ( + "context" +) + +// Waiter will use a channel signal to alert the reader to when data is +// available. +type Waiter struct { + Diode + c chan struct{} + ctx context.Context +} + +// WaiterConfigOption can be used to setup the waiter. +type WaiterConfigOption func(*Waiter) + +// WithWaiterContext sets the context to cancel any retrieval (Next()). It +// will not change any results for adding data (Set()). Default is +// context.Background(). +func WithWaiterContext(ctx context.Context) WaiterConfigOption { + return WaiterConfigOption(func(c *Waiter) { + c.ctx = ctx + }) +} + +// NewWaiter returns a new Waiter that wraps the given diode. +func NewWaiter(d Diode, opts ...WaiterConfigOption) *Waiter { + w := new(Waiter) + w.Diode = d + w.c = make(chan struct{}, 1) + w.ctx = context.Background() + + for _, opt := range opts { + opt(w) + } + + return w +} + +// Set invokes the wrapped diode's Set with the given data and uses broadcast +// to wake up any readers. +func (w *Waiter) Set(data GenericDataType) { + w.Diode.Set(data) + w.broadcast() +} + +// broadcast sends to the channel if it can. +func (w *Waiter) broadcast() { + select { + case w.c <- struct{}{}: + default: + } +} + +// Next returns the next data point on the wrapped diode. If there is no new +// data, it will wait for Set to be called or the context to be done. If the +// context is done, then nil will be returned. +func (w *Waiter) Next() GenericDataType { + for { + data, ok := w.Diode.TryNext() // nolint:staticcheck + if ok { + return data + } + select { + case <-w.ctx.Done(): + return nil + case <-w.c: + } + } +} diff --git a/utils/logs/fifo_logger.go b/utils/logs/fifo_logger.go index 22202b1b47..01329568e3 100644 --- a/utils/logs/fifo_logger.go +++ b/utils/logs/fifo_logger.go @@ -4,189 +4,191 @@ import ( "bytes" "context" "fmt" - "io" "iter" - "sync" + "strings" "time" "github.com/ARM-software/golang-utils/utils/commonerrors" + "github.com/ARM-software/golang-utils/utils/diodes" "github.com/ARM-software/golang-utils/utils/parallelisation" ) -type FIFOWriter struct { - io.WriteCloser - source string - mu sync.RWMutex - Logs bytes.Buffer -} +const ( + newLine = '\n' + bufferSize = 10000 +) -func (w *FIFOWriter) SetSource(source string) (err error) { - w.mu.RLock() - defer w.mu.RUnlock() - w.source = source - return +type loggerAlerter struct { + log Loggers } -func (w *FIFOWriter) Write(p []byte) (n int, err error) { - w.mu.RLock() - defer w.mu.RUnlock() - w.Logs.Write(p) - return +func (l *loggerAlerter) Alert(missed int) { + if l.log != nil { + l.log.LogError(fmt.Sprintf("Logger dropped %d messages", missed)) + } } -func (w *FIFOWriter) Close() (err error) { - w.mu.Lock() - defer w.mu.Unlock() - w.Logs.Reset() - return +func newLoggerAlerter(logs Loggers) diodes.Alerter { + return &loggerAlerter{log: logs} } -func (w *FIFOWriter) Read() string { - w.mu.Lock() - defer w.mu.Unlock() - n := w.Logs.Len() - if n == 0 { - return "" +func newFIFODiode(ctx context.Context, ringBufferSize int, pollingPeriod time.Duration, droppedMessagesLogger Loggers) *fifoDiode { + dCtx, cancel := context.WithCancel(ctx) + cancelStore := parallelisation.NewCancelFunctionsStore() + cancelStore.RegisterCancelFunction(cancel) + return &fifoDiode{ + d: diodes.NewPoller(diodes.NewManyToOne(ringBufferSize, newLoggerAlerter(droppedMessagesLogger)), diodes.WithPollingInterval(pollingPeriod), diodes.WithPollingContext(dCtx)), + cancelStore: cancelStore, } - bytes := w.Logs.Next(n) - return string(bytes) } -func (w *FIFOWriter) ReadLines(ctx context.Context) iter.Seq[string] { +type fifoDiode struct { + d *diodes.Poller + cancelStore *parallelisation.CancelFunctionStore +} + +func (d *fifoDiode) Set(data []byte) { + d.d.Set(diodes.GenericDataType(&data)) +} + +func (d *fifoDiode) Close() error { + d.cancelStore.Cancel() + return nil +} + +// LineIterator returns an iterator over lines. It should only be called within the context of the same goroutine. +func (d *fifoDiode) LineIterator(ctx context.Context) iter.Seq[string] { return func(yield func(string) bool) { - var partial []byte - for { - if err := parallelisation.DetermineContextError(ctx); err != nil { + err := IterateOverLines(ctx, func(fCtx context.Context) (b []byte, err error) { + err = parallelisation.DetermineContextError(fCtx) + if err != nil { return } - - buf := func() []byte { - w.mu.Lock() - defer w.mu.Unlock() - defer w.Logs.Reset() - tmp := w.Logs.Bytes() - buf := make([]byte, len(tmp)) - copy(buf, tmp) - return buf - }() - - if len(buf) == 0 { - if err := parallelisation.DetermineContextError(ctx); err != nil { - if len(partial) > 0 { - yield(string(partial)) - } - return - } - - parallelisation.SleepWithContext(ctx, 50*time.Millisecond) - continue + data, has := d.d.TryNext() + if has { + b = *(*[]byte)(data) } + return + }, yield) + if err != nil { + return + } + } +} - if len(partial) > 0 { - buf = append(partial, buf...) - partial = nil - } +func cleanseLine(line string) string { + return strings.TrimSuffix(strings.ReplaceAll(line, "\r", ""), string(newLine)) +} - for { - idx := bytes.IndexByte(buf, '\n') - if idx < 0 { - break - } - line := buf[:idx] - - if len(line) > 0 && line[len(line)-1] == '\r' { - line = line[:len(line)-1] - } - buf = buf[idx+1:] - if len(line) == 0 { - continue - } - - if !yield(string(line)) { - return - } +func iterateOverLines(ctx context.Context, b *bytes.Buffer, yield func(string) bool) (err error) { + for { + subErr := parallelisation.DetermineContextError(ctx) + if subErr != nil { + err = subErr + return + } + line, foundErr := b.ReadString(newLine) + if foundErr == nil { + if !yield(line) { + err = commonerrors.ErrEOF + return } - - if len(buf) > 0 { - partial = buf + } else { + b.Reset() + _, subErr = b.Write([]byte(line)) + if subErr != nil { + err = subErr + return } + return + } + } +} + +func IterateOverLines(ctx context.Context, fetchNext func(fCtx context.Context) ([]byte, error), yield func(string) bool) (err error) { + extendedYield := func(s string) bool { + return yield(cleanseLine(s)) + } + b := bytes.NewBuffer(make([]byte, 0, 512)) + for { + subErr := parallelisation.DetermineContextError(ctx) + if subErr != nil { + err = subErr + return + } + nextBuf, subErr := fetchNext(ctx) + if subErr != nil { + err = subErr + return + } + if len(nextBuf) == 0 { + parallelisation.SleepWithContext(ctx, 10*time.Millisecond) + continue + } + _, subErr = b.Write(nextBuf) + if subErr != nil { + err = subErr + return + } + subErr = iterateOverLines(ctx, b, extendedYield) + if subErr != nil { + err = subErr + return } } } type FIFOLoggers struct { - Output WriterWithSource - Error WriterWithSource - LogWriter *FIFOWriter - newline bool + d *fifoDiode + newline bool } -func (l *FIFOLoggers) SetLogSource(source string) error { - err := l.Check() - if err != nil { - return err - } - return l.Output.SetSource(source) +func (l *FIFOLoggers) SetLogSource(_ string) error { + return nil } -func (l *FIFOLoggers) SetLoggerSource(source string) error { - err := l.Check() - if err != nil { - return err - } - return l.Output.SetSource(source) +func (l *FIFOLoggers) SetLoggerSource(_ string) error { + return nil } -func (l *FIFOLoggers) Log(output ...interface{}) { - _, _ = l.Output.Write([]byte(fmt.Sprint(output...))) - if l.newline { - _, _ = l.Output.Write([]byte("\n")) - } +func (l *FIFOLoggers) Log(output ...any) { + l.log(output...) } -func (l *FIFOLoggers) LogError(err ...interface{}) { - _, _ = l.Error.Write([]byte(fmt.Sprint(err...))) +func (l *FIFOLoggers) LogError(err ...any) { + l.log(err...) +} + +func (l *FIFOLoggers) log(args ...any) { + b := bytes.NewBufferString(fmt.Sprint(args...)) if l.newline { - _, _ = l.Output.Write([]byte("\n")) + _, _ = b.Write([]byte{newLine}) } + l.d.Set(b.Bytes()) } func (l *FIFOLoggers) Check() error { - if l.Error == nil || l.Output == nil { - return commonerrors.ErrNoLogger + if l.d == nil { + return commonerrors.UndefinedVariable("FIFO diode") } return nil } -func (l *FIFOLoggers) Read() string { - return l.LogWriter.Read() -} - -func (l *FIFOLoggers) ReadLines(ctx context.Context) iter.Seq[string] { - return l.LogWriter.ReadLines(ctx) +// LineIterator returns an iterator over lines. It should only be called within the context of the same goroutine. +func (l *FIFOLoggers) LineIterator(ctx context.Context) iter.Seq[string] { + return l.d.LineIterator(ctx) } // Close closes the logger func (l *FIFOLoggers) Close() (err error) { - return l.LogWriter.Close() + return l.d.Close() } // NewFIFOLogger creates a logger to a bytes buffer. // All messages (whether they are output or error) are merged together. // Once messages have been accessed they are gone func NewFIFOLogger() (loggers *FIFOLoggers, err error) { - l, err := NewNoopLogger("Noop Logger") - if err != nil { - return - } - logWriter := &FIFOWriter{} - - loggers = &FIFOLoggers{ - LogWriter: logWriter, - newline: true, - Output: NewDiodeWriterForSlowWriter(logWriter, 10000, 50*time.Millisecond, l), - Error: NewDiodeWriterForSlowWriter(logWriter, 10000, 50*time.Millisecond, l), - } + loggers, err = newDefaultFIFOLogger(true) return } @@ -194,16 +196,22 @@ func NewFIFOLogger() (loggers *FIFOLoggers, err error) { // All messages (whether they are output or error) are merged together. // Once messages have been accessed they are gone func NewPlainFIFOLogger() (loggers *FIFOLoggers, err error) { - l, err := NewNoopLogger("Noop Logger") + loggers, err = newDefaultFIFOLogger(false) + return +} + +func newDefaultFIFOLogger(addNewLine bool) (loggers *FIFOLoggers, err error) { + l, err := NewNoopLogger("FIFO") if err != nil { return } + return NewFIFOLoggerWithBuffer(addNewLine, bufferSize, 50*time.Millisecond, l) +} - logWriter := &FIFOWriter{} +func NewFIFOLoggerWithBuffer(addNewLine bool, ringBufferSize int, pollingPeriod time.Duration, droppedMessageLogger Loggers) (loggers *FIFOLoggers, err error) { loggers = &FIFOLoggers{ - LogWriter: logWriter, - Output: NewDiodeWriterForSlowWriter(logWriter, 10000, 50*time.Millisecond, l), - Error: NewDiodeWriterForSlowWriter(logWriter, 10000, 50*time.Millisecond, l), + d: newFIFODiode(context.Background(), ringBufferSize, pollingPeriod, droppedMessageLogger), + newline: addNewLine, } return } diff --git a/utils/logs/fifo_logger_test.go b/utils/logs/fifo_logger_test.go index cde2cddc7c..ece2dba43b 100644 --- a/utils/logs/fifo_logger_test.go +++ b/utils/logs/fifo_logger_test.go @@ -1,106 +1,146 @@ package logs import ( + "bytes" "context" - "regexp" + "fmt" + "io" "strings" "testing" "time" + "github.com/go-faker/faker/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/ARM-software/golang-utils/utils/commonerrors" + "github.com/ARM-software/golang-utils/utils/commonerrors/errortest" + "github.com/ARM-software/golang-utils/utils/parallelisation" ) -func TestFIFOLoggerRead(t *testing.T) { - loggers, err := NewFIFOLogger() - require.NoError(t, err) - testLog(t, loggers) - loggers.LogError("Test err") - loggers.Log("Test1") - contents := loggers.Read() - require.NotEmpty(t, contents) - time.Sleep(200 * time.Millisecond) // account for slow polling - require.True(t, strings.Contains(contents, "Test err")) - require.True(t, strings.Contains(contents, "Test1")) - loggers.Log("Test2") - contents = loggers.Read() - require.NotEmpty(t, contents) - require.False(t, strings.Contains(contents, "Test err")) - require.False(t, strings.Contains(contents, "Test1")) - require.True(t, strings.Contains(contents, "Test2")) - contents = loggers.Read() - require.Empty(t, contents) -} +func TestFIFOLoggerLineIterator(t *testing.T) { + t.Run("logger tests", func(t *testing.T) { + loggers, err := NewFIFOLogger() + require.NoError(t, err) + defer func() { _ = loggers.Close() }() + testLog(t, loggers) + }) + t.Run("read lines", func(t *testing.T) { + loggers, err := NewFIFOLogger() + require.NoError(t, err) + defer func() { _ = loggers.Close() }() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + loggers.LogError("Test\r err\n") + loggers.Log("\rTest1\r") + count := 0 -func TestPlainFIFOLoggerRead(t *testing.T) { - loggers, err := NewPlainFIFOLogger() - require.NoError(t, err) - testLog(t, loggers) - loggers.LogError("Test err") - loggers.Log("Test1") - time.Sleep(200 * time.Millisecond) // account for slow polling - contents := loggers.Read() - require.NotEmpty(t, contents) - require.True(t, strings.Contains(contents, "Test err")) - require.True(t, strings.Contains(contents, "Test1")) - loggers.Log("Test2") - contents = loggers.Read() - require.NotEmpty(t, contents) - require.False(t, strings.Contains(contents, "Test err")) - require.False(t, strings.Contains(contents, "Test1")) - require.True(t, strings.Contains(contents, "Test2")) - contents = loggers.Read() - require.Empty(t, contents) -} + var b strings.Builder + for line := range loggers.LineIterator(ctx) { + _, err := b.WriteString(line + "\n") + require.NoError(t, err) + count++ + } -func TestFIFOLoggerReadlines(t *testing.T) { - loggers, err := NewFIFOLogger() - require.NoError(t, err) - testLog(t, loggers) - loggers.LogError("Test err\n") - loggers.Log("Test1") - count := 0 - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() + assert.Equal(t, "Test err\n\nTest1\n", b.String()) + assert.Equal(t, 3, count) // log error added a line + }) +} - var b strings.Builder - for line := range loggers.ReadLines(ctx) { - _, err := b.WriteString(line + "\n") +func TestPlainFIFOLoggerLineIterator(t *testing.T) { + t.Run("logger tests", func(t *testing.T) { + loggers, err := NewPlainFIFOLogger() require.NoError(t, err) - count++ - } - - assert.Regexp(t, regexp.MustCompile(`\[Test\] Error: .* .* Test err\n\[Test\] Output: .* .* Test1\n`), b.String()) - assert.Equal(t, 2, count) -} + defer func() { _ = loggers.Close() }() + testLog(t, loggers) + }) + t.Run("read lines", func(t *testing.T) { + loggers, err := NewPlainFIFOLogger() + require.NoError(t, err) + defer func() { _ = loggers.Close() }() + go func() { + time.Sleep(500 * time.Millisecond) + loggers.LogError("Test err") + loggers.Log("") + time.Sleep(100 * time.Millisecond) + loggers.Log("Test1") + loggers.Log("\n\n\n") + time.Sleep(200 * time.Millisecond) + loggers.Log("Test2\n") + }() -func TestPlainFIFOLoggerReadlines(t *testing.T) { - loggers, err := NewPlainFIFOLogger() - require.NoError(t, err) - testLog(t, loggers) + count := 0 + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() - go func() { - time.Sleep(500 * time.Millisecond) - loggers.LogError("Test err") - loggers.Log("") - time.Sleep(100 * time.Millisecond) - loggers.Log("Test1") - loggers.Log("\n\n\n") - time.Sleep(200 * time.Millisecond) - loggers.Log("Test2\n") - }() + var b strings.Builder + for line := range loggers.LineIterator(ctx) { + _, err := b.WriteString(line + "\n") + require.NoError(t, err) + count++ + } - count := 0 - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() + assert.Equal(t, "Test errTest1\n\n\nTest2\n", b.String()) + assert.Equal(t, 4, count) + }) +} - var b strings.Builder - for line := range loggers.ReadLines(ctx) { - _, err := b.WriteString(line + "\n") - require.NoError(t, err) - count++ +func Test_iterateOverLines(t *testing.T) { + endIncompleteLine := faker.Word() + testLines := fmt.Sprintf("%v\n%v", strings.ReplaceAll(faker.Paragraph(), " ", "/r/n"), endIncompleteLine) + buf := bytes.NewBufferString(testLines) + numberOfLines := strings.Count(testLines, "\n") + lineCounter := 0 + yield := func(string) bool { + lineCounter++ + return true } + t.Run("cancelled", func(t *testing.T) { + cancelCtx, cancel := context.WithCancel(context.Background()) + cancel() + errortest.AssertError(t, iterateOverLines(cancelCtx, buf, yield), commonerrors.ErrCancelled) + assert.Zero(t, lineCounter) + }) + t.Run("success", func(t *testing.T) { + err := iterateOverLines(context.Background(), buf, yield) + require.NoError(t, err) + assert.Equal(t, numberOfLines, lineCounter) + assert.Equal(t, len(endIncompleteLine), buf.Len()) + line, err := buf.ReadString(newLine) + require.Error(t, err) + assert.Equal(t, io.EOF, err) + assert.Equal(t, endIncompleteLine, line) + }) +} - assert.Equal(t, "Test errTest1\nTest2\n", b.String()) - assert.Equal(t, 3, count) +func Test_IterateOverLines(t *testing.T) { + lastIncompleteLine := faker.Sentence() + overallLines := []string{fmt.Sprintf("%v\n%v", faker.Word(), faker.Word()), fmt.Sprintf("%v\n%v", strings.ReplaceAll(faker.Sentence(), " ", "\r"), faker.Name()), fmt.Sprintf("%v\n%v\n%v", faker.DomainName(), faker.IPv4(), lastIncompleteLine)} + expectedLines := strings.Split(strings.ReplaceAll(strings.TrimSuffix(strings.Join(overallLines, ""), "\n"+lastIncompleteLine), "\r", ""), "\n") + index := 0 + nextLine := func(fCtx context.Context) ([]byte, error) { + err := parallelisation.DetermineContextError(fCtx) + if err != nil { + return nil, err + } + if index >= len(overallLines) { + return nil, nil + } + b := []byte(overallLines[index]) + index++ + return b, nil + } + lineCounter := 0 + var readLines []string + yield := func(l string) bool { + lineCounter++ + readLines = append(readLines, l) + return true + } + cctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + err := IterateOverLines(cctx, nextLine, yield) + errortest.AssertError(t, err, commonerrors.ErrTimeout, commonerrors.ErrCancelled) + assert.Equal(t, 4, lineCounter) + assert.EqualValues(t, expectedLines, readLines) } From 3cce4a7754c64024335c1e5d7d5d9f65f2934586 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Thu, 5 Jun 2025 12:43:32 +0100 Subject: [PATCH 14/14] Allow FIFO logger to be cancelled and to Wait for command completion --- utils/diodes/poller.go | 4 ++-- utils/logs/fifo_logger.go | 5 +++++ utils/subprocess/executor.go | 8 ++++++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/utils/diodes/poller.go b/utils/diodes/poller.go index b8a2880e8c..575837355c 100644 --- a/utils/diodes/poller.go +++ b/utils/diodes/poller.go @@ -59,7 +59,7 @@ func (p *Poller) Next() GenericDataType { for { data, ok := p.Diode.TryNext() // nolint:staticcheck if !ok { - if p.isDone() { + if p.IsDone() { return nil } @@ -70,7 +70,7 @@ func (p *Poller) Next() GenericDataType { } } -func (p *Poller) isDone() bool { +func (p *Poller) IsDone() bool { select { case <-p.ctx.Done(): return true diff --git a/utils/logs/fifo_logger.go b/utils/logs/fifo_logger.go index 01329568e3..a98e34ea45 100644 --- a/utils/logs/fifo_logger.go +++ b/utils/logs/fifo_logger.go @@ -67,6 +67,11 @@ func (d *fifoDiode) LineIterator(ctx context.Context) iter.Seq[string] { data, has := d.d.TryNext() if has { b = *(*[]byte)(data) + return + } + if d.d.IsDone() { + err = commonerrors.ErrEOF + return } return }, yield) diff --git a/utils/subprocess/executor.go b/utils/subprocess/executor.go index 35770b65ea..99d96a1872 100644 --- a/utils/subprocess/executor.go +++ b/utils/subprocess/executor.go @@ -192,6 +192,14 @@ func (s *Subprocess) IsOn() bool { return s.isRunning.Load() && s.processMonitoring.IsOn() } +// Wait waits for the command to exit and waits for any copying to +// stdin or copying from stdout or stderr to complete. +// +// The command must have been started by Start. +func (s *Subprocess) Wait() error { + return s.command.cmdWrapper.cmd.Wait() +} + // Start starts the process if not already started. // This method is idempotent. func (s *Subprocess) Start() (err error) {