Skip to content

Commit 751c5c5

Browse files
authored
Pipe: Refactor AirGap receiver with configurable payload size control (#17443) (#17503)
* Pipe: add hot-reloadable AirGap payload size guard to mitigate DoS risk. Introduce a dedicated AirGap receiver payload limit in the pipe config and enforce it before request buffer allocation, so oversized payloads are rejected early and memory pressure is bounded under malicious or malformed inputs. Made-with: Cursor * update * update * update * update * update * spotless (cherry picked from commit 7afedfe)
1 parent c72b967 commit 751c5c5

5 files changed

Lines changed: 132 additions & 3 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,14 +222,22 @@ private boolean checkSum(byte[] bytes) {
222222
}
223223
}
224224

225-
private byte[] readData(final InputStream inputStream) throws IOException {
225+
byte[] readData(final InputStream inputStream) throws IOException {
226226
final int length = readLength(inputStream);
227227

228228
if (length <= 0) {
229229
// Will fail() after checkSum()
230230
return new byte[0];
231231
}
232232

233+
final int maxLength = PipeConfig.getInstance().getPipeAirGapReceiverMaxPayloadSizeInBytes();
234+
if (length > maxLength) {
235+
throw new IOException(
236+
String.format(
237+
"AirGap payload length (%d) exceeds maximum allowed (%d). Closing connection from %s",
238+
length, maxLength, socket.getRemoteSocketAddress()));
239+
}
240+
233241
final byte[] resultBuffer = new byte[length];
234242
readTillFull(inputStream, resultBuffer);
235243
if (isELanguagePayload) {
@@ -238,11 +246,16 @@ private byte[] readData(final InputStream inputStream) throws IOException {
238246
return resultBuffer;
239247
}
240248

249+
private int readLength(final InputStream inputStream) throws IOException {
250+
return readLength(inputStream, false);
251+
}
252+
241253
/**
242254
* Read the length of the following data. The thread may typically block here when there is no
243255
* data to read.
244256
*/
245-
private int readLength(final InputStream inputStream) throws IOException {
257+
private int readLength(final InputStream inputStream, final boolean isELanguage)
258+
throws IOException {
246259
final byte[] doubleIntLengthBytes = new byte[2 * INT_LEN];
247260
readTillFull(inputStream, doubleIntLengthBytes);
248261

@@ -251,10 +264,16 @@ private int readLength(final InputStream inputStream) throws IOException {
251264
if (Arrays.equals(
252265
doubleIntLengthBytes,
253266
BytesUtils.subBytes(AirGapELanguageConstant.E_LANGUAGE_PREFIX, 0, 2 * INT_LEN))) {
267+
if (isELanguage) {
268+
throw new IOException(
269+
String.format(
270+
"Detected suspicious nested E-Language prefix. Closing connection from %s",
271+
socket.getRemoteSocketAddress()));
272+
}
254273
isELanguagePayload = true;
255274
skipTillEnough(
256275
inputStream, (long) AirGapELanguageConstant.E_LANGUAGE_PREFIX.length - 2 * INT_LEN);
257-
return readLength(inputStream);
276+
return readLength(inputStream, true);
258277
}
259278

260279
final byte[] dataLengthBytes = BytesUtils.subBytes(doubleIntLengthBytes, 0, INT_LEN);
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.receiver.protocol.airgap;
21+
22+
import org.apache.iotdb.commons.conf.CommonConfig;
23+
import org.apache.iotdb.commons.conf.CommonDescriptor;
24+
import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant;
25+
26+
import org.apache.tsfile.utils.BytesUtils;
27+
import org.junit.Assert;
28+
import org.junit.Test;
29+
30+
import java.io.ByteArrayInputStream;
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.net.Socket;
34+
35+
public class IoTDBAirGapReceiverTest {
36+
37+
@Test
38+
public void testRejectOversizedAirGapPayload() throws Exception {
39+
final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
40+
final int originalMaxPayload = commonConfig.getPipeAirGapReceiverMaxPayloadSizeInBytes();
41+
42+
try {
43+
commonConfig.setPipeAirGapReceiverMaxPayloadSizeInBytes(16);
44+
final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(new Socket(), 1L);
45+
46+
final byte[] oversizedLength = BytesUtils.intToBytes(32);
47+
final InputStream inputStream =
48+
new ByteArrayInputStream(BytesUtils.concatByteArray(oversizedLength, oversizedLength));
49+
50+
final IOException exception =
51+
Assert.assertThrows(IOException.class, () -> receiver.readData(inputStream));
52+
Assert.assertTrue(exception.getMessage().contains("payload length"));
53+
} finally {
54+
commonConfig.setPipeAirGapReceiverMaxPayloadSizeInBytes(originalMaxPayload);
55+
}
56+
}
57+
58+
@Test
59+
public void testRejectNestedELanguagePrefix() throws Exception {
60+
final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(new Socket(), 2L);
61+
62+
final InputStream inputStream =
63+
new ByteArrayInputStream(
64+
BytesUtils.concatByteArray(
65+
AirGapELanguageConstant.E_LANGUAGE_PREFIX,
66+
AirGapELanguageConstant.E_LANGUAGE_PREFIX));
67+
68+
final IOException exception =
69+
Assert.assertThrows(IOException.class, () -> receiver.readData(inputStream));
70+
Assert.assertTrue(exception.getMessage().contains("nested E-Language prefix"));
71+
}
72+
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,11 @@ public class CommonConfig {
309309
private double pipeReceiverActualToEstimatedMemoryRatio = 3;
310310

311311
private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
312+
// Align with default thrift frame size calculation.
313+
private int pipeAirGapReceiverMaxPayloadSizeInBytes =
314+
Math.min(
315+
64 * 1024 * 1024,
316+
(int) Math.min(Runtime.getRuntime().maxMemory() / 64, Integer.MAX_VALUE));
312317
private boolean pipeReceiverLoadConversionEnabled = false;
313318
private volatile long pipePeriodicalLogMinIntervalSeconds = 60;
314319
private volatile long pipeLoggerCacheMaxSizeInBytes = 16 * MB;
@@ -1646,6 +1651,23 @@ public void setPipeReceiverReqDecompressedMaxLengthInBytes(
16461651
pipeReceiverReqDecompressedMaxLengthInBytes);
16471652
}
16481653

1654+
public void setPipeAirGapReceiverMaxPayloadSizeInBytes(
1655+
int pipeAirGapReceiverMaxPayloadSizeInBytes) {
1656+
if (pipeAirGapReceiverMaxPayloadSizeInBytes <= 0) {
1657+
logger.info(
1658+
"Ignore invalid pipeAirGapReceiverMaxPayloadSizeInBytes {}, because it must be greater than 0.",
1659+
pipeAirGapReceiverMaxPayloadSizeInBytes);
1660+
return;
1661+
}
1662+
if (this.pipeAirGapReceiverMaxPayloadSizeInBytes == pipeAirGapReceiverMaxPayloadSizeInBytes) {
1663+
return;
1664+
}
1665+
this.pipeAirGapReceiverMaxPayloadSizeInBytes = pipeAirGapReceiverMaxPayloadSizeInBytes;
1666+
logger.info(
1667+
"pipeAirGapReceiverMaxPayloadSizeInBytes is set to {}.",
1668+
pipeAirGapReceiverMaxPayloadSizeInBytes);
1669+
}
1670+
16491671
public boolean isPipeReceiverLoadConversionEnabled() {
16501672
return pipeReceiverLoadConversionEnabled;
16511673
}
@@ -1687,6 +1709,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
16871709
return pipeReceiverReqDecompressedMaxLengthInBytes;
16881710
}
16891711

1712+
public int getPipeAirGapReceiverMaxPayloadSizeInBytes() {
1713+
return pipeAirGapReceiverMaxPayloadSizeInBytes;
1714+
}
1715+
16901716
public double getPipeMetaReportMaxLogNumPerRound() {
16911717
return pipeMetaReportMaxLogNumPerRound;
16921718
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
361361
return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
362362
}
363363

364+
public int getPipeAirGapReceiverMaxPayloadSizeInBytes() {
365+
return COMMON_CONFIG.getPipeAirGapReceiverMaxPayloadSizeInBytes();
366+
}
367+
364368
public boolean isPipeReceiverLoadConversionEnabled() {
365369
return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
366370
}
@@ -627,6 +631,9 @@ public void printAllConfigs() {
627631
LOGGER.info(
628632
"PipeReceiverReqDecompressedMaxLengthInBytes: {}",
629633
getPipeReceiverReqDecompressedMaxLengthInBytes());
634+
LOGGER.info(
635+
"PipeAirGapReceiverMaxPayloadSizeInBytes: {}",
636+
getPipeAirGapReceiverMaxPayloadSizeInBytes());
630637
LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled());
631638
LOGGER.info(
632639
"PipePeriodicalLogMinIntervalSeconds: {}", getPipePeriodicalLogMinIntervalSeconds());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
464464
properties.getProperty(
465465
"pipe_receiver_req_decompressed_max_length_in_bytes",
466466
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));
467+
config.setPipeAirGapReceiverMaxPayloadSizeInBytes(
468+
Integer.parseInt(
469+
properties.getProperty(
470+
"pipe_air_gap_receiver_max_payload_size_in_bytes",
471+
String.valueOf(config.getPipeAirGapReceiverMaxPayloadSizeInBytes()))));
467472
config.setPipeReceiverLoadConversionEnabled(
468473
Boolean.parseBoolean(
469474
properties.getProperty(

0 commit comments

Comments
 (0)