1414import com .hubspot .singularity .config .SingularityConfiguration ;
1515import com .hubspot .singularity .data .DeployManager ;
1616import com .hubspot .singularity .data .TaskManager ;
17+ import com .hubspot .singularity .mesos .SingularitySchedulerLock ;
1718import java .util .ArrayList ;
1819import java .util .Comparator ;
1920import java .util .List ;
@@ -42,6 +43,7 @@ public class SingularityTaskHistoryPersister
4243 private final DeployManager deployManager ;
4344 private final HistoryManager historyManager ;
4445 private final int agentReregisterTimeoutSeconds ;
46+ private final SingularitySchedulerLock singularitySchedulerLock ;
4547
4648 @ Inject
4749 public SingularityTaskHistoryPersister (
@@ -50,6 +52,7 @@ public SingularityTaskHistoryPersister(
5052 DeployManager deployManager ,
5153 HistoryManager historyManager ,
5254 SingularityManagedThreadPoolFactory managedThreadPoolFactory ,
55+ SingularitySchedulerLock singularitySchedulerLock ,
5356 @ Named (SingularityHistoryModule .PERSISTER_LOCK ) ReentrantLock persisterLock ,
5457 @ Named (
5558 SingularityHistoryModule .LAST_TASK_PERSISTER_SUCCESS
@@ -61,6 +64,7 @@ public SingularityTaskHistoryPersister(
6164 this .deployManager = deployManager ;
6265 this .agentReregisterTimeoutSeconds =
6366 configuration .getMesosConfiguration ().getAgentReregisterTimeoutSeconds ();
67+ this .singularitySchedulerLock = singularitySchedulerLock ;
6468 }
6569
6670 @ Override
@@ -88,22 +92,30 @@ public void runActionOnPoll() {
8892 () -> {
8993 try {
9094 LOG .debug ("Checking request {}" , requestId );
91- List <SingularityTaskId > taskIds = taskManager .getTaskIdsForRequest (
92- requestId
95+ List <SingularityTaskId > taskIds = singularitySchedulerLock .runWithRequestLockAndReturn (
96+ () -> {
97+ List <SingularityTaskId > ids = taskManager .getTaskIdsForRequest (
98+ requestId
99+ );
100+ ids .removeAll (taskManager .getActiveTaskIdsForRequest (requestId ));
101+ ids .removeAll (taskManager .getLBCleanupTasks ());
102+ List <SingularityPendingDeploy > pendingDeploys = deployManager .getPendingDeploys ();
103+ ids =
104+ ids
105+ .stream ()
106+ .filter (
107+ taskId ->
108+ !isPartOfPendingDeploy (pendingDeploys , taskId ) &&
109+ !couldReturnWithRecoveredAgent (taskId )
110+ )
111+ .sorted (SingularityTaskId .STARTED_AT_COMPARATOR_DESC )
112+ .collect (Collectors .toList ());
113+ return ids ;
114+ },
115+ requestId ,
116+ "task history persister fetch"
93117 );
94- taskIds .removeAll (taskManager .getActiveTaskIdsForRequest (requestId ));
95- taskIds .removeAll (taskManager .getLBCleanupTasks ());
96- List <SingularityPendingDeploy > pendingDeploys = deployManager .getPendingDeploys ();
97- taskIds =
98- taskIds
99- .stream ()
100- .filter (
101- taskId ->
102- !isPartOfPendingDeploy (pendingDeploys , taskId ) &&
103- !couldReturnWithRecoveredAgent (taskId )
104- )
105- .sorted (SingularityTaskId .STARTED_AT_COMPARATOR_DESC )
106- .collect (Collectors .toList ());
118+
107119 int forRequest = 0 ;
108120 int transferred = 0 ;
109121 for (SingularityTaskId taskId : taskIds ) {
0 commit comments