2121
2222import org .apache .iotdb .commons .path .PartialPath ;
2323import org .apache .iotdb .commons .pipe .sink .payload .thrift .response .PipeTransferFilePieceResp ;
24+ import org .apache .iotdb .db .pipe .processor .twostage .exchange .payload .CombineRequest ;
25+ import org .apache .iotdb .db .pipe .processor .twostage .state .CountState ;
2426import org .apache .iotdb .db .pipe .sink .payload .evolvable .request .PipeTransferDataNodeHandshakeV1Req ;
2527import org .apache .iotdb .db .pipe .sink .payload .evolvable .request .PipeTransferPlanNodeReq ;
2628import org .apache .iotdb .db .pipe .sink .payload .evolvable .request .PipeTransferSchemaSnapshotPieceReq ;
3739import org .apache .iotdb .db .queryengine .plan .planner .plan .node .write .InsertRowNode ;
3840import org .apache .iotdb .db .queryengine .plan .statement .Statement ;
3941import org .apache .iotdb .rpc .RpcUtils ;
42+ import org .apache .iotdb .service .rpc .thrift .TPipeTransferReq ;
4043
4144import org .apache .tsfile .common .conf .TSFileConfig ;
4245import org .apache .tsfile .enums .TSDataType ;
@@ -62,6 +65,61 @@ public class PipeDataNodeThriftRequestTest {
6265
6366 private static final String TIME_PRECISION = "ms" ;
6467
68+ @ Test
69+ public void testCombineRequest () throws Exception {
70+ final CombineRequest req =
71+ CombineRequest .toTPipeTransferReq ("pipe" , 1L , 2 , "combine" , new CountState (123L ));
72+ final CombineRequest deserializeReq = CombineRequest .fromTPipeTransferReq (req );
73+
74+ Assert .assertEquals (req .getVersion (), deserializeReq .getVersion ());
75+ Assert .assertEquals (req .getType (), deserializeReq .getType ());
76+ Assert .assertEquals ("pipe" , deserializeReq .getPipeName ());
77+ Assert .assertEquals (1L , deserializeReq .getCreationTime ());
78+ Assert .assertEquals (2 , deserializeReq .getRegionId ());
79+ Assert .assertEquals ("combine" , deserializeReq .getCombineId ());
80+ Assert .assertTrue (deserializeReq .getState () instanceof CountState );
81+ Assert .assertEquals (123L , ((CountState ) deserializeReq .getState ()).getCount ());
82+ }
83+
84+ @ Test
85+ public void testCombineRequestWithUnexpectedStateClassName () throws Exception {
86+ final CombineRequest req =
87+ CombineRequest .toTPipeTransferReq ("pipe" , 1L , 2 , "combine" , new CountState (123L ));
88+
89+ final ByteBuffer bodyBuffer = req .body .duplicate ();
90+ final String pipeName = ReadWriteIOUtils .readString (bodyBuffer );
91+ final long creationTime = ReadWriteIOUtils .readLong (bodyBuffer );
92+ final int regionId = ReadWriteIOUtils .readInt (bodyBuffer );
93+ final String combineId = ReadWriteIOUtils .readString (bodyBuffer );
94+ ReadWriteIOUtils .readString (bodyBuffer );
95+ final long count = ReadWriteIOUtils .readLong (bodyBuffer );
96+
97+ final ByteBuffer tamperedBody ;
98+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS ();
99+ final DataOutputStream outputStream = new DataOutputStream (byteArrayOutputStream )) {
100+ ReadWriteIOUtils .write (pipeName , outputStream );
101+ ReadWriteIOUtils .write (creationTime , outputStream );
102+ ReadWriteIOUtils .write (regionId , outputStream );
103+ ReadWriteIOUtils .write (combineId , outputStream );
104+ ReadWriteIOUtils .write ("java.lang.String" , outputStream );
105+ ReadWriteIOUtils .write (count , outputStream );
106+ tamperedBody =
107+ ByteBuffer .wrap (byteArrayOutputStream .getBuf (), 0 , byteArrayOutputStream .size ());
108+ }
109+
110+ final TPipeTransferReq tamperedReq = new TPipeTransferReq ();
111+ tamperedReq .version = req .version ;
112+ tamperedReq .type = req .type ;
113+ tamperedReq .body = tamperedBody ;
114+
115+ try {
116+ CombineRequest .fromTPipeTransferReq (tamperedReq );
117+ Assert .fail ("Expected IllegalArgumentException" );
118+ } catch (final IllegalArgumentException e ) {
119+ Assert .assertTrue (e .getMessage ().contains ("Unexpected state class" ));
120+ }
121+ }
122+
65123 @ Test
66124 public void testPipeTransferDataNodeHandshakeReq () throws IOException {
67125 final PipeTransferDataNodeHandshakeV1Req req =
0 commit comments