66import com .hubspot .singularity .SingularityDeleteResult ;
77import com .hubspot .singularity .SingularityDeployHistory ;
88import com .hubspot .singularity .SingularityDeployKey ;
9+ import com .hubspot .singularity .SingularityManagedThreadPoolFactory ;
910import com .hubspot .singularity .SingularityRequestDeployState ;
11+ import com .hubspot .singularity .async .CompletableFutures ;
1012import com .hubspot .singularity .config .SingularityConfiguration ;
1113import com .hubspot .singularity .data .DeployManager ;
1214import com .hubspot .singularity .mesos .SingularitySchedulerLock ;
15+ import java .util .ArrayList ;
1316import java .util .Collections ;
1417import java .util .Comparator ;
1518import java .util .List ;
1619import java .util .Map ;
1720import java .util .Optional ;
21+ import java .util .concurrent .CompletableFuture ;
1822import java .util .concurrent .TimeUnit ;
1923import java .util .concurrent .atomic .AtomicBoolean ;
2024import java .util .concurrent .atomic .AtomicLong ;
@@ -42,12 +46,13 @@ public SingularityDeployHistoryPersister(
4246 DeployManager deployManager ,
4347 HistoryManager historyManager ,
4448 SingularitySchedulerLock schedulerLock ,
49+ SingularityManagedThreadPoolFactory managedThreadPoolFactory ,
4550 @ Named (SingularityHistoryModule .PERSISTER_LOCK ) ReentrantLock persisterLock ,
4651 @ Named (
4752 SingularityHistoryModule .LAST_DEPLOY_PERSISTER_SUCCESS
4853 ) AtomicLong lastPersisterSuccess
4954 ) {
50- super (configuration , persisterLock , lastPersisterSuccess );
55+ super (configuration , persisterLock , lastPersisterSuccess , managedThreadPoolFactory );
5156 this .schedulerLock = schedulerLock ;
5257 this .deployManager = deployManager ;
5358 this .historyManager = historyManager ;
@@ -72,75 +77,85 @@ public void runActionOnPoll() {
7277 Collectors .groupingBy (SingularityDeployKey ::getRequestId , Collectors .toList ())
7378 );
7479
80+ List <CompletableFuture <Void >> futures = new ArrayList <>();
7581 for (String requestId : deployManager
7682 .getAllRequestDeployStatesByRequestId ()
7783 .keySet ()) {
78- LOG .debug ("Checking deploy histories to persist for request {}" , requestId );
79- schedulerLock .runWithRequestLock (
80- () -> {
81- Optional <SingularityRequestDeployState > deployState = deployManager .getRequestDeployState (
82- requestId
83- );
84-
85- if (!deployState .isPresent ()) {
86- LOG .warn ("No request deploy state for {}" , requestId );
87- return ;
88- }
89-
90- int i = 0 ;
91- List <SingularityDeployHistory > deployHistories = allDeployIdsByRequest
92- .getOrDefault (requestId , Collections .emptyList ())
93- .stream ()
94- .map (
95- deployKey ->
96- deployManager .getDeployHistory (
97- deployKey .getRequestId (),
98- deployKey .getDeployId (),
99- true
100- )
101- )
102- .filter (Optional ::isPresent )
103- .map (Optional ::get )
104- .sorted (
105- Comparator
106- .comparingLong (
107- SingularityDeployHistory ::getCreateTimestampForCalculatingHistoryAge
108- )
109- .reversed ()
110- )
111- .collect (Collectors .toList ());
112-
113- for (SingularityDeployHistory deployHistory : deployHistories ) {
114- numTotal .increment ();
115- if (
116- !shouldTransferDeploy (
117- requestId ,
118- deployState .get (),
119- deployHistory .getDeployMarker ().getDeployId ()
120- )
121- ) {
122- continue ;
123- }
124-
125- LOG .info (
126- "Persisting deploy {} for request {}" ,
127- deployHistory .getDeployMarker ().getDeployId (),
128- requestId
84+ futures .add (
85+ CompletableFuture .runAsync (
86+ () -> {
87+ LOG .debug ("Checking deploy histories to persist for request {}" , requestId );
88+ schedulerLock .runWithRequestLock (
89+ () -> {
90+ Optional <SingularityRequestDeployState > deployState = deployManager .getRequestDeployState (
91+ requestId
92+ );
93+
94+ if (!deployState .isPresent ()) {
95+ LOG .warn ("No request deploy state for {}" , requestId );
96+ return ;
97+ }
98+
99+ int i = 0 ;
100+ List <SingularityDeployHistory > deployHistories = allDeployIdsByRequest
101+ .getOrDefault (requestId , Collections .emptyList ())
102+ .stream ()
103+ .map (
104+ deployKey ->
105+ deployManager .getDeployHistory (
106+ deployKey .getRequestId (),
107+ deployKey .getDeployId (),
108+ true
109+ )
110+ )
111+ .filter (Optional ::isPresent )
112+ .map (Optional ::get )
113+ .sorted (
114+ Comparator
115+ .comparingLong (
116+ SingularityDeployHistory ::getCreateTimestampForCalculatingHistoryAge
117+ )
118+ .reversed ()
119+ )
120+ .collect (Collectors .toList ());
121+
122+ for (SingularityDeployHistory deployHistory : deployHistories ) {
123+ numTotal .increment ();
124+ if (
125+ !shouldTransferDeploy (
126+ requestId ,
127+ deployState .get (),
128+ deployHistory .getDeployMarker ().getDeployId ()
129+ )
130+ ) {
131+ continue ;
132+ }
133+
134+ LOG .info (
135+ "Persisting deploy {} for request {}" ,
136+ deployHistory .getDeployMarker ().getDeployId (),
137+ requestId
138+ );
139+ if (moveToHistoryOrCheckForPurge (deployHistory , i ++)) {
140+ numTransferred .increment ();
141+ } else {
142+ LOG .error ("Deploy History Persister failed on {}" , deployHistory );
143+ persisterSuccess .getAndSet (false );
144+ }
145+ }
146+ },
147+ requestId ,
148+ getClass ().getSimpleName (),
149+ SingularitySchedulerLock .Priority .LOW
129150 );
130- if (moveToHistoryOrCheckForPurge (deployHistory , i ++)) {
131- numTransferred .increment ();
132- } else {
133- LOG .error ("Deploy History Persister failed on {}" , deployHistory );
134- persisterSuccess .getAndSet (false );
135- }
136- }
137- },
138- requestId ,
139- getClass ().getSimpleName (),
140- SingularitySchedulerLock .Priority .LOW
151+ },
152+ persisterExecutor
153+ )
141154 );
142155 }
143156
157+ CompletableFutures .allOf (futures ).join ();
158+
144159 LOG .info (
145160 "Transferred {} out of {} deploys in {}" ,
146161 numTransferred ,
0 commit comments