Skip to content

Commit d243a10

Browse files
CaideyipiHTHou
authored andcommitted
[To dev/1.3] Fixed the REST partial insert & Pipe: Added partial insert IT (#17339)
* try * ' * no-bomb * side-effect * Revert "side-effect" This reverts commit cd104ea. * under-flow * fix * Fix rest partial insert serde error * fix testSpecialPartialInsert * add IT * fix IT * ignore flaky test --------- Co-authored-by: HTHou <haonan@apache.org>
1 parent 82ed55f commit d243a10

6 files changed

Lines changed: 213 additions & 4 deletions

File tree

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,14 @@ public Connection getConnection(String username, String password) throws SQLExce
440440
this);
441441
}
442442

443+
@Override
444+
public Connection getAvailableConnection(String username, String password) throws SQLException {
445+
return new ClusterTestConnection(
446+
getWriteConnection(null, username, password),
447+
getOneAvailableReadConnection(null, username, password),
448+
this);
449+
}
450+
443451
@Override
444452
public Connection getConnection(
445453
final DataNodeWrapper dataNodeWrapper, final String username, final String password)
@@ -656,6 +664,23 @@ protected List<NodeConnection> getReadConnections(
656664
return readConnRequestDelegate.requestAll();
657665
}
658666

667+
protected List<NodeConnection> getOneAvailableReadConnection(
668+
final Constant.Version version, final String username, final String password)
669+
throws SQLException {
670+
final List<DataNodeWrapper> dataNodeWrapperListCopy = new ArrayList<>(dataNodeWrapperList);
671+
Collections.shuffle(dataNodeWrapperListCopy);
672+
SQLException lastException = null;
673+
for (final DataNodeWrapper dataNode : dataNodeWrapperListCopy) {
674+
try {
675+
return getReadConnections(version, dataNode, username, password);
676+
} catch (final SQLException e) {
677+
lastException = e;
678+
}
679+
}
680+
logger.error("Failed to get connection from any DataNode, last exception is ", lastException);
681+
throw lastException;
682+
}
683+
659684
// use this to avoid some runtimeExceptions when try to get jdbc connections.
660685
// because it is hard to add retry and handle exception when getting jdbc connections in
661686
// getWriteConnectionWithSpecifiedDataNode and getReadConnections.

integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@ public Connection getConnection(String username, String password) throws SQLExce
136136
return connection;
137137
}
138138

139+
@Override
140+
public Connection getAvailableConnection(String username, String password) throws SQLException {
141+
throw new UnsupportedOperationException();
142+
}
143+
139144
@Override
140145
public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
141146
DataNodeWrapper dataNode, String username, String password) {

integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ Connection getConnection(Constant.Version version, String username, String passw
120120
Connection getConnection(DataNodeWrapper dataNodeWrapper, String username, String password)
121121
throws SQLException;
122122

123+
default Connection getAvailableConnection() throws SQLException {
124+
return getAvailableConnection(SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD);
125+
}
126+
127+
Connection getAvailableConnection(String username, String password) throws SQLException;
128+
123129
default Connection getWriteOnlyConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode)
124130
throws SQLException {
125131
return getWriteOnlyConnectionWithSpecifiedDataNode(

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
2222
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
2324
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
2425
import org.apache.iotdb.it.framework.IoTDBTestRunner;
2526
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -41,18 +42,27 @@
4142
import org.junit.After;
4243
import org.junit.Assert;
4344
import org.junit.Before;
45+
import org.junit.Ignore;
4446
import org.junit.Test;
4547
import org.junit.experimental.categories.Category;
4648
import org.junit.runner.RunWith;
4749

4850
import java.io.IOException;
4951
import java.nio.charset.Charset;
5052
import java.nio.charset.StandardCharsets;
53+
import java.sql.Connection;
54+
import java.sql.ResultSet;
55+
import java.sql.ResultSetMetaData;
56+
import java.sql.SQLException;
57+
import java.sql.Statement;
5158
import java.util.ArrayList;
5259
import java.util.Base64;
5360
import java.util.List;
5461
import java.util.Map;
62+
import java.util.concurrent.TimeUnit;
5563

64+
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
65+
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
5666
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.COLUMN_TTL;
5767
import static org.junit.Assert.assertEquals;
5868
import static org.junit.Assert.assertTrue;
@@ -78,7 +88,7 @@ public void tearDown() throws Exception {
7888
EnvFactory.getEnv().cleanClusterEnvironment();
7989
}
8090

81-
private String getAuthorization(String username, String password) {
91+
public static String getAuthorization(String username, String password) {
8292
return Base64.getEncoder()
8393
.encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8));
8494
}
@@ -128,7 +138,7 @@ public void ping() {
128138
}
129139
}
130140

131-
private HttpPost getHttpPost(String url) {
141+
public static HttpPost getHttpPost(String url) {
132142
HttpPost httpPost = new HttpPost(url);
133143
httpPost.addHeader("Content-type", "application/json; charset=utf-8");
134144
httpPost.setHeader("Accept", "application/json");
@@ -242,6 +252,101 @@ public void errorInsertRecords(CloseableHttpClient httpClient, String json, Http
242252
}
243253
}
244254

255+
@Ignore // Flaky test
256+
@Test
257+
public void errorInsertRecords() throws SQLException, InterruptedException {
258+
SimpleEnv simpleEnv = new SimpleEnv();
259+
simpleEnv
260+
.getConfig()
261+
.getCommonConfig()
262+
.setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
263+
.setSchemaReplicationFactor(3)
264+
.setDataRegionConsensusProtocolClass(IOT_CONSENSUS)
265+
.setDataReplicationFactor(2);
266+
simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
267+
simpleEnv.initClusterEnvironment(1, 3);
268+
269+
CloseableHttpResponse response = null;
270+
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
271+
try {
272+
HttpPost httpPost =
273+
getHttpPost(
274+
"http://"
275+
+ simpleEnv.getDataNodeWrapper(0).getIp()
276+
+ ":"
277+
+ simpleEnv.getDataNodeWrapper(0).getRestServicePort()
278+
+ "/rest/v2/insertRecords");
279+
String json =
280+
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
281+
httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
282+
for (int i = 0; i < 30; i++) {
283+
try {
284+
response = httpClient.execute(httpPost);
285+
break;
286+
} catch (Exception e) {
287+
if (i == 29) {
288+
throw e;
289+
}
290+
try {
291+
Thread.sleep(1000);
292+
} catch (InterruptedException ex) {
293+
throw new RuntimeException(ex);
294+
}
295+
}
296+
}
297+
298+
HttpEntity responseEntity = response.getEntity();
299+
String message = EntityUtils.toString(responseEntity, "utf-8");
300+
JsonObject result = JsonParser.parseString(message).getAsJsonObject();
301+
assertEquals(507, Integer.parseInt(result.get("code").toString()));
302+
} catch (IOException e) {
303+
e.printStackTrace();
304+
fail(e.getMessage());
305+
} finally {
306+
try {
307+
if (response != null) {
308+
response.close();
309+
}
310+
} catch (IOException e) {
311+
e.printStackTrace();
312+
fail(e.getMessage());
313+
}
314+
}
315+
TimeUnit.SECONDS.sleep(5);
316+
317+
try {
318+
for (DataNodeWrapper dataNodeWrapper : simpleEnv.getDataNodeWrapperList()) {
319+
dataNodeWrapper.stop();
320+
try (Connection connectionAfterNodeDown = simpleEnv.getAvailableConnection();
321+
Statement statementAfterNodeDown = connectionAfterNodeDown.createStatement()) {
322+
int count = 0;
323+
try (ResultSet resultSet =
324+
statementAfterNodeDown.executeQuery(
325+
"select s88, s77, s66, s55, s44, s33 from root.s1")) {
326+
ResultSetMetaData metaData = resultSet.getMetaData();
327+
while (resultSet.next()) {
328+
StringBuilder row = new StringBuilder();
329+
for (int i = 0; i < metaData.getColumnCount(); i++) {
330+
row.append(resultSet.getString(i + 1)).append(",");
331+
}
332+
System.out.println(row);
333+
count++;
334+
}
335+
}
336+
assertEquals(3, count);
337+
}
338+
dataNodeWrapper.start();
339+
TimeUnit.SECONDS.sleep(1);
340+
}
341+
} catch (SQLException e) {
342+
if (!e.getMessage().contains("Maybe server is down")) {
343+
throw e;
344+
}
345+
} finally {
346+
simpleEnv.cleanClusterEnvironment();
347+
}
348+
}
349+
245350
public void rightInsertTablet(CloseableHttpClient httpClient, String json, HttpPost httpPost) {
246351
CloseableHttpResponse response = null;
247352
try {

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,16 @@
2828
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
2929
import org.apache.iotdb.rpc.TSStatusCode;
3030

31+
import org.apache.http.client.methods.HttpPost;
32+
import org.apache.http.entity.StringEntity;
33+
import org.apache.http.impl.client.CloseableHttpClient;
34+
import org.apache.http.impl.client.HttpClientBuilder;
3135
import org.junit.Assert;
3236
import org.junit.Test;
3337
import org.junit.experimental.categories.Category;
3438
import org.junit.runner.RunWith;
3539

40+
import java.nio.charset.Charset;
3641
import java.sql.Connection;
3742
import java.sql.Statement;
3843
import java.util.Arrays;
@@ -42,11 +47,20 @@
4247
import java.util.Map;
4348
import java.util.function.Consumer;
4449

50+
import static org.apache.iotdb.db.it.IoTDBRestServiceIT.getHttpPost;
51+
4552
@RunWith(IoTDBTestRunner.class)
4653
@Category({MultiClusterIT2AutoCreateSchema.class})
4754
public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT {
55+
56+
@Override
57+
protected void setupConfig() {
58+
super.setupConfig();
59+
senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
60+
}
61+
4862
@Test
49-
public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
63+
public void testThriftSinkWithRealtimeFirstDisabled() throws Exception {
5064
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
5165

5266
final String receiverIp = receiverDataNode.getIp();
@@ -196,7 +210,7 @@ private void testSinkFormat(final String format) throws Exception {
196210
}
197211

198212
@Test
199-
public void testLegacyConnector() throws Exception {
213+
public void testLegacySink() throws Exception {
200214
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
201215

202216
final String receiverIp = receiverDataNode.getIp();
@@ -503,4 +517,53 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws
503517
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", "2,1.0,"))));
504518
}
505519
}
520+
521+
@Test
522+
public void testSpecialPartialInsert() throws Exception {
523+
try (final Connection connection = senderEnv.getConnection();
524+
final Statement statement = connection.createStatement()) {
525+
statement.execute(
526+
String.format(
527+
"create pipe a2b with sink ('node-urls'='%s')",
528+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
529+
}
530+
531+
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
532+
533+
HttpPost httpPost =
534+
getHttpPost(
535+
"http://"
536+
+ senderEnv.getDataNodeWrapper(0).getIp()
537+
+ ":"
538+
+ senderEnv.getDataNodeWrapper(0).getRestServicePort()
539+
+ "/rest/v2/insertRecords");
540+
String json =
541+
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
542+
httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
543+
for (int i = 0; i < 30; i++) {
544+
try {
545+
httpClient.execute(httpPost);
546+
break;
547+
} catch (final Exception e) {
548+
if (i == 29) {
549+
throw e;
550+
}
551+
try {
552+
Thread.sleep(1000);
553+
} catch (InterruptedException ex) {
554+
throw new RuntimeException(ex);
555+
}
556+
}
557+
}
558+
559+
TestUtils.assertDataEventuallyOnEnv(
560+
receiverEnv,
561+
"select s88, s77, s66, s55, s44, s33 from root.s1",
562+
"Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s33,",
563+
new HashSet<>(
564+
Arrays.asList(
565+
"1635232113960,null,null,null,null,1,",
566+
"1635232151960,null,null,2.0,2.1,null,",
567+
"1635232143960,6.0,4.0,null,null,null,")));
568+
}
506569
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,11 @@ public PlanNode visitInsertRows(
714714
insertRowStatement.getTime(),
715715
insertRowStatement.getValues(),
716716
insertRowStatement.isNeedInferType());
717+
if (insertRowStatement.getFailedMeasurementInfoMap() != null) {
718+
for (Integer index : insertRowStatement.getFailedMeasurementInfoMap().keySet()) {
719+
insertRowNode.markFailedMeasurement(index);
720+
}
721+
}
717722
insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
718723
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
719724
}

0 commit comments

Comments
 (0)