Skip to content

Commit 88fc1e7

Browse files
authored
Merge pull request #75 from mutablelogic/v4
Added text stream implementation
2 parents 69cbaad + da0fc9b commit 88fc1e7

File tree

7 files changed

+254
-84
lines changed

7 files changed

+254
-84
lines changed

pkg/handler/auth/endpoints.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (service *auth) CreateToken(w http.ResponseWriter, r *http.Request) {
105105
var req TokenCreate
106106

107107
// Get the request
108-
if err := httprequest.Read(r, &req); err != nil {
108+
if err := httprequest.Body(&req, r); err != nil {
109109
httpresponse.Error(w, http.StatusBadRequest, err.Error())
110110
return
111111
}

pkg/handler/certmanager/endpoints.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (service *certmanager) reqCreateCA(w http.ResponseWriter, r *http.Request)
193193
var req reqCreateCA
194194

195195
// Get the request
196-
if err := httprequest.Read(r, &req); err != nil {
196+
if err := httprequest.Body(&req, r); err != nil {
197197
httpresponse.Error(w, http.StatusBadRequest, err.Error())
198198
return
199199
}
@@ -211,7 +211,7 @@ func (service *certmanager) reqCreateCert(w http.ResponseWriter, r *http.Request
211211
var req reqCreateCert
212212

213213
// Get the request
214-
if err := httprequest.Read(r, &req); err != nil {
214+
if err := httprequest.Body(&req, r); err != nil {
215215
httpresponse.Error(w, http.StatusBadRequest, err.Error())
216216
return
217217
}

pkg/handler/nginx/endpoints.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (service *nginx) ReadConfig(w http.ResponseWriter, r *http.Request) {
171171
// Create a new configuration
172172
func (service *nginx) CreateConfig(w http.ResponseWriter, r *http.Request) {
173173
var create responseTemplate
174-
if err := httprequest.Read(r, &create); err != nil {
174+
if err := httprequest.Body(&create, r); err != nil {
175175
httpresponse.Error(w, http.StatusBadRequest, err.Error())
176176
return
177177
} else if create.Name == "" {
@@ -264,7 +264,7 @@ func (service *nginx) DeleteConfig(tmpl *folders.Template, w http.ResponseWriter
264264
func (service *nginx) PatchConfig(tmpl *folders.Template, w http.ResponseWriter, r *http.Request) {
265265
var patch responseTemplate
266266
var modified bool
267-
if err := httprequest.Read(r, &patch); err != nil {
267+
if err := httprequest.Body(&patch, r); err != nil {
268268
httpresponse.Error(w, http.StatusBadRequest, err.Error())
269269
return
270270
}

pkg/httpresponse/httpresponse.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ type ErrorResponse struct {
2424
// GLOBALS
2525

2626
const (
27-
ContentTypeKey = "Content-Type"
28-
ContentLengthKey = "Content-Length"
29-
ContentTypeJSON = "application/json"
30-
ContentTypeText = "text/plain"
27+
ContentTypeKey = "Content-Type"
28+
ContentLengthKey = "Content-Length"
29+
ContentTypeJSON = "application/json"
30+
ContentTypeText = "text/plain"
31+
ContentTypeTextStream = "text/event-stream"
3132
)
3233

3334
///////////////////////////////////////////////////////////////////////////////

pkg/httpresponse/textstream.go

+166
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package httpresponse
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"errors"
7+
"io"
8+
"net/http"
9+
"sync"
10+
"time"
11+
)
12+
13+
///////////////////////////////////////////////////////////////////////////////
14+
// TYPES
15+
16+
// TextStream implements a stream of text events
17+
type TextStream struct {
18+
wg sync.WaitGroup
19+
w io.Writer
20+
ch chan *textevent
21+
err error
22+
}
23+
24+
type textevent struct {
25+
name string
26+
data []any
27+
}
28+
29+
///////////////////////////////////////////////////////////////////////////////
30+
// GLOBALS
31+
32+
const (
33+
defaultKeepAlive = 10 * time.Second
34+
)
35+
36+
var (
37+
strPing = "ping"
38+
strEvent = []byte("event: ")
39+
strData = []byte("data: ")
40+
strNewline = []byte("\n")
41+
)
42+
43+
///////////////////////////////////////////////////////////////////////////////
44+
// LIFECYCLE
45+
46+
// Create a new text stream with mimetype text/event-stream
47+
// Additional header tuples can be provided as a series of key-value pairs
48+
func NewTextStream(w http.ResponseWriter, tuples ...string) *TextStream {
49+
// Check parameters
50+
if w == nil {
51+
return nil
52+
}
53+
if len(tuples)%2 != 0 {
54+
return nil
55+
}
56+
57+
// Create a text stream
58+
self := new(TextStream)
59+
self.w = w
60+
self.ch = make(chan *textevent)
61+
62+
// Set the default content type
63+
w.Header().Set(ContentTypeKey, ContentTypeTextStream)
64+
65+
// Set additional headers
66+
for i := 0; i < len(tuples); i += 2 {
67+
w.Header().Set(tuples[i], tuples[i+1])
68+
}
69+
70+
// Write the response, don't know is this is the right one
71+
w.WriteHeader(http.StatusContinue)
72+
73+
// goroutine will write to the response writer until the channel is closed
74+
self.wg.Add(1)
75+
go func() {
76+
defer self.wg.Done()
77+
78+
// Create a ticker for ping messages
79+
ticker := time.NewTimer(100 * time.Millisecond)
80+
defer ticker.Stop()
81+
82+
// Run until the channel is closed
83+
for {
84+
select {
85+
case evt := <-self.ch:
86+
if evt == nil {
87+
return
88+
}
89+
self.emit(evt)
90+
case <-ticker.C:
91+
self.err = errors.Join(self.err, self.emit(&textevent{strPing, nil}))
92+
ticker.Reset(defaultKeepAlive)
93+
}
94+
}
95+
}()
96+
97+
// Return the textstream object
98+
return self
99+
}
100+
101+
// Close the text stream to stop sending ping messages
102+
func (s *TextStream) Close() error {
103+
// Close the channel
104+
close(s.ch)
105+
106+
// Wait for the goroutine to finish
107+
s.wg.Wait()
108+
109+
// Return any errors
110+
return s.err
111+
}
112+
113+
///////////////////////////////////////////////////////////////////////////////
114+
// PUBLIC METHODS
115+
116+
// Write a text event to the stream, and one or more optional data objects
117+
// which are encoded as JSON
118+
func (s *TextStream) Write(name string, data ...any) {
119+
s.ch <- &textevent{name, data}
120+
}
121+
122+
///////////////////////////////////////////////////////////////////////////////
123+
// PRIVATE METHODS
124+
125+
// emit an event to the stream
126+
func (s *TextStream) emit(e *textevent) error {
127+
var result error
128+
129+
// Write the event to the stream
130+
if e.name != "" {
131+
if err := s.write(strEvent, []byte(e.name), strNewline); err != nil {
132+
return err
133+
}
134+
}
135+
136+
// Write the data to the stream
137+
for _, v := range e.data {
138+
if v == nil {
139+
continue
140+
} else if data, err := json.Marshal(v); err != nil {
141+
result = errors.Join(result, err)
142+
} else if err := s.write(strData, data, strNewline); err != nil {
143+
result = errors.Join(result, err)
144+
}
145+
}
146+
147+
// Flush the event
148+
if result == nil {
149+
if err := s.write(strNewline); err != nil {
150+
result = errors.Join(result, err)
151+
}
152+
if w, ok := s.w.(http.Flusher); ok {
153+
w.Flush()
154+
}
155+
}
156+
157+
// Return any errors
158+
return result
159+
}
160+
161+
func (s *TextStream) write(v ...[]byte) error {
162+
if _, err := s.w.Write(bytes.Join(v, nil)); err != nil {
163+
return err
164+
}
165+
return nil
166+
}

pkg/httpresponse/textstream_test.go

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package httpresponse_test
2+
3+
import (
4+
"net/http/httptest"
5+
"testing"
6+
"time"
7+
8+
// Packages
9+
"github.com/mutablelogic/go-server/pkg/httpresponse"
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func Test_textstream_001(t *testing.T) {
14+
assert := assert.New(t)
15+
16+
t.Run("New", func(t *testing.T) {
17+
resp := httptest.NewRecorder()
18+
ts := httpresponse.NewTextStream(resp)
19+
assert.NotNil(ts)
20+
t.Log(ts)
21+
assert.NoError(ts.Close())
22+
})
23+
24+
t.Run("Ping", func(t *testing.T) {
25+
resp := httptest.NewRecorder()
26+
ts := httpresponse.NewTextStream(resp)
27+
assert.NotNil(ts)
28+
29+
time.Sleep(1 * time.Second)
30+
assert.NoError(ts.Close())
31+
assert.Equal(100, resp.Code)
32+
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
33+
assert.Equal("event: ping\n\n", resp.Body.String())
34+
})
35+
36+
t.Run("EventNoData", func(t *testing.T) {
37+
resp := httptest.NewRecorder()
38+
ts := httpresponse.NewTextStream(resp)
39+
assert.NotNil(ts)
40+
41+
ts.Write("foo")
42+
43+
time.Sleep(1 * time.Second)
44+
assert.NoError(ts.Close())
45+
assert.Equal(100, resp.Code)
46+
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
47+
assert.Equal("event: foo\n\n"+"event: ping\n\n", resp.Body.String())
48+
})
49+
50+
t.Run("EventData", func(t *testing.T) {
51+
resp := httptest.NewRecorder()
52+
ts := httpresponse.NewTextStream(resp)
53+
assert.NotNil(ts)
54+
55+
ts.Write("foo", "bar")
56+
57+
time.Sleep(1 * time.Second)
58+
assert.NoError(ts.Close())
59+
assert.Equal(100, resp.Code)
60+
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
61+
assert.Equal("event: foo\n"+"data: \"bar\"\n\n"+"event: ping\n\n", resp.Body.String())
62+
})
63+
64+
t.Run("EventDataData", func(t *testing.T) {
65+
resp := httptest.NewRecorder()
66+
ts := httpresponse.NewTextStream(resp)
67+
assert.NotNil(ts)
68+
69+
ts.Write("foo", "bar1", "bar2")
70+
71+
time.Sleep(1 * time.Second)
72+
assert.NoError(ts.Close())
73+
assert.Equal(100, resp.Code)
74+
assert.Equal("text/event-stream", resp.Header().Get("Content-Type"))
75+
assert.Equal("event: foo\n"+"data: \"bar1\"\n"+"data: \"bar2\"\n\n"+"event: ping\n\n", resp.Body.String())
76+
})
77+
78+
}

pkg/httpserver/httpresponse/httpresponse.go

-75
This file was deleted.

0 commit comments

Comments
 (0)