Skip to content

Commit 8e4a548

Browse files
authored
Merge pull request #2244 from HubSpot/task-lag-guardrail
Avoid Request Locks With Task Lag
2 parents ff09baf + 893e924 commit 8e4a548

11 files changed

Lines changed: 251 additions & 7 deletions

SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityDeployHistoryPersister.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ public void runActionOnPoll() {
135135
}
136136
},
137137
requestId,
138-
getClass().getSimpleName()
138+
getClass().getSimpleName(),
139+
SingularitySchedulerLock.Priority.LOW
139140
);
140141
}
141142

SingularityService/src/main/java/com/hubspot/singularity/data/history/SingularityRequestHistoryPersister.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.atomic.AtomicBoolean;
2121
import java.util.concurrent.atomic.AtomicInteger;
2222
import java.util.concurrent.atomic.AtomicLong;
23+
import java.util.concurrent.locks.ReadWriteLock;
2324
import java.util.concurrent.locks.ReentrantLock;
2425
import javax.inject.Singleton;
2526
import org.slf4j.Logger;
@@ -167,7 +168,8 @@ public void runActionOnPoll() {
167168
}
168169
},
169170
requestHistoryParent.requestId,
170-
"request history purger"
171+
"request history purger",
172+
SingularitySchedulerLock.Priority.LOW
171173
);
172174
}
173175

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.hubspot.singularity.helpers;
2+
3+
import com.google.inject.Inject;
4+
import com.hubspot.singularity.SingularityManagedScheduledExecutorServiceFactory;
5+
import com.hubspot.singularity.SingularityPendingTaskId;
6+
import com.hubspot.singularity.config.SingularityConfiguration;
7+
import com.hubspot.singularity.data.TaskManager;
8+
import java.util.List;
9+
import java.util.concurrent.ConcurrentHashMap;
10+
import java.util.concurrent.ConcurrentMap;
11+
import java.util.concurrent.ScheduledExecutorService;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.stream.Collectors;
14+
15+
public class TaskLagGuardrail {
16+
private final TaskManager taskManager;
17+
private final SingularityConfiguration configuration;
18+
private final ScheduledExecutorService executor;
19+
private ConcurrentMap<String, Integer> lateTasksByRequestId;
20+
21+
@Inject
22+
public TaskLagGuardrail(
23+
SingularityConfiguration configuration,
24+
SingularityManagedScheduledExecutorServiceFactory factory,
25+
TaskManager taskManager
26+
) {
27+
this.configuration = configuration;
28+
this.taskManager = taskManager;
29+
this.lateTasksByRequestId = new ConcurrentHashMap<>();
30+
this.executor = factory.getSingleThreaded(getClass().getSimpleName());
31+
executor.scheduleWithFixedDelay(
32+
this::updateLateTasksByRequestId,
33+
0,
34+
configuration.getRequestCacheTtl(),
35+
TimeUnit.SECONDS
36+
);
37+
}
38+
39+
public boolean isLagged(String requestId) {
40+
return lateTasksByRequestId.containsKey(requestId);
41+
}
42+
43+
public void updateLateTasksByRequestId() {
44+
long now = System.currentTimeMillis();
45+
List<SingularityPendingTaskId> allPendingTaskIds = taskManager.getPendingTaskIds();
46+
47+
// not a thread safe assignment, but should be fine for periodic updates
48+
this.lateTasksByRequestId =
49+
allPendingTaskIds
50+
.stream()
51+
.filter(
52+
p ->
53+
now - p.getNextRunAt() > configuration.getDeltaAfterWhichTasksAreLateMillis()
54+
)
55+
.collect(
56+
Collectors.toConcurrentMap(
57+
SingularityPendingTaskId::getRequestId,
58+
p -> 1,
59+
Integer::sum
60+
)
61+
);
62+
}
63+
}

SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.google.inject.multibindings.Multibinder;
1010
import com.google.inject.name.Named;
1111
import com.hubspot.singularity.helpers.MesosProtosUtils;
12+
import com.hubspot.singularity.helpers.TaskLagGuardrail;
1213
import com.hubspot.singularity.hooks.DeployAcceptanceHook;
1314
import java.util.concurrent.atomic.AtomicInteger;
1415
import java.util.concurrent.atomic.AtomicLong;
@@ -34,6 +35,7 @@ public void configure() {
3435
bind(SingularityStartup.class).in(Scopes.SINGLETON);
3536
bind(SingularitySchedulerLock.class).in(Scopes.SINGLETON);
3637
bind(SingularityMesosSchedulerClient.class).in(Scopes.SINGLETON);
38+
bind(TaskLagGuardrail.class).in(Scopes.SINGLETON);
3739

3840
Multibinder.newSetBinder(binder(), DeployAcceptanceHook.class);
3941
}

SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularitySchedulerLock.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.inject.Inject;
44
import com.hubspot.mesos.JavaUtils;
5+
import com.hubspot.singularity.helpers.TaskLagGuardrail;
56
import java.util.concurrent.Callable;
67
import java.util.concurrent.ConcurrentHashMap;
78
import java.util.concurrent.TimeUnit;
@@ -18,9 +19,11 @@ public class SingularitySchedulerLock {
1819
private final ReentrantLock stateLock;
1920
private final ReentrantLock offersLock;
2021
private final ConcurrentHashMap<String, ReentrantLock> requestLocks;
22+
private final TaskLagGuardrail taskLag;
2123

2224
@Inject
23-
public SingularitySchedulerLock() {
25+
public SingularitySchedulerLock(TaskLagGuardrail taskLag) {
26+
this.taskLag = taskLag;
2427
this.stateLock = new ReentrantLock();
2528
this.offersLock = new ReentrantLock();
2629
this.requestLocks = new ConcurrentHashMap<>();
@@ -96,7 +99,38 @@ private void unlock(String requestId, String name, long start) {
9699
lock.unlock();
97100
}
98101

102+
/**
103+
* Run the given function with the specified request lock.
104+
*
105+
* @param function The function to run.
106+
* @param requestId Request to lock.
107+
* @param name Description of this request lock.
108+
*/
99109
public void runWithRequestLock(Runnable function, String requestId, String name) {
110+
runWithRequestLock(function, requestId, name, Priority.HIGH);
111+
}
112+
113+
/**
114+
* Run the given function with the specified request lock, unless run with low priority.
115+
* If run with low priority, the function will not run if the request is lagged
116+
* to allow higher priority components to acquire the lock.
117+
*
118+
* @param function The function to run.
119+
* @param requestId Request to lock.
120+
* @param name Description of this request lock.
121+
* @param priority Priority of this request lock.
122+
*/
123+
public void runWithRequestLock(
124+
Runnable function,
125+
String requestId,
126+
String name,
127+
Priority priority
128+
) {
129+
if (priority == Priority.LOW && isLocked(requestId) && taskLag.isLagged(requestId)) {
130+
LOG.info("{} - Skipping low priority lock on {}", name, requestId);
131+
return;
132+
}
133+
100134
long start = lock(requestId, name);
101135
try {
102136
function.run();
@@ -208,4 +242,14 @@ private void unlockOffers(String name, long start) {
208242
LOG.debug("{} - Unlocking offers lock ({})", name, JavaUtils.duration(start));
209243
offersLock.unlock();
210244
}
245+
246+
private boolean isLocked(String requestId) {
247+
ReentrantLock lock = requestLocks.get(requestId);
248+
return lock != null && lock.isLocked();
249+
}
250+
251+
public enum Priority {
252+
LOW,
253+
HIGH
254+
}
211255
}

SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityAutoScaleSpreadAllPoller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ public void runActionOnPoll() {
8787
}
8888
},
8989
requestWithState.getRequest().getId(),
90-
getClass().getSimpleName()
90+
getClass().getSimpleName(),
91+
SingularitySchedulerLock.Priority.LOW
9192
);
9293
}
9394
}

SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityCrashLoopChecker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public void checkCooldowns() {
8383
}
8484
},
8585
cooldownRequest.getRequest().getId(),
86-
getClass().getSimpleName()
86+
getClass().getSimpleName(),
87+
SingularitySchedulerLock.Priority.LOW
8788
),
8889
cooldownExecutor
8990
)

SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityExpiringUserActionPoller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ protected void checkExpiringObjects() {
199199
);
200200
},
201201
requestWithState.get().getRequest().getId(),
202-
getClazz().getSimpleName()
202+
getClazz().getSimpleName(),
203+
SingularitySchedulerLock.Priority.LOW
203204
);
204205
}
205206

SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularitySchedulerModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.google.inject.Scopes;
55
import com.hubspot.singularity.data.history.SingularityHistoryPurger;
66
import com.hubspot.singularity.helpers.RebalancingHelper;
7+
import com.hubspot.singularity.helpers.TaskLagGuardrail;
78
import com.hubspot.singularity.mesos.SingularityMesosOfferScheduler;
89
import com.hubspot.singularity.mesos.SingularityMesosTaskPrioritizer;
910

SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityUpstreamChecker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,8 @@ public void syncUpstreams() {
355355
lock.runWithRequestLock(
356356
() -> syncUpstreamsForService(singularityRequest),
357357
singularityRequest.getId(),
358-
getClass().getSimpleName()
358+
getClass().getSimpleName(),
359+
SingularitySchedulerLock.Priority.LOW
359360
);
360361
}
361362
}

0 commit comments

Comments
 (0)