Skip to content

Commit f87cafb

Browse files
rb/lib/thrift/processor: Refactor the processor to handle properly unary requests
1 parent e5ee833 commit f87cafb

File tree

2 files changed

+137
-30
lines changed

2 files changed

+137
-30
lines changed

compiler/cpp/src/generate/t_rb_generator.cc

+41-30
Original file line numberDiff line numberDiff line change
@@ -826,9 +826,9 @@ void t_rb_generator::generate_service(t_service* tservice) {
826826
f_service_.indent() << "NAMESPACE = '" << tservice->get_program()->get_namespace("*") << "'.freeze" << endl << endl;
827827

828828
// Generate the three main parts of the service (well, two for now in PHP)
829+
generate_service_helpers(tservice);
829830
generate_service_client(tservice);
830831
generate_service_server(tservice);
831-
generate_service_helpers(tservice);
832832

833833
f_service_.indent() << "::Thrift.register_service_type(self)"<< endl << endl;
834834

@@ -1019,6 +1019,16 @@ void t_rb_generator::generate_service_server(t_service* tservice) {
10191019
f_service_.indent_down();
10201020
f_service_.indent() << "end" << endl << endl;
10211021

1022+
f_service_.indent() << "METHODS = {" << endl;
1023+
f_service_.indent_up();
1024+
1025+
for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
1026+
f_service_.indent() << "'" << (*f_iter)->get_name() << "' => { args_klass: " << capitalize((*f_iter)->get_name()) << "_args, oneway: " << ((*f_iter)->is_oneway() ? "true" : "false") << "}," << endl;
1027+
}
1028+
1029+
f_service_.indent_down();
1030+
f_service_.indent() << "}.freeze" << endl << endl;
1031+
10221032
// Generate the process subfunctions
10231033
for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
10241034
generate_process_function(tservice, *f_iter);
@@ -1035,34 +1045,20 @@ void t_rb_generator::generate_service_server(t_service* tservice) {
10351045
*/
10361046
void t_rb_generator::generate_process_function(t_service* tservice, t_function* tfunction) {
10371047
(void)tservice;
1038-
// Open function
1039-
f_service_.indent() << "def process_" << tfunction->get_name() << "(seqid, iprot, oprot)" << endl;
1040-
1041-
f_service_.indent_up();
1042-
1043-
string argsname = capitalize(tfunction->get_name()) + "_args";
10441048
string resultname = capitalize(tfunction->get_name()) + "_result";
10451049

1046-
f_service_.indent() << "args = read_args(iprot, " << argsname << ")" << endl;
1047-
1048-
f_service_.indent();
1049-
1050-
if (!tfunction->is_oneway()) {
1051-
f_service_ << "result = ";
1052-
}
1053-
f_service_ << "@middleware.handle_" << (tfunction->is_oneway() ? "unary" : "binary") << "('" << tfunction->get_name() << "', args) do |args|" << endl;
1054-
f_service_.indent_up();
1055-
10561050
t_struct* xs = tfunction->get_xceptions();
10571051
const std::vector<t_field*>& xceptions = xs->get_members();
10581052
vector<t_field*>::const_iterator x_iter;
10591053

1060-
// Declare result for non oneway function
1054+
// Open function
1055+
f_service_.indent() << "def execute_" << tfunction->get_name() << "(args)" << endl;
1056+
f_service_.indent_up();
1057+
10611058
if (!tfunction->is_oneway()) {
1062-
f_service_.indent() << "result = " << resultname << ".new()" << endl;
1059+
f_service_.indent() << "result = " << resultname << ".new()" << endl << endl;
10631060
}
10641061

1065-
// Try block for a function with exceptions
10661062
if (xceptions.size() > 0) {
10671063
f_service_.indent() << "begin" << endl;
10681064
f_service_.indent_up();
@@ -1104,24 +1100,39 @@ void t_rb_generator::generate_process_function(t_service* tservice, t_function*
11041100
f_service_.indent() << "end" << endl;
11051101
}
11061102

1107-
// Shortcut out here for oneway functions
1103+
f_service_.indent() << endl;
1104+
11081105
if (tfunction->is_oneway()) {
11091106
f_service_.indent() << "nil" << endl;
1110-
f_service_.indent_down();
1107+
} else {
1108+
f_service_.indent() << "result" << endl;
1109+
}
11111110

1112-
f_service_.indent() << "end" << endl;
1113-
f_service_.indent_down();
1111+
f_service_.indent_down();
1112+
f_service_.indent() << "end" << endl << endl;
11141113

1115-
f_service_.indent() << "end" << endl << endl;
1116-
return;
1117-
}
1114+
f_service_.indent() << "def process_" << tfunction->get_name() << "(seqid, iprot, oprot)" << endl;
1115+
1116+
f_service_.indent_up();
1117+
1118+
string argsname = capitalize(tfunction->get_name()) + "_args";
11181119

1120+
f_service_.indent() << "args = read_args(iprot, " << argsname << ")" << endl;
11191121

1120-
f_service_.indent() << "result" << endl;
1122+
f_service_.indent();
1123+
1124+
if (!tfunction->is_oneway()) {
1125+
f_service_ << "result = ";
1126+
}
1127+
f_service_ << "@middleware.handle_" << (tfunction->is_oneway() ? "unary" : "binary") << "('" << tfunction->get_name() << "', args) do |args|" << endl;
1128+
f_service_.indent_up();
1129+
f_service_.indent() << "execute_" << tfunction->get_name() << "(args)" << endl;
11211130
f_service_.indent_down();
11221131
f_service_.indent() << "end" << endl << endl;
1123-
f_service_.indent() << "write_result(result, oprot, '" << tfunction->get_name() << "', seqid)"
1124-
<< endl;
1132+
if (!tfunction->is_oneway()) {
1133+
f_service_.indent() << "write_result(result, oprot, '" << tfunction->get_name() << "', seqid)"
1134+
<< endl;
1135+
}
11251136

11261137
// Close function
11271138
f_service_.indent_down();

lib/rb/lib/thrift/processor.rb

+96
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,109 @@
1919

2020
module Thrift
2121
module Processor
22+
class BaseProcessorFunction
23+
def initialize(fname, middleware, args_klass, method)
24+
@fname = fname
25+
@middleware = middleware
26+
@args_klass = args_klass
27+
@method = method
28+
end
29+
30+
protected
31+
32+
def read_args(iprot)
33+
args = @args_klass.new
34+
35+
args.read(iprot)
36+
iprot.read_message_end
37+
38+
args
39+
end
40+
end
41+
42+
class BinaryProcessorFunction < BaseProcessorFunction
43+
def process(seqid, iprot, oprot)
44+
execute(seqid, iprot, oprot)
45+
46+
true
47+
end
48+
49+
private
50+
51+
def execute(seqid, iprot, oprot)
52+
args = read_args(iprot)
53+
54+
result = @middleware.handle_binary(@fname, args) do |args|
55+
@method.call(args)
56+
end
57+
58+
write_result(result, oprot, seqid)
59+
rescue => e
60+
write_exception(e, oprot, seqid)
61+
end
62+
63+
def write_exception(exception, oprot, seqid)
64+
oprot.write_message_begin(@fname, MessageTypes::EXCEPTION, seqid)
65+
66+
unless exception.is_a? ApplicationException
67+
exception = ApplicationException.new(
68+
ApplicationException::INTERNAL_ERROR,
69+
"Internal error processing #{@fname}: #{exception.class}: #{exception}"
70+
)
71+
end
72+
73+
exception.write(oprot)
74+
oprot.write_message_end
75+
oprot.trans.flush
76+
end
77+
78+
def write_result(result, oprot, seqid)
79+
oprot.write_message_begin(@fname, MessageTypes::REPLY, seqid)
80+
result.write(oprot)
81+
oprot.write_message_end
82+
oprot.trans.flush
83+
end
84+
end
85+
86+
class UnaryProcessorFunction < BaseProcessorFunction
87+
def process(_seqid, iprot, _oprot)
88+
args = read_args(iprot)
89+
90+
@middleware.handle_unary(@fname, args) do |args|
91+
@method.call(args)
92+
end
93+
94+
true
95+
end
96+
end
97+
2298
def initialize(handler, middlewares = [])
2399
@handler = handler
24100
@middleware = Middleware.wrap(middlewares)
101+
102+
@functions = if self.class.const_defined?(:METHODS)
103+
self.class::METHODS.reduce({}) do |acc, (key, args)|
104+
klass = args[:oneway] ? UnaryProcessorFunction : BinaryProcessorFunction
105+
106+
acc.merge key => klass.new(
107+
key,
108+
@middleware,
109+
args[:args_klass],
110+
method("execute_#{key}")
111+
)
112+
end
113+
end || {}
25114
end
26115

27116
def process(iprot, oprot)
28117
name, _type, seqid = iprot.read_message_begin
118+
119+
func = @functions[name]
120+
121+
return func.process(seqid, iprot, oprot) if func
122+
123+
# TODO: once all the stubs will be generated w thrift >=2.5 the next lines
124+
# can be deleted
29125
if respond_to?("process_#{name}")
30126
begin
31127
send("process_#{name}", seqid, iprot, oprot)

0 commit comments

Comments
 (0)