Skip to content

Commit 35fafd2

Browse files
lss602726449HuSen8891
authored andcommitted
Enhancement: Add gpshrink to support elastic scaling
In order to support gpshrink, similar to gpexpand, we first support "alter table <tablename> shrink table to <segnum>" to redistribute data on a specific number of segments. For gpshrink implementation, it is mainly divided into two stages similar to gpexpand: 1. Collect the tables that need to be shrink and write them into gpshrink.status_detail. 2. Perform data redistribution on the tables that need to be shrink, and delete specific segments in gp_segment_configuration.
1 parent a6d62a3 commit 35fafd2

17 files changed

Lines changed: 3613 additions & 26 deletions

File tree

.github/workflows/build.yml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,49 @@ jobs:
125125
/code/gpdb_src/src/test/isolation/expected/
126126
/code/gpdb_src/gpAux/gpdemo/datadirs/standby/log/
127127
/code/gpdb_src/gpAux/gpdemo/datadirs/singlenodedir/demoDataDir-1/log/
128+
icw-expandshrink-test:
129+
needs: build
130+
runs-on: [ self-hosted, example ]
131+
env:
132+
MAKE_TEST_COMMAND: "-C src/test/isolation2 installcheck-expandshrink"
133+
TEST_OS: "centos"
134+
DUMP_DB: "true"
135+
steps:
136+
- uses: actions/checkout@v3
137+
with:
138+
path: "gpdb_src"
139+
- uses: actions/download-artifact@v3
140+
with:
141+
name: cbdb-variables
142+
path: /opt/
143+
- uses: actions/download-artifact@v3
144+
with:
145+
name: cbdb-package
146+
path: /opt/
147+
- name: Run icw-test script
148+
run: |
149+
mkdir /code
150+
cp -a gpdb_src/ /code
151+
cd /code
152+
echo $GITHUB_RUN_ID > gpdb_src/BUILD_NUMBER
153+
gpdb_src/hd-ci/icw_cbdb.bash $FTS_MODE
154+
- uses: actions/upload-artifact@v3
155+
if: failure()
156+
with:
157+
name: cbdb-icw-expandshrink-test-log
158+
path: |
159+
/code/gpdb_src/src/test/isolation2/regression.out
160+
/code/gpdb_src/src/test/isolation2/regression.diffs
161+
/code/gpdb_src/src/test/isolation2/results/
162+
/code/gpdb_src/src/test/isolation2/expected/
163+
/code/gpdb_src/gpAux/gpdemo/datadirs/standby/log/
164+
/code/gpdb_src/gpAux/gpdemo/datadirs/qddir/demoDataDir-1/log/
165+
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast1/demoDataDir0/log/
166+
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast2/demoDataDir1/log/
167+
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast3/demoDataDir2/log/
168+
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror1/demoDataDir0/log/
169+
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror2/demoDataDir1/log/
170+
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror3/demoDataDir2/log/
128171
icw-orca-test:
129172
needs: build
130173
runs-on: [self-hosted, example]

gpMgmt/bin/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ SUBDIRS += ifaddrs
1313
$(recurse)
1414

1515
PROGRAMS= analyzedb gpactivatestandby gpaddmirrors gpcheckcat gpcheckperf \
16-
gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpinitstandby \
16+
gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpshrink gpinitstandby \
1717
gpinitsystem gpload gpload.py gplogfilter gpmovemirrors \
1818
gppkg gprecoverseg gpreload gpscp gpsd gpssh gpssh-exkeys gpstart \
1919
gpstate gpstop minirepro gpmemwatcher gpmemreport gpdemo
@@ -194,7 +194,7 @@ clean distclean:
194194
rm -rf *.pyc
195195
rm -f analyzedbc gpactivatestandbyc gpaddmirrorsc gpcheckcatc \
196196
gpcheckperfc gpcheckresgroupimplc gpchecksubnetcfgc gpconfigc \
197-
gpdeletesystemc gpexpandc gpinitstandbyc gplogfilterc gpmovemirrorsc \
197+
gpdeletesystemc gpexpandc gpshrinkc gpinitstandbyc gplogfilterc gpmovemirrorsc \
198198
gppkgc gprecoversegc gpreloadc gpscpc gpsdc gpssh-exkeysc gpsshc \
199199
gpstartc gpstatec gpstopc minireproc
200200
rm -f gpconfig_modules/gucs_disallowed_in_file.txt

gpMgmt/bin/gpexpand

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,6 +1109,8 @@ class gpexpand:
11091109
tblspc_info = {}
11101110

11111111
for oid in tblspc_oids:
1112+
if oid not in tblspc_oid_names:
1113+
continue
11121114
location = os.path.dirname(os.readlink(os.path.join(coordinator_tblspc_dir,
11131115
oid)))
11141116
tblspc_info[oid] = {"location": location,
@@ -1222,6 +1224,15 @@ class gpexpand:
12221224
coordinator_tblspc_dir = self.gparray.coordinator.getSegmentTableSpaceDirectory()
12231225
if not os.listdir(coordinator_tblspc_dir):
12241226
return None
1227+
1228+
tblspc_oids = os.listdir(coordinator_tblspc_dir)
1229+
tblspc_oid_names = self.get_tablespace_oid_names()
1230+
flag = False
1231+
for oid in tblspc_oids:
1232+
if oid in tblspc_oid_names:
1233+
flag = True
1234+
if not flag:
1235+
return None
12251236

12261237
if not self.options.filename:
12271238
raise ExpansionError('Missing tablespace input file')
@@ -1385,6 +1396,25 @@ class gpexpand:
13851396
self.pool.join()
13861397
self.pool.check_results()
13871398

1399+
1400+
for i in range(1,12):
1401+
flag = True
1402+
for segment in newSegments:
1403+
if seg.isSegmentMirror() == True:
1404+
continue
1405+
1406+
cmd = Command('pg_isready for segment',
1407+
"pg_isready -q -h %s -p %d -d %s" % (segment.getSegmentHostName(), segment.getSegmentPort(), segment.getSegmentDataDirectory()))
1408+
cmd.run()
1409+
rc = cmd.get_return_code()
1410+
if rc != 0:
1411+
flag &= False
1412+
if flag:
1413+
break
1414+
time.sleep(10)
1415+
self.logger.info("Waiting for segment ready last for %s second" % (i*10))
1416+
1417+
13881418
"""
13891419
Build the list of delete statements based on the COORDINATOR_ONLY_TABLES
13901420
defined in gpcatalog.py

gpMgmt/bin/gppylib/gparray.py

Lines changed: 117 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def __equal(self, other, ignoreAttr=[]):
148148
return True
149149

150150
def __eq__(self, other):
151-
return self.__equal(other)
151+
return self.__equal(other, ['mode'])
152152

153153

154154
def __hash__(self):
@@ -429,6 +429,9 @@ def __str__(self):
429429
return "(Primary: %s, Mirror: %s)" % (str(self.primaryDB),
430430
str(self.mirrorDB))
431431

432+
def __eq__(self, other):
433+
return self.primaryDB == other.primaryDB and self.mirrorDB == other.mirrorDB
434+
432435
# --------------------------------------------------------------------
433436
def addPrimary(self,segDB):
434437
self.primaryDB=segDB
@@ -799,6 +802,7 @@ def __init__(self, segments, segmentsAsLoadedFromDb=None):
799802
self.standbyCoordinator = None
800803
self.segmentPairs = []
801804
self.expansionSegmentPairs=[]
805+
self.shrinkSegmentPairs=[]
802806
self.numPrimarySegments = 0
803807

804808
self.recoveredSegmentDbids = []
@@ -1045,7 +1049,7 @@ def dumpToFile(self, filename):
10451049
fp.close()
10461050

10471051
# --------------------------------------------------------------------
1048-
def getDbList(self, includeExpansionSegs=False):
1052+
def getDbList(self, includeExpansionSegs=False, removeShrinkSegs=False):
10491053
"""
10501054
Return a list of all Segment objects that make up the array
10511055
"""
@@ -1054,8 +1058,8 @@ def getDbList(self, includeExpansionSegs=False):
10541058
dbs.append(self.coordinator)
10551059
if self.standbyCoordinator:
10561060
dbs.append(self.standbyCoordinator)
1057-
if includeExpansionSegs:
1058-
dbs.extend(self.getSegDbList(True))
1061+
if includeExpansionSegs or removeShrinkSegs:
1062+
dbs.extend(self.getSegDbList(includeExpansionSegs, removeShrinkSegs))
10591063
else:
10601064
dbs.extend(self.getSegDbList())
10611065
return dbs
@@ -1105,23 +1109,29 @@ def getDbIdToPeerMap(self):
11051109

11061110

11071111
# --------------------------------------------------------------------
1108-
def getSegDbList(self, includeExpansionSegs=False):
1112+
def getSegDbList(self, includeExpansionSegs=False, removeShrinkSegs=False):
11091113
"""Return a list of all Segment objects for all segments in the array"""
11101114
dbs=[]
11111115
for segPair in self.segmentPairs:
11121116
dbs.extend(segPair.get_dbs())
11131117
if includeExpansionSegs:
11141118
for segPair in self.expansionSegmentPairs:
11151119
dbs.extend(segPair.get_dbs())
1120+
if removeShrinkSegs:
1121+
for segPair in self.shrinkSegmentPairs:
1122+
dbs = list(filter(lambda x: segPair.primaryDB != x and segPair.mirrorDB != x, dbs))
11161123
return dbs
11171124

11181125
# --------------------------------------------------------------------
1119-
def getSegmentList(self, includeExpansionSegs=False):
1126+
def getSegmentList(self, includeExpansionSegs=False, removeShrinkSegs=False):
11201127
"""Return a list of SegmentPair objects for all segments in the array"""
11211128
dbs=[]
11221129
dbs.extend(self.segmentPairs)
11231130
if includeExpansionSegs:
11241131
dbs.extend(self.expansionSegmentPairs)
1132+
if removeShrinkSegs:
1133+
for segPair in self.shrinkSegmentPairs:
1134+
dbs.remove(segPair)
11251135
return dbs
11261136

11271137
# --------------------------------------------------------------------
@@ -1148,6 +1158,21 @@ def getExpansionSegPairList(self):
11481158
"""Returns a list of all SegmentPair objects that make up the new segments
11491159
of an expansion"""
11501160
return self.expansionSegmentPairs
1161+
1162+
# --------------------------------------------------------------------
1163+
def getShrinkSegDbList(self):
1164+
"""Returns a list of all Segment objects that make up the new segments
1165+
of an expansion"""
1166+
dbs=[]
1167+
for segPair in self.shrinkSegmentPairs:
1168+
dbs.extend(segPair.get_dbs())
1169+
return dbs
1170+
1171+
# --------------------------------------------------------------------
1172+
def getShrinkSegPairList(self):
1173+
"""Returns a list of all SegmentPair objects that make up the new segments
1174+
of an expansion"""
1175+
return self.shrinkSegmentPairs
11511176

11521177
# --------------------------------------------------------------------
11531178
def getSegmentContainingDb(self, db):
@@ -1164,6 +1189,15 @@ def getExpansionSegmentContainingDb(self, db):
11641189
if db.getSegmentDbId() == segDb.getSegmentDbId():
11651190
return segPair
11661191
return None
1192+
1193+
# --------------------------------------------------------------------
1194+
def getShrinkSegmentContainingDb(self, db):
1195+
for segPair in self.shrinkSegmentPairs:
1196+
for segDb in segPair.get_dbs():
1197+
if db.getSegmentDbId() == segDb.getSegmentDbId():
1198+
return segPair
1199+
return None
1200+
11671201
# --------------------------------------------------------------------
11681202
def get_invalid_segdbs(self):
11691203
dbs=[]
@@ -1488,6 +1522,37 @@ def addExpansionSeg(self, content, preferred_role, dbid, role,
14881522
else:
14891523
seg.addMirror(segdb)
14901524

1525+
# --------------------------------------------------------------------
1526+
def addShrinkSeg(self, content, preferred_role, dbid, role,
1527+
hostname, address, port, datadir):
1528+
"""
1529+
Add a segment to the gparray as an shrink segment.
1530+
1531+
Note: may work better to construct the new Segment in gpshrink and
1532+
simply pass it in.
1533+
"""
1534+
1535+
segdb = Segment(content = content,
1536+
preferred_role = preferred_role,
1537+
dbid = dbid,
1538+
role = role,
1539+
mode = MODE_SYNCHRONIZED,
1540+
status = STATUS_UP,
1541+
hostname = hostname,
1542+
address = address,
1543+
port = port,
1544+
datadir = datadir)
1545+
1546+
if preferred_role == ROLE_PRIMARY:
1547+
self.shrinkSegmentPairs.append(SegmentPair())
1548+
seg = self.shrinkSegmentPairs[-1]
1549+
if seg.primaryDB:
1550+
raise Exception('Duplicate content id for primary segment')
1551+
seg.addPrimary(segdb)
1552+
else:
1553+
seg = self.shrinkSegmentPairs[-1]
1554+
seg.addMirror(segdb)
1555+
14911556
# --------------------------------------------------------------------
14921557
def reOrderExpansionSegs(self):
14931558
"""
@@ -1595,6 +1660,52 @@ def validateExpansionSegs(self):
15951660
else:
15961661
used_ports[hostname] = []
15971662
used_ports[hostname].append(db.port)
1663+
1664+
# --------------------------------------------------------------------
1665+
def validateShrinkSegs(self):
1666+
""" Checks the segments added for various inconsistencies and errors.
1667+
"""
1668+
1669+
# make sure we have added at least one segment
1670+
if len(self.shrinkSegmentPairs) == 0:
1671+
raise Exception('No shrink segments defined')
1672+
1673+
totalsize = len(self.segmentPairs)
1674+
removesize = len(self.shrinkSegmentPairs)
1675+
1676+
if removesize >= totalsize:
1677+
self.logger.error('removed segment num %d more than or equal to total segment num %d', removesize, totalsize)
1678+
exit(1)
1679+
elif removesize < 1:
1680+
self.logger.error('removed segment num %d less than 1', removesize)
1681+
exit(1)
1682+
1683+
for segPair in self.shrinkSegmentPairs:
1684+
if self.hasMirrors:
1685+
if segPair.mirrorDB is None:
1686+
raise Exception('primaryDB and mirrorDB should be removed simultaneously')
1687+
1688+
if segPair.primaryDB.content != segPair.mirrorDB.content:
1689+
raise Exception('primaryDB content is not equal mirrorDB content')
1690+
1691+
# If shrinkSegmentPairs not in the segmentPairs raise exception
1692+
flag = False
1693+
for segPair_ in self.segmentPairs :
1694+
if segPair_ == segPair :
1695+
flag = True
1696+
1697+
if flag == False:
1698+
raise Exception('Shrink segments not in the gp_segment_configuration table')
1699+
1700+
# If shrinkSegmentPairs is not the last n segment.
1701+
self.shrinkSegmentPairs.sort(key=lambda segPair: segPair.primaryDB.content)
1702+
1703+
if self.shrinkSegmentPairs[-1].primaryDB.content != self.get_max_contentid():
1704+
raise Exception('please remove segment from max contentid')
1705+
1706+
if self.shrinkSegmentPairs[0].primaryDB.content != self.get_max_contentid()-len(self.shrinkSegmentPairs)+1:
1707+
raise Exception('please remove segment in continuous contentid')
1708+
15981709

15991710
# --------------------------------------------------------------------
16001711
def addExpansionHosts(self, hosts, mirror_type):

gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def __init__(self, gpArray, forceMap, useUtilityMode, allowPrimary):
5454
self.dbsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getSegmentsAsLoadedFromDb()])
5555

5656
# 'goalsegmap' reflects the desired state of the catalog
57-
self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True)])
57+
self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True, removeShrinkSegs=True)])
5858

5959
# find mirrors and primaries to remove
6060
self.mirror_to_remove = [

0 commit comments

Comments
 (0)