Skip to content

Commit ed64982

Browse files
Use FDW to query multiple servers as shards (#320)
This commit mainly meets the needs of users to query multiple clusters as external shards. fdw treats each data source as a whole without knowing its internal structure. It keeps requesters and data sources properly decoupled to maintain generality. Add a new catalog table pg_foreign_table_seg to enable multiple shards in foreign table. The foreign table should be treated as a shard table with strewn locus. Each QE scanning the foreign should got a shard from pg_foreign_table_seg. Considering that the size of the computing cluster and the number of shards of the foreign table may be inconsistent. Use flexible gang to generate the same number of scan nodes as foreign table shards. Considering that the data bandwidth between different data centers is limited, we need to reduce the data transmission of fdw as much as possible. Pushing the execution node down to the remote end as much as possible can reduce data transmission. If all tables of a subtree are distributed in the same foreign server collection, It can be pushed down. But in mpp-fdw, we should consider if a table only joinning the shared in same foreign server. So a new system attribute gp_foreign_server was add to the foreign table. If the customer add "t1.gp_foreign_server = t2.gp_foreign_server" to join condition. It should be pushed down. We can only push down the first stage of the two-stage aggregation. Multi-stage aggregation will use some intermediate types. Some of these intermediate types are external types that can be output externally, such as count, min, max, and sum. The intermediate and final types of these types are identical. Others are more complex internal types, such as avg, whose intermediate type is inconsistent with the final type and must be converted using a final function. Since the local node in FDW serves as a standard client to exchange data with the remote server, these internal types cannot be transmitted. So some of the aggregate functions such as "avg" should not be pushed down now.
1 parent 56caaab commit ed64982

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1386
-121
lines changed

contrib/postgres_fdw/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ SHLIB_LINK_INTERNAL = -Wl,-Bsymbolic -Wl,-Bstatic -Wl,-Bstatic $(libpq) -Wl,-Bdy
2121
EXTENSION = postgres_fdw
2222
DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql
2323

24-
REGRESS = gp_postgres_fdw #postgres_fdw
24+
REGRESS = gp_postgres_fdw mpp_postgres_fdw #postgres_fdw
2525

2626
ifdef USE_PGXS
2727
PG_CONFIG = pg_config
Lines changed: 389 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
2+
NOTICE: extension "postgres_fdw" already exists, skipping
3+
CREATE SERVER testserver2 FOREIGN DATA WRAPPER postgres_fdw;
4+
DO $d$
5+
BEGIN
6+
EXECUTE $$CREATE SERVER mpps1 FOREIGN DATA WRAPPER postgres_fdw
7+
OPTIONS (dbname 'fdw1',
8+
port '$$||current_setting('port')||$$'
9+
)$$;
10+
EXECUTE $$CREATE SERVER mpps2 FOREIGN DATA WRAPPER postgres_fdw
11+
OPTIONS (dbname 'fdw2',
12+
port '$$||current_setting('port')||$$'
13+
)$$;
14+
EXECUTE $$CREATE SERVER mpps3 FOREIGN DATA WRAPPER postgres_fdw
15+
OPTIONS (dbname 'fdw3',
16+
port '$$||current_setting('port')||$$'
17+
)$$;
18+
END;
19+
$d$;
20+
21+
CREATE USER MAPPING FOR CURRENT_USER SERVER mpps1;
22+
CREATE USER MAPPING FOR CURRENT_USER SERVER mpps2;
23+
CREATE USER MAPPING FOR CURRENT_USER SERVER mpps3;
24+
DROP DATABASE IF EXISTS fdw1;
25+
DROP DATABASE IF EXISTS fdw2;
26+
DROP DATABASE IF EXISTS fdw3;
27+
CREATE DATABASE fdw1;
28+
CREATE DATABASE fdw2;
29+
CREATE DATABASE fdw3;
30+
\c fdw1
31+
create table t1(a int, b text);
32+
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
33+
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
34+
create table t2(a int, b text);
35+
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
36+
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
37+
insert into t1 values(1, 'fdw1');
38+
insert into t2 values(1, 'fdw1');
39+
\c fdw2
40+
create table t1(a int, b text);
41+
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
42+
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
43+
create table t2(a int, b text);
44+
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
45+
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
46+
insert into t1 values(1, 'fdw2');
47+
insert into t2 values(1, 'fdw2');
48+
\c fdw3
49+
create table t1(a int, b text);
50+
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
51+
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
52+
create table t2(a int, b text);
53+
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
54+
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
55+
insert into t1 values(1, 'fdw3');
56+
insert into t2 values(1, 'fdw3');
57+
\c contrib_regression
58+
CREATE FOREIGN TABLE fs1 (
59+
a int,
60+
b text
61+
)
62+
SERVER mpps1
63+
OPTIONS (schema_name 'public', table_name 't1', mpp_execute 'all segments');
64+
ADD FOREIGN SEGMENT FROM SERVER mpps1 INTO fs1;
65+
explain (costs off) select * from fs1;
66+
QUERY PLAN
67+
------------------------------------------
68+
Gather Motion 1:1 (slice1; segments: 1)
69+
-> Foreign Scan on fs1
70+
Optimizer: Postgres query optimizer
71+
(3 rows)
72+
73+
select * from fs1;
74+
a | b
75+
---+------
76+
1 | fdw1
77+
(1 row)
78+
79+
ADD FOREIGN SEGMENT FROM SERVER mpps2 INTO fs1;
80+
explain (costs off) select * from fs1;
81+
QUERY PLAN
82+
------------------------------------------
83+
Gather Motion 2:1 (slice1; segments: 2)
84+
-> Foreign Scan on fs1
85+
Optimizer: Postgres query optimizer
86+
(3 rows)
87+
88+
select * from fs1;
89+
a | b
90+
---+------
91+
1 | fdw1
92+
1 | fdw2
93+
(2 rows)
94+
95+
explain (costs off) select count(*) from fs1;
96+
QUERY PLAN
97+
------------------------------------------------
98+
Finalize Aggregate
99+
-> Gather Motion 2:1 (slice1; segments: 2)
100+
-> Foreign Scan
101+
Relations: Aggregate on (fs1)
102+
Optimizer: Postgres query optimizer
103+
(5 rows)
104+
105+
select count(*) from fs1;
106+
count
107+
-------
108+
2
109+
(1 row)
110+
111+
select count(*),b from fs1 group by b;
112+
count | b
113+
-------+------
114+
1 | fdw2
115+
1 | fdw1
116+
(2 rows)
117+
118+
ADD FOREIGN SEGMENT FROM SERVER mpps3 INTO fs1;
119+
explain (costs off) select * from fs1;
120+
QUERY PLAN
121+
------------------------------------------
122+
Gather Motion 3:1 (slice1; segments: 3)
123+
-> Foreign Scan on fs1
124+
Optimizer: Postgres query optimizer
125+
(3 rows)
126+
127+
select * from fs1;
128+
a | b
129+
---+------
130+
1 | fdw2
131+
1 | fdw1
132+
1 | fdw3
133+
(3 rows)
134+
135+
explain (costs off) select count(*) from fs1;
136+
QUERY PLAN
137+
------------------------------------------------
138+
Finalize Aggregate
139+
-> Gather Motion 3:1 (slice1; segments: 3)
140+
-> Foreign Scan
141+
Relations: Aggregate on (fs1)
142+
Optimizer: Postgres query optimizer
143+
(5 rows)
144+
145+
select count(*) from fs1;
146+
count
147+
-------
148+
3
149+
(1 row)
150+
151+
select count(*),b from fs1 group by b;
152+
count | b
153+
-------+------
154+
1 | fdw2
155+
1 | fdw1
156+
1 | fdw3
157+
(3 rows)
158+
159+
----------------------
160+
-- Test join push down
161+
----------------------
162+
CREATE FOREIGN TABLE fs2 (
163+
a int,
164+
b text
165+
)
166+
SERVER mpps1
167+
OPTIONS (schema_name 'public', table_name 't2', mpp_execute 'all segments');
168+
ADD FOREIGN SEGMENT FROM SERVER mpps1 INTO fs2;
169+
ADD FOREIGN SEGMENT FROM SERVER mpps2 INTO fs2;
170+
ADD FOREIGN SEGMENT FROM SERVER mpps3 INTO fs2;
171+
explain (costs off) select * from fs1, fs2 where fs1.a = fs2.a;
172+
QUERY PLAN
173+
------------------------------------------------------------------
174+
Gather Motion 3:1 (slice1; segments: 3)
175+
-> Hash Join
176+
Hash Cond: (fs1.a = fs2.a)
177+
-> Redistribute Motion 3:3 (slice2; segments: 3)
178+
Hash Key: fs1.a
179+
-> Foreign Scan on fs1
180+
-> Hash
181+
-> Redistribute Motion 3:3 (slice3; segments: 3)
182+
Hash Key: fs2.a
183+
-> Foreign Scan on fs2
184+
Optimizer: Postgres query optimizer
185+
(11 rows)
186+
187+
select * from fs1,fs2 where fs1.a = fs2.a;
188+
a | b | a | b
189+
---+------+---+------
190+
1 | fdw1 | 1 | fdw2
191+
1 | fdw1 | 1 | fdw1
192+
1 | fdw1 | 1 | fdw3
193+
1 | fdw2 | 1 | fdw2
194+
1 | fdw2 | 1 | fdw1
195+
1 | fdw2 | 1 | fdw3
196+
1 | fdw3 | 1 | fdw2
197+
1 | fdw3 | 1 | fdw1
198+
1 | fdw3 | 1 | fdw3
199+
(9 rows)
200+
201+
explain (costs off) select * from fs1, fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
202+
QUERY PLAN
203+
-------------------------------------------
204+
Gather Motion 3:1 (slice1; segments: 3)
205+
-> Foreign Scan
206+
Relations: (fs1) INNER JOIN (fs2)
207+
Optimizer: Postgres query optimizer
208+
(4 rows)
209+
210+
select * from fs1,fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
211+
a | b | a | b
212+
---+------+---+------
213+
1 | fdw3 | 1 | fdw3
214+
1 | fdw1 | 1 | fdw1
215+
1 | fdw2 | 1 | fdw2
216+
(3 rows)
217+
218+
explain (costs off) select count(*) from fs1, fs2 where fs1.a = fs2.a;
219+
QUERY PLAN
220+
------------------------------------------------------------------------------
221+
Finalize Aggregate
222+
-> Gather Motion 3:1 (slice1; segments: 3)
223+
-> Partial Aggregate
224+
-> Hash Join
225+
Hash Cond: (fs1.a = fs2.a)
226+
-> Redistribute Motion 3:3 (slice2; segments: 3)
227+
Hash Key: fs1.a
228+
-> Foreign Scan on fs1
229+
-> Hash
230+
-> Redistribute Motion 3:3 (slice3; segments: 3)
231+
Hash Key: fs2.a
232+
-> Foreign Scan on fs2
233+
Optimizer: Postgres query optimizer
234+
(13 rows)
235+
236+
select count(*) from fs1,fs2 where fs1.a = fs2.a;
237+
count
238+
-------
239+
9
240+
(1 row)
241+
242+
explain (costs off) select count(*) from fs1, fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
243+
QUERY PLAN
244+
----------------------------------------------------------------
245+
Finalize Aggregate
246+
-> Gather Motion 3:1 (slice1; segments: 3)
247+
-> Foreign Scan
248+
Relations: Aggregate on ((fs1) INNER JOIN (fs2))
249+
Optimizer: Postgres query optimizer
250+
(5 rows)
251+
252+
select count(*) from fs1,fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
253+
count
254+
-------
255+
3
256+
(1 row)
257+
258+
----------------------------
259+
-- Test with enable parallel
260+
----------------------------
261+
set enable_parallel to true;
262+
explain (costs off) select * from fs1;
263+
QUERY PLAN
264+
------------------------------------------
265+
Gather Motion 3:1 (slice1; segments: 3)
266+
-> Foreign Scan on fs1
267+
Optimizer: Postgres query optimizer
268+
(3 rows)
269+
270+
select * from fs1;
271+
a | b
272+
---+------
273+
1 | fdw1
274+
1 | fdw2
275+
1 | fdw3
276+
(3 rows)
277+
278+
explain (costs off) select count(*) from fs1;
279+
QUERY PLAN
280+
------------------------------------------------
281+
Finalize Aggregate
282+
-> Gather Motion 3:1 (slice1; segments: 3)
283+
-> Foreign Scan
284+
Relations: Aggregate on (fs1)
285+
Optimizer: Postgres query optimizer
286+
(5 rows)
287+
288+
select count(*) from fs1;
289+
count
290+
-------
291+
3
292+
(1 row)
293+
294+
explain (costs off) select * from fs1, fs2 where fs1.a = fs2.a;
295+
QUERY PLAN
296+
------------------------------------------------------------------
297+
Gather Motion 3:1 (slice1; segments: 3)
298+
-> Hash Join
299+
Hash Cond: (fs1.a = fs2.a)
300+
-> Redistribute Motion 3:3 (slice2; segments: 3)
301+
Hash Key: fs1.a
302+
-> Foreign Scan on fs1
303+
-> Hash
304+
-> Redistribute Motion 3:3 (slice3; segments: 3)
305+
Hash Key: fs2.a
306+
-> Foreign Scan on fs2
307+
Optimizer: Postgres query optimizer
308+
(11 rows)
309+
310+
select * from fs1,fs2 where fs1.a = fs2.a;
311+
a | b | a | b
312+
---+------+---+------
313+
1 | fdw1 | 1 | fdw1
314+
1 | fdw1 | 1 | fdw3
315+
1 | fdw1 | 1 | fdw2
316+
1 | fdw3 | 1 | fdw1
317+
1 | fdw3 | 1 | fdw3
318+
1 | fdw3 | 1 | fdw2
319+
1 | fdw2 | 1 | fdw1
320+
1 | fdw2 | 1 | fdw3
321+
1 | fdw2 | 1 | fdw2
322+
(9 rows)
323+
324+
select count(*),b from fs1 group by b;
325+
count | b
326+
-------+------
327+
1 | fdw2
328+
1 | fdw1
329+
1 | fdw3
330+
(3 rows)
331+
332+
explain (costs off) select * from fs1, fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
333+
QUERY PLAN
334+
-------------------------------------------
335+
Gather Motion 3:1 (slice1; segments: 3)
336+
-> Foreign Scan
337+
Relations: (fs1) INNER JOIN (fs2)
338+
Optimizer: Postgres query optimizer
339+
(4 rows)
340+
341+
select * from fs1,fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
342+
a | b | a | b
343+
---+------+---+------
344+
1 | fdw3 | 1 | fdw3
345+
1 | fdw2 | 1 | fdw2
346+
1 | fdw1 | 1 | fdw1
347+
(3 rows)
348+
349+
explain (costs off) select count(*) from fs1, fs2 where fs1.a = fs2.a;
350+
QUERY PLAN
351+
------------------------------------------------------------------------------
352+
Finalize Aggregate
353+
-> Gather Motion 3:1 (slice1; segments: 3)
354+
-> Partial Aggregate
355+
-> Hash Join
356+
Hash Cond: (fs1.a = fs2.a)
357+
-> Redistribute Motion 3:3 (slice2; segments: 3)
358+
Hash Key: fs1.a
359+
-> Foreign Scan on fs1
360+
-> Hash
361+
-> Redistribute Motion 3:3 (slice3; segments: 3)
362+
Hash Key: fs2.a
363+
-> Foreign Scan on fs2
364+
Optimizer: Postgres query optimizer
365+
(13 rows)
366+
367+
select count(*) from fs1,fs2 where fs1.a = fs2.a;
368+
count
369+
-------
370+
9
371+
(1 row)
372+
373+
explain (costs off) select count(*) from fs1, fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
374+
QUERY PLAN
375+
----------------------------------------------------------------
376+
Finalize Aggregate
377+
-> Gather Motion 3:1 (slice1; segments: 3)
378+
-> Foreign Scan
379+
Relations: Aggregate on ((fs1) INNER JOIN (fs2))
380+
Optimizer: Postgres query optimizer
381+
(5 rows)
382+
383+
select count(*) from fs1,fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
384+
count
385+
-------
386+
3
387+
(1 row)
388+
389+
reset enable_parallel;

0 commit comments

Comments
 (0)