Skip to content

Commit 8eef30d

Browse files
committed
Run persisters in parallel
1 parent e26beb8 commit 8eef30d

5 files changed

Lines changed: 183 additions & 116 deletions

File tree

SingularityService/src/main/java/com/hubspot/singularity/config/SingularityConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,8 @@ public class SingularityConfiguration extends Configuration {
261261

262262
private long persistHistoryEverySeconds = TimeUnit.MINUTES.toSeconds(2);
263263

264+
private int historyPollerConcurrency = 5;
265+
264266
private int maxPendingImmediatePersists = 200;
265267

266268
private long reconcileAgentsEveryMinutes = TimeUnit.HOURS.toMinutes(1);
@@ -2109,4 +2111,12 @@ public int getRequestCacheTtl() {
21092111
public void setRequestCacheTtl(int requestCacheTtlInSeconds) {
21102112
this.requestCacheTtlInSeconds = requestCacheTtlInSeconds;
21112113
}
2114+
2115+
public int getHistoryPollerConcurrency() {
2116+
return historyPollerConcurrency;
2117+
}
2118+
2119+
public void setHistoryPollerConcurrency(int historyPollerConcurrency) {
2120+
this.historyPollerConcurrency = historyPollerConcurrency;
2121+
}
21122122
}

SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityDeployHistoryPersister.java

Lines changed: 77 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,19 @@
66
import com.hubspot.singularity.SingularityDeleteResult;
77
import com.hubspot.singularity.SingularityDeployHistory;
88
import com.hubspot.singularity.SingularityDeployKey;
9+
import com.hubspot.singularity.SingularityManagedThreadPoolFactory;
910
import com.hubspot.singularity.SingularityRequestDeployState;
11+
import com.hubspot.singularity.async.CompletableFutures;
1012
import com.hubspot.singularity.config.SingularityConfiguration;
1113
import com.hubspot.singularity.data.DeployManager;
1214
import com.hubspot.singularity.mesos.SingularitySchedulerLock;
15+
import java.util.ArrayList;
1316
import java.util.Collections;
1417
import java.util.Comparator;
1518
import java.util.List;
1619
import java.util.Map;
1720
import java.util.Optional;
21+
import java.util.concurrent.CompletableFuture;
1822
import java.util.concurrent.TimeUnit;
1923
import java.util.concurrent.atomic.AtomicBoolean;
2024
import java.util.concurrent.atomic.AtomicLong;
@@ -42,12 +46,13 @@ public SingularityDeployHistoryPersister(
4246
DeployManager deployManager,
4347
HistoryManager historyManager,
4448
SingularitySchedulerLock schedulerLock,
49+
SingularityManagedThreadPoolFactory managedThreadPoolFactory,
4550
@Named(SingularityHistoryModule.PERSISTER_LOCK) ReentrantLock persisterLock,
4651
@Named(
4752
SingularityHistoryModule.LAST_DEPLOY_PERSISTER_SUCCESS
4853
) AtomicLong lastPersisterSuccess
4954
) {
50-
super(configuration, persisterLock, lastPersisterSuccess);
55+
super(configuration, persisterLock, lastPersisterSuccess, managedThreadPoolFactory);
5156
this.schedulerLock = schedulerLock;
5257
this.deployManager = deployManager;
5358
this.historyManager = historyManager;
@@ -72,74 +77,84 @@ public void runActionOnPoll() {
7277
Collectors.groupingBy(SingularityDeployKey::getRequestId, Collectors.toList())
7378
);
7479

80+
List<CompletableFuture<Void>> futures = new ArrayList<>();
7581
for (String requestId : deployManager
7682
.getAllRequestDeployStatesByRequestId()
7783
.keySet()) {
78-
LOG.debug("Checking deploy histories to persist for request {}", requestId);
79-
schedulerLock.runWithRequestLock(
80-
() -> {
81-
Optional<SingularityRequestDeployState> deployState = deployManager.getRequestDeployState(
82-
requestId
83-
);
84-
85-
if (!deployState.isPresent()) {
86-
LOG.warn("No request deploy state for {}", requestId);
87-
return;
88-
}
89-
90-
int i = 0;
91-
List<SingularityDeployHistory> deployHistories = allDeployIdsByRequest
92-
.getOrDefault(requestId, Collections.emptyList())
93-
.stream()
94-
.map(
95-
deployKey ->
96-
deployManager.getDeployHistory(
97-
deployKey.getRequestId(),
98-
deployKey.getDeployId(),
99-
true
100-
)
101-
)
102-
.filter(Optional::isPresent)
103-
.map(Optional::get)
104-
.sorted(
105-
Comparator
106-
.comparingLong(
107-
SingularityDeployHistory::getCreateTimestampForCalculatingHistoryAge
108-
)
109-
.reversed()
110-
)
111-
.collect(Collectors.toList());
112-
113-
for (SingularityDeployHistory deployHistory : deployHistories) {
114-
numTotal.increment();
115-
if (
116-
!shouldTransferDeploy(
117-
requestId,
118-
deployState.get(),
119-
deployHistory.getDeployMarker().getDeployId()
120-
)
121-
) {
122-
continue;
123-
}
124-
125-
LOG.info(
126-
"Persisting deploy {} for request {}",
127-
deployHistory.getDeployMarker().getDeployId(),
128-
requestId
84+
futures.add(
85+
CompletableFuture.runAsync(
86+
() -> {
87+
LOG.debug("Checking deploy histories to persist for request {}", requestId);
88+
schedulerLock.runWithRequestLock(
89+
() -> {
90+
Optional<SingularityRequestDeployState> deployState = deployManager.getRequestDeployState(
91+
requestId
92+
);
93+
94+
if (!deployState.isPresent()) {
95+
LOG.warn("No request deploy state for {}", requestId);
96+
return;
97+
}
98+
99+
int i = 0;
100+
List<SingularityDeployHistory> deployHistories = allDeployIdsByRequest
101+
.getOrDefault(requestId, Collections.emptyList())
102+
.stream()
103+
.map(
104+
deployKey ->
105+
deployManager.getDeployHistory(
106+
deployKey.getRequestId(),
107+
deployKey.getDeployId(),
108+
true
109+
)
110+
)
111+
.filter(Optional::isPresent)
112+
.map(Optional::get)
113+
.sorted(
114+
Comparator
115+
.comparingLong(
116+
SingularityDeployHistory::getCreateTimestampForCalculatingHistoryAge
117+
)
118+
.reversed()
119+
)
120+
.collect(Collectors.toList());
121+
122+
for (SingularityDeployHistory deployHistory : deployHistories) {
123+
numTotal.increment();
124+
if (
125+
!shouldTransferDeploy(
126+
requestId,
127+
deployState.get(),
128+
deployHistory.getDeployMarker().getDeployId()
129+
)
130+
) {
131+
continue;
132+
}
133+
134+
LOG.info(
135+
"Persisting deploy {} for request {}",
136+
deployHistory.getDeployMarker().getDeployId(),
137+
requestId
138+
);
139+
if (moveToHistoryOrCheckForPurge(deployHistory, i++)) {
140+
numTransferred.increment();
141+
} else {
142+
persisterSuccess.getAndSet(false);
143+
}
144+
}
145+
},
146+
requestId,
147+
getClass().getSimpleName(),
148+
SingularitySchedulerLock.Priority.LOW
129149
);
130-
if (moveToHistoryOrCheckForPurge(deployHistory, i++)) {
131-
numTransferred.increment();
132-
} else {
133-
persisterSuccess.getAndSet(false);
134-
}
135-
}
136-
},
137-
requestId,
138-
getClass().getSimpleName(),
139-
SingularitySchedulerLock.Priority.LOW
150+
},
151+
persisterExecutor
152+
)
140153
);
141154
}
142155

156+
CompletableFutures.allOf(futures).join();
157+
143158
LOG.info(
144159
"Transferred {} out of {} deploys in {}",
145160
numTransferred,

SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityHistoryPersister.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import com.hubspot.mesos.JavaUtils;
44
import com.hubspot.singularity.SingularityDeleteResult;
55
import com.hubspot.singularity.SingularityHistoryItem;
6+
import com.hubspot.singularity.SingularityManagedThreadPoolFactory;
67
import com.hubspot.singularity.config.SingularityConfiguration;
78
import com.hubspot.singularity.scheduler.SingularityLeaderOnlyPoller;
89
import java.util.Optional;
10+
import java.util.concurrent.ExecutorService;
911
import java.util.concurrent.TimeUnit;
1012
import java.util.concurrent.atomic.AtomicLong;
1113
import java.util.concurrent.locks.ReentrantLock;
@@ -20,18 +22,24 @@ public abstract class SingularityHistoryPersister<T extends SingularityHistoryIt
2022

2123
protected final SingularityConfiguration configuration;
2224
protected final ReentrantLock persisterLock;
23-
25+
protected final ExecutorService persisterExecutor;
2426
protected final AtomicLong lastPersisterSuccess;
2527

2628
public SingularityHistoryPersister(
2729
SingularityConfiguration configuration,
2830
ReentrantLock persisterLock,
29-
AtomicLong lastPersisterSuccess
31+
AtomicLong lastPersisterSuccess,
32+
SingularityManagedThreadPoolFactory managedThreadPoolFactory
3033
) {
3134
super(configuration.getPersistHistoryEverySeconds(), TimeUnit.SECONDS);
3235
this.configuration = configuration;
3336
this.persisterLock = persisterLock;
3437
this.lastPersisterSuccess = lastPersisterSuccess;
38+
this.persisterExecutor =
39+
managedThreadPoolFactory.get(
40+
"persister",
41+
configuration.getHistoryPollerConcurrency()
42+
);
3543
}
3644

3745
@Override

SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityRequestHistoryPersister.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import com.hubspot.mesos.JavaUtils;
77
import com.hubspot.singularity.SingularityDeleteResult;
88
import com.hubspot.singularity.SingularityHistoryItem;
9+
import com.hubspot.singularity.SingularityManagedThreadPoolFactory;
910
import com.hubspot.singularity.SingularityRequestHistory;
11+
import com.hubspot.singularity.async.CompletableFutures;
1012
import com.hubspot.singularity.config.SingularityConfiguration;
1113
import com.hubspot.singularity.data.RequestManager;
1214
import com.hubspot.singularity.data.history.SingularityRequestHistoryPersister.SingularityRequestHistoryParent;
@@ -16,6 +18,7 @@
1618
import java.util.List;
1719
import java.util.Objects;
1820
import java.util.Optional;
21+
import java.util.concurrent.CompletableFuture;
1922
import java.util.concurrent.TimeUnit;
2023
import java.util.concurrent.atomic.AtomicBoolean;
2124
import java.util.concurrent.atomic.AtomicInteger;
@@ -43,12 +46,13 @@ public SingularityRequestHistoryPersister(
4346
RequestManager requestManager,
4447
HistoryManager historyManager,
4548
SingularitySchedulerLock lock,
49+
SingularityManagedThreadPoolFactory managedThreadPoolFactory,
4650
@Named(SingularityHistoryModule.PERSISTER_LOCK) ReentrantLock persisterLock,
4751
@Named(
4852
SingularityHistoryModule.LAST_REQUEST_PERSISTER_SUCCESS
4953
) AtomicLong lastPersisterSuccess
5054
) {
51-
super(configuration, persisterLock, lastPersisterSuccess);
55+
super(configuration, persisterLock, lastPersisterSuccess, managedThreadPoolFactory);
5256
this.requestManager = requestManager;
5357
this.historyManager = historyManager;
5458
this.lock = lock;
@@ -158,21 +162,34 @@ public void runActionOnPoll() {
158162
Collections.sort(requestHistoryParents, Collections.reverseOrder()); // createdAt descending
159163

160164
AtomicInteger i = new AtomicInteger();
165+
List<CompletableFuture<Void>> futures = new ArrayList<>();
161166
for (SingularityRequestHistoryParent requestHistoryParent : requestHistoryParents) {
162-
lock.runWithRequestLock(
163-
() -> {
164-
if (moveToHistoryOrCheckForPurge(requestHistoryParent, i.getAndIncrement())) {
165-
numHistoryTransferred.getAndAdd(requestHistoryParent.history.size());
166-
} else {
167-
persisterSuccess.getAndSet(false);
168-
}
169-
},
170-
requestHistoryParent.requestId,
171-
"request history purger",
172-
SingularitySchedulerLock.Priority.LOW
167+
futures.add(
168+
CompletableFuture.runAsync(
169+
() ->
170+
lock.runWithRequestLock(
171+
() -> {
172+
if (
173+
moveToHistoryOrCheckForPurge(
174+
requestHistoryParent,
175+
i.getAndIncrement()
176+
)
177+
) {
178+
numHistoryTransferred.getAndAdd(requestHistoryParent.history.size());
179+
} else {
180+
persisterSuccess.getAndSet(false);
181+
}
182+
},
183+
requestHistoryParent.requestId,
184+
"request history purger",
185+
SingularitySchedulerLock.Priority.LOW
186+
),
187+
persisterExecutor
188+
)
173189
);
174190
}
175191

192+
CompletableFutures.allOf(futures).join();
176193
LOG.info(
177194
"Transferred {} history updates for {} requests in {}",
178195
numHistoryTransferred,

0 commit comments

Comments
 (0)