@@ -651,7 +651,7 @@ def send(self,s,mtype,group,runid,desc,data):
651
651
full_desc = desc + fill_desc
652
652
buf += full_desc .encode ()
653
653
buf += sdata
654
- s .send (buf )
654
+ s .sendall (buf )
655
655
656
656
657
657
def _check_sec_message (self ,recv_sec_message ):
@@ -660,9 +660,21 @@ def _check_sec_message(self,recv_sec_message):
660
660
format (recv_sec_message ,self .sec_message ))
661
661
662
662
class PyPestWorker (object ):
663
+ """a pure python worker for pest++. the pest++ master doesnt even know...
663
664
665
+ Args:
666
+ pst (str or pyemu.Pst): something about a control file
667
+ host (str): master hostname or IPv4 address
668
+ port (int): port number that the master is listening on
669
+ timeout (float): number of seconds to sleep at different points in the process.
670
+ if you have lots of pars and/obs, a longer sleep can be helpful, but if you make this smaller,
671
+ the worker responds faster...'it depends'
672
+ verbose (bool): flag to echo what's going on to stdout
673
+ socket_timeout (float): number of seconds that the socket should wait before giving up.
674
+ generally, this can be a big number...
675
+ """
664
676
665
- def __init__ (self , pst , host , port , timeout = 0.1 ,verbose = True ):
677
+ def __init__ (self , pst , host , port , timeout = 0.25 ,verbose = True , socket_timeout = None ):
666
678
self .host = host
667
679
self .port = port
668
680
self ._pst_arg = pst
@@ -673,7 +685,9 @@ def __init__(self, pst, host, port, timeout=0.1,verbose=True):
673
685
self .verbose = bool (verbose )
674
686
self .par_names = None
675
687
self .obs_names = None
676
-
688
+ if socket_timeout is None :
689
+ socket_timeout = timeout * 100
690
+ self .socket_timeout = socket_timeout
677
691
self .par_values = None
678
692
self .max_reconnect_attempts = 10
679
693
self ._process_pst ()
@@ -695,23 +709,19 @@ def _process_pst(self):
695
709
696
710
697
711
def connect (self ,is_reconnect = False ):
698
- self .message ("trying to connect to {0}:{1}..." .format (self .host ,self .port ))
712
+ self .message ("trying to connect to {0}:{1}..." .format (self .host ,self .port ), echo = True )
699
713
self .s = None
700
714
c = 0
701
715
while True :
702
716
try :
703
717
time .sleep (self .timeout )
704
- print ("." , end = '' )
705
718
c += 1
706
- if c % 75 == 0 :
707
- print ('' )
708
- print (c )
709
719
if is_reconnect and c > self .max_reconnect_attempts :
710
720
print ("max reconnect attempts reached..." )
711
721
return False
712
722
self .s = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
713
723
self .s .connect ((self .host , self .port ))
714
- self .message ("connected to {0}:{1}" .format (self .host ,self .port ))
724
+ self .message ("connected to {0}:{1}" .format (self .host ,self .port ), echo = True )
715
725
break
716
726
717
727
except ConnectionRefusedError :
@@ -723,8 +733,8 @@ def connect(self,is_reconnect=False):
723
733
return True
724
734
725
735
726
- def message (self ,msg ):
727
- if self .verbose :
736
+ def message (self ,msg , echo = False ):
737
+ if self .verbose or echo :
728
738
print (str (datetime .now ())+ " : " + msg )
729
739
730
740
@@ -745,7 +755,7 @@ def send(self,mtype,group,runid,desc="",data=0):
745
755
return True
746
756
747
757
def listen (self ,lock = None ,send_lock = None ):
748
- self .s .settimeout (self .timeout )
758
+ self .s .settimeout (self .socket_timeout )
749
759
failed_reconnect = False
750
760
while True :
751
761
time .sleep (self .timeout )
@@ -757,9 +767,13 @@ def listen(self,lock=None,send_lock=None):
757
767
if not success :
758
768
print ("...exiting" )
759
769
time .sleep (self .timeout )
770
+ # set the teminate flag so that the get_pars() look will exit
771
+ self ._lock .acquire ()
772
+ self .net_pack .mtype = 14
773
+ self ._lock .release ()
760
774
return
761
775
else :
762
- print ("...reconnect successfully..." )
776
+ print ("...reconnected successfully..." )
763
777
continue
764
778
765
779
if n > 0 :
0 commit comments