Skip to content

Commit

Permalink
Fix SyslogLogger to be generally thread and task safe. See #176.
Browse files Browse the repository at this point in the history
Uses an asynchronous writer thread to write the log output and avoids any yielding within the thread/task that performs the logging call.

Note that this change breaks the API to create a SyslogLogger instance, but since the previous implementation was so fundamentally broken, we can assume that nobody was using it.
  • Loading branch information
s-ludwig committed Jul 27, 2024
1 parent 9279edf commit a16f68f
Showing 1 changed file with 147 additions and 60 deletions.
207 changes: 147 additions & 60 deletions source/vibe/core/log.d
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
module vibe.core.log;

import vibe.core.args;
import vibe.core.concurrency : ScopedLock, lock;
import vibe.core.concurrency : ScopedLock, lock, isWeaklyIsolated;
import vibe.core.sync;

import std.algorithm;
Expand Down Expand Up @@ -630,21 +630,60 @@ final class HTMLLogger : Logger {
}


import std.conv;
/**
Construct a SyslogLogger.
The log messages are sent to the given OutputStream stream using the given
Facility facility.Optionally the appName and hostName can be set. The
appName defaults to null. The hostName defaults to hostName().
Note that the passed stream's write function must not use logging with
a level for that this Logger's acceptsLevel returns true. Because this
Logger uses the stream's write function when it logs and would hence
log forevermore.
*/
SyslogLogger!StreamCallback createSyslogLogger(StreamCallback)(StreamCallback stream_callback,
SyslogFacility facility, string app_name = null, string host_name = hostName())
if (isWeaklyIsolated!StreamCallback)
{
return new SyslogLogger!StreamCallback(stream_callback, facility, app_name, host_name);
}

/**
A logger that logs in syslog format according to RFC 5424.
Messages can be logged to files (via file streams) or over the network (via
TCP or SSL streams).
Params:
StreamCallback = Callable that takes no arguments and returns an
`OutputStream` of some kind that will be used to write the log
output in Syslog format.
Known_issues:
The current implementation does not write individual lines atomically.
Thus, when logging from multiple tasks or threads concurrently, log
lines may get mixed up.
Standards: Conforms to RFC 5424.
*/
final class SyslogLogger(OutputStream) : Logger {
final class SyslogLogger(StreamCallback) : Logger
if (isWeaklyIsolated!StreamCallback)
{
import std.conv : hexString;
import core.sync.condition : Condition;
import core.sync.mutex : Mutex;
import vibe.container.ringbuffer : RingBuffer;

private {
string m_hostName;
string m_appName;
OutputStream m_ostream;
SyslogFacility m_facility;
Thread m_writeThread;
StreamCallback m_streamCallback;
Mutex m_bufferMutex;
Condition m_bufferCondition;
RingBuffer!ubyte m_buffer;
}

deprecated("Use `SyslogFacility` instead.")
Expand All @@ -670,25 +709,31 @@ final class SyslogLogger(OutputStream) : Logger {
///
private enum BOM = hexString!"EFBBBF";

/**
Construct a SyslogLogger.
The log messages are sent to the given OutputStream stream using the given
Facility facility.Optionally the appName and hostName can be set. The
appName defaults to null. The hostName defaults to hostName().
Note that the passed stream's write function must not use logging with
a level for that this Logger's acceptsLevel returns true. Because this
Logger uses the stream's write function when it logs and would hence
log forevermore.
*/
this(OutputStream stream, SyslogFacility facility, string appName = null, string hostName = hostName())
private this(StreamCallback stream_callback, SyslogFacility facility, string appName = null, string hostName = hostName())
{
m_hostName = hostName != "" ? hostName : NILVALUE;
m_appName = appName != "" ? appName : NILVALUE;
m_ostream = stream;
m_facility = facility;
m_streamCallback = stream_callback;
this.minLevel = LogLevel.debug_;

m_buffer.capacity = 1024;
m_bufferMutex = new Mutex;
m_bufferCondition = new Condition(m_bufferMutex);

m_writeThread = new Thread(&writeThreadFunc);
m_writeThread.start();
}

void dispose()
{
{
auto l = scopedMutexLock(m_bufferMutex);
while (!m_buffer.empty) m_bufferCondition.wait();
m_buffer.capacity = 0;
}
m_bufferCondition.notifyAll();
m_writeThread.join();
}

/**
Expand Down Expand Up @@ -730,66 +775,101 @@ final class SyslogLogger(OutputStream) : Logger {

auto text = msg.text;
import std.format : formattedWrite;
auto str = StreamOutputRange!OutputStream(m_ostream);
auto str = BufferOutputRange(this);
str.formattedWrite(SYSLOG_MESSAGE_FORMAT_VERSION1, priVal,
timestamp, m_hostName, BOM ~ m_appName, procId, msgId,
structuredData, BOM);
}

override void put(scope const(char)[] text)
@trusted {
m_ostream.write(text);
writeToBuffer(cast(const(ubyte)[])text);
}

override void endLine()
@trusted {
m_ostream.write("\n");
m_ostream.flush();
writeToBuffer(cast(const(ubyte)[])"\n");
}

private struct StreamOutputRange(OutputStream)
private void writeToBuffer(scope const(ubyte)[] bts)
@safe {
while (bts.length > 0) {
bool was_empty;
{
auto l = scopedMutexLock(m_bufferMutex);
while (m_buffer.full) {
if (m_buffer.capacity == 0) return; // write thread failed
() @trusted { m_bufferCondition.wait(); } ();
}
was_empty = m_buffer.empty;
auto chunk = min(bts.length, m_buffer.freeSpace);
m_buffer.putBack(bts[0 .. chunk]);
bts = bts[chunk .. $];
}
if (was_empty) () @trusted { m_bufferCondition.notifyAll(); } ();
}
}

private void writeThreadFunc()
@safe nothrow {
ubyte[4096] buf;


try {
auto ostr = m_streamCallback();

while (true) {
size_t chunk;
bool was_full, is_empty;

{
auto l = scopedMutexLock(m_bufferMutex);
while (m_buffer.empty) {
if (m_buffer.capacity == 0) {
ostr.finalize();
return;
}
() @trusted { m_bufferCondition.wait(); } ();
}
chunk = min(buf.length, m_buffer.length);
was_full = m_buffer.full;
m_buffer.read(buf[0 .. chunk]);
is_empty = m_buffer.empty;
}

if (was_full || is_empty) () @trusted { m_bufferCondition.notifyAll(); } ();

ostr.write(buf[0 .. chunk]);
}
} catch (Exception e) {
logException(e, "SyslogLogger failed to write output");
auto l = scopedMutexLock(m_bufferMutex);
m_buffer.capacity = 0;
try () @trusted { m_bufferCondition.notifyAll(); } ();
catch (Exception e2) assert(false, e2.msg);
}
}

private static struct BufferOutputRange
{
private {
OutputStream m_stream;
size_t m_fill = 0;
char[256] m_data = void;
SyslogLogger m_logger;
}

@safe:

@disable this(this);

this(OutputStream stream) { m_stream = stream; }
~this() { flush(); }
void flush()
{
if (m_fill == 0) return;
m_stream.write(m_data[0 .. m_fill]);
m_fill = 0;
}
this(SyslogLogger logger) { m_logger = logger; }

void put(char bt)
{
m_data[m_fill++] = bt;
if (m_fill >= m_data.length) flush();
ubyte[1] bts;
bts[0] = bt;
m_logger.writeToBuffer(bts[]);
}

void put(const(char)[] bts)
{
// avoid writing more chunks than necessary
if (bts.length + m_fill >= m_data.length * 2) {
flush();
m_stream.write(bts);
return;
}

while (bts.length) {
auto len = min(m_data.length - m_fill, bts.length);
m_data[m_fill .. m_fill + len] = bts[0 .. len];
m_fill += len;
bts = bts[len .. $];
if (m_fill >= m_data.length) flush();
}
m_logger.writeToBuffer(cast(const(ubyte)[])bts);
}
}
}
Expand Down Expand Up @@ -825,8 +905,13 @@ enum SyslogFacility {
unittest
{
import vibe.core.file;
auto fstream = createTempFile();
auto logger = new SyslogLogger!FileStream(fstream, SyslogFacility.local1, "appname", null);
align(16) static __gshared string gpath;
static FileStream createOutput() @trusted {
auto fstream = createTempFile();
atomicStore(gpath, fstream.path.toString());
return fstream;
}
auto logger = createSyslogLogger(&createOutput, SyslogFacility.local1, "appname", null);
LogLine msg;
import std.datetime;
import core.thread;
Expand All @@ -840,14 +925,14 @@ unittest
logger.put("αβγ");
logger.endLine();
}
auto path = fstream.path;
destroy(logger);
fstream.close();
logger.dispose();
auto path = atomicLoad(gpath);

import std.file;
import std.string;
auto lines = splitLines(readText(path.toString()), KeepTerminator.yes);
alias BOM = SyslogLogger!FileStream.BOM;
auto txt = readText(path);
auto lines = splitLines(txt, KeepTerminator.yes);
alias BOM = logger.BOM;
assert(lines.length == 7);
assert(lines[0] == "<143>1 0000-01-01T00:00:00.000001 - " ~ BOM ~ "appname - - - " ~ BOM ~ "αβγ\n");
assert(lines[1] == "<142>1 0000-01-01T00:00:00.000001 - " ~ BOM ~ "appname - - - " ~ BOM ~ "αβγ\n");
Expand All @@ -856,7 +941,7 @@ unittest
assert(lines[4] == "<139>1 0000-01-01T00:00:00.000001 - " ~ BOM ~ "appname - - - " ~ BOM ~ "αβγ\n");
assert(lines[5] == "<138>1 0000-01-01T00:00:00.000001 - " ~ BOM ~ "appname - - - " ~ BOM ~ "αβγ\n");
assert(lines[6] == "<137>1 0000-01-01T00:00:00.000001 - " ~ BOM ~ "appname - - - " ~ BOM ~ "αβγ\n");
removeFile(path.toString());
removeFile(path);
}


Expand All @@ -865,6 +950,8 @@ unittest
/// If the host name cannot be determined the function returns null.
private string hostName()
{
import std.conv : to;

string hostName;

version (Posix) {
Expand Down

0 comments on commit a16f68f

Please sign in to comment.