-
Notifications
You must be signed in to change notification settings - Fork 333
/
Copy pathCloudwatchLogs.cpp
123 lines (100 loc) · 4.6 KB
/
CloudwatchLogs.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include "Include.h"
#include "CloudwatchLogs.h"
namespace CppInteg {
CloudwatchLogs::CloudwatchLogs(ClientConfiguration* pClientConfig) : client(*pClientConfig)
{
}
STATUS CloudwatchLogs::init(PCHAR channelName, PCHAR region, BOOL isMaster, BOOL isStorage)
{
STATUS retStatus = STATUS_SUCCESS;
CreateLogGroupRequest createLogGroupRequest;
Aws::CloudWatchLogs::Model::CreateLogStreamOutcome createLogStreamOutcome;
CreateLogStreamRequest createLogStreamRequest;
std::stringstream defaultLogStreamName;
if(isStorage) {
defaultLogStreamName << channelName << '-' << "StorageMaster" << '-'
<< GETTIME() / HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
} else {
defaultLogStreamName << channelName << '-' << (isMaster ? "master" : "viewer") << '-'
<< GETTIME() / HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
}
this->logStreamName = defaultLogStreamName.str();
this->logGroupName = LOG_GROUP_NAME;
DLOGI("Log stream name: %s", this->logStreamName.c_str());
createLogGroupRequest.SetLogGroupName(this->logGroupName);
// ignore error since if this operation fails, CreateLogStream should fail as well.
// There might be some errors that can lead to successfull CreateLogStream, e.g. log group already exists.
this->client.CreateLogGroup(createLogGroupRequest);
createLogStreamRequest.SetLogGroupName(this->logGroupName);
createLogStreamRequest.SetLogStreamName(this->logStreamName);
createLogStreamOutcome = this->client.CreateLogStream(createLogStreamRequest);
CHK_ERR(createLogStreamOutcome.IsSuccess(), STATUS_INVALID_OPERATION, "Failed to create \"%s\" log stream: %s",
this->logStreamName.c_str(), createLogStreamOutcome.GetError().GetMessage().c_str());
CleanUp:
return retStatus;
}
VOID CloudwatchLogs::deinit()
{
this->flush(TRUE);
}
VOID CloudwatchLogs::push(string log)
{
std::lock_guard<std::recursive_mutex> lock(this->sync.mutex);
Aws::String awsCwString(log.c_str(), log.size());
auto logEvent =
Aws::CloudWatchLogs::Model::InputLogEvent().WithMessage(awsCwString).WithTimestamp(GETTIME() / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
this->logs.push_back(logEvent);
if (this->logs.size() >= MAX_CLOUDWATCH_LOG_COUNT) {
this->flush();
}
}
VOID CloudwatchLogs::flush(BOOL sync)
{
std::unique_lock<std::recursive_mutex> lock(this->sync.mutex);
if (this->logs.size() == 0) {
return;
}
auto pendingLogs = this->logs;
this->logs.clear();
// wait until previous logs have been flushed entirely
auto waitUntilFlushed = [this] { return !this->sync.pending.load(); };
this->sync.await.wait(lock, waitUntilFlushed);
auto request = Aws::CloudWatchLogs::Model::PutLogEventsRequest()
.WithLogGroupName(this->logGroupName)
.WithLogStreamName(this->logStreamName)
.WithLogEvents(pendingLogs);
if (this->token != "") {
request.SetSequenceToken(this->token);
}
if (!sync) {
auto asyncHandler = [this](const Aws::CloudWatchLogs::CloudWatchLogsClient* cwClientLog,
const Aws::CloudWatchLogs::Model::PutLogEventsRequest& request,
const Aws::CloudWatchLogs::Model::PutLogEventsOutcome& outcome,
const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) {
UNUSED_PARAM(cwClientLog);
UNUSED_PARAM(request);
UNUSED_PARAM(context);
if (!outcome.IsSuccess()) {
// Need to use printf so that we don't get into an infinite loop where we keep flushing
printf("Failed to push logs: %s\n", outcome.GetError().GetMessage().c_str());
} else {
printf("Successfully pushed logs to cloudwatch\n");
this->token = outcome.GetResult().GetNextSequenceToken();
}
this->sync.pending = FALSE;
this->sync.await.notify_one();
};
this->sync.pending = TRUE;
this->client.PutLogEventsAsync(request, asyncHandler);
} else {
auto outcome = this->client.PutLogEvents(request);
if (!outcome.IsSuccess()) {
// Need to use printf so that we don't get into an infinite loop where we keep flushing
printf("Failed to push logs: %s\n", outcome.GetError().GetMessage().c_str());
} else {
DLOGS("Successfully pushed logs to cloudwatch");
this->token = outcome.GetResult().GetNextSequenceToken();
}
}
}
} // namespace Canary