@@ -182,7 +182,14 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
182
182
183
183
private SplitManager splitManager ;
184
184
185
- private volatile boolean started = false ;
185
+ enum State {
186
+ INIT ,
187
+ RUNNING ,
188
+ // It's removing visibility of the current broker from other brokers. In this state, it cannot play as a leader
189
+ // or follower.
190
+ DISABLED ,
191
+ }
192
+ private final AtomicReference <State > state = new AtomicReference <>(State .INIT );
186
193
187
194
private boolean configuredSystemTopics = false ;
188
195
@@ -210,7 +217,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
210
217
* Get all the bundles that are owned by this broker.
211
218
*/
212
219
public CompletableFuture <Set <NamespaceBundle >> getOwnedServiceUnitsAsync () {
213
- if (! started ) {
220
+ if (state . get () == State . INIT ) {
214
221
log .warn ("Failed to get owned service units, load manager is not started." );
215
222
return CompletableFuture .completedFuture (Collections .emptySet ());
216
223
}
@@ -373,7 +380,7 @@ public static CompletableFuture<Optional<BrokerLookupData>> getAssignedBrokerLoo
373
380
374
381
@ Override
375
382
public void start () throws PulsarServerException {
376
- if (this . started ) {
383
+ if (state . get () != State . INIT ) {
377
384
return ;
378
385
}
379
386
try {
@@ -471,7 +478,9 @@ public void start() throws PulsarServerException {
471
478
472
479
this .splitScheduler .start ();
473
480
this .initWaiter .complete (true );
474
- this .started = true ;
481
+ if (!state .compareAndSet (State .INIT , State .RUNNING )) {
482
+ failForUnexpectedState ("start" );
483
+ }
475
484
log .info ("Started load manager." );
476
485
} catch (Throwable e ) {
477
486
failStarting (e );
@@ -643,21 +652,17 @@ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle,
643
652
filter .filterAsync (availableBrokerCandidates , bundle , context );
644
653
futures .add (future );
645
654
}
646
- CompletableFuture <Optional <String >> result = new CompletableFuture <>();
647
- FutureUtil .waitForAll (futures ).whenComplete ((__ , ex ) -> {
648
- if (ex != null ) {
649
- // TODO: We may need to revisit this error case.
650
- log .error ("Failed to filter out brokers when select bundle: {}" , bundle , ex );
651
- }
655
+ return FutureUtil .waitForAll (futures ).exceptionally (e -> {
656
+ // TODO: We may need to revisit this error case.
657
+ log .error ("Failed to filter out brokers when select bundle: {}" , bundle , e );
658
+ return null ;
659
+ }).thenApply (__ -> {
652
660
if (availableBrokerCandidates .isEmpty ()) {
653
- result .complete (Optional .empty ());
654
- return ;
661
+ return Optional .empty ();
655
662
}
656
663
Set <String > candidateBrokers = availableBrokerCandidates .keySet ();
657
-
658
- result .complete (getBrokerSelectionStrategy ().select (candidateBrokers , bundle , context ));
664
+ return getBrokerSelectionStrategy ().select (candidateBrokers , bundle , context );
659
665
});
660
- return result ;
661
666
});
662
667
}
663
668
@@ -695,6 +700,9 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
695
700
boolean force ,
696
701
long timeout ,
697
702
TimeUnit timeoutUnit ) {
703
+ if (state .get () == State .INIT ) {
704
+ return CompletableFuture .completedFuture (null );
705
+ }
698
706
if (NamespaceService .isSLAOrHeartbeatNamespace (bundle .getNamespaceObject ().toString ())) {
699
707
log .info ("Skip unloading namespace bundle: {}." , bundle );
700
708
return CompletableFuture .completedFuture (null );
@@ -783,28 +791,13 @@ private CompletableFuture<Void> splitAsync(SplitDecision decision,
783
791
784
792
@ Override
785
793
public void close () throws PulsarServerException {
786
- if (! this . started ) {
794
+ if (state . get () == State . INIT ) {
787
795
return ;
788
796
}
789
797
try {
790
- if (brokerLoadDataReportTask != null ) {
791
- brokerLoadDataReportTask .cancel (true );
792
- }
793
-
794
- if (topBundlesLoadDataReportTask != null ) {
795
- topBundlesLoadDataReportTask .cancel (true );
796
- }
797
-
798
- if (monitorTask != null ) {
799
- monitorTask .cancel (true );
800
- }
801
-
802
- this .brokerLoadDataStore .close ();
803
- this .topBundlesLoadDataStore .close ();
798
+ stopLoadDataReportTasks ();
804
799
this .unloadScheduler .close ();
805
800
this .splitScheduler .close ();
806
- } catch (IOException ex ) {
807
- throw new PulsarServerException (ex );
808
801
} finally {
809
802
try {
810
803
this .brokerRegistry .close ();
@@ -818,14 +811,36 @@ public void close() throws PulsarServerException {
818
811
} catch (Exception e ) {
819
812
throw new PulsarServerException (e );
820
813
} finally {
821
- this . started = false ;
814
+ state . set ( State . INIT ) ;
822
815
}
823
816
}
824
817
825
818
}
826
819
}
827
820
}
828
821
822
+ private void stopLoadDataReportTasks () {
823
+ if (brokerLoadDataReportTask != null ) {
824
+ brokerLoadDataReportTask .cancel (true );
825
+ }
826
+ if (topBundlesLoadDataReportTask != null ) {
827
+ topBundlesLoadDataReportTask .cancel (true );
828
+ }
829
+ if (monitorTask != null ) {
830
+ monitorTask .cancel (true );
831
+ }
832
+ try {
833
+ brokerLoadDataStore .shutdown ();
834
+ } catch (IOException e ) {
835
+ log .warn ("Failed to shutdown brokerLoadDataStore" , e );
836
+ }
837
+ try {
838
+ topBundlesLoadDataStore .shutdown ();
839
+ } catch (IOException e ) {
840
+ log .warn ("Failed to shutdown topBundlesLoadDataStore" , e );
841
+ }
842
+ }
843
+
829
844
public static boolean isInternalTopic (String topic ) {
830
845
return INTERNAL_TOPICS .contains (topic )
831
846
|| topic .startsWith (TOPIC )
@@ -841,13 +856,16 @@ synchronized void playLeader() {
841
856
boolean becameFollower = false ;
842
857
while (!Thread .currentThread ().isInterrupted ()) {
843
858
try {
844
- if (!initWaiter .get ()) {
859
+ if (!initWaiter .get () || disabled () ) {
845
860
return ;
846
861
}
847
862
if (!serviceUnitStateChannel .isChannelOwner ()) {
848
863
becameFollower = true ;
849
864
break ;
850
865
}
866
+ if (disabled ()) {
867
+ return ;
868
+ }
851
869
// Confirm the system topics have been created or create them if they do not exist.
852
870
// If the leader has changed, the new leader need to reset
853
871
// the local brokerService.topics (by this topic creations).
@@ -859,6 +877,11 @@ synchronized void playLeader() {
859
877
serviceUnitStateChannel .scheduleOwnershipMonitor ();
860
878
break ;
861
879
} catch (Throwable e ) {
880
+ if (disabled ()) {
881
+ log .warn ("The broker:{} failed to set the role but exit because it's disabled" ,
882
+ pulsar .getBrokerId (), e );
883
+ return ;
884
+ }
862
885
log .warn ("The broker:{} failed to set the role. Retrying {} th ..." ,
863
886
pulsar .getBrokerId (), ++retry , e );
864
887
try {
@@ -870,6 +893,9 @@ synchronized void playLeader() {
870
893
}
871
894
}
872
895
}
896
+ if (disabled ()) {
897
+ return ;
898
+ }
873
899
874
900
if (becameFollower ) {
875
901
log .warn ("The broker:{} became follower while initializing leader role." , pulsar .getBrokerId ());
@@ -893,13 +919,16 @@ synchronized void playFollower() {
893
919
boolean becameLeader = false ;
894
920
while (!Thread .currentThread ().isInterrupted ()) {
895
921
try {
896
- if (!initWaiter .get ()) {
922
+ if (!initWaiter .get () || disabled () ) {
897
923
return ;
898
924
}
899
925
if (serviceUnitStateChannel .isChannelOwner ()) {
900
926
becameLeader = true ;
901
927
break ;
902
928
}
929
+ if (disabled ()) {
930
+ return ;
931
+ }
903
932
unloadScheduler .close ();
904
933
serviceUnitStateChannel .cancelOwnershipMonitor ();
905
934
closeInternalTopics ();
@@ -908,6 +937,11 @@ synchronized void playFollower() {
908
937
topBundlesLoadDataStore .startProducer ();
909
938
break ;
910
939
} catch (Throwable e ) {
940
+ if (disabled ()) {
941
+ log .warn ("The broker:{} failed to set the role but exit because it's disabled" ,
942
+ pulsar .getBrokerId (), e );
943
+ return ;
944
+ }
911
945
log .warn ("The broker:{} failed to set the role. Retrying {} th ..." ,
912
946
pulsar .getBrokerId (), ++retry , e );
913
947
try {
@@ -919,6 +953,9 @@ synchronized void playFollower() {
919
953
}
920
954
}
921
955
}
956
+ if (disabled ()) {
957
+ return ;
958
+ }
922
959
923
960
if (becameLeader ) {
924
961
log .warn ("This broker:{} became leader while initializing follower role." , pulsar .getBrokerId ());
@@ -997,9 +1034,20 @@ protected void monitor() {
997
1034
}
998
1035
999
1036
public void disableBroker () throws Exception {
1037
+ // TopicDoesNotExistException might be thrown and it's not recoverable. Enable this flag to exit playFollower()
1038
+ // or playLeader() quickly.
1039
+ if (!state .compareAndSet (State .RUNNING , State .DISABLED )) {
1040
+ failForUnexpectedState ("disableBroker" );
1041
+ }
1042
+ stopLoadDataReportTasks ();
1000
1043
serviceUnitStateChannel .cleanOwnerships ();
1001
- leaderElectionService .close ();
1002
1044
brokerRegistry .unregister ();
1045
+ leaderElectionService .close ();
1046
+ final var availableBrokers = brokerRegistry .getAvailableBrokersAsync ()
1047
+ .get (conf .getMetadataStoreOperationTimeoutSeconds (), TimeUnit .SECONDS );
1048
+ if (availableBrokers .isEmpty ()) {
1049
+ close ();
1050
+ }
1003
1051
// Close the internal topics (if owned any) after giving up the possible leader role,
1004
1052
// so that the subsequent lookups could hit the next leader.
1005
1053
closeInternalTopics ();
@@ -1033,4 +1081,16 @@ protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
1033
1081
protected ServiceUnitStateChannel createServiceUnitStateChannel (PulsarService pulsar ) {
1034
1082
return new ServiceUnitStateChannelImpl (pulsar );
1035
1083
}
1084
+
1085
+ private void failForUnexpectedState (String msg ) {
1086
+ throw new IllegalStateException ("Failed to " + msg + ", state: " + state .get ());
1087
+ }
1088
+
1089
+ boolean running () {
1090
+ return state .get () == State .RUNNING ;
1091
+ }
1092
+
1093
+ private boolean disabled () {
1094
+ return state .get () == State .DISABLED ;
1095
+ }
1036
1096
}
0 commit comments