3636#include "cdb/cdbaocsam.h"
3737#include "cdb/cdbappendonlyam.h"
3838#include "cdb/cdbdisp.h"
39+ #include "cdb/cdbhash.h"
40+ #include "cdb/cdbutil.h"
3941#include "cdb/cdbvars.h"
4042#include "commands/copy.h"
4143#include "commands/copyfrom_internal.h"
@@ -726,7 +728,7 @@ formDirTableSlot(CopyFromState cstate,
726728 char * field [5 ];
727729 FmgrInfo * in_functions = cstate -> in_functions ;
728730 Oid * typioparams = cstate -> typioparams ;
729- List * attnumlist = cstate -> qd_attnumlist ;
731+ List * attnumlist = cstate -> attnumlist ;
730732 pg_time_t stampTime = (pg_time_t ) time (NULL );
731733 char lastModified [128 ];
732734
@@ -783,6 +785,7 @@ CopyFromDirectoryTable(CopyFromState cstate)
783785 char buffer [DIR_FILE_BUFF_SIZE ];
784786 int64 processed = 0 ;
785787 int64 fileSize = 0 ;
788+ UFile * file ;
786789 CdbCopy * cdbCopy = NULL ;
787790 char * dirTablePath ;
788791 char * orgiFileName ;
@@ -795,6 +798,7 @@ CopyFromDirectoryTable(CopyFromState cstate)
795798 DirectoryTable * dirTable ;
796799 pg_cryptohash_ctx * hashCtx ;
797800 uint8 md5Sum [MD5_DIGEST_LENGTH ];
801+ char errorMessage [256 ];
798802 GpDistributionData * distData = NULL ; /* distribution data used to compute target seg */
799803
800804 /*
@@ -989,8 +993,6 @@ CopyFromDirectoryTable(CopyFromState cstate)
989993 List * recheckIndexes = NIL ;
990994 CommandId mycid = GetCurrentCommandId (true);
991995 MemoryContext oldcontext = CurrentMemoryContext ;
992- char errorMessage [256 ];
993- UFile * file ;
994996 bool has_tuple = false;
995997 bool update_indexes ;
996998
@@ -1164,9 +1166,124 @@ CopyFromDirectoryTable(CopyFromState cstate)
11641166 }
11651167 else
11661168 {
1167- ereport (ERROR ,
1168- (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
1169- errmsg ("This copy from dispatch mode is not supported." )));
1169+ List * recheckIndexes = NIL ;
1170+ CommandId mycid = GetCurrentCommandId (true);
1171+ bool update_indexes ;
1172+
1173+ formDirTableSlot (cstate ,
1174+ dirTable -> spcId ,
1175+ relaFileName ,
1176+ 0 ,
1177+ NULL ,
1178+ cstate -> opts .tags ,
1179+ myslot -> tts_values ,
1180+ myslot -> tts_isnull );
1181+ ExecStoreVirtualTuple (myslot );
1182+
1183+ /* OK, store the tuple and create index entries for it */
1184+ table_tuple_insert (resultRelInfo -> ri_RelationDesc ,
1185+ myslot , mycid , 0 , NULL );
1186+
1187+ recheckIndexes = ExecInsertIndexTuples (resultRelInfo ,
1188+ myslot ,
1189+ estate ,
1190+ false,
1191+ false,
1192+ NULL ,
1193+ NIL );
1194+
1195+ /* AFTER ROW INSERT Triggers */
1196+ ExecARInsertTriggers (estate , resultRelInfo , myslot ,
1197+ recheckIndexes , cstate -> transition_capture );
1198+
1199+ list_free (recheckIndexes );
1200+
1201+ CommandCounterIncrement ();
1202+
1203+ if (UFileExists (dirTable -> spcId , orgiFileName ))
1204+ {
1205+ UFileUnlink (dirTable -> spcId , orgiFileName );
1206+ }
1207+
1208+ if (orgiFileDir )
1209+ UFileEnsurePath (dirTable -> spcId , orgiFileDir );
1210+
1211+ file = UFileOpen (dirTable -> spcId ,
1212+ orgiFileName ,
1213+ O_CREAT | O_WRONLY ,
1214+ errorMessage ,
1215+ sizeof (errorMessage ));
1216+
1217+ if (file == NULL )
1218+ ereport (ERROR ,
1219+ (errcode (ERRCODE_IO_ERROR ),
1220+ errmsg ("failed to open file \"%s\": %s" , orgiFileName , errorMessage )));
1221+
1222+ /* Delete uploaded file when the transaction fails */
1223+ UFileAddPendingDelete (cstate -> rel , dirTable -> spcId , orgiFileName , false);
1224+
1225+ hashCtx = pg_cryptohash_create (PG_MD5 );
1226+ if (hashCtx == NULL )
1227+ ereport (ERROR ,
1228+ (errcode (ERRCODE_OUT_OF_MEMORY ),
1229+ errmsg ("failed to create md5hash context: out of memory" )));
1230+ pg_cryptohash_init (hashCtx );
1231+
1232+ for (;;)
1233+ {
1234+ CHECK_FOR_INTERRUPTS ();
1235+
1236+ bytesRead = CopyReadBinaryData (cstate , buffer , DIR_FILE_BUFF_SIZE );
1237+
1238+ if (bytesRead > 0 )
1239+ {
1240+ if (UFileWrite (file , buffer , bytesRead ) == -1 )
1241+ ereport (ERROR ,
1242+ (errcode (ERRCODE_IO_ERROR ),
1243+ errmsg ("failed to write file \"%s\": %s" , orgiFileName , UFileGetLastError (file ))));
1244+
1245+ fileSize += bytesRead ;
1246+ pg_cryptohash_update (hashCtx , (const uint8 * ) buffer , bytesRead );
1247+ }
1248+
1249+ if (bytesRead != DIR_FILE_BUFF_SIZE )
1250+ {
1251+ break ;
1252+ }
1253+ }
1254+
1255+ if (UFileSync (file ) != 0 )
1256+ ereport (ERROR ,
1257+ (errcode (ERRCODE_IO_ERROR ),
1258+ errmsg ("unable to sync file \"%s\": %s" , glob_copystmt -> dirfilename , UFileGetLastError (file ))));
1259+
1260+ UFileClose (file );
1261+
1262+ pg_cryptohash_final (hashCtx , md5Sum , sizeof (md5Sum ));
1263+ pg_cryptohash_free (hashCtx );
1264+ bytesToHex (md5Sum , hexMd5Sum );
1265+
1266+ myslot -> tts_values [1 ] = Int64GetDatum (fileSize );
1267+ myslot -> tts_values [3 ] = CStringGetTextDatum ((char * ) hexMd5Sum );
1268+ myslot -> tts_isnull [3 ] = false;
1269+
1270+ simple_table_tuple_update (resultRelInfo -> ri_RelationDesc , & myslot -> tts_tid , myslot ,
1271+ estate -> es_snapshot , & update_indexes );
1272+
1273+ ExecClearTuple (myslot );
1274+
1275+ /*
1276+ * We count only tuples not suppressed by a BEFORE INSERT trigger
1277+ * or FDW; this is the same definition used by nodeModifyTable.c
1278+ * for counting tuples inserted by an INSERT command. Update
1279+ * progress of the COPY command as well.
1280+ *
1281+ * MPP: incrementing this counter here only matters for utility
1282+ * mode. in dispatch mode only the dispatcher COPY collects row
1283+ * count, so this counter is meaningless.
1284+ */
1285+ pgstat_progress_update_param (PROGRESS_COPY_TUPLES_PROCESSED ,
1286+ ++ processed );
11701287 }
11711288
11721289 cstate -> filename = NULL ;
@@ -1273,6 +1390,8 @@ BeginCopyFromDirectoryTable(ParseState *pstate,
12731390 cstate -> dispatch_mode = COPY_DISPATCH ;
12741391 else if (Gp_role == GP_ROLE_EXECUTE )
12751392 cstate -> dispatch_mode = COPY_EXECUTOR ;
1393+ else
1394+ cstate -> dispatch_mode = COPY_DIRECT ;
12761395
12771396 cstate -> cur_relname = RelationGetRelationName (cstate -> rel );
12781397 cstate -> cur_lineno = 0 ;
0 commit comments