diff --git a/Cargo.toml b/Cargo.toml index 03e40b4e..bf4f1cb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -254,6 +254,7 @@ futures-util = { workspace = true } paste = { workspace = true } serde = { workspace = true } time = { workspace = true } +tokio = { workspace = true } url = { workspace = true } rand = { workspace = true } diff --git a/tests/etl.rs b/tests/etl.rs index 140d4738..22c4d0df 100644 --- a/tests/etl.rs +++ b/tests/etl.rs @@ -17,44 +17,30 @@ use sqlx_exasol::{ const NUM_ROWS: usize = 500_000; -test_etl_single_threaded!( - "simple", - "TEST_ETL", - ExportBuilder::new_from_table("TEST_ETL", None), - ImportBuilder::new("TEST_ETL", None) -); - -test_etl_multi_threaded!( +test_etl!( "simple", "TEST_ETL", ExportBuilder::new_from_table("TEST_ETL", None), ImportBuilder::new("TEST_ETL", None) ); -test_etl_single_threaded!( +test_etl!( "query_export", "TEST_ETL", ExportBuilder::new_from_query("SELECT * FROM TEST_ETL"), ImportBuilder::new("TEST_ETL", None) ); -test_etl_single_threaded!( - "multiple_workers", - 0, - "TEST_ETL", - ExportBuilder::new_from_table("TEST_ETL", None), - ImportBuilder::new("TEST_ETL", None) -); - -test_etl_multi_threaded!( +test_etl!( "multiple_workers", 0, "TEST_ETL", + |(r, w)| pipe(r, w), ExportBuilder::new_from_table("TEST_ETL", None), ImportBuilder::new("TEST_ETL", None) ); -test_etl_single_threaded!( +test_etl!( "all_arguments", "TEST_ETL", ExportBuilder::new_from_table("TEST_ETL", None) @@ -81,7 +67,6 @@ test_etl_single_threaded!( ); test_etl!( - "single_threaded", "writer_flush_first", 1, "TEST_ETL", @@ -143,6 +128,69 @@ async fn test_etl_with_schema( } } +#[ignore] +#[cfg(feature = "runtime-tokio")] +#[sqlx_exasol::test] +async fn test_etl_multi_threaded( + pool_opts: PoolOptions, + exa_opts: ExaConnectOptions, +) -> Result<(), BoxDynError> { + { + let pool = pool_opts.min_connections(2).connect_with(exa_opts).await?; + let mut conn1 = pool.acquire().await?; + let mut conn2 = pool.acquire().await?; + + sqlx_exasol::query("CREATE TABLE TEST_ETL ( col VARCHAR(200) );") + .execute(&mut *conn1) + .await?; + + sqlx_exasol::query("INSERT INTO TEST_ETL VALUES (?)") + .bind(vec!["dummy"; NUM_ROWS]) + .execute(&mut *conn1) + .await?; + + let schema = conn1.attributes().current_schema().unwrap().to_owned(); + + let (export_fut, readers) = (ExportBuilder::new_from_table("TEST_ETL", Some(&schema))) + .build(&mut conn1) + .await?; + + let (import_fut, writers) = (ImportBuilder::new("TEST_ETL", Some(&schema))) + .build(&mut conn2) + .await?; + + let handle = std::thread::spawn(move || { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + iter::zip(readers, writers) + .map(|(r, w)| tokio::spawn(pipe(r, w))) + .collect::>() + .join_all() + .await + }) + }); + + let (export_res, import_res) = try_join(export_fut, import_fut).await?; + for res in handle.join().unwrap() { + res??; + } + + assert_eq!(NUM_ROWS as u64, export_res.rows_affected(), "exported rows"); + assert_eq!(NUM_ROWS as u64, import_res.rows_affected(), "imported rows"); + + let num_rows: i64 = sqlx_exasol::query_scalar("SELECT COUNT(*) FROM TEST_ETL") + .fetch_one(&mut *conn1) + .await?; + + assert_eq!(num_rows, 2 * NUM_ROWS as i64, "export + import rows"); + + Ok(()) + } +} + // ########################################## // ################ Failures ################ // ########################################## diff --git a/tests/macros.rs b/tests/macros.rs index decfd214..ad17312a 100644 --- a/tests/macros.rs +++ b/tests/macros.rs @@ -135,11 +135,15 @@ macro_rules! test_type_invalid { #[macro_export] macro_rules! test_etl { - ($kind:literal, $name:literal, $num_workers:expr, $table:literal, $proc:expr, $export:expr, $import:expr) => { + ($name:literal, $table:literal, $export:expr, $import:expr) => { + $crate::test_etl!($name, 1, $table, |(r, w)| pipe(r, w), $export, $import); + }; + + ($name:literal, $num_workers:expr, $table:literal, $proc:expr, $export:expr, $import:expr) => { paste::item! { #[ignore] #[sqlx_exasol::test] - async fn [< test_etl_ $kind _ $name >](pool_opts: PoolOptions, exa_opts: ExaConnectOptions) -> Result<(), BoxDynError> { + async fn [< test_etl_ $name >](pool_opts: PoolOptions, exa_opts: ExaConnectOptions) -> Result<(), BoxDynError> { let pool = pool_opts.min_connections(2).connect_with(exa_opts).await?; let mut conn1 = pool.acquire().await?; @@ -179,44 +183,6 @@ macro_rules! test_etl { }; } -#[macro_export] -macro_rules! test_etl_single_threaded { - ($name:literal, $table:literal, $export:expr, $import:expr) => { - $crate::test_etl_single_threaded!($name, 1, $table, $export, $import); - }; - - ($name:literal, $num_workers:expr, $table:literal, $export:expr, $import:expr) => { - $crate::test_etl!( - "single_threaded", - $name, - $num_workers, - $table, - |(r, w)| pipe(r, w), - $export, - $import - ); - }; -} - -#[macro_export] -macro_rules! test_etl_multi_threaded { - ($name:literal, $table:literal, $export:expr, $import:expr) => { - $crate::test_etl_multi_threaded!($name, 1, $table, $export, $import); - }; - - ($name:literal, $num_workers:expr, $table:literal, $export:expr, $import:expr) => { - $crate::test_etl!( - "multi_threaded", - $name, - $num_workers, - $table, - |(r, w)| sqlx_exasol::__rt::spawn(pipe(r, w)), - $export, - $import - ); - }; -} - #[macro_export] macro_rules! test_compile_time_type { ($col:ident, $ty:ty, $value:expr, $insert:expr, $select:expr) => {