@@ -1367,6 +1367,18 @@ void Phase2_ConvertToSQL()
13671367
13681368 void Phase2_ProcessRowData ( RowData rowData , ref string currentInsertHeader , StringBuilder currentSqlBuilder , ref long currentSqlByteLength , ref bool isFirstRowInStatement , RowsDataExportMode exportMode )
13691369 {
1370+ if ( exportMode == RowsDataExportMode . Update )
1371+ {
1372+ Phase2_ProcessRowData_Update ( rowData , currentSqlBuilder ) ;
1373+ return ;
1374+ }
1375+
1376+ if ( exportMode == RowsDataExportMode . OnDuplicateKeyUpdate )
1377+ {
1378+ Phase2_ProcessRowData_OnDuplicateKeyUpdate ( rowData , ref currentInsertHeader , currentSqlBuilder ) ;
1379+ return ;
1380+ }
1381+
13701382 // Generate insert header if needed
13711383 if ( currentInsertHeader == null )
13721384 {
@@ -1435,6 +1447,172 @@ void Phase2_ProcessRowData(RowData rowData, ref string currentInsertHeader, Stri
14351447 currentSqlByteLength += sqlValueByteLength ;
14361448 }
14371449
1450+ void Phase2_ProcessRowData_Update ( RowData rowData , StringBuilder currentSqlBuilder )
1451+ {
1452+ MySqlTable table = rowData . Table ;
1453+
1454+ // If all columns are primary keys or all are non-primary, fall back to INSERT
1455+ bool allPrimaryField = true ;
1456+ bool allNonPrimaryField = true ;
1457+ foreach ( var col in table . Columns )
1458+ {
1459+ if ( ! col . IsPrimaryKey ) allPrimaryField = false ;
1460+ if ( col . IsPrimaryKey ) allNonPrimaryField = false ;
1461+ }
1462+
1463+ if ( allPrimaryField || allNonPrimaryField )
1464+ {
1465+ // Emit as a single-row INSERT
1466+ string header = Export_GetInsertStatementHeaderFromRowData ( RowsDataExportMode . Insert , rowData ) ;
1467+ StringBuilder sbVal = new StringBuilder ( ) ;
1468+ Export_GetValueStringFromRowData ( rowData , sbVal ) ;
1469+
1470+ currentSqlBuilder . Append ( header ) ;
1471+ currentSqlBuilder . Append ( sbVal ) ;
1472+ currentSqlBuilder . AppendLine ( ";" ) ;
1473+
1474+ _sqlStatementCollection . Add ( new SqlStatement
1475+ {
1476+ SQL = currentSqlBuilder . ToString ( ) ,
1477+ IsComplete = true ,
1478+ FlushRequired = false
1479+ } ) ;
1480+ currentSqlBuilder . Clear ( ) ;
1481+ return ;
1482+ }
1483+
1484+ // UPDATE `table` SET col=val,... WHERE pk=val AND ...;
1485+ currentSqlBuilder . Append ( "UPDATE `" ) ;
1486+ currentSqlBuilder . Append ( QueryExpress . EscapeIdentifier ( rowData . TableName ) ) ;
1487+ currentSqlBuilder . Append ( "` SET " ) ;
1488+ Phase2_GetUpdateStringFromRowData ( rowData , currentSqlBuilder ) ;
1489+ currentSqlBuilder . Append ( " WHERE " ) ;
1490+ Phase2_GetConditionStringFromRowData ( rowData , currentSqlBuilder ) ;
1491+ currentSqlBuilder . AppendLine ( ";" ) ;
1492+
1493+ _sqlStatementCollection . Add ( new SqlStatement
1494+ {
1495+ SQL = currentSqlBuilder . ToString ( ) ,
1496+ IsComplete = true ,
1497+ FlushRequired = false
1498+ } ) ;
1499+ currentSqlBuilder . Clear ( ) ;
1500+ }
1501+
1502+ void Phase2_ProcessRowData_OnDuplicateKeyUpdate ( RowData rowData , ref string currentInsertHeader , StringBuilder currentSqlBuilder )
1503+ {
1504+ MySqlTable table = rowData . Table ;
1505+
1506+ // If all columns are primary keys, fall back to INSERT (no non-PK columns to update)
1507+ bool allPrimaryField = true ;
1508+ foreach ( var col in table . Columns )
1509+ {
1510+ if ( ! col . IsPrimaryKey )
1511+ {
1512+ allPrimaryField = false ;
1513+ break ;
1514+ }
1515+ }
1516+
1517+ if ( allPrimaryField )
1518+ {
1519+ // Emit as a single-row INSERT
1520+ string header = Export_GetInsertStatementHeaderFromRowData ( RowsDataExportMode . Insert , rowData ) ;
1521+ StringBuilder sbVal = new StringBuilder ( ) ;
1522+ Export_GetValueStringFromRowData ( rowData , sbVal ) ;
1523+
1524+ currentSqlBuilder . Append ( header ) ;
1525+ currentSqlBuilder . Append ( sbVal ) ;
1526+ currentSqlBuilder . AppendLine ( ";" ) ;
1527+
1528+ _sqlStatementCollection . Add ( new SqlStatement
1529+ {
1530+ SQL = currentSqlBuilder . ToString ( ) ,
1531+ IsComplete = true ,
1532+ FlushRequired = false
1533+ } ) ;
1534+ currentSqlBuilder . Clear ( ) ;
1535+ return ;
1536+ }
1537+
1538+ // INSERT INTO `table`(cols) VALUES(vals) ON DUPLICATE KEY UPDATE col=val,...;
1539+ if ( currentInsertHeader == null )
1540+ {
1541+ currentInsertHeader = Export_GetInsertStatementHeaderFromRowData ( RowsDataExportMode . Insert , rowData ) ;
1542+ }
1543+
1544+ currentSqlBuilder . Append ( currentInsertHeader ) ;
1545+
1546+ StringBuilder sbValue = new StringBuilder ( ) ;
1547+ Export_GetValueStringFromRowData ( rowData , sbValue ) ;
1548+ currentSqlBuilder . Append ( sbValue ) ;
1549+
1550+ currentSqlBuilder . Append ( " ON DUPLICATE KEY UPDATE " ) ;
1551+ Phase2_GetUpdateStringFromRowData ( rowData , currentSqlBuilder ) ;
1552+ currentSqlBuilder . AppendLine ( ";" ) ;
1553+
1554+ _sqlStatementCollection . Add ( new SqlStatement
1555+ {
1556+ SQL = currentSqlBuilder . ToString ( ) ,
1557+ IsComplete = true ,
1558+ FlushRequired = false
1559+ } ) ;
1560+ currentSqlBuilder . Clear ( ) ;
1561+ }
1562+
1563+ void Phase2_GetUpdateStringFromRowData ( RowData rowData , StringBuilder sb )
1564+ {
1565+ bool isFirst = true ;
1566+
1567+ for ( int i = 0 ; i < rowData . ColumnNames . Length ; i ++ )
1568+ {
1569+ string columnName = rowData . ColumnNames [ i ] ;
1570+ var col = rowData . Table . Columns [ columnName ] ;
1571+
1572+ if ( col . IsGeneratedColumn )
1573+ continue ;
1574+
1575+ if ( ! col . IsPrimaryKey )
1576+ {
1577+ if ( isFirst )
1578+ isFirst = false ;
1579+ else
1580+ sb . Append ( "," ) ;
1581+
1582+ sb . Append ( "`" ) ;
1583+ sb . Append ( QueryExpress . EscapeIdentifier ( columnName ) ) ;
1584+ sb . Append ( "`=" ) ;
1585+
1586+ QueryExpress . ConvertToSqlFormat ( sb , rowData . Values [ i ] , col , true , true ) ;
1587+ }
1588+ }
1589+ }
1590+
1591+ void Phase2_GetConditionStringFromRowData ( RowData rowData , StringBuilder sb )
1592+ {
1593+ bool isFirst = true ;
1594+
1595+ for ( int i = 0 ; i < rowData . ColumnNames . Length ; i ++ )
1596+ {
1597+ string columnName = rowData . ColumnNames [ i ] ;
1598+ var col = rowData . Table . Columns [ columnName ] ;
1599+
1600+ if ( col . IsPrimaryKey )
1601+ {
1602+ if ( isFirst )
1603+ isFirst = false ;
1604+ else
1605+ sb . Append ( " and " ) ;
1606+
1607+ sb . Append ( "`" ) ;
1608+ sb . Append ( QueryExpress . EscapeIdentifier ( columnName ) ) ;
1609+ sb . Append ( "`=" ) ;
1610+
1611+ QueryExpress . ConvertToSqlFormat ( sb , rowData . Values [ i ] , col , true , true ) ;
1612+ }
1613+ }
1614+ }
1615+
14381616 void Phase3_WriteOutput ( )
14391617 {
14401618 try
@@ -2704,70 +2882,59 @@ void ReportProgress()
27042882 public void StopAllProcess ( )
27052883 {
27062884 stopProcess = true ;
2707- Command ? . Cancel ( ) ;
2708- timerReport ? . Stop ( ) ;
2885+ try { Command ? . Cancel ( ) ; } catch { }
2886+ try { timerReport ? . Stop ( ) ; } catch { }
27092887
2710- // Signal BlockingCollections to stop
27112888 try
27122889 {
2713- _rowDataCollection ? . CompleteAdding ( ) ;
2714- _sqlStatementCollection ? . CompleteAdding ( ) ;
2715- _importQueryCollection ? . CompleteAdding ( ) ;
2890+ if ( _rowDataCollection != null && ! _rowDataCollection . IsAddingCompleted )
2891+ _rowDataCollection . CompleteAdding ( ) ;
27162892 }
2717- catch
2893+ catch { }
2894+
2895+ try
2896+ {
2897+ if ( _sqlStatementCollection != null && ! _sqlStatementCollection . IsAddingCompleted )
2898+ _sqlStatementCollection . CompleteAdding ( ) ;
2899+ }
2900+ catch { }
2901+
2902+ try
27182903 {
2719- // Ignore exceptions during cleanup
2904+ if ( _importQueryCollection != null && ! _importQueryCollection . IsAddingCompleted )
2905+ _importQueryCollection . CompleteAdding ( ) ;
27202906 }
2907+ catch { }
27212908 }
27222909
27232910 // Override Dispose to handle parallel processing cleanup
27242911 public void Dispose ( )
27252912 {
2726- StopAllProcess ( ) ;
2913+ try { StopAllProcess ( ) ; } catch { }
27272914
27282915 // Wait for tasks to complete with timeout
27292916 try
27302917 {
2731- if ( _phase1Task != null || _phase2Task != null || _phase3Task != null || _importPhase1Task != null || _importPhase2Task != null )
2732- {
2733- var tasks = new List < Task > ( ) ;
2734- if ( _phase1Task != null ) tasks . Add ( _phase1Task ) ;
2735- if ( _phase2Task != null ) tasks . Add ( _phase2Task ) ;
2736- if ( _phase3Task != null ) tasks . Add ( _phase3Task ) ;
2737- if ( _importPhase1Task != null ) tasks . Add ( _importPhase1Task ) ;
2738- if ( _importPhase2Task != null ) tasks . Add ( _importPhase2Task ) ;
2739-
2918+ var tasks = new List < Task > ( ) ;
2919+ if ( _phase1Task != null ) tasks . Add ( _phase1Task ) ;
2920+ if ( _phase2Task != null ) tasks . Add ( _phase2Task ) ;
2921+ if ( _phase3Task != null ) tasks . Add ( _phase3Task ) ;
2922+ if ( _importPhase1Task != null ) tasks . Add ( _importPhase1Task ) ;
2923+ if ( _importPhase2Task != null ) tasks . Add ( _importPhase2Task ) ;
2924+ if ( tasks . Count > 0 )
27402925 Task . WaitAll ( tasks . ToArray ( ) , TimeSpan . FromSeconds ( 10 ) ) ;
2741- }
27422926 }
2743- catch
2744- {
2745- // Ignore timeout exceptions
2746- }
2747-
2748- // Dispose BlockingCollections
2749- _rowDataCollection ? . Dispose ( ) ;
2750- _sqlStatementCollection ? . Dispose ( ) ;
2751- _importQueryCollection ? . Dispose ( ) ;
2927+ catch { }
27522928
2753- if ( timerReport != null )
2754- {
2755- timerReport . Stop ( ) ;
2756- timerReport . Dispose ( ) ;
2757- timerReport = null ;
2758- }
2929+ try { _rowDataCollection ? . Dispose ( ) ; } catch { }
2930+ try { _sqlStatementCollection ? . Dispose ( ) ; } catch { }
2931+ try { _importQueryCollection ? . Dispose ( ) ; } catch { }
27592932
2760- if ( textWriter != null )
2761- {
2762- textWriter . Dispose ( ) ;
2763- textWriter = null ;
2764- }
2933+ try { timerReport ? . Stop ( ) ; } catch { }
2934+ try { timerReport ? . Dispose ( ) ; timerReport = null ; } catch { }
27652935
2766- if ( textReader != null )
2767- {
2768- textReader . Dispose ( ) ;
2769- textReader = null ;
2770- }
2936+ try { textWriter ? . Dispose ( ) ; textWriter = null ; } catch { }
2937+ try { textReader ? . Dispose ( ) ; textReader = null ; } catch { }
27712938 }
27722939 }
27732940}
0 commit comments