Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"test:watch": "jest --watch",
"test:cov": "jest --coverage",
"test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand",
"test:e2e": "jest --config ./test/jest-e2e.json"
"test:e2e": "jest --config ./test/jest-e2e.json",
"migration:run": "ts-node -r tsconfig-paths/register ./node_modules/typeorm/cli.js migration:run -d src/typeorm.config.ts"
},
"dependencies": {
"@nestjs/bull": "^11.0.4",
Expand Down
13 changes: 13 additions & 0 deletions backend/src/typeorm.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { DataSource } from 'typeorm';

export default new DataSource({
type: 'postgres',
host: process.env.DATABASE_HOST ?? 'localhost',
port: Number(process.env.DATABASE_PORT ?? 5432),
username: process.env.DATABASE_USER ?? 'postgres',
password: process.env.DATABASE_PASSWORD ?? 'postgres',
database: process.env.DATABASE_NAME ?? 'smalda',
entities: [__dirname + '/**/*.entity{.ts,.js}'],
migrations: [__dirname + '/migrations/**/*{.ts,.js}'],
synchronize: false,
});
9 changes: 9 additions & 0 deletions backend/test/jest-e2e.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"moduleFileExtensions": ["js", "json", "ts"],
"rootDir": "../",
"testEnvironment": "node",
"testRegex": ".e2e-spec.ts$",
"transform": {
"^.+\\.(t|j)s$": "ts-jest"
}
}
140 changes: 122 additions & 18 deletions contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod cache;
pub mod config;
pub mod hash_validator;
pub mod metrics;
pub mod module;
pub mod rate_limit;
pub mod stellar;

Expand All @@ -24,6 +25,7 @@ use tracing::{info, warn};
use cache::CacheBackend;
use hash_validator::{HashValidator, ValidationError as HashValidationError};
use metrics::MetricsRegistry;
use module::webhook::VerificationWebhookNotifier;
use stellar::{StellarClient, TransactionRecord};

// Application state
Expand All @@ -33,6 +35,7 @@ pub struct AppState {
pub cache: Arc<CacheBackend>,
pub metrics: Arc<MetricsRegistry>,
pub stellar_secret_key: String,
pub notifier: Arc<VerificationWebhookNotifier>,
}

// Request/Response types
Expand Down Expand Up @@ -262,6 +265,17 @@ pub async fn record_transfer(
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}

let anchored_at = Utc::now().timestamp();
state
.notifier
.notify(
"document.transferred",
&req.document_hash,
&transfer_hash,
anchored_at,
)
.await;

let record = TransferRecord {
document_hash: req.document_hash.clone(),
from_owner: req.from_owner.clone(),
Expand Down Expand Up @@ -554,36 +568,126 @@ async fn verify_single_hash(state: &AppState, hash: String) -> BatchVerifyItem {
}
}

pub async fn submit_document(Json(req): Json<SubmitRequest>) -> impl IntoResponse {
pub async fn submit_document(
State(state): State<AppState>,
Json(req): Json<SubmitRequest>,
) -> impl IntoResponse {
let normalized_hash = HashValidator::normalize(&req.document_hash);
if let Err(err) = HashValidator::validate_sha256(&normalized_hash) {
let (status, body) = map_validation_error(err);
return (status, Json(body));
return (status, Json(body)).into_response();
}

// Endpoint behavior not yet implemented; preserve previous BAD_REQUEST semantics.
(
StatusCode::BAD_REQUEST,
Json(ValidationErrorResponse {
error: "submit endpoint not yet implemented".to_string(),
}),
)
let memo = format!(
"SUBMIT:{}",
&normalized_hash[..19.min(normalized_hash.len())]
);
match state.stellar.anchor_transfer(&normalized_hash, &memo).await {
Ok(()) => {
let anchored_at = Utc::now().timestamp();
let tx_hash = format!("0x{}", &normalized_hash[..16]);

let submit_key = format!("submit:{}", normalized_hash);
let record = serde_json::json!({ "tx_hash": tx_hash, "anchored_at": anchored_at });
if let Err(e) = state
.cache
.set_raw(&submit_key, &record.to_string(), 60 * 60 * 24 * 365)
.await
{
warn!("Failed to persist submit record: {}", e);
}

state
.notifier
.notify(
"document.submitted",
&normalized_hash,
&tx_hash,
anchored_at,
)
.await;

(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"transaction_id": tx_hash,
"anchored_at": anchored_at,
"error": null
})),
)
.into_response()
}
Err(e) => {
warn!(
"Stellar anchor failed for submit {}: {}",
normalized_hash, e
);
state.metrics.increment_error_count();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ValidationErrorResponse {
error: format!("stellar anchor failed: {}", e),
}),
)
.into_response()
}
}
}

pub async fn revoke_document(Json(req): Json<RevokeRequest>) -> impl IntoResponse {
pub async fn revoke_document(
State(state): State<AppState>,
Json(req): Json<RevokeRequest>,
) -> impl IntoResponse {
let normalized_hash = HashValidator::normalize(&req.document_hash);
if let Err(err) = HashValidator::validate_sha256(&normalized_hash) {
let (status, body) = map_validation_error(err);
return (status, Json(body));
return (status, Json(body)).into_response();
}

// Endpoint behavior not yet implemented; preserve previous NOT_FOUND semantics.
(
StatusCode::NOT_FOUND,
Json(ValidationErrorResponse {
error: "revoke endpoint not yet implemented".to_string(),
}),
)
let memo = format!(
"REVOKE:{}",
&normalized_hash[..19.min(normalized_hash.len())]
);
match state.stellar.anchor_transfer(&normalized_hash, &memo).await {
Ok(()) => {
let revoked_at = Utc::now().timestamp();
let tx_hash = format!("0x{}", &normalized_hash[..16]);

let revoked_key = format!("revoked:{}", normalized_hash);
if let Err(e) = state.cache.set(&revoked_key, &true, u64::MAX / 2).await {
warn!("Failed to cache revocation: {}", e);
}

state
.notifier
.notify("document.revoked", &normalized_hash, &tx_hash, revoked_at)
.await;

(
StatusCode::OK,
Json(RevokeResponse {
transaction_id: tx_hash,
revoked_at,
}),
)
.into_response()
}
Err(e) => {
warn!(
"Stellar anchor failed for revoke {}: {}",
normalized_hash, e
);
state.metrics.increment_error_count();
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ValidationErrorResponse {
error: format!("stellar anchor failed: {}", e),
}),
)
.into_response()
}
}
}

pub async fn transfer_document(Json(req): Json<TransferRequest>) -> impl IntoResponse {
Expand Down
12 changes: 7 additions & 5 deletions contract/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use stellar_doc_verifier::app;
use stellar_doc_verifier::cache::{CacheBackend, RedisCache};
use stellar_doc_verifier::config::AppConfig;
use stellar_doc_verifier::metrics::MetricsRegistry;
use stellar_doc_verifier::module::cache_warmup::CacheWarmupService;
use stellar_doc_verifier::module::webhook::VerificationWebhookNotifier;
use stellar_doc_verifier::stellar::StellarClient;
use stellar_doc_verifier::*;
use tokio::net::TcpListener;
Expand All @@ -11,10 +13,8 @@ use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Load configuration
let config = AppConfig::from_env()?;

// Initialize tracing
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new(format!(
"stellar_doc_verifier={},tower_http={}",
Expand All @@ -26,7 +26,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

info!("Starting Stellar Document Verification Service");

// Startup configuration summary (redacting secrets)
info!(
"Configuration: port={}, stellar_horizon_url={}, redis_url={}, rate_limit_per_second={}, rate_limit_burst={}, stellar_max_retries={}, log_level={}, webhook_urls={:?}, stellar_secret_key=[REDACTED], webhook_secret=[REDACTED], cache_verification_ttl={}",
config.port,
Expand All @@ -40,24 +39,27 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
config.cache_verification_ttl,
);

// Initialize components
let stellar_url = config.stellar_horizon_url.clone();
let redis_url = config.redis_url.clone();

let stellar = Arc::new(StellarClient::new(&stellar_url));
let cache = Arc::new(CacheBackend::Redis(RedisCache::new(&redis_url).await?));
let metrics = Arc::new(MetricsRegistry::new());
let notifier = Arc::new(VerificationWebhookNotifier::new());

// Warm up the cache before accepting connections
CacheWarmupService::run(&cache, &redis_url).await;

let state = AppState {
stellar,
cache,
metrics,
stellar_secret_key: config.stellar_secret_key.clone().unwrap_or_default(),
notifier,
};

let app = app(state);

// Start server
let addr = format!("0.0.0.0:{}", config.port);
info!("Listening on {}", addr);
let listener = TcpListener::bind(&addr).await?;
Expand Down
92 changes: 92 additions & 0 deletions contract/src/module/cache_warmup/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use crate::cache::CacheBackend;
use crate::VerifyResponse;
use redis::AsyncCommands;
use serde::Deserialize;
use std::sync::Arc;
use std::time::Instant;
use tracing::{info, warn};

#[derive(Debug, Deserialize)]
struct SubmitRecord {
tx_hash: String,
}

pub struct CacheWarmupService;

impl CacheWarmupService {
pub async fn run(cache: &Arc<CacheBackend>, redis_url: &str) {
let start = Instant::now();

let client = match redis::Client::open(redis_url) {
Ok(c) => c,
Err(e) => {
warn!("Cache warmup: failed to open Redis client: {}", e);
return;
}
};

let mut conn = match client.get_async_connection().await {
Ok(c) => c,
Err(e) => {
warn!("Cache warmup: Redis unavailable, skipping warmup: {}", e);
return;
}
};

// Scan for all submit:* keys
let keys: Vec<String> = match conn.keys::<_, Vec<String>>("submit:*").await {
Ok(k) => k,
Err(e) => {
warn!("Cache warmup: failed to scan Redis keys: {}", e);
return;
}
};

if keys.is_empty() {
info!("Cache warmup: no submit:* keys found, nothing to warm");
return;
}

// Sort by key name (anchored_at is embedded in the record, but we limit by count)
// Limit to 200 most recent — we sort keys lexicographically as a best-effort ordering
let mut sorted_keys = keys;
sorted_keys.sort();
sorted_keys.truncate(200);

let mut warmed = 0usize;

for key in &sorted_keys {
let hash = key.trim_start_matches("submit:");

let raw: Option<String> = match conn.get(key).await {
Ok(v) => v,
Err(e) => {
warn!("Cache warmup: failed to read key {}: {}", key, e);
continue;
}
};

let record: SubmitRecord = match raw.and_then(|v| serde_json::from_str(&v).ok()) {
Some(r) => r,
None => continue,
};

let entry = VerifyResponse {
verified: true,
transaction_id: Some(record.tx_hash),
timestamp: None,
cached: true,
};

if cache.set(hash, &entry, 3600).await.is_ok() {
warmed += 1;
}
}

info!(
"Cache warmup: warmed {} entries in {:.2}ms",
warmed,
start.elapsed().as_secs_f64() * 1000.0
);
}
}
2 changes: 2 additions & 0 deletions contract/src/module/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod cache_warmup;
pub mod webhook;
Loading
Loading