1
1
import ./ bindings
2
- import std/ [strformat]
2
+ import std/ [strformat, logging ]
3
3
4
4
# Unofficial easier-for-Nim API
5
5
13
13
14
14
ZConnectionImpl* {.pure, final.} = object
15
15
# # A Zmq connection. Since ``ZContext`` and ``ZSocket`` are pointers, it is highly recommended to **not** copy ``ZConnection``.
16
- context* : ZContext # # Zmq context from C-bindings.
16
+ context* : ZContext # # Zmq context from C-bindings.
17
17
socket* : ZSocket # # Zmq socket from C-bindings.
18
18
ownctx: bool # Boolean indicating if the connection owns the Zmq context
19
19
alive: bool # Boolean indicating if the connection has been closed
@@ -32,16 +32,39 @@ proc zmqError*() {.noinline, noreturn.} =
32
32
e.msg = & " Error: { e.error} . " & $ strerror(e.error)
33
33
raise e
34
34
35
+ var shouldLogEagainError = false
36
+
37
+ proc enableLogEagain* () =
38
+ # # Enable logging EAGAIN error in ZMQ calls
39
+ shouldLogEagainError = true
40
+
41
+ proc disableLogEagain* () =
42
+ # # Disable logging EAGAIN error in ZMQ calls
43
+ shouldLogEagainError = false
44
+
35
45
proc zmqErrorExceptEAGAIN() =
36
46
var e: ref ZmqError
37
47
new(e)
38
48
e.error = errno()
49
+ let errmsg = $ strerror(e.error)
39
50
if e.error == ZMQ_EAGAIN:
40
- discard
51
+ if shouldLogEagainError:
52
+ if logging.getHandlers().len() > 0 :
53
+ warn(errmsg)
54
+ else :
55
+ echo(errmsg)
56
+ else :
57
+ discard
41
58
else :
42
- e.msg = & " Error: { e.error} . " & $ strerror(e.error)
59
+ e.msg = & " Error: { e.error} . " & errmsg
43
60
raise e
44
61
62
+ template defaultFlag() : ZSendRecvOptions =
63
+ when defined(defaultFlagDontWait):
64
+ DONTWAIT
65
+ else :
66
+ NOFLAGS
67
+
45
68
#[
46
69
# Context related proc
47
70
]#
@@ -135,7 +158,6 @@ proc getsockopt*[T: SomeOrdinal|string](c: ZConnection, option: ZSockOptions): T
135
158
## Check http://api.zeromq.org/4-2:zmq-setsockopt
136
159
getsockopt[T](c.socket, option)
137
160
138
-
139
161
#[
140
162
Destructor
141
163
]#
@@ -305,7 +327,7 @@ proc close*(c: ZConnection, linger: int = 500) =
305
327
306
328
# Send / Receive
307
329
# Send with ZSocket type
308
- proc send*(s: ZSocket, msg: string , flags: ZSendRecvOptions = NOFLAGS ) =
330
+ proc send*(s: ZSocket, msg: string , flags: ZSendRecvOptions = defaultFlag() ) =
309
331
## Sends a message through the socket.
310
332
var m: ZMsg
311
333
if msg_init(m, msg.len) != 0:
@@ -330,7 +352,7 @@ proc sendAll*(s: ZSocket, msg: varargs[string]) =
330
352
inc(i)
331
353
s.send(msg[i])
332
354
333
- proc send* (c: ZConnection, msg: string , flags: ZSendRecvOptions = NOFLAGS ) =
355
+ proc send* (c: ZConnection, msg: string , flags: ZSendRecvOptions = defaultFlag() ) =
334
356
# # Sends a message over the connection.
335
357
send(c.socket, msg, flags)
336
358
@@ -339,7 +361,7 @@ proc sendAll*(c: ZConnection, msg: varargs[string]) =
339
361
sendAll(c.socket, msg)
340
362
341
363
# receive with ZSocket type
342
- proc receiveImpl(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS ): tuple [msgAvailable: bool , moreAvailable: bool , msg: string ] =
364
+ proc receiveImpl(s: ZSocket, flags: ZSendRecvOptions = defaultFlag() ): tuple [msgAvailable: bool , moreAvailable: bool , msg: string ] =
343
365
result .moreAvailable = false
344
366
result .msgAvailable = false
345
367
@@ -364,7 +386,7 @@ proc receiveImpl(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvaila
364
386
if msg_close(m) != 0 :
365
387
zmqError()
366
388
367
- proc waitForReceive* (s: ZSocket, timeout: int = - 2 , flags: ZSendRecvOptions = NOFLAGS ): tuple [msgAvailable: bool , moreAvailable: bool , msg: string ] =
389
+ proc waitForReceive* (s: ZSocket, timeout: int = - 2 , flags: ZSendRecvOptions = defaultFlag() ): tuple [msgAvailable: bool , moreAvailable: bool , msg: string ] =
368
390
# # Set RCVTIMEO for the socket and wait until a message is available.
369
391
# # This function is blocking.
370
392
# #
@@ -392,7 +414,7 @@ proc waitForReceive*(s: ZSocket, timeout: int = -2, flags: ZSendRecvOptions = NO
392
414
if shouldUpdateTimeout:
393
415
s.setsockopt(RCVTIMEO, curtimeout.cint )
394
416
395
- proc tryReceive* (s: ZSocket, flags: ZSendRecvOptions = NOFLAGS ): tuple [msgAvailable: bool , moreAvailable: bool , msg: string ] =
417
+ proc tryReceive* (s: ZSocket, flags: ZSendRecvOptions = defaultFlag() ): tuple [msgAvailable: bool , moreAvailable: bool , msg: string ] =
396
418
# # Receives a message from a socket in a non-blocking way.
397
419
# #
398
420
# # Indicate whether a message was received or EAGAIN occured by ``msgAvailable``
@@ -406,13 +428,13 @@ proc tryReceive*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): tuple[msgAvaila
406
428
if (status and ZMQ_POLLIN) != 0 :
407
429
result = receiveImpl(s, flags)
408
430
409
- proc receive* (s: ZSocket, flags: ZSendRecvOptions = NOFLAGS ): string =
431
+ proc receive* (s: ZSocket, flags: ZSendRecvOptions = defaultFlag() ): string =
410
432
# # Receive a message on socket.
411
433
#
412
434
# # Return an empty string on EAGAIN
413
435
receiveImpl(s, flags).msg
414
436
415
- proc receiveAll* (s: ZSocket, flags: ZSendRecvOptions = NOFLAGS ): seq [string ] =
437
+ proc receiveAll* (s: ZSocket, flags: ZSendRecvOptions = defaultFlag() ): seq [string ] =
416
438
# # Receive all parts of a message
417
439
# #
418
440
# # If EAGAIN occurs without any data being received, it will be an empty seq
@@ -425,7 +447,7 @@ proc receiveAll*(s: ZSocket, flags: ZSendRecvOptions = NOFLAGS): seq[string] =
425
447
else :
426
448
expectMessage = false
427
449
428
- proc waitForReceive* (c: ZConnection, timeout: int = - 1 , flags: ZSendRecvOptions = NOFLAGS ): tuple [msgAvailable: bool , moreAvailable: bool , msg: string ] =
450
+ proc waitForReceive* (c: ZConnection, timeout: int = - 1 , flags: ZSendRecvOptions = defaultFlag() ): tuple [msgAvailable: bool , moreAvailable: bool , msg: string ] =
429
451
# # Set RCVTIMEO for the socket and wait until a message is available.
430
452
# # This function is blocking.
431
453
# #
@@ -438,19 +460,19 @@ proc waitForReceive*(c: ZConnection, timeout: int = -1, flags: ZSendRecvOptions
438
460
# # Indicate if more parts are needed to be received by ``moreAvailable``
439
461
waitForReceive(c.socket, timeout, flags)
440
462
441
- proc tryReceive* (c: ZConnection, flags: ZSendRecvOptions = NOFLAGS ): tuple [msgAvailable: bool , moreAvailable: bool , msg: string ] =
463
+ proc tryReceive* (c: ZConnection, flags: ZSendRecvOptions = defaultFlag() ): tuple [msgAvailable: bool , moreAvailable: bool , msg: string ] =
442
464
# # Receives a message from a socket in a non-blocking way.
443
465
# #
444
466
# # Indicate whether a message was received or EAGAIN occured by ``msgAvailable``
445
467
# #
446
468
# # Indicate if more parts are needed to be received by ``moreAvailable``
447
469
tryReceive(c.socket, flags)
448
470
449
- proc receive* (c: ZConnection, flags: ZSendRecvOptions = NOFLAGS ): string =
471
+ proc receive* (c: ZConnection, flags: ZSendRecvOptions = defaultFlag() ): string =
450
472
# # Receive data over the connection
451
473
receive(c.socket, flags)
452
474
453
- proc receiveAll* (c: ZConnection, flags: ZSendRecvOptions = NOFLAGS ): seq [string ] =
475
+ proc receiveAll* (c: ZConnection, flags: ZSendRecvOptions = defaultFlag() ): seq [string ] =
454
476
# # Receive all parts of a message
455
477
# #
456
478
# # If EAGAIN occurs without any data being received, it will be an empty seq
0 commit comments