Skip to content

Commit 89252cd

Browse files
committed
Implement synchronous outlet for zero-copy writes.
1 parent 08024cc commit 89252cd

File tree

7 files changed

+159
-44
lines changed

7 files changed

+159
-44
lines changed

examples/ReceiveDataInChunks.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@
88
int main(int argc, char **argv) {
99
std::cout << "ReceiveDataInChunks" << std::endl;
1010
std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl;
11+
std::cout << "- max_buffered -- duration in msec to buffer" << std::endl;
12+
std::cout << "- flush -- set non-zero to flush data instead of pulling; useful for testing throughput" << std::endl;
1113

1214
try {
1315

1416
std::string name{argc > 1 ? argv[1] : "MyAudioStream"};
15-
int32_t max_buflen = argc > 2 ? std::stol(argv[2]) : 360;
17+
double max_buflen = argc > 2 ? std::stod(argv[2]) : 360.;
1618
bool flush = argc > 3;
1719
// resolve the stream of interest & make an inlet
18-
lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen);
20+
int32_t buf_samples = (int32_t)(max_buflen * 1000);
21+
lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen,
22+
transp_bufsize_thousandths);
1923

2024
// Use set_postprocessing to get the timestamps in a common base clock.
2125
// Do not use if this application will record timestamps to disk -- it is better to

examples/SendDataInChunks.cpp

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,11 @@ struct fake_device {
4444
pattern.reserve(pattern_samples * n_channels);
4545
for (auto sample_ix = 0; sample_ix < pattern_samples; ++sample_ix) {
4646
for (auto chan_ix = 0; chan_ix < n_channels; ++chan_ix) {
47+
// sin(2*pi*f*t), where f cycles from 1 Hz to Nyquist: srate / 2
48+
double f = (chan_ix + 1) % (int)(srate / 2);
4749
pattern.emplace_back(
4850
offset_0 + chan_ix * offset_step +
49-
magnitude * static_cast<int16_t>(sin(M_PI * chan_ix * sample_ix / n_channels)));
51+
magnitude * static_cast<int16_t>(sin(2 * M_PI * f * sample_ix / srate)));
5052
}
5153
}
5254
last_time = std::chrono::steady_clock::now();
@@ -64,15 +66,15 @@ struct fake_device {
6466
return output;
6567
}
6668

67-
std::size_t get_data(std::vector<int16_t> &buffer) {
69+
std::size_t get_data(std::vector<int16_t> &buffer, bool nodata = false) {
6870
auto now = std::chrono::steady_clock::now();
6971
auto elapsed_nano =
7072
std::chrono::duration_cast<std::chrono::nanoseconds>(now - last_time).count();
7173
int64_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK.
7274
elapsed_samples = std::min(elapsed_samples, (int64_t)(buffer.size() / n_channels));
73-
if (false) {
75+
if (nodata) {
7476
// The fastest but no patterns.
75-
memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]);
77+
// memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]);
7678
} else {
7779
std::size_t end_sample = head + elapsed_samples;
7880
std::size_t nowrap_samples = std::min(pattern_samples - head, elapsed_samples);
@@ -89,22 +91,26 @@ struct fake_device {
8991

9092
int main(int argc, char **argv) {
9193
std::cout << "SendDataInChunks" << std::endl;
92-
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl;
94+
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate nodata use_sync" << std::endl;
9395
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl;
9496
std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl;
95-
97+
std::cout << "- nodata -- Set non-zero to cause the fake device to not copy pattern data into the buffer." << std::endl;
98+
std::cout << "- use_sync -- Set to non-zero to use blocking send." << std::endl;
99+
96100
std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"};
97101
int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device.
98102
int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device.
99-
int32_t max_buffered = argc > 5 ? std::stol(argv[5]) : 360;
103+
double max_buffered = argc > 5 ? std::stod(argv[5]) : 360.;
100104
int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second.
105+
bool nodata = argc > 7;
106+
bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true;
107+
101108
int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk.
102109
int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk
103110

104111
try {
105112
// Prepare the LSL stream.
106-
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16);
107-
lsl::stream_outlet outlet(info, 0, max_buffered);
113+
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16, "example-SendDataInChunks");
108114
lsl::xml_element desc = info.desc();
109115
desc.append_child_value("manufacturer", "LSL");
110116
lsl::xml_element chns = desc.append_child("channels");
@@ -114,13 +120,19 @@ int main(int argc, char **argv) {
114120
chn.append_child_value("unit", "microvolts");
115121
chn.append_child_value("type", "EEG");
116122
}
123+
int32_t buf_samples = max_buffered * samplingrate;
124+
lsl::stream_outlet outlet(info, chunk_samples, buf_samples,
125+
transp_bufsize_samples | (do_sync ? transp_sync_blocking: transp_default));
126+
info = outlet.info(); // Refresh info with whatever the outlet captured.
127+
std::cout << "Stream UID: " << info.uid() << std::endl;
117128

118129
// Create a connection to our device.
119130
fake_device my_device(n_channels, (float)samplingrate);
120131

121132
// Prepare buffer to get data from 'device'.
122133
// The buffer should be larger than you think you need. Here we make it 4x as large.
123134
std::vector<int16_t> chunk_buffer(4 * chunk_samples * n_channels);
135+
std::fill(chunk_buffer.begin(), chunk_buffer.end(), 0);
124136

125137
std::cout << "Now sending data..." << std::endl;
126138

@@ -133,11 +145,12 @@ int main(int argc, char **argv) {
133145
std::this_thread::sleep_until(next_chunk_time);
134146

135147
// Get data from device
136-
std::size_t returned_samples = my_device.get_data(chunk_buffer);
148+
std::size_t returned_samples = my_device.get_data(chunk_buffer, nodata);
137149

138150
// send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches.
139151
// other push_chunk methods are easier but slightly slower.
140-
outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, 0.0, true);
152+
double ts = lsl::local_clock();
153+
outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, ts, true);
141154
}
142155

143156
} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }

include/lsl/common.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ typedef enum {
161161
/// The supplied max_buf should be scaled by 0.001.
162162
transp_bufsize_thousandths = 2,
163163

164+
/// The outlet will use synchronous (blocking) calls to asio to push data
165+
transp_sync_blocking = 4,
166+
164167
// prevent compilers from assuming an instance fits in a single byte
165168
_lsl_transport_options_maxval = 0x7f000000
166169
} lsl_transport_options_t;

src/stream_outlet_impl.cpp

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,22 @@
1212
namespace lsl {
1313

1414
stream_outlet_impl::stream_outlet_impl(
15-
const stream_info_impl &info, int32_t chunk_size, int32_t max_capacity)
15+
const stream_info_impl &info, int32_t chunk_size, int32_t max_capacity, uint32_t flags)
1616
: sample_factory_(std::make_shared<factory>(info.channel_format(), info.channel_count(),
1717
static_cast<uint32_t>(
1818
info.nominal_srate()
1919
? info.nominal_srate() * api_config::get_instance()->outlet_buffer_reserve_ms() /
2020
1000
2121
: api_config::get_instance()->outlet_buffer_reserve_samples()))),
2222
chunk_size_(chunk_size), info_(std::make_shared<stream_info_impl>(info)),
23-
send_buffer_(std::make_shared<send_buffer>(max_capacity)) {
23+
send_buffer_(std::make_shared<send_buffer>(max_capacity)),
24+
do_sync_(flags & transp_sync_blocking) {
25+
26+
if ((info.channel_format() == cft_string) && (flags & transp_sync_blocking)) {
27+
LOG_F(WARNING, "sync push not supported for string-formatted streams. Reverting to async.");
28+
do_sync_ = false;
29+
}
30+
2431
ensure_lsl_initialized();
2532
const api_config *cfg = api_config::get_instance();
2633

@@ -143,8 +150,24 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo
143150
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
144151
sample_p smp(
145152
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
146-
smp->assign_untyped(data);
147-
send_buffer_->push_sample(smp);
153+
if (!do_sync_) {
154+
smp->assign_untyped(data); // Note: Makes a copy!
155+
send_buffer_->push_sample(smp);
156+
} else {
157+
if (timestamp == DEDUCED_TIMESTAMP) {
158+
sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
159+
} else {
160+
sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
161+
sync_buffs_.push_back(asio::buffer(&timestamp, sizeof(timestamp)));
162+
}
163+
sync_buffs_.push_back(asio::buffer(data, smp->datasize()));
164+
if (pushthrough) {
165+
for (auto &tcp_server : tcp_servers_)
166+
tcp_server->write_all_blocking(sync_buffs_);
167+
sync_buffs_.clear();
168+
}
169+
}
170+
148171
}
149172

150173
bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); }
@@ -158,8 +181,23 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou
158181
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
159182
sample_p smp(
160183
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
161-
smp->assign_typed(data);
162-
send_buffer_->push_sample(smp);
184+
if (!do_sync_) {
185+
smp->assign_typed(data);
186+
send_buffer_->push_sample(smp);
187+
} else {
188+
if (timestamp == DEDUCED_TIMESTAMP) {
189+
sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
190+
} else {
191+
sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
192+
sync_buffs_.push_back(asio::buffer(&timestamp, sizeof(timestamp)));
193+
}
194+
sync_buffs_.push_back(asio::buffer(data, smp->datasize()));
195+
if (pushthrough) {
196+
for (auto &tcp_server : tcp_servers_)
197+
tcp_server->write_all_blocking(sync_buffs_);
198+
sync_buffs_.clear();
199+
}
200+
}
163201
}
164202

165203
template void stream_outlet_impl::enqueue<char>(const char *data, double, bool);

src/stream_outlet_impl.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <string>
1111
#include <thread>
1212
#include <vector>
13+
#include <asio/buffer.hpp>
1314

1415
using asio::ip::tcp;
1516
using asio::ip::udp;
@@ -35,9 +36,11 @@ class stream_outlet_impl {
3536
* @param max_capacity The maximum number of samples buffered for unresponsive receivers. If
3637
* more samples get pushed, the oldest will be dropped. The default is sufficient to hold a bit
3738
* more than 15 minutes of data at 512Hz, while consuming not more than ca. 512MB of RAM.
39+
* @param flags Bitwise-OR'd flags from lsl_transport_options_t
3840
*/
3941
stream_outlet_impl(
40-
const stream_info_impl &info, int32_t chunk_size = 0, int32_t max_capacity = 512000);
42+
const stream_info_impl &info, int32_t chunk_size = 0, int32_t max_capacity = 512000,
43+
uint32_t flags = transp_default);
4144

4245
/**
4346
* Destructor.
@@ -317,6 +320,8 @@ class stream_outlet_impl {
317320
stream_info_impl_p info_;
318321
/// the single-producer, multiple-receiver send buffer
319322
send_buffer_p send_buffer_;
323+
/// Flag to indicate that push_* operations should be blocking synchronous. false by default.
324+
bool do_sync_;
320325
/// the IO service objects (two per stack: one for UDP and one for TCP)
321326
std::vector<io_context_p> ios_;
322327

@@ -329,6 +334,8 @@ class stream_outlet_impl {
329334
std::vector<udp_server_p> responders_;
330335
/// threads that handle the I/O operations (two per stack: one for UDP and one for TCP)
331336
std::vector<thread_p> io_threads_;
337+
/// buffers used in synchronous call to gather-write data directly to the socket.
338+
std::vector<asio::const_buffer> sync_buffs_;
332339
};
333340

334341
} // namespace lsl

src/tcp_server.cpp

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,10 @@ class client_session : public std::enable_shared_from_this<client_session> {
148148
};
149149

150150
tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf,
151-
factory_p factory, tcp protocol, int chunk_size)
151+
factory_p factory, tcp protocol, int chunk_size, bool do_sync)
152152
: chunk_size_(chunk_size), shutdown_(false), info_(std::move(info)), io_(std::move(io)),
153153
factory_(std::move(factory)), send_buffer_(std::move(sendbuf)),
154-
acceptor_(std::make_shared<tcp::acceptor>(*io_)) {
154+
acceptor_(std::make_shared<tcp::acceptor>(*io_)), transfer_is_sync_(do_sync) {
155155
// open the server connection
156156
acceptor_->open(protocol);
157157

@@ -222,36 +222,68 @@ void tcp_server::handle_accept_outcome(std::shared_ptr<client_session> newsessio
222222
accept_next_connection();
223223
}
224224

225+
// === synchronous transfer
226+
227+
void tcp_server::write_all_blocking(std::vector<asio::const_buffer> buffs) {
228+
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
229+
std::size_t bytes_sent;
230+
asio::error_code ec;
231+
for (const auto &x : inflight_ready_) {
232+
if (x.second && x.first->is_open()) {
233+
bytes_sent = x.first->send(buffs, 0, ec);
234+
if (ec) {
235+
switch(ec.value()) {
236+
case asio::error::broken_pipe:
237+
case asio::error::connection_reset:
238+
LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket.");
239+
inflight_ready_[x.first] = false;
240+
post(*io_, [x]() {
241+
close_inflight_socket(x);
242+
});
243+
// We leave it up to the client_session destructor to remove the socket.
244+
break;
245+
default:
246+
LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str());
247+
}
248+
}
249+
}
250+
}
251+
}
252+
225253
// === graceful cancellation of in-flight sockets ===
226254

227255
void tcp_server::register_inflight_socket(const tcp_socket_p &sock) {
228256
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
229-
inflight_.insert(sock);
257+
inflight_ready_.insert({sock, false});
230258
}
231259

232260
void tcp_server::unregister_inflight_socket(const tcp_socket_p &sock) {
233261
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
234-
inflight_.erase(sock);
262+
inflight_ready_[sock] = false;
263+
inflight_ready_.erase(sock);
235264
}
236265

237-
void tcp_server::close_inflight_sockets() {
238-
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
239-
for (const auto &sock : inflight_)
240-
post(*io_, [sock]() {
266+
void tcp_server::close_inflight_socket(std::pair<tcp_socket_p, bool> x) {
267+
try {
268+
if (x.first->is_open()) {
241269
try {
242-
if (sock->is_open()) {
243-
try {
244-
// (in some cases shutdown may fail)
245-
sock->shutdown(sock->shutdown_both);
246-
} catch (...) {}
247-
sock->close();
248-
}
249-
} catch (std::exception &e) {
250-
LOG_F(WARNING, "Error during shutdown_and_close: %s", e.what());
251-
}
252-
});
270+
// (in some cases shutdown may fail)
271+
x.first->shutdown(x.first->shutdown_both);
272+
} catch (...) {}
273+
x.first->close();
274+
}
275+
} catch (std::exception &e) {
276+
LOG_F(WARNING, "Error during shutdown_and_close: %s", e.what());
277+
}
253278
}
254279

280+
void tcp_server::close_inflight_sockets() {
281+
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
282+
for (const auto &x : inflight_ready_) {
283+
inflight_ready_[x.first] = false;
284+
post(*io_, [x]() { close_inflight_socket(x); });
285+
}
286+
}
255287

256288
// === implementation of the client_session class ===
257289

@@ -511,7 +543,11 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
511543
feedbuf_.consume(n);
512544
// register outstanding work at the server (will be unregistered at session destruction)
513545
work_ = std::make_shared<work_p::element_type>(serv_->io_->get_executor());
514-
// spawn a sample transfer thread
546+
serv_->inflight_ready_[sock_] = true;
547+
if (serv_->transfer_is_sync_)
548+
LOG_F(WARNING, "Using synchronous blocking transfers for new client session.");
549+
// spawn a sample transfer thread.
550+
// TODO: only spawn thread in async, but then we need `this` to belong to something else.
515551
std::thread(&client_session::transfer_samples_thread, this, shared_from_this()).detach();
516552
} catch (std::exception &e) {
517553
LOG_F(WARNING, "Unexpected error while handling the feedheader send outcome: %s", e.what());

0 commit comments

Comments
 (0)