Skip to content

Commit 106acbc

Browse files
authored
[FLINK-39387][Test] Fix flaky scheduler benchmark tests caused by thread assertion failure in async TDD creation
1 parent ef532a2 commit 106acbc

3 files changed

Lines changed: 73 additions & 54 deletions

File tree

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.concurrent;
20+
21+
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
22+
23+
import java.util.concurrent.Callable;
24+
import java.util.concurrent.ScheduledFuture;
25+
import java.util.concurrent.TimeUnit;
26+
27+
/**
28+
* A synchronous {@link ComponentMainThreadExecutor} that executes tasks directly on the calling
29+
* thread without performing strict thread identity checks.
30+
*
31+
* <p>Unlike {@link ComponentMainThreadExecutorServiceAdapter#forMainThread()}, this executor does
32+
* not assert that the current thread is the main thread, avoiding flaky test failures when {@link
33+
* java.util.concurrent.CompletableFuture} callbacks are dispatched from background threads.
34+
*/
35+
public class NoMainThreadCheckComponentMainThreadExecutor implements ComponentMainThreadExecutor {
36+
37+
private final DirectScheduledExecutorService executor = new DirectScheduledExecutorService();
38+
39+
@Override
40+
public void assertRunningInMainThread() {
41+
// No-op: Skip thread assertion to avoid flaky test failures
42+
}
43+
44+
@Override
45+
public void execute(Runnable command) {
46+
executor.execute(command);
47+
}
48+
49+
@Override
50+
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
51+
return executor.schedule(command, delay, unit);
52+
}
53+
54+
@Override
55+
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
56+
return executor.schedule(callable, delay, unit);
57+
}
58+
59+
@Override
60+
public ScheduledFuture<?> scheduleAtFixedRate(
61+
Runnable command, long initialDelay, long period, TimeUnit unit) {
62+
return executor.scheduleAtFixedRate(command, initialDelay, period, unit);
63+
}
64+
65+
@Override
66+
public ScheduledFuture<?> scheduleWithFixedDelay(
67+
Runnable command, long initialDelay, long delay, TimeUnit unit) {
68+
return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
69+
}
70+
}

flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java

Lines changed: 1 addition & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,13 @@
1818

1919
package org.apache.flink.runtime.operators.coordination;
2020

21-
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
21+
import org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor;
2222
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
2323
import org.apache.flink.runtime.jobgraph.JobVertexID;
2424
import org.apache.flink.runtime.messages.Acknowledge;
2525
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
26-
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
2726
import org.apache.flink.util.SerializedValue;
2827
import org.apache.flink.util.concurrent.FutureUtils;
29-
import org.apache.flink.util.concurrent.ScheduledExecutor;
30-
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
31-
32-
import javax.annotation.Nonnull;
3328

3429
import java.io.IOException;
3530
import java.util.ArrayList;
@@ -39,8 +34,6 @@
3934
import java.util.Objects;
4035
import java.util.concurrent.Callable;
4136
import java.util.concurrent.CompletableFuture;
42-
import java.util.concurrent.ScheduledFuture;
43-
import java.util.concurrent.TimeUnit;
4437
import java.util.stream.Collectors;
4538

4639
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
@@ -242,48 +235,4 @@ public List<Throwable> getTaskFailoverReasons() {
242235
return taskFailoverReasons;
243236
}
244237
}
245-
246-
/**
247-
* An implementation of {@link ComponentMainThreadExecutor} that executes Runnables with a
248-
* wrapped {@link ScheduledExecutor} and disables {@link #assertRunningInMainThread()} checks.
249-
*/
250-
private static class NoMainThreadCheckComponentMainThreadExecutor
251-
implements ComponentMainThreadExecutor {
252-
private final ScheduledExecutor scheduledExecutor;
253-
254-
private NoMainThreadCheckComponentMainThreadExecutor() {
255-
this.scheduledExecutor =
256-
new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService());
257-
}
258-
259-
@Override
260-
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
261-
return scheduledExecutor.schedule(command, delay, unit);
262-
}
263-
264-
@Override
265-
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
266-
return scheduledExecutor.schedule(callable, delay, unit);
267-
}
268-
269-
@Override
270-
public ScheduledFuture<?> scheduleAtFixedRate(
271-
Runnable command, long initialDelay, long period, TimeUnit unit) {
272-
return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, period, unit);
273-
}
274-
275-
@Override
276-
public ScheduledFuture<?> scheduleWithFixedDelay(
277-
Runnable command, long initialDelay, long delay, TimeUnit unit) {
278-
return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, delay, unit);
279-
}
280-
281-
@Override
282-
public void assertRunningInMainThread() {}
283-
284-
@Override
285-
public void execute(@Nonnull Runnable command) {
286-
scheduledExecutor.execute(command);
287-
}
288-
}
289238
}

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.flink.api.common.ExecutionConfig;
2222
import org.apache.flink.runtime.JobException;
2323
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
24-
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
24+
import org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor;
2525
import org.apache.flink.runtime.execution.ExecutionState;
2626
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
2727
import org.apache.flink.runtime.executiongraph.Execution;
@@ -111,7 +111,7 @@ public static DefaultScheduler createAndInitScheduler(
111111
final JobGraph jobGraph = createJobGraph(jobVertices, jobConfiguration);
112112

113113
final ComponentMainThreadExecutor mainThreadExecutor =
114-
ComponentMainThreadExecutorServiceAdapter.forMainThread();
114+
new NoMainThreadCheckComponentMainThreadExecutor();
115115

116116
DefaultSchedulerBuilder schedulerBuilder =
117117
new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, scheduledExecutorService)

0 commit comments

Comments
 (0)