Skip to content

Commit 7478110

Browse files
authored
[To dev/1.3] Pipe: Fixed some potential OPC UA problems (#17393) & Added IT for `` in opc (#17394)
* fix * sptls * fix * remove-side * opti
1 parent 4b946d4 commit 7478110

5 files changed

Lines changed: 42 additions & 23 deletions

File tree

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,8 @@ public void start() {
461461
"-XX:MaxDirectMemorySize=" + jvmConfig.getMaxDirectMemorySize() + "m",
462462
"-Djdk.nio.maxCachedBufferSize=262144",
463463
"-D" + IoTDBConstant.INTEGRATION_TEST_KILL_POINTS + "=" + killPoints.toString(),
464-
"-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8",
464+
"-Dsun.jnu.encoding=UTF-8",
465+
"-Dfile.encoding=UTF-8",
465466
"-cp",
466467
server_node_lib_path));
467468
addStartCmdParams(startCmd);

integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void testOPCUAServerSink() throws Exception {
7070
try (final SyncConfigNodeIServiceClient client =
7171
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
7272

73-
TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null);
73+
TestUtils.executeNonQuery(env, "insert into root.db.d1(time, `1`) values (1, 1)", null);
7474

7575
final Map<String, String> sinkAttributes = new HashMap<>();
7676

@@ -108,7 +108,9 @@ public void testOPCUAServerSink() throws Exception {
108108
}
109109
}
110110
value =
111-
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
111+
opcUaClient
112+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/`1`"))
113+
.get();
112114
Assert.assertEquals(new Variant(1.0), value.getValue());
113115
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
114116
opcUaClient.disconnect().get();
@@ -119,12 +121,12 @@ public void testOPCUAServerSink() throws Exception {
119121
TestUtils.executeNonQueries(
120122
env,
121123
Arrays.asList(
122-
"create aligned timeSeries root.db.opc(value double, quality boolean, other int32)",
123-
"create aligned timeSeries root.db.opc1(value double, quality boolean, other int32)",
124-
"create aligned timeSeries root.db.opc2(value double, quality boolean, other int32)",
125-
"insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)",
126-
"insert into root.db.opc1(time, value, quality, other) values (0, 0, true, 1)",
127-
"insert into root.db.opc2(time, value, quality, other) values (0, 0, true, 1)"),
124+
"create aligned timeSeries root.db.`123`(value double, quality boolean, other int32)",
125+
"create aligned timeSeries root.db.`1231`(value double, quality boolean, other int32)",
126+
"create aligned timeSeries root.db.`1232`(value double, quality boolean, other int32)",
127+
"insert into root.db.`123`(time, value, quality, other) values (0, 0, true, 1)",
128+
"insert into root.db.`1231`(time, value, quality, other) values (0, 0, true, 1)",
129+
"insert into root.db.`1232`(time, value, quality, other) values (0, 0, true, 1)"),
128130
null);
129131

130132
while (true) {
@@ -164,31 +166,33 @@ public void testOPCUAServerSink() throws Exception {
164166
TestUtils.executeNonQueries(
165167
env,
166168
Arrays.asList(
167-
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
168-
"insert into root.db.opc1(time, value, quality, other) values (1, 1, false, 1)",
169-
"insert into root.db.opc2(time, value, quality, other) values (1, 1, false, 1)"),
169+
"insert into root.db.`123`(time, value, quality, other) values (1, 1, false, 1)",
170+
"insert into root.db.`1231`(time, value, quality, other) values (1, 1, false, 1)",
171+
"insert into root.db.`1232`(time, value, quality, other) values (1, 1, false, 1)"),
170172
null);
171173

172174
long startTime = System.currentTimeMillis();
173175
while (true) {
174176
try {
175177
value =
176-
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
178+
opcUaClient
179+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/`123`"))
180+
.get();
177181
Assert.assertEquals(new Variant(1.0), value.getValue());
178182
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
179183
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
180184

181185
value =
182186
opcUaClient
183-
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc1"))
187+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/`1231`"))
184188
.get();
185189
Assert.assertEquals(new Variant(1.0), value.getValue());
186190
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
187191
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
188192

189193
value =
190194
opcUaClient
191-
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc2"))
195+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/`1232`"))
192196
.get();
193197
Assert.assertEquals(new Variant(1.0), value.getValue());
194198
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
@@ -202,14 +206,16 @@ public void testOPCUAServerSink() throws Exception {
202206
}
203207

204208
TestUtils.executeNonQuery(
205-
env, "insert into root.db.opc(time, quality) values (2, true)", null);
206-
TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null);
209+
env, "insert into root.db.`123`(time, quality) values (2, true)", null);
210+
TestUtils.executeNonQuery(env, "insert into root.db.`123`(time, value) values (2, 2)", null);
207211

208212
startTime = System.currentTimeMillis();
209213
while (true) {
210214
try {
211215
value =
212-
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
216+
opcUaClient
217+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/`123`"))
218+
.get();
213219
Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime());
214220
Assert.assertEquals(new Variant(2.0), value.getValue());
215221
Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
import java.io.File;
3232
import java.io.IOException;
33+
import java.io.InputStream;
34+
import java.io.OutputStream;
3335
import java.nio.file.Files;
3436
import java.nio.file.Path;
3537
import java.security.Key;
@@ -61,8 +63,8 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep
6163
LOGGER.info("Loading KeyStore at {}", serverKeyStore);
6264

6365
if (serverKeyStore.exists()) {
64-
try {
65-
keyStore.load(Files.newInputStream(serverKeyStore.toPath()), password);
66+
try (InputStream is = Files.newInputStream(serverKeyStore.toPath())) {
67+
keyStore.load(is, password);
6668
} catch (final IOException e) {
6769
LOGGER.warn("Load keyStore failed, the existing keyStore may be stale, re-constructing...");
6870
FileUtils.deleteFileOrDirectory(serverKeyStore);
@@ -105,7 +107,9 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep
105107

106108
keyStore.setKeyEntry(
107109
SERVER_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate});
108-
keyStore.store(Files.newOutputStream(serverKeyStore.toPath()), password);
110+
try (final OutputStream os = Files.newOutputStream(serverKeyStore.toPath())) {
111+
keyStore.store(os, password);
112+
}
109113
}
110114

111115
final Key serverPrivateKey = keyStore.getKey(SERVER_ALIAS, password);
@@ -114,6 +118,10 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep
114118

115119
final PublicKey serverPublicKey = serverCertificate.getPublicKey();
116120
serverKeyPair = new KeyPair(serverPublicKey, (PrivateKey) serverPrivateKey);
121+
} else {
122+
throw new Exception(
123+
"Invalid keyStore, the serverPrivateKey is "
124+
+ (serverPrivateKey != null ? serverPrivateKey.getClass().getSimpleName() : "null"));
117125
}
118126

119127
return this;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,11 @@ public void startup() {
9191

9292
@Override
9393
public void shutdown() {
94-
getServer().shutdown();
95-
builder.close();
94+
try {
95+
getServer().shutdown();
96+
} finally {
97+
builder.close();
98+
}
9699
}
97100
});
98101
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymous
117117
return this;
118118
}
119119

120+
// Must be a modifiable set.
120121
public OpcUaServerBuilder setSecurityPolicies(final Set<SecurityPolicy> securityPolicies) {
121122
this.securityPolicies = securityPolicies;
122123
return this;

0 commit comments

Comments
 (0)