Skip to content

Commit 665b6fc

Browse files
authored
Multi-threaded support (WorkerThreads support) (#4)
* Replaced HolyThread with DispatchThread * Dividing Ndb objects between worker threads * FIX: Enabled sending back message to in WorkerThread * Added ping command * Add redis-server to dependencies * Bumped number of connections
1 parent 4093b87 commit 665b6fc

14 files changed

+379
-247
lines changed

.devcontainer/devcontainer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
}
2727
}
2828
},
29-
"postCreateCommand": "sudo apt-get update && sudo apt-get install -y build-essential libprotobuf-dev protobuf-compiler",
29+
"postCreateCommand": "sudo apt-get update && sudo apt-get install -y build-essential redis-server libprotobuf-dev protobuf-compiler",
3030
"containerEnv": {
3131
"RONDB_PATH": "/workspaces/pink/rondb-22.10.5-linux-glibc2.28-arm64_v8"
3232
// Do this manually:

pink/examples/binlog_parser_test.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ int main(int argc, char* argv[]) {
3131
std::string one_command = "*3\r\n$3\r\nSET\r\n$1\r\na\r\n$2\r\nab\r\n";
3232

3333
std::string binlog_body;
34-
slash::PutFixed16(&binlog_body, 1); // type
34+
// slash::PutFixed16(&binlog_body, 1); // type
3535
slash::PutFixed32(&binlog_body, 0); //exec_time
3636
slash::PutFixed32(&binlog_body, 10); // server_id
3737
slash::PutFixed64(&binlog_body, 0); // logic_id
@@ -42,7 +42,7 @@ int main(int argc, char* argv[]) {
4242
binlog_body.append(one_command);
4343

4444
std::string header;
45-
slash::PutFixed16(&header, 2);
45+
// slash::PutFixed16(&header, 2);
4646
slash::PutFixed32(&header, binlog_body.size());
4747

4848
std::string command = header + binlog_body;

pink/examples/mydispatch_srv.cc

+84-64
Original file line numberDiff line numberDiff line change
@@ -16,84 +16,104 @@
1616

1717
using namespace pink;
1818

19-
class MyConn: public PbConn {
20-
public:
21-
MyConn(int fd, const std::string& ip_port, Thread *thread,
22-
void* worker_specific_data);
23-
virtual ~MyConn();
24-
protected:
25-
virtual int DealMessage();
26-
27-
private:
28-
myproto::Ping ping_;
29-
myproto::PingRes ping_res_;
19+
class MyConn : public PbConn
20+
{
21+
public:
22+
MyConn(
23+
int fd,
24+
const std::string &ip_port,
25+
Thread *thread,
26+
void *worker_specific_data);
27+
virtual ~MyConn();
28+
29+
protected:
30+
virtual int DealMessage();
31+
32+
private:
33+
myproto::Ping ping_;
34+
myproto::PingRes ping_res_;
3035
};
3136

32-
MyConn::MyConn(int fd, const std::string& ip_port, Thread *thread,
33-
void* worker_specific_data)
34-
: PbConn(fd, ip_port, thread) {
35-
// Handle worker_specific_data ...
37+
MyConn::MyConn(
38+
int fd,
39+
const std::string &ip_port,
40+
Thread *thread,
41+
void *worker_specific_data)
42+
: PbConn(fd, ip_port, thread)
43+
{
44+
// Handle worker_specific_data ...
3645
}
3746

38-
MyConn::~MyConn() {
47+
MyConn::~MyConn()
48+
{
3949
}
4050

41-
int MyConn::DealMessage() {
42-
printf("In the myconn DealMessage branch\n");
43-
ping_.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_);
44-
ping_res_.Clear();
45-
ping_res_.set_res(11234);
46-
ping_res_.set_mess("heiheidfdfdf");
47-
printf ("DealMessage receive (%s)\n", ping_res_.mess().c_str());
48-
std::string res;
49-
ping_res_.SerializeToString(&res);
50-
WriteResp(res);
51-
return 0;
51+
int MyConn::DealMessage()
52+
{
53+
printf("In the myconn DealMessage branch\n");
54+
ping_.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_);
55+
ping_res_.Clear();
56+
ping_res_.set_res(11234);
57+
ping_res_.set_mess("heiheidfdfdf");
58+
printf("DealMessage receive (%s)\n", ping_res_.mess().c_str());
59+
std::string res;
60+
ping_res_.SerializeToString(&res);
61+
WriteResp(res);
62+
return 0;
5263
}
5364

54-
class MyConnFactory : public ConnFactory {
55-
public:
56-
virtual std::shared_ptr<PinkConn> NewPinkConn(int connfd, const std::string &ip_port,
57-
Thread *thread,
58-
void* worker_specific_data,
59-
PinkEpoll* pink_epoll) const {
60-
return std::make_shared<MyConn>(connfd, ip_port, thread, worker_specific_data);
61-
}
65+
class MyConnFactory : public ConnFactory
66+
{
67+
public:
68+
virtual std::shared_ptr<PinkConn> NewPinkConn(
69+
int connfd,
70+
const std::string &ip_port,
71+
Thread *thread,
72+
void *worker_specific_data,
73+
PinkEpoll *pink_epoll) const
74+
{
75+
return std::make_shared<MyConn>(connfd, ip_port, thread, worker_specific_data);
76+
}
6277
};
6378

6479
static std::atomic<bool> running(false);
6580

66-
static void IntSigHandle(const int sig) {
67-
printf("Catch Signal %d, cleanup...\n", sig);
68-
running.store(false);
69-
printf("server Exit");
81+
static void IntSigHandle(const int sig)
82+
{
83+
printf("Catch Signal %d, cleanup...\n", sig);
84+
running.store(false);
85+
printf("server Exit");
7086
}
7187

72-
static void SignalSetup() {
73-
signal(SIGHUP, SIG_IGN);
74-
signal(SIGPIPE, SIG_IGN);
75-
signal(SIGINT, &IntSigHandle);
76-
signal(SIGQUIT, &IntSigHandle);
77-
signal(SIGTERM, &IntSigHandle);
88+
static void SignalSetup()
89+
{
90+
signal(SIGHUP, SIG_IGN);
91+
signal(SIGPIPE, SIG_IGN);
92+
signal(SIGINT, &IntSigHandle);
93+
signal(SIGQUIT, &IntSigHandle);
94+
signal(SIGTERM, &IntSigHandle);
7895
}
7996

80-
int main() {
81-
SignalSetup();
82-
ConnFactory *my_conn_factory = new MyConnFactory();
83-
ServerThread *st = NewDispatchThread(9211, 10, my_conn_factory, 1000);
84-
85-
if (st->StartThread() != 0) {
86-
printf("StartThread error happened!\n");
87-
exit(-1);
88-
}
89-
running.store(true);
90-
while (running.load()) {
91-
sleep(1);
92-
}
93-
st->StopThread();
94-
95-
delete st;
96-
delete my_conn_factory;
97-
98-
return 0;
97+
int main()
98+
{
99+
SignalSetup();
100+
ConnFactory *my_conn_factory = new MyConnFactory();
101+
ServerThread *st = NewDispatchThread(9211, 10, my_conn_factory, 1000);
102+
103+
if (st->StartThread() != 0)
104+
{
105+
printf("StartThread error happened!\n");
106+
exit(-1);
107+
}
108+
running.store(true);
109+
while (running.load())
110+
{
111+
sleep(1);
112+
}
113+
st->StopThread();
114+
115+
delete st;
116+
delete my_conn_factory;
117+
118+
return 0;
99119
}

0 commit comments

Comments
 (0)