forked from rewardStyle/kinetic
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkinesis_reader.go
366 lines (328 loc) · 13.5 KB
/
kinesis_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
package kinetic
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
)
const (
kinesisReaderMaxBatchSize = 10000
kinesisReaderDefaultTransactionCountLimit = 5
kinesisReaderDefaultTransmissionSizeLimit = 2000000
)
// kinesisReaderOptions a struct that holds all of the KinesisReader's configurable parameters.
type kinesisReaderOptions struct {
batchSize int // maximum records per GetRecords call
transactionCountLimit int // maximum transactions per second for GetRecords calls
transmissionSizeLimit int // maximum transmission size per second for GetRecords calls
shardIterator *ShardIterator // shard iterator for Kinesis GetRecords API calls
responseReadTimeout time.Duration // response read time out for GetRecordsRequest API call
logLevel aws.LogLevelType // log level for configuring the LogHelper's log level
Stats ConsumerStatsCollector // stats collection mechanism
}
// defaultKinesisReaderOptions instantiates a kinesisReaderOptions with default values.
func defaultKinesisReaderOptions() *kinesisReaderOptions {
return &kinesisReaderOptions{
batchSize: kinesisReaderMaxBatchSize,
transactionCountLimit: kinesisReaderDefaultTransactionCountLimit,
transmissionSizeLimit: kinesisReaderDefaultTransmissionSizeLimit,
shardIterator: NewShardIterator(),
responseReadTimeout: time.Second,
Stats: &NilConsumerStatsCollector{},
}
}
// KinesisReaderOptionsFn is a method signature for defining functional option methods for configuring
// the KinesisReader.
type KinesisReaderOptionsFn func(*KinesisReader) error
// KinesisReaderBatchSize is a functional option method for configuring the KinesisReader's
// batch size.
func KinesisReaderBatchSize(size int) KinesisReaderOptionsFn {
return func(o *KinesisReader) error {
if size > 0 && size <= kinesisReaderMaxBatchSize {
o.batchSize = size
return nil
}
return ErrInvalidBatchSize
}
}
// KinesisReaderTransactionCountLimit is a functional option method for configuring the
// KinesisReader's transaction count limit
func KinesisReaderTransactionCountLimit(count int) KinesisReaderOptionsFn {
return func(o *KinesisReader) error {
if count > 0 {
o.transactionCountLimit = count
return nil
}
return ErrInvalidTransactionCountLimit
}
}
// KinesisReaderTransmissionSizeLimit is a functional option method for configuring the
// KinesisReader's transmission size limit
func KinesisReaderTransmissionSizeLimit(size int) KinesisReaderOptionsFn {
return func(o *KinesisReader) error {
if size > 0 {
o.transmissionSizeLimit = size
return nil
}
return ErrInvalidTransmissionSizeLimit
}
}
// KinesisReaderShardIterator is a functional option method for configuring the KinesisReader's
// shard iterator.
func KinesisReaderShardIterator(shardIterator *ShardIterator) KinesisReaderOptionsFn {
return func(o *KinesisReader) error {
o.shardIterator = shardIterator
return nil
}
}
// KinesisReaderResponseReadTimeout is a functional option method for configuring the KinesisReader's
// response read timeout.
func KinesisReaderResponseReadTimeout(timeout time.Duration) KinesisReaderOptionsFn {
return func(o *KinesisReader) error {
o.responseReadTimeout = timeout
return nil
}
}
// KinesisReaderLogLevel is a functional option method for configuring the KinesisReader's log level.
func KinesisReaderLogLevel(ll aws.LogLevelType) KinesisReaderOptionsFn {
return func(o *KinesisReader) error {
o.logLevel = ll & 0xffff0000
return nil
}
}
// KinesisReaderStats is a functional option method for configuring the KinesisReader's stats collector.
func KinesisReaderStats(sc ConsumerStatsCollector) KinesisReaderOptionsFn {
return func(o *KinesisReader) error {
o.Stats = sc
return nil
}
}
// KinesisReader handles the API to read records from Kinesis.
type KinesisReader struct {
*kinesisReaderOptions
*LogHelper
stream string // name of AWS Kinesis Stream to stream from
shard string // shardID of AWS Kinesis Stream to stream from
txnCountRateLimiter *rate.Limiter // rate limiter to limit the number of transactions per second
txSizeRateLimiter *rate.Limiter // rate limiter to limit the transmission size per seccond
nextShardIterator string // shardIterator to start with with GetRecord request
client kinesisiface.KinesisAPI // client to Kinesis API
}
// NewKinesisReader creates a new KinesisReader object which implements the StreamReader interface to read records from
// Kinesis.
func NewKinesisReader(c *aws.Config, stream string, shard string, optionFns ...KinesisReaderOptionsFn) (*KinesisReader, error) {
sess, err := session.NewSession(c)
if err != nil {
return nil, err
}
kinesisReader := &KinesisReader{
kinesisReaderOptions: defaultKinesisReaderOptions(),
stream: stream,
shard: shard,
client: kinesis.New(sess),
}
for _, optionFn := range optionFns {
optionFn(kinesisReader)
}
kinesisReader.txnCountRateLimiter = rate.NewLimiter(rate.Limit(kinesisReader.transactionCountLimit), 1)
kinesisReader.txSizeRateLimiter = rate.NewLimiter(rate.Limit(kinesisReader.transmissionSizeLimit), kinesisReader.transmissionSizeLimit)
kinesisReader.LogHelper = &LogHelper{
LogLevel: kinesisReader.logLevel,
Logger: c.Logger,
}
return kinesisReader, nil
}
// ensureShardIterator will lazily make sure that we have a valid ShardIterator, calling the GetShardIterator API with
// the configured ShardIteratorType (with any applicable StartingSequenceNumber or Timestamp) if necessary.
//
// Not thread-safe. Only called from getRecords Care must be taken to ensure that only one call to Listen and
// Retrieve/RetrieveFn can be running at a time.
func (r *KinesisReader) ensureShardIterator() error {
if r.nextShardIterator != "" {
return nil
}
resp, err := r.client.GetShardIterator(&kinesis.GetShardIteratorInput{
ShardId: aws.String(r.shard), // Required
ShardIteratorType: aws.String(r.shardIterator.shardIteratorType), // Required
StreamName: aws.String(r.stream), // Required
StartingSequenceNumber: r.shardIterator.getStartingSequenceNumber(),
Timestamp: r.shardIterator.getTimestamp(),
})
if err != nil {
r.LogError(err)
return err
}
if resp == nil {
return ErrNilGetShardIteratorResponse
}
if resp.ShardIterator == nil {
return ErrNilShardIterator
}
return r.setNextShardIterator(*resp.ShardIterator)
}
// setNextShardIterator sets the nextShardIterator to use when calling GetRecords.
//
// Not thread-safe. Only called from getRecords (and ensureShardIterator, which is called from getRecords). Care must
// be taken to ensure that only one call to Listen and Retrieve/RetrieveFn can be running at a time.
func (r *KinesisReader) setNextShardIterator(shardIterator string) error {
if len(shardIterator) == 0 {
return ErrEmptyShardIterator
}
r.nextShardIterator = shardIterator
return nil
}
// setSequenceNumber sets the sequenceNumber of shardIterator to the last delivered message and updates the
// shardIteratorType to AT_SEQUENCE_NUMBER. This is only used when we need to call getShardIterator (say, to refresh the
// shard iterator).
//
// Not thread-safe. Only called from getRecords. Care must be taken to ensure
// that only one call to Listen and Retrieve/RetrieveFn can be running at a
// time.
func (r *KinesisReader) setSequenceNumber(sequenceNumber string) error {
if len(sequenceNumber) == 0 {
return ErrEmptySequenceNumber
}
r.shardIterator.AtSequenceNumber(sequenceNumber)
return nil
}
func (r *KinesisReader) getRecords(ctx context.Context, fn messageHandler, batchSize int) (count int, size int, err error) {
if err = r.ensureShardIterator(); err != nil {
r.LogError("Error calling ensureShardIterator(): ", err)
return count, size, err
}
// We use the GetRecordsRequest method of creating requests to allow for registering custom handlers for better
// control over the API request.
var startReadTime time.Time
var startUnmarshalTime time.Time
start := time.Now()
req, resp := r.client.GetRecordsRequest(&kinesis.GetRecordsInput{
Limit: aws.Int64(int64(batchSize)),
ShardIterator: aws.String(r.nextShardIterator),
})
req.ApplyOptions(request.WithResponseReadTimeout(r.responseReadTimeout))
// If debug is turned on, add some handlers for GetRecords logging
if r.LogLevel.AtLeast(LogDebug) {
req.Handlers.Send.PushBack(func(req *request.Request) {
r.LogDebug("Finished getRecords Send, took", time.Since(start))
})
}
// Here, we insert a handler to be called after the Send handler and before the the Unmarshal handler in the
// aws-go-sdk library.
//
// The Send handler will call http.Client.Do() on the request, which blocks until the response headers have been
// read before returning an HTTPResponse.
//
// The Unmarshal handler will ultimately call ioutil.ReadAll() on the HTTPResponse.Body stream.
//
// Our handler wraps the HTTPResponse.Body with our own ReadCloser so that we can implement stats collection
req.Handlers.Unmarshal.PushFront(func(req *request.Request) {
r.LogDebug("Started getRecords Unmarshal, took", time.Since(start))
startReadTime = time.Now()
req.HTTPResponse.Body = &ReadCloserWrapper{
ReadCloser: req.HTTPResponse.Body,
OnCloseFn: func() {
r.Stats.UpdateGetRecordsReadResponseDuration(time.Since(startReadTime))
r.LogDebug("Finished GetRecords body read, took", time.Since(start))
startUnmarshalTime = time.Now()
},
}
})
req.Handlers.Unmarshal.PushBack(func(req *request.Request) {
size += int(req.HTTPRequest.ContentLength)
r.Stats.UpdateGetRecordsUnmarshalDuration(time.Since(startUnmarshalTime))
r.LogDebug("Finished GetRecords Unmarshal, took", time.Since(start))
})
// Send the GetRecords request
r.LogDebug("Starting GetRecords Build/Sign request, took", time.Since(start))
r.Stats.AddGetRecordsCalled(1)
if err = req.Send(); err != nil {
r.LogError("Error getting records:", err)
switch err.(awserr.Error).Code() {
case kinesis.ErrCodeProvisionedThroughputExceededException:
r.Stats.AddReadProvisionedThroughputExceeded(1)
default:
r.LogDebug("Received AWS error:", err.Error())
}
return count, size, err
}
r.Stats.UpdateGetRecordsDuration(time.Since(start))
// Process Records
r.LogDebug(fmt.Sprintf("Finished GetRecords request, %d records from shard %s, took %v\n", len(resp.Records), r.shard, time.Since(start)))
if resp == nil {
return count, size, ErrNilGetRecordsResponse
}
r.Stats.UpdateBatchSize(len(resp.Records))
for _, record := range resp.Records {
if record != nil {
// Allow (only) a pipeOfDeath to trigger an instance shutdown of the loop to deliver messages.
// Otherwise, a normal cancellation will not prevent getRecords from completing the delivery of
// the current batch of records.
select {
case <-ctx.Done():
r.LogInfo(fmt.Sprintf("getRecords received ctx.Done() while delivering messages, %d delivered, ~%d dropped", count, len(resp.Records)-count))
return count, size, ctx.Err()
default:
fn(FromRecord(record))
count++
r.Stats.AddConsumed(1)
if record.SequenceNumber != nil {
// We can safely ignore if this call returns error, as if we somehow receive an
// empty sequence number from AWS, we will simply not set it. At worst, this
// causes us to reprocess this record if we happen to refresh the iterator.
r.setSequenceNumber(*record.SequenceNumber)
}
}
}
}
if resp.NextShardIterator != nil {
// TODO: According to AWS docs:
// http://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#GetRecordsOutput
//
// NextShardIterator: The next position in the shard from which to start sequentially reading data
// records. If set to null, the shard has been closed and the requested iterator will not return any
// more data.
//
// When dealing with streams that will merge or split, we need to detect that the shard has closed and
// notify the client library.
//
// TODO: I don't know if we should be ignoring an error returned by setShardIterator in case of an empty
// shard iterator in the response. There isn't much we can do, and the best path for recovery may be
// simply to reprocess the batch and see if we get a valid NextShardIterator from AWS the next time
// around.
r.setNextShardIterator(*resp.NextShardIterator)
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
if err := r.txnCountRateLimiter.Wait(ctx); err != nil {
r.LogError("Error occured waiting for transaction count tokens")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := r.txSizeRateLimiter.WaitN(ctx, size); err != nil {
r.LogError("Error occured waiting for transmission size tokens")
}
}()
wg.Wait()
return count, size, nil
}
// GetRecord calls getRecords and delivers one record into the messages channel.
func (r *KinesisReader) GetRecord(ctx context.Context, fn messageHandler) error {
_, _, err := r.getRecords(ctx, fn, 1)
return err
}
// GetRecords calls getRecords and delivers each record into the messages channel.
func (r *KinesisReader) GetRecords(ctx context.Context, fn messageHandler) error {
_, _, err := r.getRecords(ctx, fn, r.batchSize)
return err
}