Skip to content

Commit 06ae183

Browse files
zhangwenchao-123my-ship-it
authored andcommitted
Implement copy to directory table and support copy from/to program directory table.
This commit, we support some directory features as following: 1. support copy to directory table, the grammar is such as: COPY BINARY DIRECTORY TABLE <directory_table_name> 'relative_path' to 'dest_path'; 2. support copy from/to directory table program 3. allow create/drop index on directory table 4. support download mode in gpdirtable tool Authored-by: Zhang Wenchao zwcpostgres@gmail.com
1 parent 08c5844 commit 06ae183

18 files changed

Lines changed: 1125 additions & 227 deletions

File tree

gpMgmt/bin/gpdirtableload

Lines changed: 136 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -67,23 +67,32 @@ def parseargs():
6767

6868
parser.add_argument('--database', '-d', default="gpadmin",
6969
help='Database to connect to')
70-
parser.add_argument('--dest-path', help='Path relative to the table root directory')
70+
parser.add_argument('--mode', choices=['upload', 'download'], default="upload",
71+
help='Upload or download file to/from directory table')
72+
parser.add_argument('--dest-path', help='In upload mode, this means path relative to '
73+
'the table root directory, while in download '
74+
'mode, means directory to download')
7175

7276
parser.add_argument('--force-password-auth', default=False, action='store_true',
7377
help='Force a password prompt')
7478

7579
parser.add_argument('--host', default="localhost",
7680
help='Host to connect to')
77-
parser.add_argument('--input-file', help='Input files or directory')
81+
parser.add_argument('--input-file', help='In upload mode, this means input files or '
82+
'directory, while in download mode, means '
83+
'which directory table to download')
7884

7985
parser.add_argument('--logfile', help='Log output to logfile')
8086

87+
parser.add_argument('--tag', help='In download mode, only download the same tag files')
88+
parser.add_argument('--force-write', default=False, action='store_true',
89+
help='In download mode, force write files when files have existed')
90+
8191
parser.add_argument('--port', '-p', type=int, default="5432",
8292
help='Port to connect to')
8393
parser.add_argument('--stop-on-error', default=False,
8494
help='Stop loading files when an error occurs')
8595
parser.add_argument('--table', '-t', help='Directory table to load to')
86-
parser.add_argument('--tag', help='Tag name')
8796
parser.add_argument('--tasks', '-T', type=int, default="1",
8897
help='The maximum number of files that concurrently loads')
8998
parser.add_argument('--user', '-U', default="gpadmin",
@@ -155,13 +164,21 @@ class gpdirtableload:
155164
self.options.qv = self.INFO
156165
self.startTimestamp = time.time()
157166
self.pool = None
167+
self.upload = True
158168

159169
# set default log level
160170
if self.options.verbose is not None:
161171
self.options.qv = self.DEBUG
162172
else:
163173
self.options.qv = self.INFO
164174

175+
# set load from/to
176+
if self.options.mode is not None and self.options.mode == 'download':
177+
self.upload = False
178+
179+
if self.options.dest_path is None:
180+
self.log(self.ERROR, '--dest-path must be set')
181+
165182
# default to gpAdminLogs for a log file, may be overwritten
166183
if self.options.logfile is None:
167184
self.options.logfile = os.path.join(os.environ.get('HOME', '.'), 'gpAdminLogs')
@@ -334,16 +351,32 @@ class gpdirtableload:
334351
self.allFiles.append(filepath)
335352
self.numFiles = 1
336353

354+
def collectAllFilesToDownload(self):
355+
self.allFilesToDownload = []
356+
self.numFiles = 0
357+
358+
qry = "SELECT relative_path FROM %s " % self.options.table
359+
360+
if self.options.tag:
361+
qry += "WHERE tag = \'%s\'" % self.options.tag
362+
363+
self.allFilesToDownload = [s[0] for s in
364+
self.db.query(qry).getresult()]
365+
self.numFiles = len(self.allFilesToDownload)
366+
337367
def confirmWorkers(self):
338368
if self.numFiles < self.options.tasks:
339369
self.numWorkers = self.numFiles
340370
else:
341371
self.numWorkers = self.options.tasks
342372

343-
def startLoadFiles(self):
373+
def startUploadFiles(self):
344374
"""
345-
startLoadFiles
375+
startUploadFiles
346376
"""
377+
if self.options.input_file is None:
378+
self.log(self.ERROR, '--input-file must be set in upload mode')
379+
347380
self.pool = WorkerPool(numWorkers=self.numWorkers, should_stop=self.options.stop_on_error)
348381

349382
srcfile = None
@@ -357,7 +390,7 @@ class gpdirtableload:
357390
self.log(self.ERROR, 'cannot find greenplum environment ' +
358391
'file: environment misconfigured')
359392

360-
cmdstrbase = "source %s ;"
393+
cmdstrbase = "source %s ;" % srcfile
361394

362395
cmdstrbase += "export PGPASSWORD=%s ; psql " % self.options.password
363396

@@ -401,13 +434,94 @@ class gpdirtableload:
401434
self.pool.haltWork()
402435
self.pool.joinWorkers()
403436

404-
def run2(self):
437+
def startDownloadFiles(self):
438+
"""
439+
startDownloadFiles
440+
"""
441+
self.pool = WorkerPool(numWorkers=self.numWorkers, should_stop=self.options.stop_on_error)
442+
443+
if not self.options.dest_path:
444+
self.log(self.ERROR, 'dest-path is not set.')
445+
if (not os.path.exists(self.options.dest_path)):
446+
self.log(self.ERROR, 'Directory %s does not exist.' % self.options.dest_path)
447+
if (not os.path.isdir(self.options.dest_path)):
448+
self.log(self.ERROR, 'File path %s is not a directory.' %self.options.dest_path)
449+
450+
srcfile = None
451+
if os.environ.get('GPHOME_LOADERS'):
452+
srcfile = os.path.join(os.environ.get('GPHOME_LOADERS'),
453+
'greenplum_loaders_path.sh')
454+
elif os.environ.get('GPHOME'):
455+
srcfile = os.path.join(os.environ.get('GPHOME'),
456+
'greenplum_path.sh')
457+
if (not (srcfile and os.path.exists(srcfile))):
458+
self.log(self.ERROR, 'cannot find greenplum environment ' +
459+
'file: environment misconfigured')
460+
461+
cmdstrbase = "source %s ;" % srcfile
462+
463+
cmdstrbase += "export PGPASSWORD=%s ; psql " % self.options.password
464+
465+
if self.options.database != None:
466+
cmdstrbase += "-d %s " % self.options.database
467+
if self.options.host != None:
468+
cmdstrbase += "-h %s " % self.options.host
469+
if self.options.port != 0:
470+
cmdstrbase += "-p %d " % self.options.port
471+
if self.options.user != None:
472+
cmdstrbase += "-U %s " % self.options.user
473+
474+
try:
475+
for file in self.allFilesToDownload:
476+
fullpath = self.options.dest_path + '/' + file
477+
if (os.path.exists(fullpath) and not self.options.force_write):
478+
if (not os.path.isdir(fullpath)):
479+
continue
480+
else:
481+
self.log(self.ERROR, 'file directory %s has existed' % fullpath)
482+
483+
filedir = os.path.dirname(fullpath)
484+
if (not os.path.exists(filedir)):
485+
os.makedirs(filedir, exist_ok=True)
486+
487+
cmdstr = cmdstrbase
488+
cmdstr += '-c \"copy binary directory table %s \'%s\' to \'%s\' \"' % (self.options.table, file, fullpath)
489+
490+
cmd = Command(name='download directory table', ctxt=LOCAL, cmdStr=cmdstr)
491+
self.pool.addCommand(cmd)
492+
self.pool.join()
493+
items = self.pool.getCompletedItems()
494+
for i in items:
495+
if not i.was_successful():
496+
self.log(self.ERROR, 'failed download directory table %s to %s, msg:%s' %
497+
(self.options.table, self.options.dest_path, i.get_results().stderr))
498+
self.pool.check_results()
499+
except Exception as err:
500+
self.log(self.ERROR, 'errors in job:')
501+
self.log(self.ERROR, err.__str__())
502+
self.log(self.ERROR, 'exiting early')
503+
finally:
504+
self.pool.haltWork()
505+
self.pool.joinWorkers()
506+
507+
def run_upload(self):
405508
try:
406509
start = time.time()
407510
self.collectAllFiles()
408511
self.confirmWorkers()
409512
self.setup_connection()
410-
self.startLoadFiles()
513+
self.startUploadFiles()
514+
self.log(self.INFO, 'running time: %.2f seconds' % (time.time() - start))
515+
except Exception as e:
516+
raise
517+
518+
def run_download(self):
519+
try:
520+
start = time.time()
521+
self.setup_connection()
522+
self.collectAllFilesToDownload()
523+
self.confirmWorkers()
524+
self.startDownloadFiles()
411525
self.log(self.INFO, 'running time: %.2f seconds' % (time.time() - start))
412526
except Exception as e:
413527
raise
@@ -422,24 +536,27 @@ class gpdirtableload:
422536
signal.signal(signal.SIGHUP, signal.SIG_IGN)
423537

424538
try:
425-
try:
426-
self.run2()
427-
except Exception:
428-
traceback.print_exc(file=self.logfile)
429-
self.logfile.flush()
430-
self.exitValue = 2
431-
if (self.options.qv > self.INFO):
432-
traceback.print_exc()
433-
else:
434-
self.log(self.ERROR, "unexpected error -- backtrace " +
435-
"written to log file")
539+
if self.upload == True:
540+
self.run_upload()
541+
else:
542+
self.run_download()
543+
except (Exception, SystemExit):
544+
traceback.print_exc(file=self.logfile)
545+
self.logfile.flush()
546+
self.exitValue = 2
547+
if (self.options.qv > self.INFO):
548+
traceback.print_exc()
549+
else:
550+
self.log(self.ERROR, "unexpected error -- backtrace " +
551+
"written to log file")
436552
finally:
437553
if self.exitValue == 0:
438554
self.log(self.INFO, 'gpdirtableload succeeded')
439555
elif self.exitValue == 1:
440556
self.log(self.INFO, 'gpdirtableload succeeded with warnings')
441557
else:
442558
self.log(self.INFO, 'gpdirtableload failed')
559+
os._exit(self.exitValue)
443560

444561

445562
if __name__ == '__main__':

src/backend/commands/copy.c

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -472,9 +472,18 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
472472
*/
473473
PG_TRY();
474474
{
475-
cstate = BeginCopyTo(pstate, rel, query, relid,
476-
stmt->filename, stmt->is_program,
477-
stmt->attlist, options);
475+
if (rel && rel->rd_rel->relkind == RELKIND_DIRECTORY_TABLE)
476+
{
477+
cstate = BeginCopyToDirectoryTable(pstate, stmt->filename, stmt->dirfilename,
478+
rel, stmt->is_program, options);
479+
}
480+
481+
else
482+
{
483+
cstate = BeginCopyTo(pstate, rel, query, relid,
484+
stmt->filename, stmt->is_program,
485+
stmt->attlist, options);
486+
}
478487

479488
/*
480489
* "copy t to file on segment" CopyDispatchOnSegment

0 commit comments

Comments
 (0)