diff --git a/.github/actions/exa-cluster/action.yaml b/.github/actions/exa-cluster/action.yaml index ff7f3959..ef6246f7 100644 --- a/.github/actions/exa-cluster/action.yaml +++ b/.github/actions/exa-cluster/action.yaml @@ -69,14 +69,16 @@ runs: sudo openssl req -x509 -new -nodes -key ~/sqlx1/etc/ssl/ssl.ca.key -sha256 -days 3650 -out ~/sqlx1/etc/ssl/ssl.ca -subj "/CN=exacluster.local" sudo openssl req -new -key ~/sqlx1/etc/ssl/ssl.key -out ~/ssl.csr -subj "/CN=exacluster.local" - sudo openssl x509 -req -in ~/ssl.csr -CA ~/sqlx1/etc/ssl/ssl.ca -CAkey ~/sqlx1/etc/ssl/ssl.ca.key -CAcreateserial -out ~/sqlx1/etc/ssl/ssl.crt -days 365 -sha256 -extfile ~/v3.ext + sudo openssl x509 -req -in ~/ssl.csr -CA ~/sqlx1/etc/ssl/ssl.ca -CAkey ~/sqlx1/etc/ssl/ssl.ca.key -CAcreateserial \ + -out ~/sqlx1/etc/ssl/ssl.crt -days 365 -sha256 -extfile ~/v3.ext shell: bash - name: Start Exasol cluster run: | for (( i=1; i<=${{ inputs.num-nodes }}; i++ )); do - docker run --name sqlx$i --detach --network=sqlx --ip 10.10.10.1$i --privileged --stop-timeout 120 -v ~/sqlx$i:/exa exasol/docker-db:${{ inputs.exasol-version }} init-sc --node-id 1$i + docker run --name sqlx$i --detach --network=sqlx --ip 10.10.10.1$i --privileged --stop-timeout 120 \ + -v ~/sqlx$i:/exa exasol/docker-db:${{ inputs.exasol-version }} init-sc --node-id 1$i done shell: bash @@ -88,7 +90,11 @@ runs: FINGERPRINT=`sudo openssl x509 -in ~/sqlx1/etc/ssl/ssl.crt -outform DER | sudo openssl dgst -sha256 | awk '{print $2}'` while [[ $RC -ne 0 ]]; do - docker exec -e FINGERPRINT="$FINGERPRINT" sqlx1 /bin/bash -c '`find /usr/opt -name exaplus | head -1` -c localhost/$FINGERPRINT:8563 -u sys -p exasol -sql "SELECT * FROM DUAL;"' &>/dev/null + docker exec -e FINGERPRINT="$FINGERPRINT" sqlx1 /bin/bash -c ' + $(which exaplus || find /usr/opt -name exaplus | head -1) \ + -c localhost/$FINGERPRINT:8563 -u sys -p exasol -sql "SELECT * FROM DUAL;" + ' &>/dev/null + RC=$? sleep 5 done diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 306951f7..bb2c2807 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -14,7 +14,7 @@ env: EXASOL_VERSION: latest-7.1 NUM_NODES: 4 TESTS_TIMEOUT: 15 - EXA_CLUSTER_SETUP_TIMEOUT: 15 + EXA_CLUSTER_SETUP_TIMEOUT: 10 jobs: format: @@ -125,6 +125,55 @@ jobs: env: DATABASE_URL: ${{ steps.exa-cluster.outputs.url }} SQLX_OFFLINE: true + + io_tests_latest: + name: IO tests on latest Exasol (TLS only) + needs: clippy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Free disk space + uses: ./.github/actions/free-space + + - name: Create Exasol cluster + timeout-minutes: ${{ fromJSON(env.EXA_CLUSTER_SETUP_TIMEOUT) }} + id: exa-cluster + uses: ./.github/actions/exa-cluster + with: + exasol-version: latest + num-nodes: 1 + + - uses: dtolnay/rust-toolchain@1.86.0 + - uses: Swatinem/rust-cache@v2 + + - name: Test IO combos (NativeTLS, no compression) + timeout-minutes: ${{ fromJSON(env.TESTS_TIMEOUT) }} + run: cargo test --features runtime-tokio,etl,tls-native-tls -- it_works_with_io_combo_tls --ignored --nocapture + env: + DATABASE_URL: ${{ steps.exa-cluster.outputs.url }} + SQLX_OFFLINE: true + + - name: Test IO combos (NativeTLS, with compression) + timeout-minutes: ${{ fromJSON(env.TESTS_TIMEOUT) }} + run: cargo test --features runtime-tokio,etl,tls-native-tls,compression -- it_works_with_io_combo_tls --ignored --nocapture + env: + DATABASE_URL: ${{ steps.exa-cluster.outputs.url }} + SQLX_OFFLINE: true + + - name: Test IO combos (Rustls, no compression) + timeout-minutes: ${{ fromJSON(env.TESTS_TIMEOUT) }} + run: cargo test --features runtime-tokio,etl,tls-rustls-aws-lc-rs -- it_works_with_io_combo_tls --ignored --nocapture + env: + DATABASE_URL: ${{ steps.exa-cluster.outputs.url }} + SQLX_OFFLINE: true + + - name: Test IO combos (Rustls, with compression) + timeout-minutes: ${{ fromJSON(env.TESTS_TIMEOUT) }} + run: cargo test --features runtime-tokio,etl,tls-rustls-aws-lc-rs,compression -- it_works_with_io_combo_tls --ignored --nocapture + env: + DATABASE_URL: ${{ steps.exa-cluster.outputs.url }} + SQLX_OFFLINE: true driver_tests: name: Driver tests diff --git a/Cargo.toml b/Cargo.toml index 9a8c2756..32273a51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,11 +152,16 @@ rustls = { version = "0.23", default-features = false, features = [ "std", "tls12", ] } +semver = { version = "1", default-features = false, features = [ + "serde", + "std", +] } serde = { version = "1", default-features = false, features = ["derive", "rc"] } serde_json = { version = "1", default-features = false, features = [ "std", "raw_value", ] } +sha2 = { version = "0.10", default-features = false, features = ["std"] } sqlx = { version = "0.9.0-alpha.1", default-features = false } sqlx-cli = { version = "0.9.0-alpha.1", default-features = false } sqlx-core = { version = "0.9.0-alpha.1", default-features = false, features = [ diff --git a/sqlx-exasol-impl/Cargo.toml b/sqlx-exasol-impl/Cargo.toml index 5198b959..28a290f0 100644 --- a/sqlx-exasol-impl/Cargo.toml +++ b/sqlx-exasol-impl/Cargo.toml @@ -27,7 +27,7 @@ time = ["sqlx-core/time", "dep:time"] uuid = ["sqlx-core/uuid", "dep:uuid"] # TLS features -tls = ["dep:rcgen"] +tls = ["dep:rcgen", "dep:sha2"] native-tls = [ "dep:native-tls", "sqlx-core/_tls-native-tls", @@ -52,6 +52,7 @@ futures-util = { workspace = true } futures-core = { workspace = true } rand = { workspace = true } rsa = { workspace = true } +semver = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sqlx-core = { workspace = true } @@ -73,6 +74,7 @@ native-tls = { workspace = true, optional = true } rcgen = { workspace = true, optional = true } rustls = { workspace = true, optional = true } rust_decimal = { workspace = true, optional = true } +sha2 = { workspace = true, optional = true } sqlx-macros-core = { workspace = true, optional = true } time = { workspace = true, optional = true } uuid = { workspace = true, optional = true } diff --git a/sqlx-exasol-impl/src/connection/etl/export/options.rs b/sqlx-exasol-impl/src/connection/etl/export/options.rs index 9677f33d..7aa15e2a 100644 --- a/sqlx-exasol-impl/src/connection/etl/export/options.rs +++ b/sqlx-exasol-impl/src/connection/etl/export/options.rs @@ -155,7 +155,13 @@ impl EtlJob for ExportBuilder<'_> { ExportService::new(chan_tx) } - fn query(&self, addrs: Vec, with_tls: bool, with_compression: bool) -> String { + fn query( + &self, + addrs: Vec, + with_tls: bool, + with_compression: bool, + public_key: Option, + ) -> String { let mut query = String::new(); if let Some(comment) = self.comment { @@ -182,7 +188,7 @@ impl EtlJob for ExportBuilder<'_> { query.push(' '); query.push_str(" INTO CSV "); - Self::append_files(&mut query, addrs, with_tls, with_compression); + Self::append_files(&mut query, addrs, with_tls, with_compression, public_key); if let Some(enc) = self.encoding { Self::push_key_value(&mut query, "ENCODING", enc); diff --git a/sqlx-exasol-impl/src/connection/etl/import/options.rs b/sqlx-exasol-impl/src/connection/etl/import/options.rs index 3b8de96f..8e72db04 100644 --- a/sqlx-exasol-impl/src/connection/etl/import/options.rs +++ b/sqlx-exasol-impl/src/connection/etl/import/options.rs @@ -169,7 +169,13 @@ impl EtlJob for ImportBuilder<'_> { ImportService::new(chan_tx) } - fn query(&self, addrs: Vec, with_tls: bool, with_compression: bool) -> String { + fn query( + &self, + addrs: Vec, + with_tls: bool, + with_compression: bool, + public_key: Option, + ) -> String { let mut query = String::new(); if let Some(comment) = self.comment { @@ -199,7 +205,7 @@ impl EtlJob for ImportBuilder<'_> { } query.push_str("FROM CSV "); - Self::append_files(&mut query, addrs, with_tls, with_compression); + Self::append_files(&mut query, addrs, with_tls, with_compression, public_key); if let Some(enc) = self.encoding { Self::push_key_value(&mut query, "ENCODING", enc); diff --git a/sqlx-exasol-impl/src/connection/etl/job/maybe_tls/mod.rs b/sqlx-exasol-impl/src/connection/etl/job/maybe_tls/mod.rs index e67ab818..6c37f969 100644 --- a/sqlx-exasol-impl/src/connection/etl/job/maybe_tls/mod.rs +++ b/sqlx-exasol-impl/src/connection/etl/job/maybe_tls/mod.rs @@ -19,14 +19,17 @@ pub enum WithMaybeTlsSocketMaker { } impl WithMaybeTlsSocketMaker { - #[allow(unused_variables, reason = "conditionally compiled")] - pub fn new(with_tls: bool) -> SqlxResult { + pub fn new( + #[allow(unused_variables, reason = "conditionally compiled")] with_tls: bool, + #[allow(unused_variables, reason = "conditionally compiled")] with_pub_key: bool, + ) -> SqlxResult<(Self, Option)> { #[cfg(feature = "tls")] if with_tls { - return tls::WithTlsSocketMaker::new().map(Self::Tls); + let (wsm, public_key) = tls::WithTlsSocketMaker::new(with_pub_key)?; + return Ok((Self::Tls(wsm), public_key)); } - Ok(Self::NonTls) + Ok((Self::NonTls, None)) } } diff --git a/sqlx-exasol-impl/src/connection/etl/job/maybe_tls/tls/mod.rs b/sqlx-exasol-impl/src/connection/etl/job/maybe_tls/tls/mod.rs index d3309e42..9e62cdf2 100644 --- a/sqlx-exasol-impl/src/connection/etl/job/maybe_tls/tls/mod.rs +++ b/sqlx-exasol-impl/src/connection/etl/job/maybe_tls/tls/mod.rs @@ -6,11 +6,13 @@ mod sync_socket; use std::io; +use base64::{engine::general_purpose::STANDARD as STD_BASE64_ENGINE, Engine}; use rcgen::{CertificateParams, KeyPair}; use rsa::{ pkcs8::{EncodePrivateKey, LineEnding}, RsaPrivateKey, }; +use sha2::{Digest, Sha256}; use sqlx_core::net::{Socket, WithSocket}; use crate::{ @@ -28,7 +30,7 @@ pub enum WithTlsSocketMaker { } impl WithTlsSocketMaker { - pub fn new() -> SqlxResult { + pub fn new(with_pub_key: bool) -> SqlxResult<(WithTlsSocketMaker, Option)> { let bits = 2048; let private_key = RsaPrivateKey::new(&mut rand::thread_rng(), bits).map_err(ToSqlxError::to_sqlx_err)?; @@ -39,16 +41,22 @@ impl WithTlsSocketMaker { .map_err(SqlxError::Tls)?; let key_pair = KeyPair::from_pem(&key).map_err(ToSqlxError::to_sqlx_err)?; + let public_key = with_pub_key + .then(|| key_pair.public_key_der()) + .map(Sha256::digest) + .map(|data| STD_BASE64_ENGINE.encode(data)); let cert = CertificateParams::default() .self_signed(&key_pair) .map_err(ToSqlxError::to_sqlx_err)?; #[cfg(feature = "rustls")] - return rustls::WithRustlsSocketMaker::new(&cert, &key_pair).map(Self::Rustls); + return rustls::WithRustlsSocketMaker::new(&cert, &key_pair) + .map(|wsm| (Self::Rustls(wsm), public_key)); #[cfg(feature = "native-tls")] #[allow(unreachable_code, reason = "conditionally compiled")] - return native_tls::WithNativeTlsSocketMaker::new(&cert, &key_pair).map(Self::NativeTls); + return native_tls::WithNativeTlsSocketMaker::new(&cert, &key_pair) + .map(|wsm| (Self::NativeTls(wsm), public_key)); } } diff --git a/sqlx-exasol-impl/src/connection/etl/job/mod.rs b/sqlx-exasol-impl/src/connection/etl/job/mod.rs index 2e7c60fc..8ca95875 100644 --- a/sqlx-exasol-impl/src/connection/etl/job/mod.rs +++ b/sqlx-exasol-impl/src/connection/etl/job/mod.rs @@ -2,6 +2,7 @@ pub mod maybe_tls; mod socket_addr; use std::{ + fmt::Write, future::Future, io, net::{IpAddr, SocketAddr, SocketAddrV4}, @@ -10,6 +11,7 @@ use std::{ use flume::{Receiver, Sender}; use futures_core::future::BoxFuture; use hyper::server::conn::http1::Connection; +use semver::Version; use sqlx_core::{ net::WithSocket, sql_str::{AssertSqlSafe, SqlSafeStr}, @@ -54,6 +56,10 @@ pub trait EtlJob: Sized + Send + Sync { const HTTP_SCHEME: &'static str = "http"; const HTTPS_SCHEME: &'static str = "https"; + /// Version since the public key from the self-signed certificate used in ETL jobs must be + /// included in the query. Note that this is also conditioned by whether TLS is used. + const PK_FP_VER: Version = Version::new(8, 32, 0); + const JOB_TYPE: &'static str; /// The type of worker that will be created for this job. @@ -146,7 +152,13 @@ pub trait EtlJob: Sized + Send + Sync { } } - fn query(&self, addrs: Vec, with_tls: bool, with_compression: bool) -> String; + fn query( + &self, + addrs: Vec, + with_tls: bool, + with_compression: bool, + public_key: Option, + ) -> String; /// Builds an ETL job, returning the query future and the ETL workers. /// @@ -167,11 +179,13 @@ pub trait EtlJob: Sized + Send + Sync { .await? .into(); + let with_pub_key = conn.session_info().release_version() >= &Self::PK_FP_VER; let with_tls = conn.attributes().encryption_enabled(); let with_compression = self .use_compression() .unwrap_or(conn.attributes().compression_enabled()); - let wsm = WithMaybeTlsSocketMaker::new(with_tls)?; + + let (wsm, public_key) = WithMaybeTlsSocketMaker::new(with_tls, with_pub_key)?; // Get the internal Exasol node addresses and the workers let JobComponents { @@ -181,7 +195,8 @@ pub trait EtlJob: Sized + Send + Sync { } = self.connect(wsm, ips, port, with_compression).await?; // Query execution driving future to be returned and awaited alongside the worker IO - let query = AssertSqlSafe(self.query(addrs, with_tls, with_compression)).into_sql_str(); + let query = AssertSqlSafe(self.query(addrs, with_tls, with_compression, public_key)) + .into_sql_str(); let query_future = ExecuteEtl(ExaRoundtrip::new(Execute(query))).future(&mut conn.ws); Ok((EtlQuery::new(query_future, conn_futures), workers)) @@ -195,6 +210,7 @@ pub trait EtlJob: Sized + Send + Sync { addrs: Vec, with_tls: bool, with_compression: bool, + public_key: Option, ) { let prefix = if with_tls { Self::HTTPS_SCHEME @@ -208,16 +224,17 @@ pub trait EtlJob: Sized + Send + Sync { Self::CSV_FILE_EXT }; + let public_key = public_key + .map(|pk| format!("PUBLIC KEY 'sha256//{pk}'")) + .unwrap_or_default(); + for (idx, addr) in addrs.into_iter().enumerate() { - let filename = format!( - "AT '{}://{}' FILE '{}_{:0>5}.{}'\n", - prefix, - addr, - Self::JOB_TYPE, - idx, - ext - ); - query.push_str(&filename); + writeln!( + query, + "AT '{prefix}://{addr}' {public_key} FILE '{job_type}_{idx:0>5}.{ext}'", + job_type = Self::JOB_TYPE + ) + .expect("writing to a String cannot fail"); } } diff --git a/sqlx-exasol-impl/src/connection/etl/query.rs b/sqlx-exasol-impl/src/connection/etl/query.rs index 6f0f8672..4ca3a64c 100644 --- a/sqlx-exasol-impl/src/connection/etl/query.rs +++ b/sqlx-exasol-impl/src/connection/etl/query.rs @@ -57,22 +57,18 @@ impl Future for EtlQuery<'_> { type Output = SqlxResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // The query future is what we're ultimately interested in. - // If it's done, we can return its result. - if !self.query.is_terminated() && self.query.poll_unpin(cx).is_ready() { - if let Some(query_res) = Pin::new(&mut self.query).take_output() { - return Poll::Ready(query_res); - } + if !self.bootstrap.is_terminated() { + let _ = self.bootstrap.poll_unpin(cx); } - // The bootstrap futures run in the background to set up the servers. - // We need to poll them to make progress and check for errors. - if !self.bootstrap.is_terminated() && self.bootstrap.poll_unpin(cx).is_ready() { - // If the bootstrap futures resulted in an error, propagate it. - Pin::new(&mut self.bootstrap).take_output().transpose()?; + if !self.query.is_terminated() { + let _ = self.query.poll_unpin(cx); } - Poll::Pending + // Give priority to query errors + let query_res = Pin::new(&mut self.query).take_output().transpose()?; + Pin::new(&mut self.bootstrap).take_output().transpose()?; + query_res.map(Ok).map_or(Poll::Pending, Poll::Ready) } } diff --git a/sqlx-exasol-impl/src/responses/session_info.rs b/sqlx-exasol-impl/src/responses/session_info.rs index 4ba861d1..37ebef7a 100644 --- a/sqlx-exasol-impl/src/responses/session_info.rs +++ b/sqlx-exasol-impl/src/responses/session_info.rs @@ -1,3 +1,4 @@ +use semver::Version; use serde::Deserialize; use crate::options::ProtocolVersion; @@ -8,7 +9,7 @@ use crate::options::ProtocolVersion; pub struct SessionInfo { protocol_version: ProtocolVersion, session_id: u64, - release_version: String, + release_version: Version, database_name: String, product_name: String, max_data_message_size: u64, @@ -31,7 +32,7 @@ impl SessionInfo { } #[must_use] - pub fn release_version(&self) -> &str { + pub fn release_version(&self) -> &Version { &self.release_version } diff --git a/tests/io.rs b/tests/io.rs index e7062b91..29f53581 100644 --- a/tests/io.rs +++ b/tests/io.rs @@ -29,7 +29,7 @@ async fn it_works_with_io_combo_disabled( #[ignore] #[sqlx_exasol::test] -async fn it_works_with_io_combo_preferred( +async fn it_works_with_io_combo_tls_preferred( pool_opts: PoolOptions, exa_opts: ExaConnectOptions, ) -> Result<(), BoxDynError> { @@ -54,7 +54,7 @@ async fn it_works_with_io_combo_preferred( #[ignore] #[allow(unreachable_code, reason = "conditionally compiled")] #[sqlx_exasol::test] -async fn it_works_with_io_combo_required( +async fn it_works_with_io_combo_tls_required( pool_opts: PoolOptions, exa_opts: ExaConnectOptions, ) -> Result<(), BoxDynError> {