-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathevents.go
111 lines (94 loc) · 2.51 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package main
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/cilium/hive"
"github.com/cilium/hive/cell"
"github.com/cilium/hive/job"
"github.com/cilium/hive/script"
"github.com/cilium/stream"
)
// eventsCell provides the ExampleEvents API for subscribing
// to a stream of example events.
var eventsCell = cell.Module(
"example-events",
"Provides a stream of example events",
cell.Provide(
newExampleEvents,
watchEventsCommand,
),
)
type ExampleEvent struct {
Message string
}
type ExampleEvents interface {
stream.Observable[ExampleEvent]
}
type exampleEventSource struct {
stream.Observable[ExampleEvent]
emit func(ExampleEvent) // Emits an item to 'src'
complete func(error) // Completes 'src'
}
func (es *exampleEventSource) emitter(ctx context.Context, health cell.Health) error {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
defer es.complete(nil)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
es.emit(makeEvent())
}
}
}
// makeEvent generates a random event
func makeEvent() ExampleEvent {
var prefixes = []string{
"Thrusters set to",
"Main engine damage at",
"Laser power set to",
"Remaining hypercannon fuel:",
"Reserve of peanut butter sandwiches:",
"Crew morale at",
"Elevator music volume now set to",
"Mission completion: ",
}
prefixIdx := rand.Intn(len(prefixes))
percentage := rand.Intn(100)
return ExampleEvent{
Message: fmt.Sprintf("%s %d%%", prefixes[prefixIdx], percentage),
}
}
func newExampleEvents(lc cell.Lifecycle, jobs job.Registry, health cell.Health) ExampleEvents {
es := &exampleEventSource{}
// Multicast() constructs a one-to-many observable to which items can be emitted.
es.Observable, es.emit, es.complete = stream.Multicast[ExampleEvent]()
// Create a new job group and add emitter as a one-shot job.
g := jobs.NewGroup(health)
g.Add(job.OneShot("emitter", es.emitter))
// Add the group to the lifecycle to be started and stopped.
lc.Append(g)
return es
}
// watchEventsCommand defines the hive script command "events" that subscribes
// to events.
func watchEventsCommand(ee ExampleEvents) hive.ScriptCmdOut {
return hive.NewScriptCmd(
"events",
script.Command(
script.CmdUsage{Summary: "Watch events"},
func(s *script.State, args ...string) (script.WaitFunc, error) {
for e := range stream.ToChannel(s.Context(), ee) {
s.Logf("%s\n", e)
s.FlushLog()
}
return nil, nil
},
),
)
}