diff --git a/backend/package.json b/backend/package.json index e5bdd22..65b83c0 100644 --- a/backend/package.json +++ b/backend/package.json @@ -17,7 +17,8 @@ "test:watch": "jest --watch", "test:cov": "jest --coverage", "test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand", - "test:e2e": "jest --config ./test/jest-e2e.json" + "test:e2e": "jest --config ./test/jest-e2e.json", + "migration:run": "ts-node -r tsconfig-paths/register ./node_modules/typeorm/cli.js migration:run -d src/typeorm.config.ts" }, "dependencies": { "@nestjs/bull": "^11.0.4", diff --git a/backend/src/typeorm.config.ts b/backend/src/typeorm.config.ts new file mode 100644 index 0000000..b4dad71 --- /dev/null +++ b/backend/src/typeorm.config.ts @@ -0,0 +1,13 @@ +import { DataSource } from 'typeorm'; + +export default new DataSource({ + type: 'postgres', + host: process.env.DATABASE_HOST ?? 'localhost', + port: Number(process.env.DATABASE_PORT ?? 5432), + username: process.env.DATABASE_USER ?? 'postgres', + password: process.env.DATABASE_PASSWORD ?? 'postgres', + database: process.env.DATABASE_NAME ?? 'smalda', + entities: [__dirname + '/**/*.entity{.ts,.js}'], + migrations: [__dirname + '/migrations/**/*{.ts,.js}'], + synchronize: false, +}); diff --git a/backend/test/jest-e2e.json b/backend/test/jest-e2e.json new file mode 100644 index 0000000..93400cd --- /dev/null +++ b/backend/test/jest-e2e.json @@ -0,0 +1,9 @@ +{ + "moduleFileExtensions": ["js", "json", "ts"], + "rootDir": "../", + "testEnvironment": "node", + "testRegex": ".e2e-spec.ts$", + "transform": { + "^.+\\.(t|j)s$": "ts-jest" + } +} diff --git a/contract/src/lib.rs b/contract/src/lib.rs index fb90570..d9e34af 100644 --- a/contract/src/lib.rs +++ b/contract/src/lib.rs @@ -2,6 +2,7 @@ pub mod cache; pub mod config; pub mod hash_validator; pub mod metrics; +pub mod module; pub mod rate_limit; pub mod stellar; @@ -24,6 +25,7 @@ use tracing::{info, warn}; use cache::CacheBackend; use hash_validator::{HashValidator, ValidationError as HashValidationError}; use metrics::MetricsRegistry; +use module::webhook::VerificationWebhookNotifier; use stellar::{StellarClient, TransactionRecord}; // Application state @@ -33,6 +35,7 @@ pub struct AppState { pub cache: Arc, pub metrics: Arc, pub stellar_secret_key: String, + pub notifier: Arc, } // Request/Response types @@ -262,6 +265,17 @@ pub async fn record_transfer( return Err(StatusCode::INTERNAL_SERVER_ERROR); } + let anchored_at = Utc::now().timestamp(); + state + .notifier + .notify( + "document.transferred", + &req.document_hash, + &transfer_hash, + anchored_at, + ) + .await; + let record = TransferRecord { document_hash: req.document_hash.clone(), from_owner: req.from_owner.clone(), @@ -554,36 +568,126 @@ async fn verify_single_hash(state: &AppState, hash: String) -> BatchVerifyItem { } } -pub async fn submit_document(Json(req): Json) -> impl IntoResponse { +pub async fn submit_document( + State(state): State, + Json(req): Json, +) -> impl IntoResponse { let normalized_hash = HashValidator::normalize(&req.document_hash); if let Err(err) = HashValidator::validate_sha256(&normalized_hash) { let (status, body) = map_validation_error(err); - return (status, Json(body)); + return (status, Json(body)).into_response(); } - // Endpoint behavior not yet implemented; preserve previous BAD_REQUEST semantics. - ( - StatusCode::BAD_REQUEST, - Json(ValidationErrorResponse { - error: "submit endpoint not yet implemented".to_string(), - }), - ) + let memo = format!( + "SUBMIT:{}", + &normalized_hash[..19.min(normalized_hash.len())] + ); + match state.stellar.anchor_transfer(&normalized_hash, &memo).await { + Ok(()) => { + let anchored_at = Utc::now().timestamp(); + let tx_hash = format!("0x{}", &normalized_hash[..16]); + + let submit_key = format!("submit:{}", normalized_hash); + let record = serde_json::json!({ "tx_hash": tx_hash, "anchored_at": anchored_at }); + if let Err(e) = state + .cache + .set_raw(&submit_key, &record.to_string(), 60 * 60 * 24 * 365) + .await + { + warn!("Failed to persist submit record: {}", e); + } + + state + .notifier + .notify( + "document.submitted", + &normalized_hash, + &tx_hash, + anchored_at, + ) + .await; + + ( + StatusCode::OK, + Json(serde_json::json!({ + "success": true, + "transaction_id": tx_hash, + "anchored_at": anchored_at, + "error": null + })), + ) + .into_response() + } + Err(e) => { + warn!( + "Stellar anchor failed for submit {}: {}", + normalized_hash, e + ); + state.metrics.increment_error_count(); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ValidationErrorResponse { + error: format!("stellar anchor failed: {}", e), + }), + ) + .into_response() + } + } } -pub async fn revoke_document(Json(req): Json) -> impl IntoResponse { +pub async fn revoke_document( + State(state): State, + Json(req): Json, +) -> impl IntoResponse { let normalized_hash = HashValidator::normalize(&req.document_hash); if let Err(err) = HashValidator::validate_sha256(&normalized_hash) { let (status, body) = map_validation_error(err); - return (status, Json(body)); + return (status, Json(body)).into_response(); } - // Endpoint behavior not yet implemented; preserve previous NOT_FOUND semantics. - ( - StatusCode::NOT_FOUND, - Json(ValidationErrorResponse { - error: "revoke endpoint not yet implemented".to_string(), - }), - ) + let memo = format!( + "REVOKE:{}", + &normalized_hash[..19.min(normalized_hash.len())] + ); + match state.stellar.anchor_transfer(&normalized_hash, &memo).await { + Ok(()) => { + let revoked_at = Utc::now().timestamp(); + let tx_hash = format!("0x{}", &normalized_hash[..16]); + + let revoked_key = format!("revoked:{}", normalized_hash); + if let Err(e) = state.cache.set(&revoked_key, &true, u64::MAX / 2).await { + warn!("Failed to cache revocation: {}", e); + } + + state + .notifier + .notify("document.revoked", &normalized_hash, &tx_hash, revoked_at) + .await; + + ( + StatusCode::OK, + Json(RevokeResponse { + transaction_id: tx_hash, + revoked_at, + }), + ) + .into_response() + } + Err(e) => { + warn!( + "Stellar anchor failed for revoke {}: {}", + normalized_hash, e + ); + state.metrics.increment_error_count(); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ValidationErrorResponse { + error: format!("stellar anchor failed: {}", e), + }), + ) + .into_response() + } + } } pub async fn transfer_document(Json(req): Json) -> impl IntoResponse { diff --git a/contract/src/main.rs b/contract/src/main.rs index 380140e..b0c42bd 100644 --- a/contract/src/main.rs +++ b/contract/src/main.rs @@ -3,6 +3,8 @@ use stellar_doc_verifier::app; use stellar_doc_verifier::cache::{CacheBackend, RedisCache}; use stellar_doc_verifier::config::AppConfig; use stellar_doc_verifier::metrics::MetricsRegistry; +use stellar_doc_verifier::module::cache_warmup::CacheWarmupService; +use stellar_doc_verifier::module::webhook::VerificationWebhookNotifier; use stellar_doc_verifier::stellar::StellarClient; use stellar_doc_verifier::*; use tokio::net::TcpListener; @@ -11,10 +13,8 @@ use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), Box> { - // Load configuration let config = AppConfig::from_env()?; - // Initialize tracing let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { EnvFilter::new(format!( "stellar_doc_verifier={},tower_http={}", @@ -26,7 +26,6 @@ async fn main() -> Result<(), Box> { info!("Starting Stellar Document Verification Service"); - // Startup configuration summary (redacting secrets) info!( "Configuration: port={}, stellar_horizon_url={}, redis_url={}, rate_limit_per_second={}, rate_limit_burst={}, stellar_max_retries={}, log_level={}, webhook_urls={:?}, stellar_secret_key=[REDACTED], webhook_secret=[REDACTED], cache_verification_ttl={}", config.port, @@ -40,24 +39,27 @@ async fn main() -> Result<(), Box> { config.cache_verification_ttl, ); - // Initialize components let stellar_url = config.stellar_horizon_url.clone(); let redis_url = config.redis_url.clone(); let stellar = Arc::new(StellarClient::new(&stellar_url)); let cache = Arc::new(CacheBackend::Redis(RedisCache::new(&redis_url).await?)); let metrics = Arc::new(MetricsRegistry::new()); + let notifier = Arc::new(VerificationWebhookNotifier::new()); + + // Warm up the cache before accepting connections + CacheWarmupService::run(&cache, &redis_url).await; let state = AppState { stellar, cache, metrics, stellar_secret_key: config.stellar_secret_key.clone().unwrap_or_default(), + notifier, }; let app = app(state); - // Start server let addr = format!("0.0.0.0:{}", config.port); info!("Listening on {}", addr); let listener = TcpListener::bind(&addr).await?; diff --git a/contract/src/module/cache_warmup/mod.rs b/contract/src/module/cache_warmup/mod.rs new file mode 100644 index 0000000..21453dc --- /dev/null +++ b/contract/src/module/cache_warmup/mod.rs @@ -0,0 +1,92 @@ +use crate::cache::CacheBackend; +use crate::VerifyResponse; +use redis::AsyncCommands; +use serde::Deserialize; +use std::sync::Arc; +use std::time::Instant; +use tracing::{info, warn}; + +#[derive(Debug, Deserialize)] +struct SubmitRecord { + tx_hash: String, +} + +pub struct CacheWarmupService; + +impl CacheWarmupService { + pub async fn run(cache: &Arc, redis_url: &str) { + let start = Instant::now(); + + let client = match redis::Client::open(redis_url) { + Ok(c) => c, + Err(e) => { + warn!("Cache warmup: failed to open Redis client: {}", e); + return; + } + }; + + let mut conn = match client.get_async_connection().await { + Ok(c) => c, + Err(e) => { + warn!("Cache warmup: Redis unavailable, skipping warmup: {}", e); + return; + } + }; + + // Scan for all submit:* keys + let keys: Vec = match conn.keys::<_, Vec>("submit:*").await { + Ok(k) => k, + Err(e) => { + warn!("Cache warmup: failed to scan Redis keys: {}", e); + return; + } + }; + + if keys.is_empty() { + info!("Cache warmup: no submit:* keys found, nothing to warm"); + return; + } + + // Sort by key name (anchored_at is embedded in the record, but we limit by count) + // Limit to 200 most recent — we sort keys lexicographically as a best-effort ordering + let mut sorted_keys = keys; + sorted_keys.sort(); + sorted_keys.truncate(200); + + let mut warmed = 0usize; + + for key in &sorted_keys { + let hash = key.trim_start_matches("submit:"); + + let raw: Option = match conn.get(key).await { + Ok(v) => v, + Err(e) => { + warn!("Cache warmup: failed to read key {}: {}", key, e); + continue; + } + }; + + let record: SubmitRecord = match raw.and_then(|v| serde_json::from_str(&v).ok()) { + Some(r) => r, + None => continue, + }; + + let entry = VerifyResponse { + verified: true, + transaction_id: Some(record.tx_hash), + timestamp: None, + cached: true, + }; + + if cache.set(hash, &entry, 3600).await.is_ok() { + warmed += 1; + } + } + + info!( + "Cache warmup: warmed {} entries in {:.2}ms", + warmed, + start.elapsed().as_secs_f64() * 1000.0 + ); + } +} diff --git a/contract/src/module/mod.rs b/contract/src/module/mod.rs new file mode 100644 index 0000000..ed702a1 --- /dev/null +++ b/contract/src/module/mod.rs @@ -0,0 +1,2 @@ +pub mod cache_warmup; +pub mod webhook; diff --git a/contract/src/module/webhook/mod.rs b/contract/src/module/webhook/mod.rs new file mode 100644 index 0000000..1ef6c03 --- /dev/null +++ b/contract/src/module/webhook/mod.rs @@ -0,0 +1,84 @@ +use reqwest::Client; +use serde_json::json; +use std::env; +use std::time::Duration; +use tracing::{info, warn}; + +pub struct VerificationWebhookNotifier { + client: Client, + url: Option, +} + +impl VerificationWebhookNotifier { + pub fn new() -> Self { + let url = match env::var("WEBHOOK_URL") { + Ok(u) if !u.is_empty() => { + info!("Webhook notifier enabled: {}", u); + Some(u) + } + _ => { + warn!("WEBHOOK_URL not set — webhook notifier is disabled"); + None + } + }; + + let client = Client::builder() + .timeout(Duration::from_secs(5)) + .build() + .expect("failed to build reqwest client"); + + Self { client, url } + } + + pub async fn notify( + &self, + event_type: &str, + document_hash: &str, + tx_hash: &str, + timestamp: i64, + ) { + let url = match &self.url { + Some(u) => u.clone(), + None => return, + }; + + let payload = json!({ + "event_type": event_type, + "document_hash": document_hash, + "tx_hash": tx_hash, + "timestamp": timestamp, + "service": "smalda-contract" + }); + + if let Err(e) = self.send_with_retry(&url, &payload).await { + warn!("Webhook delivery failed after retry: {}", e); + } + } + + async fn send_with_retry(&self, url: &str, payload: &serde_json::Value) -> Result<(), String> { + match self.send(url, payload).await { + Ok(()) => Ok(()), + Err(e) => { + warn!( + "Webhook delivery failed (attempt 1): {} — retrying in 2s", + e + ); + tokio::time::sleep(Duration::from_secs(2)).await; + self.send(url, payload) + .await + .map_err(|e2| format!("{}", e2)) + } + } + } + + async fn send(&self, url: &str, payload: &serde_json::Value) -> Result<(), reqwest::Error> { + self.client.post(url).json(payload).send().await?; + Ok(()) + } +} + +impl Default for VerificationWebhookNotifier { + fn default() -> Self { + Self::new() + } +}