Skip to content

Commit 046cab5

Browse files
committed
Upgrade to JGraphT 1.5.3
1 parent 6b32b2f commit 046cab5

18 files changed

+57
-83
lines changed

DEPENDENCY-LICENSES

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ List of third-party dependencies grouped by their license type.
405405

406406
Eclipse Public License (EPL) 1.0, GNU Lesser General Public License Version 2.1, February 1999
407407

408-
* JGraphT - Core (org.jgrapht:jgrapht-core:0.9.0 - http://www.jgrapht.org/jgrapht-core)
408+
* JGraphT - Core (org.jgrapht:jgrapht-core:1.5.3 - http://www.jgrapht.org/jgrapht-core)
409409

410410
Eclipse Public License 2.0, GNU General Public License, version 2 with the GNU Classpath Exception
411411

LICENSE-binary

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ The license texts of these dependencies can be found in the licenses directory.
899899

900900
Eclipse Public License (EPL) 1.0, GNU Lesser General Public License Version 2.1, February 1999
901901

902-
* JGraphT - Core (org.jgrapht:jgrapht-core:0.9.0 - http://www.jgrapht.org/jgrapht-core)
902+
* JGraphT - Core (org.jgrapht:jgrapht-core:1.5.3 - http://www.jgrapht.org/jgrapht-core)
903903

904904
Eclipse Public License v. 2.0, GNU General Public License, version 2 with the GNU Classpath Exception
905905

storm-client/src/jvm/org/apache/storm/streams/ProcessorBolt.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import java.util.List;
1616
import java.util.Map;
1717
import org.apache.storm.shade.com.google.common.collect.Multimap;
18-
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
18+
import org.apache.storm.shade.org.jgrapht.Graph;
1919
import org.apache.storm.task.OutputCollector;
2020
import org.apache.storm.task.TopologyContext;
2121
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -28,7 +28,7 @@
2828
class ProcessorBolt extends BaseRichBolt implements StreamBolt {
2929
private final ProcessorBoltDelegate delegate;
3030

31-
ProcessorBolt(String id, DirectedGraph<Node, Edge> graph, List<ProcessorNode> nodes) {
31+
ProcessorBolt(String id, Graph<Node, Edge> graph, List<ProcessorNode> nodes) {
3232
delegate = new ProcessorBoltDelegate(id, graph, nodes);
3333
}
3434

storm-client/src/jvm/org/apache/storm/streams/ProcessorBoltDelegate.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import org.apache.storm.shade.com.google.common.collect.HashBasedTable;
2727
import org.apache.storm.shade.com.google.common.collect.Multimap;
2828
import org.apache.storm.shade.com.google.common.collect.Table;
29-
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
30-
import org.apache.storm.shade.org.jgrapht.graph.DirectedSubgraph;
29+
import org.apache.storm.shade.org.jgrapht.Graph;
30+
import org.apache.storm.shade.org.jgrapht.graph.AsSubgraph;
3131
import org.apache.storm.shade.org.jgrapht.traverse.TopologicalOrderIterator;
3232
import org.apache.storm.streams.processors.ChainedProcessorContext;
3333
import org.apache.storm.streams.processors.EmittingProcessorContext;
@@ -45,7 +45,7 @@
4545
class ProcessorBoltDelegate implements Serializable {
4646
private static final Logger LOG = LoggerFactory.getLogger(ProcessorBoltDelegate.class);
4747
private final String id;
48-
private final DirectedGraph<Node, Edge> graph;
48+
private final Graph<Node, Edge> graph;
4949
private final List<ProcessorNode> nodes;
5050
private final List<ProcessorNode> outgoingProcessors = new ArrayList<>();
5151
private final Set<EmittingProcessorContext> emittingProcessorContexts = new HashSet<>();
@@ -57,7 +57,7 @@ class ProcessorBoltDelegate implements Serializable {
5757
private Multimap<String, ProcessorNode> streamToInitialProcessors;
5858
private String timestampField;
5959

60-
ProcessorBoltDelegate(String id, DirectedGraph<Node, Edge> graph, List<ProcessorNode> nodes) {
60+
ProcessorBoltDelegate(String id, Graph<Node, Edge> graph, List<ProcessorNode> nodes) {
6161
this.id = id;
6262
this.graph = graph;
6363
this.nodes = new ArrayList<>(nodes);
@@ -79,7 +79,7 @@ void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollec
7979
this.topoConf = topoConf;
8080
topologyContext = context;
8181
outputCollector = collector;
82-
DirectedSubgraph<Node, Edge> subgraph = new DirectedSubgraph<>(graph, new HashSet<>(nodes), null);
82+
AsSubgraph<Node, Edge> subgraph = new AsSubgraph<>(graph, new HashSet<>(nodes), null);
8383
TopologicalOrderIterator<Node, Edge> it = new TopologicalOrderIterator<>(subgraph);
8484
while (it.hasNext()) {
8585
Node node = it.next();

storm-client/src/jvm/org/apache/storm/streams/StatefulProcessorBolt.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import java.util.Map;
1818
import java.util.Set;
1919
import org.apache.storm.shade.com.google.common.collect.Multimap;
20-
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
20+
import org.apache.storm.shade.org.jgrapht.Graph;
2121
import org.apache.storm.state.KeyValueState;
2222
import org.apache.storm.streams.processors.StatefulProcessor;
2323
import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
@@ -35,7 +35,7 @@ class StatefulProcessorBolt<K, V> extends BaseStatefulBolt<KeyValueState<K, V>>
3535
// can be UpdateStateByKey or StateQuery processors
3636
private final Set<StatefulProcessor<K, V>> statefulProcessors;
3737

38-
StatefulProcessorBolt(String boltId, DirectedGraph<Node, Edge> graph, List<ProcessorNode> nodes) {
38+
StatefulProcessorBolt(String boltId, Graph<Node, Edge> graph, List<ProcessorNode> nodes) {
3939
delegate = new ProcessorBoltDelegate(boltId, graph, nodes);
4040
statefulProcessors = getStatefulProcessors(nodes);
4141
}

storm-client/src/jvm/org/apache/storm/streams/StreamBuilder.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.HashSet;
2121
import java.util.List;
2222
import java.util.Map;
23-
import java.util.PriorityQueue;
2423
import java.util.Set;
2524
import java.util.stream.Collectors;
2625
import org.apache.storm.annotation.InterfaceStability;
@@ -68,7 +67,7 @@ public class StreamBuilder {
6867
* Creates a new {@link StreamBuilder}.
6968
*/
7069
public StreamBuilder() {
71-
graph = new DefaultDirectedGraph<>(new StreamsEdgeFactory());
70+
graph = new DefaultDirectedGraph<>(null, null, false);
7271
}
7372

7473
/**
@@ -158,7 +157,7 @@ public StormTopology build() {
158157
nodeGroupingInfo.clear();
159158
windowInfo.clear();
160159
curGroup.clear();
161-
TopologicalOrderIterator<Node, Edge> iterator = new TopologicalOrderIterator<>(graph, queue());
160+
TopologicalOrderIterator<Node, Edge> iterator = new TopologicalOrderIterator<>(graph, priorityComparator());
162161
TopologyBuilder topologyBuilder = new TopologyBuilder();
163162
while (iterator.hasNext()) {
164163
Node node = iterator.next();
@@ -196,7 +195,7 @@ Node addNode(Node parent, Node child, String parentStreamId) {
196195

197196
Node addNode(Node parent, Node child, String parentStreamId, int parallelism) {
198197
graph.addVertex(child);
199-
graph.addEdge(parent, child);
198+
graph.addEdge(parent, child, new Edge(parent, child));
200199
child.setParallelism(parallelism);
201200
if (parent instanceof WindowNode || parent instanceof PartitionNode) {
202201
child.addParentStream(parentNode(parent), parentStreamId);
@@ -236,9 +235,8 @@ Node insert(Node parent, Node child) {
236235
return newChild;
237236
}
238237

239-
private PriorityQueue<Node> queue() {
240-
// min-heap
241-
return new PriorityQueue<>(new Comparator<Node>() {
238+
private Comparator<Node> priorityComparator() {
239+
return new Comparator<Node>() {
242240
/*
243241
* Nodes in the descending order of priority.
244242
* ProcessorNode has higher priority than partition and window nodes
@@ -279,7 +277,7 @@ private int getPriority(Node node) {
279277
}
280278
return Integer.MAX_VALUE;
281279
}
282-
});
280+
};
283281
}
284282

285283
private void handleProcessorNode(ProcessorNode processorNode, TopologyBuilder topologyBuilder) {

storm-client/src/jvm/org/apache/storm/streams/StreamUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
import java.util.ArrayList;
1818
import java.util.List;
19-
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
19+
import org.apache.storm.shade.org.jgrapht.Graph;
2020
import org.apache.storm.tuple.Fields;
2121

2222
public class StreamUtil {
2323
@SuppressWarnings("unchecked")
24-
public static <T> List<T> getParents(DirectedGraph<Node, Edge> graph, Node node) {
24+
public static <T> List<T> getParents(Graph<Node, Edge> graph, Node node) {
2525
List<Edge> incoming = new ArrayList<>(graph.incomingEdgesOf(node));
2626
List<T> ret = new ArrayList<>();
2727
for (Edge e : incoming) {
@@ -31,7 +31,7 @@ public static <T> List<T> getParents(DirectedGraph<Node, Edge> graph, Node node)
3131
}
3232

3333
@SuppressWarnings("unchecked")
34-
public static <T> List<T> getChildren(DirectedGraph<Node, Edge> graph, Node node) {
34+
public static <T> List<T> getChildren(Graph<Node, Edge> graph, Node node) {
3535
List<Edge> outgoing = new ArrayList<>(graph.outgoingEdgesOf(node));
3636
List<T> ret = new ArrayList<>();
3737
for (Edge e : outgoing) {

storm-client/src/jvm/org/apache/storm/streams/StreamsEdgeFactory.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

storm-client/src/jvm/org/apache/storm/streams/WindowedProcessorBolt.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.util.List;
1919
import java.util.Map;
2020
import org.apache.storm.shade.com.google.common.collect.Multimap;
21-
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
21+
import org.apache.storm.shade.org.jgrapht.Graph;
2222
import org.apache.storm.streams.windowing.SlidingWindows;
2323
import org.apache.storm.streams.windowing.TumblingWindows;
2424
import org.apache.storm.streams.windowing.Window;
@@ -39,7 +39,7 @@ class WindowedProcessorBolt extends BaseWindowedBolt implements StreamBolt {
3939
private final ProcessorBoltDelegate delegate;
4040
private final Window<?, ?> window;
4141

42-
WindowedProcessorBolt(String id, DirectedGraph<Node, Edge> graph,
42+
WindowedProcessorBolt(String id, Graph<Node, Edge> graph,
4343
List<ProcessorNode> nodes,
4444
Window<?, ?> window) {
4545
delegate = new ProcessorBoltDelegate(id, graph, nodes);

storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,8 @@
3838
import org.apache.storm.generated.SharedMemory;
3939
import org.apache.storm.generated.StormTopology;
4040
import org.apache.storm.grouping.CustomStreamGrouping;
41-
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
42-
import org.apache.storm.shade.org.jgrapht.UndirectedGraph;
43-
import org.apache.storm.shade.org.jgrapht.alg.ConnectivityInspector;
41+
import org.apache.storm.shade.org.jgrapht.Graph;
42+
import org.apache.storm.shade.org.jgrapht.alg.connectivity.ConnectivityInspector;
4443
import org.apache.storm.shade.org.jgrapht.graph.DefaultDirectedGraph;
4544
import org.apache.storm.shade.org.jgrapht.graph.Pseudograph;
4645
import org.apache.storm.topology.BoltDeclarer;
@@ -103,7 +102,7 @@ public class TridentTopology {
103102
Map<String, Number> masterCoordResources = new HashMap<>();
104103

105104
public TridentTopology() {
106-
this(new DefaultDirectedGraph<Node, IndexedEdge>(new ErrorEdgeFactory()),
105+
this(new DefaultDirectedGraph<Node, IndexedEdge>(null, new ErrorEdgeFactory(), false),
107106
new LinkedHashMap<String, List<Node>>(),
108107
new UniqueIdGen());
109108
}
@@ -333,9 +332,9 @@ private static Set<String> committerBatches(Group g, Map<Node, String> batchGrou
333332
return ret;
334333
}
335334

336-
private static Map<Group, Integer> getGroupParallelisms(DirectedGraph<Node, IndexedEdge> graph, GraphGrouper grouper,
335+
private static Map<Group, Integer> getGroupParallelisms(Graph<Node, IndexedEdge> graph, GraphGrouper grouper,
337336
Collection<Group> groups) {
338-
UndirectedGraph<Group, Object> equivs = new Pseudograph<>(Object.class);
337+
Graph<Group, Object> equivs = new Pseudograph<>(Object.class);
339338
for (Group g : groups) {
340339
equivs.addVertex(g);
341340
}
@@ -440,7 +439,7 @@ private static boolean isIdentityPartition(PartitionNode n) {
440439
return false;
441440
}
442441

443-
private static void addEdge(DirectedGraph g, Object source, Object target, int index) {
442+
private static void addEdge(Graph g, Object source, Object target, int index) {
444443
g.addEdge(source, target, new IndexedEdge(source, target, index));
445444
}
446445

0 commit comments

Comments
 (0)