Skip to content

Commit 58f94e2

Browse files
authored
Pipe: Implemented OPC Sink for outer server & Set configuration and changed the default value of the server security policies & Made the default quality configurable and does not throw when non-value/quality measurement is encountered (#16944) (#17367)
fix Pipe: Fixed the OPC UA client connection problem (#17083) * fix * IT (cherry picked from commit 82f7ca6) spt Optimized the logger when table does not exist in DN heartbeat && Pipe: Fixed the OPC UA Sink key getter logic and potentail NPE when closing client && Load: Fixed the missing schema writing for "root" table (#17063) * root-fix * f * fix * rest * spls * gsa * fix (cherry picked from commit 5101489) fix Pipe: Implemented OPC Sink for outer server & Set configuration and changed the default value of the server security policies & Made the default quality configurable and does not throw when non-value/quality measurement is encountered (#16944) * pj * cj * bone * fix * fix * framework * fix * trilog * framework * fix * fix * yl * stack-client * fix * might * sleep-removal * cleaning * fix * sec-dir * cleaning * remove-poison * f * fix * clean-sit * sit-comp * object * many-clean * sit-sit * fix * fix * fix * ref * sit * partial * security-policies * check-equals * check-err * fix * compile-fix * adjust * ut * refactor * fix_and_IT * fix * placeholder * rollback * eliminate-fault * pw * fix * f * fix (cherry picked from commit cb18a95)
1 parent 4903f3c commit 58f94e2

15 files changed

Lines changed: 1530 additions & 149 deletions

File tree

integration-test/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@
185185
<dependency>
186186
<groupId>org.bouncycastle</groupId>
187187
<artifactId>bcprov-jdk18on</artifactId>
188-
<scope>test</scope>
189188
</dependency>
190189
<dependency>
191190
<groupId>junit</groupId>

integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ abstract class AbstractPipeSingleIT {
3131

3232
@Before
3333
public void setUp() {
34-
MultiEnvFactory.createEnv(2);
34+
MultiEnvFactory.createEnv(1);
3535
env = MultiEnvFactory.getEnv(0);
3636
env.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
3737
// 10 min, assert that the operations will not time out

integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java

Lines changed: 237 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,67 +20,280 @@
2020
package org.apache.iotdb.pipe.it.single;
2121

2222
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
23+
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
2324
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
2425
import org.apache.iotdb.db.it.utils.TestUtils;
26+
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
27+
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
28+
import org.apache.iotdb.it.env.cluster.EnvUtils;
2529
import org.apache.iotdb.it.framework.IoTDBTestRunner;
2630
import org.apache.iotdb.itbase.category.MultiClusterIT1;
31+
import org.apache.iotdb.pipe.api.exception.PipeException;
2732
import org.apache.iotdb.rpc.TSStatusCode;
2833

34+
import org.apache.tsfile.common.conf.TSFileConfig;
35+
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
36+
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
37+
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
38+
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
39+
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
40+
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
41+
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
42+
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
43+
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
44+
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
45+
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
2946
import org.junit.Assert;
3047
import org.junit.Test;
3148
import org.junit.experimental.categories.Category;
3249
import org.junit.runner.RunWith;
3350

51+
import java.io.File;
52+
import java.net.ConnectException;
53+
import java.util.Arrays;
3454
import java.util.Collections;
3555
import java.util.HashMap;
3656
import java.util.Map;
57+
import java.util.Objects;
58+
import java.util.UUID;
59+
60+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
61+
import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
3762

3863
@RunWith(IoTDBTestRunner.class)
3964
@Category({MultiClusterIT1.class})
4065
public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
4166
@Test
42-
public void testOPCUASink() throws Exception {
67+
public void testOPCUAServerSink() throws Exception {
68+
int tcpPort = -1;
69+
int httpsPort = -1;
4370
try (final SyncConfigNodeIServiceClient client =
4471
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
4572

46-
TestUtils.executeNonQuery(env, "insert into root.db.d1(time,s1) values (1,1)", null);
73+
TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null);
4774

48-
final Map<String, String> connectorAttributes = new HashMap<>();
49-
connectorAttributes.put("sink", "opc-ua-sink");
50-
connectorAttributes.put("opcua.model", "client-server");
75+
final Map<String, String> sinkAttributes = new HashMap<>();
5176

52-
Assert.assertEquals(
53-
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
54-
client
55-
.createPipe(
56-
new TCreatePipeReq("testPipe", connectorAttributes)
57-
.setExtractorAttributes(Collections.emptyMap())
58-
.setProcessorAttributes(Collections.emptyMap()))
59-
.getCode());
77+
sinkAttributes.put("sink", "opc-ua-sink");
78+
sinkAttributes.put("model", "client-server");
79+
sinkAttributes.put("opcua.security-policy", "None");
80+
81+
OpcUaClient opcUaClient;
82+
DataValue value;
83+
while (true) {
84+
final int[] ports = EnvUtils.searchAvailablePorts();
85+
tcpPort = ports[0];
86+
httpsPort = ports[1];
87+
sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
88+
sinkAttributes.put("https.port", Integer.toString(httpsPort));
89+
90+
Assert.assertEquals(
91+
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
92+
client
93+
.createPipe(
94+
new TCreatePipeReq("testPipe", sinkAttributes)
95+
.setExtractorAttributes(Collections.singletonMap("user", "root"))
96+
.setProcessorAttributes(Collections.emptyMap()))
97+
.getCode());
98+
99+
try {
100+
opcUaClient =
101+
getOpcUaClient(
102+
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
103+
} catch (final PipeException e) {
104+
if (e.getCause() instanceof ConnectException) {
105+
continue;
106+
} else {
107+
throw e;
108+
}
109+
}
110+
value =
111+
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
112+
Assert.assertEquals(new Variant(1.0), value.getValue());
113+
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
114+
opcUaClient.disconnect().get();
115+
break;
116+
}
117+
118+
// Create the region first to avoid tsFile parsing
119+
TestUtils.executeNonQueries(
120+
env,
121+
Arrays.asList(
122+
"create aligned timeSeries root.db.opc(value double, quality boolean, other int32)",
123+
"create aligned timeSeries root.db.opc1(value double, quality boolean, other int32)",
124+
"create aligned timeSeries root.db.opc2(value double, quality boolean, other int32)",
125+
"insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)",
126+
"insert into root.db.opc1(time, value, quality, other) values (0, 0, true, 1)",
127+
"insert into root.db.opc2(time, value, quality, other) values (0, 0, true, 1)"),
128+
null);
129+
130+
while (true) {
131+
final int[] ports = EnvUtils.searchAvailablePorts();
132+
tcpPort = ports[0];
133+
httpsPort = ports[1];
134+
sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
135+
sinkAttributes.put("https.port", Integer.toString(httpsPort));
136+
sinkAttributes.put("with-quality", "true");
137+
138+
Assert.assertEquals(
139+
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
140+
client
141+
.alterPipe(
142+
new TAlterPipeReq()
143+
.setPipeName("testPipe")
144+
.setIsReplaceAllConnectorAttributes(true)
145+
.setConnectorAttributes(sinkAttributes)
146+
.setProcessorAttributes(Collections.emptyMap())
147+
.setExtractorAttributes(Collections.emptyMap()))
148+
.getCode());
149+
try {
150+
opcUaClient =
151+
getOpcUaClient(
152+
"opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
153+
} catch (final PipeException e) {
154+
if (e.getCause() instanceof ConnectException) {
155+
continue;
156+
} else {
157+
throw e;
158+
}
159+
}
160+
break;
161+
}
162+
163+
// Test multiple regions
164+
TestUtils.executeNonQueries(
165+
env,
166+
Arrays.asList(
167+
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
168+
"insert into root.db.opc1(time, value, quality, other) values (1, 1, false, 1)",
169+
"insert into root.db.opc2(time, value, quality, other) values (1, 1, false, 1)"),
170+
null);
171+
172+
long startTime = System.currentTimeMillis();
173+
while (true) {
174+
try {
175+
value =
176+
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
177+
Assert.assertEquals(new Variant(1.0), value.getValue());
178+
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
179+
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
180+
181+
value =
182+
opcUaClient
183+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc1"))
184+
.get();
185+
Assert.assertEquals(new Variant(1.0), value.getValue());
186+
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
187+
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
188+
189+
value =
190+
opcUaClient
191+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc2"))
192+
.get();
193+
Assert.assertEquals(new Variant(1.0), value.getValue());
194+
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
195+
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
196+
break;
197+
} catch (final Throwable t) {
198+
if (System.currentTimeMillis() - startTime > 10_000L) {
199+
throw t;
200+
}
201+
}
202+
}
203+
204+
TestUtils.executeNonQuery(
205+
env, "insert into root.db.opc(time, quality) values (2, true)", null);
206+
TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null);
207+
208+
startTime = System.currentTimeMillis();
209+
while (true) {
210+
try {
211+
value =
212+
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
213+
Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime());
214+
Assert.assertEquals(new Variant(2.0), value.getValue());
215+
Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
216+
break;
217+
} catch (final Throwable t) {
218+
if (System.currentTimeMillis() - startTime > 10_000L) {
219+
throw t;
220+
}
221+
}
222+
}
223+
224+
opcUaClient.disconnect().get();
60225
Assert.assertEquals(
61226
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());
62227

63228
// Test reconstruction
64-
connectorAttributes.put("password", "test");
229+
sinkAttributes.put("password", "test");
230+
sinkAttributes.put("security-policy", "basic256sha256");
65231
Assert.assertEquals(
66232
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
67233
client
68234
.createPipe(
69-
new TCreatePipeReq("testPipe", connectorAttributes)
235+
new TCreatePipeReq("testPipe", sinkAttributes)
70236
.setExtractorAttributes(Collections.emptyMap())
71237
.setProcessorAttributes(Collections.emptyMap()))
72238
.getCode());
73239

240+
// Banned none, only allows basic256sha256
241+
final int finalTcpPort = tcpPort;
242+
Assert.assertThrows(
243+
PipeException.class,
244+
() ->
245+
getOpcUaClient(
246+
"opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb",
247+
SecurityPolicy.None,
248+
"root",
249+
"root"));
250+
74251
// Test conflict
75-
connectorAttributes.put("password", "conflict");
76-
Assert.assertEquals(
77-
TSStatusCode.PIPE_ERROR.getStatusCode(),
78-
client
79-
.createPipe(
80-
new TCreatePipeReq("testPipe", connectorAttributes)
81-
.setExtractorAttributes(Collections.emptyMap())
82-
.setProcessorAttributes(Collections.emptyMap()))
83-
.getCode());
252+
sinkAttributes.put("password", "conflict");
253+
try {
254+
TestUtils.executeNonQuery(
255+
env,
256+
String.format(
257+
"create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict@pswd', 'tcp.port'='%s', 'https.port'='%s')",
258+
tcpPort, httpsPort),
259+
null);
260+
Assert.fail();
261+
} catch (final Exception e) {
262+
Assert.assertEquals(
263+
String.format(
264+
"org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port %s and https port %s's password **** conflicts to the new password ****, reject reusing.",
265+
tcpPort, httpsPort),
266+
e.getMessage());
267+
}
268+
} finally {
269+
if (tcpPort >= 0) {
270+
final String lockPath = EnvUtils.getLockFilePath(tcpPort);
271+
if (!new File(lockPath).delete()) {
272+
System.out.printf("Delete lock file %s failed%n", lockPath);
273+
}
274+
}
84275
}
85276
}
277+
278+
private static OpcUaClient getOpcUaClient(
279+
final String nodeUrl,
280+
final SecurityPolicy policy,
281+
final String userName,
282+
final String password) {
283+
final IoTDBOpcUaClient client;
284+
285+
final IdentityProvider provider =
286+
Objects.nonNull(userName)
287+
? new UsernameProvider(userName, password)
288+
: new AnonymousProvider();
289+
290+
final String securityDir =
291+
CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
292+
+ File.separatorChar
293+
+ UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));
294+
295+
client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
296+
new ClientRunner(client, securityDir, password, userName, 10).run();
297+
return client.getClient();
298+
}
86299
}

0 commit comments

Comments
 (0)