Skip to content

Commit cbe9798

Browse files
authored
4.x: Introduce FlatMapConfig & remove some overload bloat (#8031)
* 4.x: Introduce FlatMapConfig & remove some overload bloat Also found a Checkstyle plugin bug: checkstyle/checkstyle#19342 * Missed these * Apply checkstyle fixes
1 parent 5756066 commit cbe9798

17 files changed

Lines changed: 233 additions & 188 deletions

gradle/wrapper/gradle-wrapper.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-9.4.0-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-9.4.1-bin.zip
44
networkTimeout=10000
55
validateDistributionUrl=true
66
zipStoreBase=GRADLE_USER_HOME

gradlew

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableAsyncPerf.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.openjdk.jmh.annotations.*;
2020
import org.openjdk.jmh.infra.Blackhole;
2121

22+
import io.reactivex.rxjava4.core.config.FlatMapConfig;
2223
import io.reactivex.rxjava4.functions.Action;
2324
import io.reactivex.rxjava4.internal.functions.Functions;
2425
import io.reactivex.rxjava4.schedulers.Schedulers;
@@ -59,7 +60,7 @@ public void setup() {
5960
.flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())), false, maxConcurrency);
6061

6162
flatMap = Flowable.fromArray(array)
62-
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), false, maxConcurrency);
63+
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), new FlatMapConfig(false, maxConcurrency));
6364
}
6465

6566
// @Benchmark

src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableSyncPerf.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.openjdk.jmh.annotations.*;
2020
import org.openjdk.jmh.infra.Blackhole;
2121

22+
import io.reactivex.rxjava4.core.config.FlatMapConfig;
2223
import io.reactivex.rxjava4.internal.functions.Functions;
2324

2425
@SuppressWarnings("exports")
@@ -49,7 +50,7 @@ public void setup() {
4950
.flatMapCompletable(Functions.justFunction(Completable.complete()), false, maxConcurrency);
5051

5152
flatMap = Flowable.fromArray(array)
52-
.flatMap(Functions.justFunction(Completable.complete().toFlowable()), false, maxConcurrency);
53+
.flatMap(Functions.justFunction(Completable.complete().toFlowable()), new FlatMapConfig(false, maxConcurrency));
5354
}
5455

5556
@Benchmark

src/jmh/java/io/reactivex/rxjava4/parallel/ParallelPerf.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static java.util.concurrent.Flow.*;
2222

2323
import io.reactivex.rxjava4.core.*;
24+
import io.reactivex.rxjava4.core.config.FlatMapConfig;
2425
import io.reactivex.rxjava4.flowables.GroupedFlowable;
2526
import io.reactivex.rxjava4.functions.Function;
2627
import io.reactivex.rxjava4.schedulers.Schedulers;
@@ -71,7 +72,7 @@ public Publisher<Integer> apply(Integer v) {
7172
return Flowable.just(v).subscribeOn(Schedulers.computation())
7273
.map(ParallelPerf.this);
7374
}
74-
}, cpu);
75+
}, new FlatMapConfig(cpu));
7576

7677
groupBy = source.groupBy(new Function<Integer, Integer>() {
7778
int i;

src/main/java/io/reactivex/rxjava4/core/Flowable.java

Lines changed: 27 additions & 145 deletions
Large diffs are not rendered by default.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import io.reactivex.rxjava4.core.Flowable;
17+
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
18+
19+
/**
20+
* Generic configuration block with option to delay errors, change prefetch
21+
* amounts and buffer sizes.
22+
* TODO once value classes are available, make this a record class.
23+
* @since 4.0.0
24+
*/
25+
public record FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {
26+
27+
/**
28+
* Default config: no error delay, {@link Flowable#bufferSize()} sizes.
29+
*/
30+
public FlatMapConfig() {
31+
this(false, Flowable.bufferSize(), Flowable.bufferSize());
32+
}
33+
34+
/**
35+
* Optionally delay error, {@link Flowable#bufferSize()} sizes
36+
* @param delayErrors should the error be delayed?
37+
*/
38+
public FlatMapConfig(boolean delayErrors) {
39+
this(delayErrors, Flowable.bufferSize(), Flowable.bufferSize());
40+
}
41+
42+
/**
43+
* Optionally set the buffer size, no delay errors.
44+
* @param maxConcurrency the maximum number of concurrent flows
45+
*/
46+
public FlatMapConfig(int maxConcurrency) {
47+
this(false,
48+
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"),
49+
Flowable.bufferSize());
50+
}
51+
52+
/**
53+
* Optionally delays errors and sets the buffer size too.
54+
* @param delayError should the errors be delayed?
55+
* @param maxConcurrency the maximum number of concurrent flows
56+
*/
57+
public FlatMapConfig(boolean delayErrors, int maxConcurrency) {
58+
this(delayErrors,
59+
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"),
60+
Flowable.bufferSize());
61+
}
62+
63+
/**
64+
* Fully customize the configuration.
65+
* @param delayErrors should the errors be delayed
66+
* @param bufferSize what would be the buffer size
67+
* @param prefetch what would be the prefetch amount
68+
*/
69+
public FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {
70+
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
71+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
72+
this.delayErrors = delayErrors;
73+
this.maxConcurrency = maxConcurrency;
74+
this.bufferSize = bufferSize;
75+
}
76+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import io.reactivex.rxjava4.core.Flowable;
17+
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
18+
19+
/**
20+
* Generic configuration block with option to delay errors, change prefetch
21+
* amounts and buffer sizes.
22+
* TODO once value classes are available, make this a record class.
23+
* @since 4.0.0
24+
*/
25+
public record GenericConfig(boolean delayError, int bufferSize, int prefetch) {
26+
27+
/**
28+
* Default config: no error delay, {@link Flowable#bufferSize()} sizes.
29+
*/
30+
public GenericConfig() {
31+
this(false, Flowable.bufferSize(), Flowable.bufferSize());
32+
}
33+
34+
/**
35+
* Optionally delay error, {@link Flowable#bufferSize()} sizes.
36+
* @param delayError should the error be delayed?
37+
*/
38+
public GenericConfig(boolean delayError) {
39+
this(delayError, Flowable.bufferSize(), Flowable.bufferSize());
40+
}
41+
42+
/**
43+
* Optionally set the buffer size, no delay errors.
44+
* @param bufferSize the prefetch and the buffer size
45+
*/
46+
public GenericConfig(int bufferSize) {
47+
this(false, ObjectHelper.verifyPositive(bufferSize, "bufferSize"), Flowable.bufferSize());
48+
}
49+
50+
/**
51+
* Optionally delays errors and sets the buffer size too.
52+
* @param delayError should the errors be delayed?
53+
* @param bufferSize the prefetch and the buffer size
54+
*/
55+
public GenericConfig(boolean delayError, int bufferSize) {
56+
this(delayError, ObjectHelper.verifyPositive(bufferSize, "bufferSize"), Flowable.bufferSize());
57+
}
58+
59+
/**
60+
* Fully customize the configuration.
61+
* @param delayError should the errors be delayed
62+
* @param bufferSize what would be the buffer size
63+
* @param prefetch what would be the prefetch amount
64+
*/
65+
public GenericConfig(boolean delayError, int bufferSize, int prefetch) {
66+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
67+
ObjectHelper.verifyPositive(prefetch, "prefetch");
68+
this.delayError = delayError;
69+
this.bufferSize = bufferSize;
70+
this.prefetch = prefetch;
71+
}
72+
}

src/main/java/io/reactivex/rxjava4/core/docs/FlowableDocBasic.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
/**
2020
* Documents a set of operators so that the main Flowable source file is not cluttered.
2121
* @param <T> the element type of the flow
22+
* @since 4.0.0
2223
*/
2324
public sealed interface FlowableDocBasic<T> permits Flowable {
2425
/**

src/main/module/module-info.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
module io.reactivex.rxjava4 {
1515
exports io.reactivex.rxjava4.annotations;
1616
exports io.reactivex.rxjava4.core;
17+
exports io.reactivex.rxjava4.core.docs;
18+
exports io.reactivex.rxjava4.core.config;
1719
exports io.reactivex.rxjava4.disposables;
1820
exports io.reactivex.rxjava4.exceptions;
1921
exports io.reactivex.rxjava4.flowables;

0 commit comments

Comments
 (0)