Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions .github/actions/exa-cluster/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ runs:

sudo openssl req -x509 -new -nodes -key ~/sqlx1/etc/ssl/ssl.ca.key -sha256 -days 3650 -out ~/sqlx1/etc/ssl/ssl.ca -subj "/CN=exacluster.local"
sudo openssl req -new -key ~/sqlx1/etc/ssl/ssl.key -out ~/ssl.csr -subj "/CN=exacluster.local"
sudo openssl x509 -req -in ~/ssl.csr -CA ~/sqlx1/etc/ssl/ssl.ca -CAkey ~/sqlx1/etc/ssl/ssl.ca.key -CAcreateserial -out ~/sqlx1/etc/ssl/ssl.crt -days 365 -sha256 -extfile ~/v3.ext
sudo openssl x509 -req -in ~/ssl.csr -CA ~/sqlx1/etc/ssl/ssl.ca -CAkey ~/sqlx1/etc/ssl/ssl.ca.key -CAcreateserial \
-out ~/sqlx1/etc/ssl/ssl.crt -days 365 -sha256 -extfile ~/v3.ext

shell: bash

- name: Start Exasol cluster
run: |
for (( i=1; i<=${{ inputs.num-nodes }}; i++ )); do
docker run --name sqlx$i --detach --network=sqlx --ip 10.10.10.1$i --privileged --stop-timeout 120 -v ~/sqlx$i:/exa exasol/docker-db:${{ inputs.exasol-version }} init-sc --node-id 1$i
docker run --name sqlx$i --detach --network=sqlx --ip 10.10.10.1$i --privileged --stop-timeout 120 \
-v ~/sqlx$i:/exa exasol/docker-db:${{ inputs.exasol-version }} init-sc --node-id 1$i
done
shell: bash

Expand All @@ -88,7 +90,11 @@ runs:
FINGERPRINT=`sudo openssl x509 -in ~/sqlx1/etc/ssl/ssl.crt -outform DER | sudo openssl dgst -sha256 | awk '{print $2}'`

while [[ $RC -ne 0 ]]; do
docker exec -e FINGERPRINT="$FINGERPRINT" sqlx1 /bin/bash -c '`find /usr/opt -name exaplus | head -1` -c localhost/$FINGERPRINT:8563 -u sys -p exasol -sql "SELECT * FROM DUAL;"' &>/dev/null
docker exec -e FINGERPRINT="$FINGERPRINT" sqlx1 /bin/bash -c '
$(which exaplus || find /usr/opt -name exaplus | head -1) \
-c localhost/$FINGERPRINT:8563 -u sys -p exasol -sql "SELECT * FROM DUAL;"
' &>/dev/null

RC=$?
sleep 5
done
Expand Down
51 changes: 50 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ env:
EXASOL_VERSION: latest-7.1
NUM_NODES: 4
TESTS_TIMEOUT: 15
EXA_CLUSTER_SETUP_TIMEOUT: 15
EXA_CLUSTER_SETUP_TIMEOUT: 10

jobs:
format:
Expand Down Expand Up @@ -125,6 +125,55 @@ jobs:
env:
DATABASE_URL: ${{ steps.exa-cluster.outputs.url }}
SQLX_OFFLINE: true

io_tests_latest:
name: IO tests on latest Exasol (TLS only)
needs: clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Free disk space
uses: ./.github/actions/free-space

- name: Create Exasol cluster
timeout-minutes: ${{ fromJSON(env.EXA_CLUSTER_SETUP_TIMEOUT) }}
id: exa-cluster
uses: ./.github/actions/exa-cluster
with:
exasol-version: latest
num-nodes: 1

- uses: dtolnay/rust-toolchain@1.86.0
- uses: Swatinem/rust-cache@v2

- name: Test IO combos (NativeTLS, no compression)
timeout-minutes: ${{ fromJSON(env.TESTS_TIMEOUT) }}
run: cargo test --features runtime-tokio,etl,tls-native-tls -- it_works_with_io_combo_tls --ignored --nocapture
env:
DATABASE_URL: ${{ steps.exa-cluster.outputs.url }}
SQLX_OFFLINE: true

- name: Test IO combos (NativeTLS, with compression)
timeout-minutes: ${{ fromJSON(env.TESTS_TIMEOUT) }}
run: cargo test --features runtime-tokio,etl,tls-native-tls,compression -- it_works_with_io_combo_tls --ignored --nocapture
env:
DATABASE_URL: ${{ steps.exa-cluster.outputs.url }}
SQLX_OFFLINE: true

- name: Test IO combos (Rustls, no compression)
timeout-minutes: ${{ fromJSON(env.TESTS_TIMEOUT) }}
run: cargo test --features runtime-tokio,etl,tls-rustls-aws-lc-rs -- it_works_with_io_combo_tls --ignored --nocapture
env:
DATABASE_URL: ${{ steps.exa-cluster.outputs.url }}
SQLX_OFFLINE: true

- name: Test IO combos (Rustls, with compression)
timeout-minutes: ${{ fromJSON(env.TESTS_TIMEOUT) }}
run: cargo test --features runtime-tokio,etl,tls-rustls-aws-lc-rs,compression -- it_works_with_io_combo_tls --ignored --nocapture
env:
DATABASE_URL: ${{ steps.exa-cluster.outputs.url }}
SQLX_OFFLINE: true

driver_tests:
name: Driver tests
Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,16 @@ rustls = { version = "0.23", default-features = false, features = [
"std",
"tls12",
] }
semver = { version = "1", default-features = false, features = [
"serde",
"std",
] }
serde = { version = "1", default-features = false, features = ["derive", "rc"] }
serde_json = { version = "1", default-features = false, features = [
"std",
"raw_value",
] }
sha2 = { version = "0.10", default-features = false, features = ["std"] }
sqlx = { version = "0.9.0-alpha.1", default-features = false }
sqlx-cli = { version = "0.9.0-alpha.1", default-features = false }
sqlx-core = { version = "0.9.0-alpha.1", default-features = false, features = [
Expand Down
4 changes: 3 additions & 1 deletion sqlx-exasol-impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ time = ["sqlx-core/time", "dep:time"]
uuid = ["sqlx-core/uuid", "dep:uuid"]

# TLS features
tls = ["dep:rcgen"]
tls = ["dep:rcgen", "dep:sha2"]
native-tls = [
"dep:native-tls",
"sqlx-core/_tls-native-tls",
Expand All @@ -52,6 +52,7 @@ futures-util = { workspace = true }
futures-core = { workspace = true }
rand = { workspace = true }
rsa = { workspace = true }
semver = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sqlx-core = { workspace = true }
Expand All @@ -73,6 +74,7 @@ native-tls = { workspace = true, optional = true }
rcgen = { workspace = true, optional = true }
rustls = { workspace = true, optional = true }
rust_decimal = { workspace = true, optional = true }
sha2 = { workspace = true, optional = true }
sqlx-macros-core = { workspace = true, optional = true }
time = { workspace = true, optional = true }
uuid = { workspace = true, optional = true }
Expand Down
10 changes: 8 additions & 2 deletions sqlx-exasol-impl/src/connection/etl/export/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,13 @@ impl EtlJob for ExportBuilder<'_> {
ExportService::new(chan_tx)
}

fn query(&self, addrs: Vec<SocketAddrV4>, with_tls: bool, with_compression: bool) -> String {
fn query(
&self,
addrs: Vec<SocketAddrV4>,
with_tls: bool,
with_compression: bool,
public_key: Option<String>,
) -> String {
let mut query = String::new();

if let Some(comment) = self.comment {
Expand All @@ -182,7 +188,7 @@ impl EtlJob for ExportBuilder<'_> {
query.push(' ');

query.push_str(" INTO CSV ");
Self::append_files(&mut query, addrs, with_tls, with_compression);
Self::append_files(&mut query, addrs, with_tls, with_compression, public_key);

if let Some(enc) = self.encoding {
Self::push_key_value(&mut query, "ENCODING", enc);
Expand Down
10 changes: 8 additions & 2 deletions sqlx-exasol-impl/src/connection/etl/import/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,13 @@ impl EtlJob for ImportBuilder<'_> {
ImportService::new(chan_tx)
}

fn query(&self, addrs: Vec<SocketAddrV4>, with_tls: bool, with_compression: bool) -> String {
fn query(
&self,
addrs: Vec<SocketAddrV4>,
with_tls: bool,
with_compression: bool,
public_key: Option<String>,
) -> String {
let mut query = String::new();

if let Some(comment) = self.comment {
Expand Down Expand Up @@ -199,7 +205,7 @@ impl EtlJob for ImportBuilder<'_> {
}

query.push_str("FROM CSV ");
Self::append_files(&mut query, addrs, with_tls, with_compression);
Self::append_files(&mut query, addrs, with_tls, with_compression, public_key);

if let Some(enc) = self.encoding {
Self::push_key_value(&mut query, "ENCODING", enc);
Expand Down
11 changes: 7 additions & 4 deletions sqlx-exasol-impl/src/connection/etl/job/maybe_tls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ pub enum WithMaybeTlsSocketMaker {
}

impl WithMaybeTlsSocketMaker {
#[allow(unused_variables, reason = "conditionally compiled")]
pub fn new(with_tls: bool) -> SqlxResult<Self> {
pub fn new(
#[allow(unused_variables, reason = "conditionally compiled")] with_tls: bool,
#[allow(unused_variables, reason = "conditionally compiled")] with_pub_key: bool,
) -> SqlxResult<(Self, Option<String>)> {
#[cfg(feature = "tls")]
if with_tls {
return tls::WithTlsSocketMaker::new().map(Self::Tls);
let (wsm, public_key) = tls::WithTlsSocketMaker::new(with_pub_key)?;
return Ok((Self::Tls(wsm), public_key));
}

Ok(Self::NonTls)
Ok((Self::NonTls, None))
}
}

Expand Down
14 changes: 11 additions & 3 deletions sqlx-exasol-impl/src/connection/etl/job/maybe_tls/tls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ mod sync_socket;

use std::io;

use base64::{engine::general_purpose::STANDARD as STD_BASE64_ENGINE, Engine};
use rcgen::{CertificateParams, KeyPair};
use rsa::{
pkcs8::{EncodePrivateKey, LineEnding},
RsaPrivateKey,
};
use sha2::{Digest, Sha256};
use sqlx_core::net::{Socket, WithSocket};

use crate::{
Expand All @@ -28,7 +30,7 @@ pub enum WithTlsSocketMaker {
}

impl WithTlsSocketMaker {
pub fn new() -> SqlxResult<WithTlsSocketMaker> {
pub fn new(with_pub_key: bool) -> SqlxResult<(WithTlsSocketMaker, Option<String>)> {
let bits = 2048;
let private_key =
RsaPrivateKey::new(&mut rand::thread_rng(), bits).map_err(ToSqlxError::to_sqlx_err)?;
Expand All @@ -39,16 +41,22 @@ impl WithTlsSocketMaker {
.map_err(SqlxError::Tls)?;

let key_pair = KeyPair::from_pem(&key).map_err(ToSqlxError::to_sqlx_err)?;
let public_key = with_pub_key
.then(|| key_pair.public_key_der())
.map(Sha256::digest)
.map(|data| STD_BASE64_ENGINE.encode(data));
let cert = CertificateParams::default()
.self_signed(&key_pair)
.map_err(ToSqlxError::to_sqlx_err)?;

#[cfg(feature = "rustls")]
return rustls::WithRustlsSocketMaker::new(&cert, &key_pair).map(Self::Rustls);
return rustls::WithRustlsSocketMaker::new(&cert, &key_pair)
.map(|wsm| (Self::Rustls(wsm), public_key));

#[cfg(feature = "native-tls")]
#[allow(unreachable_code, reason = "conditionally compiled")]
return native_tls::WithNativeTlsSocketMaker::new(&cert, &key_pair).map(Self::NativeTls);
return native_tls::WithNativeTlsSocketMaker::new(&cert, &key_pair)
.map(|wsm| (Self::NativeTls(wsm), public_key));
}
}

Expand Down
41 changes: 29 additions & 12 deletions sqlx-exasol-impl/src/connection/etl/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod maybe_tls;
mod socket_addr;

use std::{
fmt::Write,
future::Future,
io,
net::{IpAddr, SocketAddr, SocketAddrV4},
Expand All @@ -10,6 +11,7 @@ use std::{
use flume::{Receiver, Sender};
use futures_core::future::BoxFuture;
use hyper::server::conn::http1::Connection;
use semver::Version;
use sqlx_core::{
net::WithSocket,
sql_str::{AssertSqlSafe, SqlSafeStr},
Expand Down Expand Up @@ -54,6 +56,10 @@ pub trait EtlJob: Sized + Send + Sync {
const HTTP_SCHEME: &'static str = "http";
const HTTPS_SCHEME: &'static str = "https";

/// Version since the public key from the self-signed certificate used in ETL jobs must be
/// included in the query. Note that this is also conditioned by whether TLS is used.
const PK_FP_VER: Version = Version::new(8, 32, 0);

const JOB_TYPE: &'static str;

/// The type of worker that will be created for this job.
Expand Down Expand Up @@ -146,7 +152,13 @@ pub trait EtlJob: Sized + Send + Sync {
}
}

fn query(&self, addrs: Vec<SocketAddrV4>, with_tls: bool, with_compression: bool) -> String;
fn query(
&self,
addrs: Vec<SocketAddrV4>,
with_tls: bool,
with_compression: bool,
public_key: Option<String>,
) -> String;

/// Builds an ETL job, returning the query future and the ETL workers.
///
Expand All @@ -167,11 +179,13 @@ pub trait EtlJob: Sized + Send + Sync {
.await?
.into();

let with_pub_key = conn.session_info().release_version() >= &Self::PK_FP_VER;
let with_tls = conn.attributes().encryption_enabled();
let with_compression = self
.use_compression()
.unwrap_or(conn.attributes().compression_enabled());
let wsm = WithMaybeTlsSocketMaker::new(with_tls)?;

let (wsm, public_key) = WithMaybeTlsSocketMaker::new(with_tls, with_pub_key)?;

// Get the internal Exasol node addresses and the workers
let JobComponents {
Expand All @@ -181,7 +195,8 @@ pub trait EtlJob: Sized + Send + Sync {
} = self.connect(wsm, ips, port, with_compression).await?;

// Query execution driving future to be returned and awaited alongside the worker IO
let query = AssertSqlSafe(self.query(addrs, with_tls, with_compression)).into_sql_str();
let query = AssertSqlSafe(self.query(addrs, with_tls, with_compression, public_key))
.into_sql_str();
let query_future = ExecuteEtl(ExaRoundtrip::new(Execute(query))).future(&mut conn.ws);

Ok((EtlQuery::new(query_future, conn_futures), workers))
Expand All @@ -195,6 +210,7 @@ pub trait EtlJob: Sized + Send + Sync {
addrs: Vec<SocketAddrV4>,
with_tls: bool,
with_compression: bool,
public_key: Option<String>,
) {
let prefix = if with_tls {
Self::HTTPS_SCHEME
Expand All @@ -208,16 +224,17 @@ pub trait EtlJob: Sized + Send + Sync {
Self::CSV_FILE_EXT
};

let public_key = public_key
.map(|pk| format!("PUBLIC KEY 'sha256//{pk}'"))
.unwrap_or_default();

for (idx, addr) in addrs.into_iter().enumerate() {
let filename = format!(
"AT '{}://{}' FILE '{}_{:0>5}.{}'\n",
prefix,
addr,
Self::JOB_TYPE,
idx,
ext
);
query.push_str(&filename);
writeln!(
query,
"AT '{prefix}://{addr}' {public_key} FILE '{job_type}_{idx:0>5}.{ext}'",
job_type = Self::JOB_TYPE
)
.expect("writing to a String cannot fail");
}
}

Expand Down
20 changes: 8 additions & 12 deletions sqlx-exasol-impl/src/connection/etl/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,18 @@ impl Future for EtlQuery<'_> {
type Output = SqlxResult<ExaQueryResult>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// The query future is what we're ultimately interested in.
// If it's done, we can return its result.
if !self.query.is_terminated() && self.query.poll_unpin(cx).is_ready() {
if let Some(query_res) = Pin::new(&mut self.query).take_output() {
return Poll::Ready(query_res);
}
if !self.bootstrap.is_terminated() {
let _ = self.bootstrap.poll_unpin(cx);
}

// The bootstrap futures run in the background to set up the servers.
// We need to poll them to make progress and check for errors.
if !self.bootstrap.is_terminated() && self.bootstrap.poll_unpin(cx).is_ready() {
// If the bootstrap futures resulted in an error, propagate it.
Pin::new(&mut self.bootstrap).take_output().transpose()?;
if !self.query.is_terminated() {
let _ = self.query.poll_unpin(cx);
}

Poll::Pending
// Give priority to query errors
let query_res = Pin::new(&mut self.query).take_output().transpose()?;
Pin::new(&mut self.bootstrap).take_output().transpose()?;
query_res.map(Ok).map_or(Poll::Pending, Poll::Ready)
}
}

Expand Down
Loading