11
11
"""
12
12
13
13
from cmath import rect
14
- from typing import TYPE_CHECKING , ClassVar , List , Optional
14
+ from typing import TYPE_CHECKING , ClassVar , List , Optional , Tuple
15
15
16
16
import numpy as np
17
17
from scipy .constants import epsilon_0 , h , mu_0
@@ -552,7 +552,7 @@ def _detect(self, power: List[np.ndarray]) -> List[np.ndarray]:
552
552
for source in self .context .sources :
553
553
# we let the laser handle the RIN distribution
554
554
# so the same noise is injected in all the signals
555
- rin = source .get_rin (self .high_fc - self . low_fc )
555
+ rin = source .get_rin (self ._get_bandwidth ()[ 0 ] )
556
556
dist = source .get_rin_dist (i , j )
557
557
558
558
# calculate the standard deviation of the RIN noise
@@ -602,15 +602,15 @@ def _filter(self, signal: np.ndarray) -> np.ndarray:
602
602
signal :
603
603
The signal to filter.
604
604
"""
605
- high = min ( self .high_fc , 0.5 * ( self . context . fs - 1 ) )
605
+ bw = self ._get_bandwidth ( )
606
606
sos = (
607
- butter (6 , high , "lowpass" , fs = self .context .fs , output = "sos" )
608
- if self . low_fc == 0
607
+ butter (6 , bw [ 2 ] , "lowpass" , fs = self .context .fs , output = "sos" )
608
+ if bw [ 1 ] == 0
609
609
else butter (
610
610
6 ,
611
611
[
612
- max ( self . low_fc , self . context . fs / self . context . num_samples * 30 ) ,
613
- high ,
612
+ bw [ 1 ] ,
613
+ bw [ 2 ] ,
614
614
],
615
615
"bandpass" ,
616
616
fs = self .context .fs ,
@@ -619,6 +619,20 @@ def _filter(self, signal: np.ndarray) -> np.ndarray:
619
619
)
620
620
return sosfiltfilt (sos , signal )
621
621
622
+ def _get_bandwidth (self ):
623
+ """Gets the bandwidth of the detector w.r.t.
624
+
625
+ the sampling frequency.
626
+ """
627
+ low = (
628
+ 0
629
+ if self .low_fc == 0
630
+ else max (self .low_fc , self .context .fs / self .context .num_samples * 30 )
631
+ )
632
+ high = min (self .high_fc , 0.5 * (self .context .fs - 1 ))
633
+
634
+ return (high - low , low , high )
635
+
622
636
623
637
class DifferentialDetector (Detector ):
624
638
"""A differential detector takes two connections and provides three outputs
@@ -697,10 +711,8 @@ def _detect(self, powers: List[np.ndarray]) -> List[np.ndarray]:
697
711
for source in self .context .sources :
698
712
# get the RIN specs from the laser to ensure that the
699
713
# same noise is injected across all signals
700
- monitor_rin = source .get_rin (
701
- self .monitor_high_fc - self .monitor_low_fc
702
- )
703
- rf_rin = source .get_rin (self .rf_high_fc - self .rf_low_fc )
714
+ monitor_rin = source .get_rin (self ._get_monitor_bandwidth ()[0 ])
715
+ rf_rin = source .get_rin (self ._get_rf_bandwidth ()[0 ])
704
716
dist = source .get_rin_dist (i , j )
705
717
706
718
# only calculate the noise if there is power
@@ -756,27 +768,24 @@ def _detect(self, powers: List[np.ndarray]) -> List[np.ndarray]:
756
768
self ._monitor (p2 , self .monitor_rin_dists2 ),
757
769
)
758
770
759
- def _filter (self , signal : np .ndarray , low_fc : float , high_fc : float ) -> np .ndarray :
771
+ def _filter (self , signal : np .ndarray , bw : Tuple [ float , float , float ] ) -> np .ndarray :
760
772
"""Filters the signal.
761
773
762
774
Parameters
763
775
----------
764
776
signal :
765
777
The signal to filter.
766
- low_fc :
767
- The lower cut-off frequency.
768
- high_fc :
769
- The higher cut-off frequency.
778
+ bw :
779
+ The bandwidth of the filter. (difference, low_fc, high_fc)
770
780
"""
771
- high = min (high_fc , 0.5 * (self .context .fs - 1 ))
772
781
sos = (
773
- butter (6 , high , "lowpass" , fs = self .context .fs , output = "sos" )
774
- if low_fc == 0
782
+ butter (6 , bw [ 2 ] , "lowpass" , fs = self .context .fs , output = "sos" )
783
+ if bw [ 1 ] == 0
775
784
else butter (
776
785
6 ,
777
786
[
778
- max ( low_fc , self . context . fs / self . context . num_samples * 30 ) ,
779
- high ,
787
+ bw [ 1 ] ,
788
+ bw [ 2 ] ,
780
789
],
781
790
"bandpass" ,
782
791
fs = self .context .fs ,
@@ -785,6 +794,39 @@ def _filter(self, signal: np.ndarray, low_fc: float, high_fc: float) -> np.ndarr
785
794
)
786
795
return sosfiltfilt (sos , signal )
787
796
797
+ def _get_bandwidth (self , low_fc , high_fc ):
798
+ """Gets the bandwidth of the detector w.r.t. the sampling frequency.
799
+
800
+ Parameters
801
+ ----------
802
+ low_fc :
803
+ The lower cut-off frequency.
804
+ high_fc :
805
+ The higher cut-off frequency.
806
+ """
807
+ low = (
808
+ 0
809
+ if low_fc == 0
810
+ else max (low_fc , self .context .fs / self .context .num_samples * 30 )
811
+ )
812
+ high = min (high_fc , 0.5 * (self .context .fs - 1 ))
813
+
814
+ return (high - low , low , high )
815
+
816
+ def _get_monitor_bandwidth (self ):
817
+ """Gets the bandwidth of the monitor w.r.t.
818
+
819
+ the sampling frequency.
820
+ """
821
+ return self ._get_bandwidth (self .monitor_low_fc , self .monitor_high_fc )
822
+
823
+ def _get_rf_bandwidth (self ):
824
+ """Gets the bandwidth of the rf w.r.t.
825
+
826
+ the sampling frequency.
827
+ """
828
+ return self ._get_bandwidth (self .rf_low_fc , self .rf_high_fc )
829
+
788
830
def _monitor (self , power : np .ndarray , rin_dists : np .ndarray ) -> np .ndarray :
789
831
"""Takes a signal and turns it into a monitor output.
790
832
@@ -813,7 +855,7 @@ def _monitor_filter(self, signal: np.ndarray) -> np.ndarray:
813
855
signal :
814
856
The signal to filter.
815
857
"""
816
- return self ._filter (signal , self .monitor_low_fc , self . monitor_high_fc )
858
+ return self ._filter (signal , self ._get_monitor_bandwidth () )
817
859
818
860
def _rf (self , p1 : np .ndarray , p2 : np .ndarray ) -> np .ndarray :
819
861
"""Takes two signals and generates the differential RF signal. p1 - p2.
@@ -850,4 +892,4 @@ def _rf_filter(self, signal: np.ndarray) -> np.ndarray:
850
892
signal :
851
893
The signal to filter.
852
894
"""
853
- return self ._filter (signal , self .rf_low_fc , self . rf_high_fc )
895
+ return self ._filter (signal , self ._get_rf_bandwidth () )
0 commit comments