diff --git a/source/vibe/core/log.d b/source/vibe/core/log.d index a01b661..6140a2b 100644 --- a/source/vibe/core/log.d +++ b/source/vibe/core/log.d @@ -8,7 +8,7 @@ module vibe.core.log; import vibe.core.args; -import vibe.core.concurrency : ScopedLock, lock; +import vibe.core.concurrency : ScopedLock, lock, isWeaklyIsolated; import vibe.core.sync; import std.algorithm; @@ -630,21 +630,60 @@ final class HTMLLogger : Logger { } -import std.conv; +/** + Construct a SyslogLogger. + + The log messages are sent to the given OutputStream stream using the given + Facility facility.Optionally the appName and hostName can be set. The + appName defaults to null. The hostName defaults to hostName(). + + Note that the passed stream's write function must not use logging with + a level for that this Logger's acceptsLevel returns true. Because this + Logger uses the stream's write function when it logs and would hence + log forevermore. +*/ +SyslogLogger!StreamCallback createSyslogLogger(StreamCallback)(StreamCallback stream_callback, + SyslogFacility facility, string app_name = null, string host_name = hostName()) + if (isWeaklyIsolated!StreamCallback) +{ + return new SyslogLogger!StreamCallback(stream_callback, facility, app_name, host_name); +} + /** A logger that logs in syslog format according to RFC 5424. Messages can be logged to files (via file streams) or over the network (via TCP or SSL streams). + Params: + StreamCallback = Callable that takes no arguments and returns an + `OutputStream` of some kind that will be used to write the log + output in Syslog format. + + Known_issues: + The current implementation does not write individual lines atomically. + Thus, when logging from multiple tasks or threads concurrently, log + lines may get mixed up. + Standards: Conforms to RFC 5424. */ -final class SyslogLogger(OutputStream) : Logger { +final class SyslogLogger(StreamCallback) : Logger + if (isWeaklyIsolated!StreamCallback) +{ + import std.conv : hexString; + import core.sync.condition : Condition; + import core.sync.mutex : Mutex; + import vibe.container.ringbuffer : RingBuffer; + private { string m_hostName; string m_appName; - OutputStream m_ostream; SyslogFacility m_facility; + Thread m_writeThread; + StreamCallback m_streamCallback; + Mutex m_bufferMutex; + Condition m_bufferCondition; + RingBuffer!ubyte m_buffer; } deprecated("Use `SyslogFacility` instead.") @@ -670,25 +709,31 @@ final class SyslogLogger(OutputStream) : Logger { /// private enum BOM = hexString!"EFBBBF"; - /** - Construct a SyslogLogger. - - The log messages are sent to the given OutputStream stream using the given - Facility facility.Optionally the appName and hostName can be set. The - appName defaults to null. The hostName defaults to hostName(). - - Note that the passed stream's write function must not use logging with - a level for that this Logger's acceptsLevel returns true. Because this - Logger uses the stream's write function when it logs and would hence - log forevermore. - */ - this(OutputStream stream, SyslogFacility facility, string appName = null, string hostName = hostName()) + private this(StreamCallback stream_callback, SyslogFacility facility, string appName = null, string hostName = hostName()) { m_hostName = hostName != "" ? hostName : NILVALUE; m_appName = appName != "" ? appName : NILVALUE; - m_ostream = stream; m_facility = facility; + m_streamCallback = stream_callback; this.minLevel = LogLevel.debug_; + + m_buffer.capacity = 1024; + m_bufferMutex = new Mutex; + m_bufferCondition = new Condition(m_bufferMutex); + + m_writeThread = new Thread(&writeThreadFunc); + m_writeThread.start(); + } + + void dispose() + { + { + auto l = scopedMutexLock(m_bufferMutex); + while (!m_buffer.empty) m_bufferCondition.wait(); + m_buffer.capacity = 0; + } + m_bufferCondition.notifyAll(); + m_writeThread.join(); } /** @@ -730,7 +775,7 @@ final class SyslogLogger(OutputStream) : Logger { auto text = msg.text; import std.format : formattedWrite; - auto str = StreamOutputRange!OutputStream(m_ostream); + auto str = BufferOutputRange(this); str.formattedWrite(SYSLOG_MESSAGE_FORMAT_VERSION1, priVal, timestamp, m_hostName, BOM ~ m_appName, procId, msgId, structuredData, BOM); @@ -738,58 +783,93 @@ final class SyslogLogger(OutputStream) : Logger { override void put(scope const(char)[] text) @trusted { - m_ostream.write(text); + writeToBuffer(cast(const(ubyte)[])text); } override void endLine() @trusted { - m_ostream.write("\n"); - m_ostream.flush(); + writeToBuffer(cast(const(ubyte)[])"\n"); } - private struct StreamOutputRange(OutputStream) + private void writeToBuffer(scope const(ubyte)[] bts) + @safe { + while (bts.length > 0) { + bool was_empty; + { + auto l = scopedMutexLock(m_bufferMutex); + while (m_buffer.full) { + if (m_buffer.capacity == 0) return; // write thread failed + () @trusted { m_bufferCondition.wait(); } (); + } + was_empty = m_buffer.empty; + auto chunk = min(bts.length, m_buffer.freeSpace); + m_buffer.putBack(bts[0 .. chunk]); + bts = bts[chunk .. $]; + } + if (was_empty) () @trusted { m_bufferCondition.notifyAll(); } (); + } + } + + private void writeThreadFunc() + @safe nothrow { + ubyte[4096] buf; + + + try { + auto ostr = m_streamCallback(); + + while (true) { + size_t chunk; + bool was_full, is_empty; + + { + auto l = scopedMutexLock(m_bufferMutex); + while (m_buffer.empty) { + if (m_buffer.capacity == 0) { + ostr.finalize(); + return; + } + () @trusted { m_bufferCondition.wait(); } (); + } + chunk = min(buf.length, m_buffer.length); + was_full = m_buffer.full; + m_buffer.read(buf[0 .. chunk]); + is_empty = m_buffer.empty; + } + + if (was_full || is_empty) () @trusted { m_bufferCondition.notifyAll(); } (); + + ostr.write(buf[0 .. chunk]); + } + } catch (Exception e) { + logException(e, "SyslogLogger failed to write output"); + auto l = scopedMutexLock(m_bufferMutex); + m_buffer.capacity = 0; + try () @trusted { m_bufferCondition.notifyAll(); } (); + catch (Exception e2) assert(false, e2.msg); + } + } + + private static struct BufferOutputRange { private { - OutputStream m_stream; - size_t m_fill = 0; - char[256] m_data = void; + SyslogLogger m_logger; } @safe: - @disable this(this); - - this(OutputStream stream) { m_stream = stream; } - ~this() { flush(); } - void flush() - { - if (m_fill == 0) return; - m_stream.write(m_data[0 .. m_fill]); - m_fill = 0; - } + this(SyslogLogger logger) { m_logger = logger; } void put(char bt) { - m_data[m_fill++] = bt; - if (m_fill >= m_data.length) flush(); + ubyte[1] bts; + bts[0] = bt; + m_logger.writeToBuffer(bts[]); } void put(const(char)[] bts) { - // avoid writing more chunks than necessary - if (bts.length + m_fill >= m_data.length * 2) { - flush(); - m_stream.write(bts); - return; - } - - while (bts.length) { - auto len = min(m_data.length - m_fill, bts.length); - m_data[m_fill .. m_fill + len] = bts[0 .. len]; - m_fill += len; - bts = bts[len .. $]; - if (m_fill >= m_data.length) flush(); - } + m_logger.writeToBuffer(cast(const(ubyte)[])bts); } } } @@ -825,8 +905,13 @@ enum SyslogFacility { unittest { import vibe.core.file; - auto fstream = createTempFile(); - auto logger = new SyslogLogger!FileStream(fstream, SyslogFacility.local1, "appname", null); + align(16) static __gshared string gpath; + static FileStream createOutput() @trusted { + auto fstream = createTempFile(); + atomicStore(gpath, fstream.path.toString()); + return fstream; + } + auto logger = createSyslogLogger(&createOutput, SyslogFacility.local1, "appname", null); LogLine msg; import std.datetime; import core.thread; @@ -840,14 +925,14 @@ unittest logger.put("αβγ"); logger.endLine(); } - auto path = fstream.path; - destroy(logger); - fstream.close(); + logger.dispose(); + auto path = atomicLoad(gpath); import std.file; import std.string; - auto lines = splitLines(readText(path.toString()), KeepTerminator.yes); - alias BOM = SyslogLogger!FileStream.BOM; + auto txt = readText(path); + auto lines = splitLines(txt, KeepTerminator.yes); + alias BOM = logger.BOM; assert(lines.length == 7); assert(lines[0] == "<143>1 0000-01-01T00:00:00.000001 - " ~ BOM ~ "appname - - - " ~ BOM ~ "αβγ\n"); assert(lines[1] == "<142>1 0000-01-01T00:00:00.000001 - " ~ BOM ~ "appname - - - " ~ BOM ~ "αβγ\n"); @@ -856,7 +941,7 @@ unittest assert(lines[4] == "<139>1 0000-01-01T00:00:00.000001 - " ~ BOM ~ "appname - - - " ~ BOM ~ "αβγ\n"); assert(lines[5] == "<138>1 0000-01-01T00:00:00.000001 - " ~ BOM ~ "appname - - - " ~ BOM ~ "αβγ\n"); assert(lines[6] == "<137>1 0000-01-01T00:00:00.000001 - " ~ BOM ~ "appname - - - " ~ BOM ~ "αβγ\n"); - removeFile(path.toString()); + removeFile(path); } @@ -865,6 +950,8 @@ unittest /// If the host name cannot be determined the function returns null. private string hostName() { + import std.conv : to; + string hostName; version (Posix) {