use axum::{ extract::State, http::StatusCode, routing::{get, post}, Json, Router, }; use contracts::auth_middleware::AuthUser; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use uuid::Uuid; use sqlx::postgres::PgPool; use sqlx::FromRow; pub mod packages; #[derive(Clone)] struct AppState { beeceptor_url: String, client: reqwest::Client, pool: PgPool, } #[derive(Debug, Serialize, Deserialize)] struct CreateOrderRequest { amount: u64, currency: Option, package_id: Option, user_id: Option, } #[derive(Debug, Serialize)] struct CreateOrderResponse { order_id: String, amount: u64, currency: String, status: String, } #[derive(Debug, Serialize, Deserialize)] struct VerifyPaymentRequest { order_id: String, payment_id: String, signature: Option, } #[derive(Debug, Serialize)] struct VerifyPaymentResponse { verified: bool, payment_id: String, status: String, message: String, } #[derive(Debug, Serialize)] struct PaymentStatusResponse { payment_id: String, status: String, amount: u64, currency: String, } #[derive(Debug, FromRow)] struct PricingPackageRow { tracecoins_amount: i32, } #[derive(Debug, FromRow)] struct PaymentRow { id: Uuid, user_id: Uuid, package_id: Option, tracecoins_credited: Option, } async fn create_order( auth: AuthUser, State(state): State, Json(payload): Json, ) -> Result, (StatusCode, String)> { tracing::info!("Creating payment order: amount={}", payload.amount); let package_id_str = payload.package_id.as_ref().ok_or((StatusCode::BAD_REQUEST, "package_id is required".to_string()))?; let package_id = Uuid::parse_str(package_id_str).map_err(|_| (StatusCode::BAD_REQUEST, "Invalid package id".to_string()))?; let package = sqlx::query_as::<_, PricingPackageRow>( "SELECT tracecoins_amount FROM pricing_packages WHERE id = $1 AND is_active = true", ) .bind(package_id) .fetch_optional(&state.pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; let package = package.ok_or((StatusCode::BAD_REQUEST, "Invalid or inactive package".to_string()))?; let tracecoins_credited = package.tracecoins_amount; let resp = state .client .post(&state.beeceptor_url) .header("Content-Type", "application/json") .json(&serde_json::json!({ "amount": payload.amount, "currency": payload.currency.as_deref().unwrap_or("INR"), "package_id": package_id_str, "user_id": auth.user_id.to_string(), })) .send() .await .map_err(|e| (StatusCode::BAD_GATEWAY, format!("Beeceptor error: {}", e)))?; let status = resp.status(); let body: serde_json::Value = resp .json() .await .map_err(|e| (StatusCode::BAD_GATEWAY, format!("Parse error: {}", e)))?; if !status.is_success() { return Err(( StatusCode::BAD_REQUEST, body.get("message") .and_then(|m| m.as_str()) .unwrap_or("Order creation failed") .to_string(), )); } let order_id = body .get("order_id") .and_then(|v| v.as_str()) .unwrap_or("mock_order_123") .to_string(); sqlx::query( r#" INSERT INTO payments (user_id, package_id, razorpay_order_id, amount, tracecoins_credited, status) VALUES ($1, $2, $3, $4, $5, 'PENDING') "#, ) .bind(auth.user_id) .bind(package_id) .bind(&order_id) .bind(payload.amount as i64) .bind(tracecoins_credited) .execute(&state.pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; Ok(Json(CreateOrderResponse { order_id, amount: payload.amount, currency: payload.currency.unwrap_or("INR".to_string()), status: "created".to_string(), })) } async fn verify_payment( auth: AuthUser, State(state): State, Json(payload): Json, ) -> Result, (StatusCode, String)> { tracing::info!("Verifying payment: order_id={}", payload.order_id); let verify_url = format!("{}/verify", state.beeceptor_url.trim_end_matches('/')); let resp = state .client .post(&verify_url) .header("Content-Type", "application/json") .json(&payload) .send() .await .map_err(|e| (StatusCode::BAD_GATEWAY, format!("Beeceptor error: {}", e)))?; let status = resp.status(); let body: serde_json::Value = resp .json() .await .map_err(|e| (StatusCode::BAD_GATEWAY, format!("Parse error: {}", e)))?; if !status.is_success() { return Err(( StatusCode::BAD_REQUEST, body.get("message") .and_then(|m| m.as_str()) .unwrap_or("Verification failed") .to_string(), )); } let payment = sqlx::query_as::<_, PaymentRow>( r#" SELECT id, user_id, package_id, tracecoins_credited FROM payments WHERE razorpay_order_id = $1 AND status = 'PENDING' "#, ) .bind(&payload.order_id) .fetch_optional(&state.pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; let payment = match payment { Some(p) => p, None => return Err((StatusCode::NOT_FOUND, "Payment not found or already processed".to_string())), }; if payment.user_id != auth.user_id { return Err((StatusCode::FORBIDDEN, "Payment does not belong to user".to_string())); } let tracecoins = payment.tracecoins_credited.unwrap_or(0); sqlx::query( r#" UPDATE payments SET status = 'SUCCESS', razorpay_payment_id = $1, verified_at = NOW() WHERE id = $2 "#, ) .bind(&payload.payment_id) .bind(payment.id) .execute(&state.pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; sqlx::query( r#" INSERT INTO tracecoin_wallets (user_id, balance, reserved) VALUES ($1, $2, 0) ON CONFLICT (user_id) DO UPDATE SET balance = tracecoin_wallets.balance + excluded.balance "#, ) .bind(payment.user_id) .bind(tracecoins as i64) .execute(&state.pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; if let Ok(Some(wallet_id)) = sqlx::query_scalar::<_, Uuid>( "SELECT id FROM tracecoin_wallets WHERE user_id = $1" ) .bind(payment.user_id) .fetch_optional(&state.pool) .await { sqlx::query( r#" INSERT INTO tracecoin_ledger (wallet_id, transaction_type, amount, balance_after, reference_type, reference_id, description) VALUES ($1, 'CREDIT', $2, $2, 'PAYMENT', $3, 'Package purchase') "#, ) .bind(wallet_id) .bind(tracecoins as i64) .bind(payment.id) .execute(&state.pool) .await .ok(); } let _ = sqlx::query( r#" INSERT INTO notifications (user_id, title, body, notification_type, reference_id) VALUES ($1, $2, $3, $4, $5) "#, ) .bind(payment.user_id) .bind("Tracecoins Purchased Successfully") .bind(format!("Your {} Tracecoin package has been credited to your wallet.", tracecoins)) .bind("PAYMENT") .bind(payment.id) .execute(&state.pool) .await .ok(); Ok(Json(VerifyPaymentResponse { verified: true, payment_id: payload.payment_id, status: "success".to_string(), message: "Payment verified successfully".to_string(), })) } async fn get_payment_status( State(state): State, axum::extract::Path(payment_id): axum::extract::Path, ) -> Result, (StatusCode, String)> { tracing::info!("Getting payment status: payment_id={}", payment_id); let status_url = format!("{}/{}", state.beeceptor_url.trim_end_matches('/'), payment_id); let resp = state .client .get(&status_url) .send() .await .map_err(|e| (StatusCode::BAD_GATEWAY, format!("Beeceptor error: {}", e)))?; let status = resp.status(); let body: serde_json::Value = resp .json() .await .map_err(|e| (StatusCode::BAD_GATEWAY, format!("Parse error: {}", e)))?; if !status.is_success() { return Ok(Json(PaymentStatusResponse { payment_id, status: "not_found".to_string(), amount: 0, currency: "INR".to_string(), })); } let amount = body.get("amount").and_then(|v| v.as_u64()).unwrap_or(0); let currency = body .get("currency") .and_then(|v| v.as_str()) .unwrap_or("INR") .to_string(); let status_str = body .get("status") .and_then(|v| v.as_str()) .unwrap_or("unknown") .to_string(); Ok(Json(PaymentStatusResponse { payment_id, status: status_str, amount, currency, })) } #[tokio::main] async fn main() { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()), )) .with(tracing_subscriber::fmt::layer()) .init(); let beeceptor_url = std::env::var("BEECEPTOR_URL") .unwrap_or_else(|_| "https://nxtgauge.free.beeceptor.com".to_string()); let db_url = std::env::var("DATABASE_URL") .unwrap_or_else(|_| "postgres://postgres:password@localhost:5432/nxtgauge".to_string()); let pool = PgPool::connect(&db_url) .await .expect("Failed to connect to database"); let state = AppState { beeceptor_url, client: reqwest::Client::new(), pool, }; let app = Router::new() .route("/api/payments/create-order", post(create_order)) .route("/api/payments/verify", post(verify_payment)) .route("/api/payments/{id}/status", get(get_payment_status)) .nest("/api/packages", packages::router()) .with_state(state); let port: u16 = std::env::var("PORT") .unwrap_or_else(|_| "9116".to_string()) .parse() .expect("PORT must be a valid u16"); let addr = SocketAddr::from(([0, 0, 0, 0], port)); tracing::info!("Payments service listening on {}", addr); let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); axum::serve(listener, app).await.unwrap(); }