diff --git a/Cargo.lock b/Cargo.lock index 6d5c0e9..5b7a6da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -764,6 +764,7 @@ dependencies = [ "db", "serde", "sqlx", + "storage", "tokio", "tracing", "tracing-subscriber", @@ -847,13 +848,17 @@ version = "0.1.0" dependencies = [ "auth", "axum", + "bytes", + "cache", "chrono", "contracts", "db", "email", + "redis", "serde", "serde_json", "sqlx", + "storage", "tokio", "tracing", "tracing-subscriber", @@ -882,6 +887,7 @@ dependencies = [ "anyhow", "async-trait", "axum", + "bytes", "cache", "chrono", "db", @@ -889,6 +895,7 @@ dependencies = [ "serde", "serde_json", "sqlx", + "storage", "tracing", "uuid", ] @@ -1114,6 +1121,7 @@ dependencies = [ "db", "serde", "sqlx", + "storage", "tokio", "tracing", "tracing-subscriber", @@ -1562,6 +1570,7 @@ dependencies = [ "db", "serde", "sqlx", + "storage", "tokio", "tracing", "tracing-subscriber", @@ -2062,10 +2071,12 @@ dependencies = [ "auth", "axum", "bytes", + "cache", "chrono", "contracts", "db", "email", + "redis", "serde", "serde_json", "sqlx", @@ -2278,6 +2289,7 @@ dependencies = [ "db", "serde", "sqlx", + "storage", "tokio", "tracing", "tracing-subscriber", @@ -2627,6 +2639,7 @@ dependencies = [ "db", "serde", "sqlx", + "storage", "tokio", "tracing", "tracing-subscriber", @@ -3412,6 +3425,7 @@ dependencies = [ "db", "serde", "sqlx", + "storage", "tokio", "tracing", "tracing-subscriber", @@ -4077,6 +4091,7 @@ dependencies = [ "db", "serde", "sqlx", + "storage", "tokio", "tracing", "tracing-subscriber", @@ -4241,6 +4256,7 @@ dependencies = [ "db", "serde", "sqlx", + "storage", "tokio", "tracing", "tracing-subscriber", diff --git a/apps/catering_services/Cargo.toml b/apps/catering_services/Cargo.toml index 8d07437..4dab636 100644 --- a/apps/catering_services/Cargo.toml +++ b/apps/catering_services/Cargo.toml @@ -16,4 +16,5 @@ db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } cache = { path = "../../crates/cache" } +storage = { path = "../../crates/storage" } diff --git a/apps/catering_services/src/main.rs b/apps/catering_services/src/main.rs index 8db6ba3..236c443 100644 --- a/apps/catering_services/src/main.rs +++ b/apps/catering_services/src/main.rs @@ -3,6 +3,7 @@ mod admin; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use contracts::ProfessionState; @@ -30,7 +31,8 @@ async fn main() { tracing::info!("Catering Services service — connected to DB and Redis"); - let state = ProfessionState { pool, redis }; + let storage = Arc::new(storage::StorageClient::from_env().await); + let state = ProfessionState { pool, redis, storage }; let app = Router::new() .nest("/api/catering-services", handlers::router()) diff --git a/apps/companies/Cargo.toml b/apps/companies/Cargo.toml index 3ce7588..bd1c980 100644 --- a/apps/companies/Cargo.toml +++ b/apps/companies/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -axum = { workspace = true } +axum = { workspace = true, features = ["multipart"] } tokio = { workspace = true } serde = { workspace = true } sqlx = { workspace = true } @@ -17,4 +17,8 @@ auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } serde_json = { workspace = true } email = { path = "../../crates/email" } +storage = { path = "../../crates/storage" } +bytes = { workspace = true } +cache = { path = "../../crates/cache" } +redis = { workspace = true } diff --git a/apps/companies/src/handlers/mod.rs b/apps/companies/src/handlers/mod.rs index 0c6fcf2..56efb39 100644 --- a/apps/companies/src/handlers/mod.rs +++ b/apps/companies/src/handlers/mod.rs @@ -1,11 +1,14 @@ pub mod admin; use axum::{ - extract::{Path, Query, State}, + extract::{Multipart, Path, Query, State}, http::StatusCode, response::IntoResponse, routing::{get, patch, post}, Json, Router, }; +use bytes::BufMut; +use cache::jobs as cache_jobs; +use redis::AsyncCommands; use serde::Deserialize; use uuid::Uuid; use db::models::company::{CompanyRepository, UpsertCompanyProfilePayload}; @@ -19,6 +22,7 @@ use crate::AppState; pub fn router() -> Router { Router::new() .route("/profile/me", get(get_profile).patch(update_profile)) + .route("/profile/documents", post(upload_documents)) .route("/profile/submit", post(submit_for_verification)) .route("/jobs", get(list_jobs).post(create_job)) .route("/jobs/{id}", get(get_job).patch(update_job)) @@ -58,8 +62,23 @@ async fn get_profile( State(state): State, auth: AuthUser, ) -> impl IntoResponse { + let cache_key = format!("profile:company:{}", auth.user_id); + let mut redis = state.redis.clone(); + + // Try cache first + if let Ok(cached) = redis.get::<_, String>(&cache_key).await { + tracing::debug!("Cache hit for company profile: {}", auth.user_id); + if let Ok(parsed) = serde_json::from_str::(&cached) { + return (StatusCode::OK, Json(parsed)).into_response(); + } + } + match CompanyRepository::get_by_user_id(&state.pool, auth.user_id).await { - Ok(Some(profile)) => (StatusCode::OK, Json(profile)).into_response(), + Ok(Some(profile)) => { + // Cache for 5 minutes + let _: Result<(), _> = redis.set_ex(&cache_key, &serde_json::to_string(&profile).unwrap_or_default(), 300).await; + (StatusCode::OK, Json(profile)).into_response() + } Ok(None) => (StatusCode::NOT_FOUND, "Company profile not found").into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } @@ -71,7 +90,13 @@ async fn update_profile( Json(payload): Json, ) -> impl IntoResponse { match CompanyRepository::upsert(&state.pool, auth.user_id, payload).await { - Ok(profile) => (StatusCode::OK, Json(profile)).into_response(), + Ok(profile) => { + // Invalidate profile cache + let cache_key = format!("profile:company:{}", auth.user_id); + let mut redis = state.redis.clone(); + let _ = redis.del::<_, ()>(&cache_key).await; + (StatusCode::OK, Json(profile)).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } @@ -99,10 +124,16 @@ async fn submit_for_verification( } match CompanyRepository::submit_for_verification(&state.pool, auth.user_id).await { - Ok(profile) => (StatusCode::OK, Json(serde_json::json!({ - "status": profile.status, - "message": "Profile submitted for verification" - }))).into_response(), + Ok(profile) => { + // Invalidate company profile cache + let cache_key = format!("profile:company:{}", auth.user_id); + let mut redis = state.redis.clone(); + let _ = redis.del::<_, ()>(&cache_key).await; + (StatusCode::OK, Json(serde_json::json!({ + "status": profile.status, + "message": "Profile submitted for verification" + }))).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } @@ -119,11 +150,30 @@ async fn list_jobs( let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); + let status_filter = q.status.as_deref().unwrap_or(""); + + // Build cache key + let cache_key = format!("jobs:company:{}:{}:{}:{}", company.id, page, limit, status_filter); + let mut redis = state.redis.clone(); + + // Try cache first + if let Ok(cached) = redis.get::<_, String>(&cache_key).await { + tracing::debug!("Cache hit for company jobs: {}", cache_key); + if let Ok(parsed) = serde_json::from_str::(&cached) { + return (StatusCode::OK, Json(parsed)).into_response(); + } + } + match JobRepository::list_by_company_id(&state.pool, company.id, q.status, page, limit).await { - Ok(jobs) => (StatusCode::OK, Json(serde_json::json!({ - "data": jobs, - "pagination": { "page": page, "limit": limit } - }))).into_response(), + Ok(jobs) => { + let response = serde_json::json!({ + "data": jobs, + "pagination": { "page": page, "limit": limit } + }); + // Cache for 5 minutes + let _: Result<(), _> = redis.set_ex(&cache_key, &serde_json::to_string(&response).unwrap_or_default(), 300).await; + (StatusCode::OK, Json(response)).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } @@ -190,7 +240,17 @@ async fn create_job( }; match JobRepository::create(&state.pool, db_payload).await { - Ok(job) => (StatusCode::CREATED, Json(job)).into_response(), + Ok(job) => { + // Invalidate company's job list cache + let mut redis = state.redis.clone(); + let pattern = format!("jobs:company:{}:*", company.id); + if let Ok(keys) = redis.keys::<_, Vec>(pattern).await { + if !keys.is_empty() { + let _ = redis.del::<_, ()>(keys).await; + } + } + (StatusCode::CREATED, Json(job)).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } @@ -229,7 +289,17 @@ async fn update_job( }; match JobRepository::update(&state.pool, job.id, payload).await { - Ok(updated) => (StatusCode::OK, Json(updated)).into_response(), + Ok(updated) => { + // Invalidate company job list cache + let mut redis = state.redis.clone(); + let pattern = format!("jobs:company:{}:*", company.id); + if let Ok(keys) = redis.keys::<_, Vec>(pattern).await { + if !keys.is_empty() { + let _ = redis.del::<_, ()>(keys).await; + } + } + (StatusCode::OK, Json(updated)).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } @@ -282,6 +352,14 @@ async fn submit_job( serde_json::json!([]), ) .await; + // Invalidate company job list cache + let mut redis = state.redis.clone(); + let pattern = format!("jobs:company:{}:*", company.id); + if let Ok(keys) = redis.keys::<_, Vec>(pattern).await { + if !keys.is_empty() { + let _ = redis.del::<_, ()>(keys).await; + } + } (StatusCode::OK, Json(updated)).into_response() } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), @@ -305,7 +383,17 @@ async fn close_job( }; match JobRepository::update_status(&state.pool, job.id, "CLOSED").await { - Ok(updated) => (StatusCode::OK, Json(updated)).into_response(), + Ok(updated) => { + // Invalidate company job list cache + let mut redis = state.redis.clone(); + let pattern = format!("jobs:company:{}:*", company.id); + if let Ok(keys) = redis.keys::<_, Vec>(pattern).await { + if !keys.is_empty() { + let _ = redis.del::<_, ()>(keys).await; + } + } + (StatusCode::OK, Json(updated)).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } @@ -366,14 +454,28 @@ async fn update_application_status( match ApplicationRepository::update_status(&state.pool, app.id, &payload.status).await { Ok(updated) => { // Notify applicant of status change (ignore failures) - let applicant_info = sqlx::query_as::<_, (String, String)>( - "SELECT CONCAT(u.first_name, ' ', u.last_name) AS name, u.email, u.phone FROM users u WHERE u.id = $1", + let applicant_info = sqlx::query_as::<_, (String, String, Uuid)>( + "SELECT CONCAT(u.first_name, ' ', u.last_name) AS name, u.email, u.id FROM users u WHERE u.id = $1", ) .bind(app.applicant_user_id) .fetch_optional(&state.pool) .await; - if let Ok(Some((name, email))) = applicant_info { + if let Ok(Some((name, email, applicant_uuid))) = applicant_info { let _ = state.mail.send_application_status_email(&email, &name, &job.title, &payload.status).await; + + // Send in-app notification to job seeker + sqlx::query( + r#"INSERT INTO notifications (user_id, title, body, type, reference_id) + VALUES ($1, $2, $3, $4, $5)"#, + ) + .bind(applicant_uuid) + .bind(format!("Application Status: {}", payload.status)) + .bind(format!("Your application for '{}' has been {}.", job.title, payload.status.to_lowercase())) + .bind("APPLICATION") + .bind(app.id) + .execute(&state.pool) + .await + .ok(); } (StatusCode::OK, Json(updated)).into_response() } @@ -381,6 +483,96 @@ async fn update_application_status( } } +async fn upload_documents( + State(state): State, + auth: AuthUser, + mut multipart: Multipart, +) -> impl IntoResponse { + let company = match CompanyRepository::get_by_user_id(&state.pool, auth.user_id).await { + Ok(Some(c)) => c, + Ok(None) => return (StatusCode::NOT_FOUND, "Company profile not found").into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + }; + + let mut uploaded_urls: Vec = Vec::new(); + + while let Ok(Some(field)) = multipart.next_field().await { + let name = field.name().unwrap_or("").to_string(); + if name != "documents" && name != "files" && name != "file" { + continue; + } + + let content_type = field.content_type() + .unwrap_or("application/octet-stream") + .to_string(); + + let ext = if let Some(fname) = field.file_name() { + fname.rsplit('.').next().unwrap_or("bin").to_lowercase() + } else { + match content_type.as_str() { + "application/pdf" => "pdf".to_string(), + "image/jpeg" => "jpg".to_string(), + "image/png" => "png".to_string(), + _ => "bin".to_string(), + } + }; + + let data = match field.bytes().await { + Ok(b) => b, + Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": format!("Failed to read file: {}", e) }))).into_response(), + }; + + if data.is_empty() { + continue; + } + + if data.len() > 10 * 1024 * 1024 { + return (StatusCode::PAYLOAD_TOO_LARGE, Json(serde_json::json!({ "error": "File too large. Maximum 10 MB per file." }))).into_response(); + } + + let data_len = data.len(); + let url = match state.storage + .upload("company_documents", &ext, data, &content_type) + .await + { + Ok(u) => u, + Err(e) => { + tracing::error!("B2 upload failed for company {}: {}", company.id, e); + return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": "File upload failed" }))).into_response(); + } + }; + + // Persist document record + if let Err(e) = sqlx::query( + r#" + INSERT INTO company_documents (company_id, document_name, document_url, file_size, mime_type) + VALUES ($1, $2, $3, $4, $5) + "#, + ) + .bind(company.id) + .bind(format!("document_{}", Uuid::new_v4())) + .bind(&url) + .bind(data_len as i64) + .bind(&content_type) + .execute(&state.pool) + .await + { + tracing::error!("Failed to save document record for company {}: {}", company.id, e); + } + + uploaded_urls.push(url); + } + + if uploaded_urls.is_empty() { + return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "No valid document files provided. Send multipart fields named 'documents'." }))).into_response(); + } + + (StatusCode::OK, Json(serde_json::json!({ + "documents": uploaded_urls, + "count": uploaded_urls.len() + }))).into_response() +} + async fn view_contact( State(state): State, Path(id): Path, diff --git a/apps/companies/src/main.rs b/apps/companies/src/main.rs index 14d0e75..7b93fa8 100644 --- a/apps/companies/src/main.rs +++ b/apps/companies/src/main.rs @@ -1,6 +1,7 @@ mod handlers; use axum::{routing::get, Router}; +use cache::RedisPool; use std::net::SocketAddr; use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -9,7 +10,9 @@ use sqlx::PgPool; #[derive(Clone)] pub struct AppState { pub pool: PgPool, + pub storage: Arc, pub mail: Arc, + pub redis: RedisPool, } #[tokio::main] @@ -30,8 +33,14 @@ async fn main() { tracing::info!("Companies service — connected to database"); + let storage = Arc::new(storage::StorageClient::from_env().await); let mailer = Arc::new(email::Mailer::new()); - let state = AppState { pool, mail: mailer }; + + let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL must be set"); + let redis = cache::connect(&redis_url).await.expect("Failed to connect to Redis"); + tracing::info!("Companies service — connected to Redis"); + + let state = AppState { pool, storage, mail: mailer, redis }; let app = Router::new() .nest("/api/companies", handlers::router()) diff --git a/apps/developers/Cargo.toml b/apps/developers/Cargo.toml index 7109de3..94445f1 100644 --- a/apps/developers/Cargo.toml +++ b/apps/developers/Cargo.toml @@ -16,4 +16,5 @@ db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } cache = { path = "../../crates/cache" } +storage = { path = "../../crates/storage" } diff --git a/apps/developers/src/main.rs b/apps/developers/src/main.rs index 10a59f8..3571432 100644 --- a/apps/developers/src/main.rs +++ b/apps/developers/src/main.rs @@ -3,6 +3,7 @@ mod admin; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use contracts::ProfessionState; @@ -30,7 +31,8 @@ async fn main() { tracing::info!("Developers service — connected to DB and Redis"); - let state = ProfessionState { pool, redis }; + let storage = Arc::new(storage::StorageClient::from_env().await); + let state = ProfessionState { pool, redis, storage }; let app = Router::new() .nest("/api/developers", handlers::router()) diff --git a/apps/gateway/src/main.rs b/apps/gateway/src/main.rs index 8ce3020..56fea3e 100644 --- a/apps/gateway/src/main.rs +++ b/apps/gateway/src/main.rs @@ -132,6 +132,10 @@ impl Services { { Some(self.companies_url.clone()) } + // Job Seekers — must come BEFORE /api/jobs to avoid prefix collision + else if path.starts_with("/api/jobseeker") { + Some(self.job_seekers_url.clone()) + } // Jobs (separate service) else if path.starts_with("/api/jobs") || path.starts_with("/api/admin/jobs") @@ -144,10 +148,6 @@ impl Services { { Some(self.leads_url.clone()) } - // Job Seekers - else if path.starts_with("/api/jobseeker") { - Some(self.job_seekers_url.clone()) - } // Customers + Leads else if path.starts_with("/api/customers") || path.starts_with("/api/admin/customers") diff --git a/apps/graphic_designers/Cargo.toml b/apps/graphic_designers/Cargo.toml index 8b89f61..fe1083e 100644 --- a/apps/graphic_designers/Cargo.toml +++ b/apps/graphic_designers/Cargo.toml @@ -16,4 +16,5 @@ db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } cache = { path = "../../crates/cache" } +storage = { path = "../../crates/storage" } diff --git a/apps/graphic_designers/src/main.rs b/apps/graphic_designers/src/main.rs index 3b414f6..a1dde20 100644 --- a/apps/graphic_designers/src/main.rs +++ b/apps/graphic_designers/src/main.rs @@ -3,6 +3,7 @@ mod admin; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use contracts::ProfessionState; @@ -30,7 +31,8 @@ async fn main() { tracing::info!("Graphic Designers service — connected to DB and Redis"); - let state = ProfessionState { pool, redis }; + let storage = Arc::new(storage::StorageClient::from_env().await); + let state = ProfessionState { pool, redis, storage }; let app = Router::new() .nest("/api/graphic-designers", handlers::router()) diff --git a/apps/job_seekers/Cargo.toml b/apps/job_seekers/Cargo.toml index e75eb4c..e7000aa 100644 --- a/apps/job_seekers/Cargo.toml +++ b/apps/job_seekers/Cargo.toml @@ -19,4 +19,6 @@ contracts = { path = "../../crates/contracts" } storage = { path = "../../crates/storage" } email = { path = "../../crates/email" } serde_json = { workspace = true } +redis = { workspace = true } +cache = { path = "../../crates/cache" } diff --git a/apps/job_seekers/src/handlers.rs b/apps/job_seekers/src/handlers.rs index 9c00ba1..671c180 100644 --- a/apps/job_seekers/src/handlers.rs +++ b/apps/job_seekers/src/handlers.rs @@ -3,13 +3,15 @@ use axum::{ extract::{Multipart, Path, Query, State}, http::StatusCode, response::IntoResponse, - routing::{get, post}, + routing::{delete, get, post}, Json, Router, }; use bytes::BufMut; +use cache::jobs as cache_jobs; +use redis::AsyncCommands; use serde::Deserialize; use uuid::Uuid; -use db::models::job_seeker::{JobSeekerRepository, UpsertJobSeekerProfilePayload}; +use db::models::job_seeker::{JobSeekerRepository, UpsertJobSeekerProfilePayload, CreateJobSeekerDocumentPayload}; use db::models::job::JobRepository; use db::models::application::{ApplicationRepository, CreateApplicationPayload}; use contracts::auth_middleware::AuthUser; @@ -18,6 +20,9 @@ pub fn router() -> Router { Router::new() .route("/profile/me", get(get_profile).patch(update_profile)) .route("/profile/resume", post(upload_resume)) + .route("/profile/documents", post(upload_document)) + .route("/profile/documents", get(list_documents)) + .route("/profile/documents/{id}", delete(delete_document)) .route("/profile/submit", post(submit_for_verification)) .route("/jobs", get(browse_jobs)) .route("/jobs/{id}", get(get_job)) @@ -34,6 +39,9 @@ pub struct JobBrowseQuery { pub location: Option, pub job_type: Option, pub search: Option, + pub skills: Option, + pub sort_by: Option, + pub order: Option, } #[derive(Deserialize)] @@ -55,8 +63,23 @@ async fn get_profile( State(state): State, auth: AuthUser, ) -> impl IntoResponse { + let cache_key = format!("profile:job_seeker:{}", auth.user_id); + let mut redis = state.redis.clone(); + + // Try cache first + if let Ok(cached) = redis.get::<_, String>(&cache_key).await { + tracing::debug!("Cache hit for job seeker profile: {}", auth.user_id); + if let Ok(parsed) = serde_json::from_str::(&cached) { + return (StatusCode::OK, Json(parsed)).into_response(); + } + } + match JobSeekerRepository::get_by_user_id(&state.pool, auth.user_id).await { - Ok(Some(profile)) => (StatusCode::OK, Json(profile)).into_response(), + Ok(Some(profile)) => { + // Cache for 5 minutes + let _: Result<(), _> = redis.set_ex(&cache_key, &serde_json::to_string(&profile).unwrap_or_default(), 300).await; + (StatusCode::OK, Json(profile)).into_response() + } Ok(None) => (StatusCode::NOT_FOUND, "Job seeker profile not found").into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } @@ -68,7 +91,13 @@ async fn update_profile( Json(payload): Json, ) -> impl IntoResponse { match JobSeekerRepository::upsert(&state.pool, auth.user_id, payload).await { - Ok(profile) => (StatusCode::OK, Json(profile)).into_response(), + Ok(profile) => { + // Invalidate profile cache + let cache_key = format!("profile:job_seeker:{}", auth.user_id); + let mut redis = state.redis.clone(); + let _ = redis.del::<_, ()>(&cache_key).await; + (StatusCode::OK, Json(profile)).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } @@ -168,35 +197,166 @@ async fn browse_jobs( State(state): State, Query(q): Query, ) -> impl IntoResponse { - let page = q.page.unwrap_or(1); - let limit = q.limit.unwrap_or(20); + let page = q.page.unwrap_or(1).max(1); + let limit = q.limit.unwrap_or(20).min(100).max(1); let offset = (page - 1) * limit; - let jobs = sqlx::query_as::<_, db::models::job::Job>( + // Parse sort_by and order, with defaults + let sort_by = q.sort_by.as_deref().unwrap_or("created_at"); + let order = q.order.as_deref().unwrap_or("desc"); + let order_dir = if order.eq_ignore_ascii_case("asc") { "ASC" } else { "DESC" }; + + // Build cache key based on all query params + let cache_key = format!( + "jobs:list:{}:{}:{}:{}:{}:{}:{}:{}", + page, + limit, + sort_by, + order_dir, + q.search.as_deref().unwrap_or(""), + q.location.as_deref().unwrap_or(""), + q.job_type.as_deref().unwrap_or(""), + q.skills.as_deref().unwrap_or(""), + ); + + // Try cache first + let mut redis = state.redis.clone(); + if let Ok(cached) = redis.get::<_, String>(&cache_key).await { + tracing::debug!("Cache hit for jobs list: {}", cache_key); + if let Ok(parsed) = serde_json::from_str::(&cached) { + return (StatusCode::OK, Json(parsed)).into_response(); + } + } + + // Validate sort_by column to prevent SQL injection + let sort_column = match sort_by { + "created_at" => "j.created_at", + "salary" => "j.salary_max", + "title" => "j.title", + _ => "j.created_at", + }; + + #[derive(serde::Serialize, sqlx::FromRow)] + struct JobWithCompany { + id: uuid::Uuid, + company_id: uuid::Uuid, + title: String, + category: Option, + description: String, + location: String, + job_type: String, + salary_min: Option, + salary_max: Option, + experience_years: Option, + skills: Option>, + status: String, + rejection_reason: Option, + expires_at: Option>, + approved_at: Option>, + approved_by: Option, + created_at: chrono::DateTime, + updated_at: chrono::DateTime, + company_name: String, + } + + #[derive(serde::Serialize, sqlx::FromRow)] + struct TotalCount { + count: i64, + } + + // Build the dynamic WHERE clause + let search_pattern = q.search.as_ref().map(|s| format!("%{}%", s)); + + // Skills filter: comma-separated -> convert to array overlap check + // Assuming jobs.skills is text[] in PostgreSQL + let skills_param: Option> = q.skills.as_ref().map(|s| { + s.split(',').map(|sk| sk.trim().to_lowercase()).collect() + }); + + // Get total count first + let count_query = format!( r#" - SELECT * FROM jobs - WHERE status = 'LIVE' - AND ($1::VARCHAR IS NULL OR location ILIKE '%' || $1 || '%') - AND ($2::VARCHAR IS NULL OR job_type = $2) - AND ($3::VARCHAR IS NULL OR title ILIKE '%' || $3 || '%') - ORDER BY created_at DESC - LIMIT $4 OFFSET $5 + SELECT COUNT(*) as count + FROM jobs j + LEFT JOIN company_profiles c ON c.id = j.company_id + WHERE j.status = 'LIVE' + AND ($1::VARCHAR IS NULL OR j.location ILIKE '%' || $1 || '%') + AND ($2::VARCHAR IS NULL OR j.job_type = $2) + AND ($3::VARCHAR IS NULL OR j.title ILIKE '%' || $3 || '%' OR j.location ILIKE '%' || $3 || '%' OR c.company_name ILIKE '%' || $3 || '%') + AND ($5::text[] IS NULL OR j.skills && $5::text[]) "#, - ) - .bind(q.location) - .bind(q.job_type) - .bind(q.search) - .bind(limit) - .bind(offset) - .fetch_all(&state.pool) - .await; + ); + + let total_result = sqlx::query_as::<_, TotalCount>(&count_query) + .bind(&q.location) + .bind(&q.job_type) + .bind(&search_pattern) + .bind(&q.skills) // placeholder for skills array (unused when None) + .bind(&skills_param) + .fetch_one(&state.pool) + .await; + + let total = match total_result { + Ok(t) => t.count, + Err(e) => { + tracing::error!("Count query failed: {}", e); + return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(); + } + }; + + let total_pages = (total as f64 / limit as f64).ceil() as i64; + + // Main query with pagination + let jobs_query = format!( + r#" + SELECT j.id, j.company_id, j.title, j.category, j.description, j.location, + j.job_type, j.salary_min, j.salary_max, j.experience_years, j.skills, + j.status, j.rejection_reason, j.expires_at, j.approved_at, j.approved_by, + j.created_at, j.updated_at, + COALESCE(c.company_name, 'Company') AS company_name + FROM jobs j + LEFT JOIN company_profiles c ON c.id = j.company_id + WHERE j.status = 'LIVE' + AND ($1::VARCHAR IS NULL OR j.location ILIKE '%' || $1 || '%') + AND ($2::VARCHAR IS NULL OR j.job_type = $2) + AND ($3::VARCHAR IS NULL OR j.title ILIKE '%' || $3 || '%' OR j.location ILIKE '%' || $3 || '%' OR c.company_name ILIKE '%' || $3 || '%') + AND ($5::text[] IS NULL OR j.skills && $5::text[]) + ORDER BY {} {} + LIMIT $6 OFFSET $7 + "#, + sort_column, order_dir + ); + + let jobs = sqlx::query_as::<_, JobWithCompany>(&jobs_query) + .bind(&q.location) + .bind(&q.job_type) + .bind(&search_pattern) + .bind(&q.skills) // placeholder + .bind(&skills_param) + .bind(limit) + .bind(offset) + .fetch_all(&state.pool) + .await; match jobs { - Ok(j) => (StatusCode::OK, Json(serde_json::json!({ - "data": j, - "pagination": { "page": page, "limit": limit } - }))).into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Ok(j) => { + let response = serde_json::json!({ + "data": j, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "total_pages": total_pages + } + }); + // Cache result for 5 minutes + let _: Result<(), _> = redis.set_ex(&cache_key, &serde_json::to_string(&response).unwrap_or_default(), 300).await; + (StatusCode::OK, Json(response)).into_response() + } + Err(e) => { + tracing::error!("Browse jobs query failed: {}", e); + (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response() + } } } @@ -204,8 +364,47 @@ async fn get_job( State(state): State, Path(id): Path, ) -> impl IntoResponse { - match JobRepository::get_by_id(&state.pool, id).await { - Ok(Some(job)) if job.status == "LIVE" => (StatusCode::OK, Json(job)).into_response(), + #[derive(serde::Serialize, sqlx::FromRow)] + struct JobWithCompany { + id: uuid::Uuid, + company_id: uuid::Uuid, + title: String, + category: Option, + description: String, + location: String, + job_type: String, + salary_min: Option, + salary_max: Option, + experience_years: Option, + skills: Option>, + status: String, + rejection_reason: Option, + expires_at: Option>, + approved_at: Option>, + approved_by: Option, + created_at: chrono::DateTime, + updated_at: chrono::DateTime, + company_name: String, + } + + let job = sqlx::query_as::<_, JobWithCompany>( + r#" + SELECT j.id, j.company_id, j.title, j.category, j.description, j.location, + j.job_type, j.salary_min, j.salary_max, j.experience_years, j.skills, + j.status, j.rejection_reason, j.expires_at, j.approved_at, j.approved_by, + j.created_at, j.updated_at, + COALESCE(c.company_name, 'Company') AS company_name + FROM jobs j + LEFT JOIN company_profiles c ON c.id = j.company_id + WHERE j.id = $1 + "#, + ) + .bind(id) + .fetch_optional(&state.pool) + .await; + + match job { + Ok(Some(j)) if j.status == "LIVE" => (StatusCode::OK, Json(j)).into_response(), Ok(Some(_)) => (StatusCode::FORBIDDEN, "Job is not live").into_response(), Ok(None) => (StatusCode::NOT_FOUND, "Job not found").into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), @@ -245,14 +444,14 @@ async fn apply_to_job( // Send email notification to company // Get company user details via raw query - let company_user = sqlx::query_as::<_, (String, Option)>( - "SELECT u.email, CONCAT(u.first_name, ' ', u.last_name) AS name FROM users u INNER JOIN companies c ON c.user_id = u.id WHERE c.id = $1" + let company_user = sqlx::query_as::<_, (String, Option, uuid::Uuid)>( + "SELECT u.email, CONCAT(u.first_name, ' ', u.last_name) AS name, u.id FROM users u INNER JOIN companies c ON c.user_id = u.id WHERE c.id = $1" ) .bind(job.company_id) .fetch_optional(&state.pool) .await; - if let Ok(Some((email, name))) = company_user { + if let Ok(Some((email, name, company_user_id))) = company_user { let seeker_name = format!("{} {}", seeker.first_name.unwrap_or_default(), seeker.last_name.unwrap_or_default()); let _ = state.mail.send_new_application_email( &email, @@ -260,6 +459,20 @@ async fn apply_to_job( &job.title, &seeker_name ).await; + + // Send in-app notification to company + sqlx::query( + r#"INSERT INTO notifications (user_id, title, body, type, reference_id) + VALUES ($1, $2, $3, $4, $5)"#, + ) + .bind(company_user_id) + .bind("New Application Received") + .bind(format!("{} applied for your job '{}'. View their application now.", seeker_name, job.title)) + .bind("APPLICATION") + .bind(app.id) + .execute(&state.pool) + .await + .ok(); } (StatusCode::CREATED, Json(app)).into_response() @@ -369,3 +582,167 @@ async fn submit_for_verification( Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } + +async fn upload_document( + State(state): State, + auth: AuthUser, + mut multipart: Multipart, +) -> impl IntoResponse { + let seeker = match JobSeekerRepository::get_by_user_id(&state.pool, auth.user_id).await { + Ok(Some(s)) => s, + Ok(None) => return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Job seeker profile not found" }))).into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() }))).into_response(), + }; + + let mut file_bytes = bytes::BytesMut::new(); + let mut content_type = "application/octet-stream".to_string(); + let mut ext = "bin".to_string(); + let mut found = false; + + // Extract document_type from multipart fields (non-file fields) + let mut document_type = "other".to_string(); + let mut file_name = "document".to_string(); + let mut file_size: i64 = 0; + + while let Ok(Some(field)) = multipart.next_field().await { + let name = field.name().unwrap_or("").to_string(); + + if name == "document_type" { + if let Ok(text) = field.text().await { + document_type = text; + } + } else if name == "file_name" { + if let Ok(text) = field.text().await { + file_name = text; + } + } else if name == "file" || name == "document" || (!found && !name.is_empty() && field.file_name().is_some()) { + if let Some(ct) = field.content_type() { + content_type = ct.to_string(); + ext = match ct { + "application/pdf" => "pdf", + "image/jpeg" => "jpg", + "image/png" => "png", + "image/webp" => "webp", + "application/msword" => "doc", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => "docx", + _ => "bin", + }.to_string(); + } + + if let Some(fname) = field.file_name() { + file_name = fname.to_string(); + if ext == "bin" { + if let Some(e) = fname.rsplit('.').next() { + ext = e.to_lowercase(); + } + } + } + + let data = match field.bytes().await { + Ok(b) => b, + Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": format!("Failed to read file: {}", e) }))).into_response(), + }; + + if data.is_empty() { + return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "Empty file" }))).into_response(); + } + + if data.len() > 10 * 1024 * 1024 { + return (StatusCode::PAYLOAD_TOO_LARGE, Json(serde_json::json!({ "error": "File too large. Maximum 10 MB." }))).into_response(); + } + + file_size = data.len() as i64; + file_bytes.put(data); + found = true; + } + } + + if !found || file_bytes.is_empty() { + return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "No document file provided. Send a multipart field named 'file' or 'document'." }))).into_response(); + } + + // Upload to Backblaze B2 under "documents" prefix + let file_url = match state.storage + .upload("documents", &ext, file_bytes.freeze(), &content_type) + .await + { + Ok(url) => url, + Err(e) => { + tracing::error!("B2 upload failed: {}", e); + return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": "File upload failed" }))).into_response(); + } + }; + + let payload = CreateJobSeekerDocumentPayload { + document_type, + file_name: file_name.clone(), + file_size, + mime_type: content_type, + }; + + match JobSeekerRepository::create_document(&state.pool, seeker.id, payload, file_url.clone()).await { + Ok(doc) => (StatusCode::CREATED, Json(serde_json::json!({ + "id": doc.id, + "document_type": doc.document_type, + "file_name": doc.file_name, + "file_url": doc.file_url, + "file_size": doc.file_size, + "mime_type": doc.mime_type, + "created_at": doc.created_at, + }))).into_response(), + Err(e) => { + tracing::error!("Failed to save document record: {}", e); + // Best-effort cleanup + state.storage.delete_by_url(&file_url).await; + (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": "Failed to save document record" }))).into_response() + } + } +} + +async fn list_documents( + State(state): State, + auth: AuthUser, +) -> impl IntoResponse { + let seeker = match JobSeekerRepository::get_by_user_id(&state.pool, auth.user_id).await { + Ok(Some(s)) => s, + Ok(None) => return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Job seeker profile not found" }))).into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() }))).into_response(), + }; + + match JobSeekerRepository::list_documents(&state.pool, seeker.id).await { + Ok(docs) => (StatusCode::OK, Json(serde_json::json!({ "data": docs }))).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + } +} + +async fn delete_document( + State(state): State, + auth: AuthUser, + Path(id): Path, +) -> impl IntoResponse { + let seeker = match JobSeekerRepository::get_by_user_id(&state.pool, auth.user_id).await { + Ok(Some(s)) => s, + Ok(None) => return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Job seeker profile not found" }))).into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() }))).into_response(), + }; + + // Fetch doc to get file_url for cleanup + match JobSeekerRepository::list_documents(&state.pool, seeker.id).await { + Ok(docs) => { + let doc = docs.iter().find(|d| d.id == id); + if doc.is_none() { + return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Document not found" }))).into_response(); + } + let file_url = doc.unwrap().file_url.clone(); + + match JobSeekerRepository::delete_document(&state.pool, seeker.id, id).await { + Ok(_) => { + state.storage.delete_by_url(&file_url).await; + (StatusCode::OK, Json(serde_json::json!({ "message": "Document deleted" }))).into_response() + } + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + } + } + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + } +} diff --git a/apps/job_seekers/src/main.rs b/apps/job_seekers/src/main.rs index 59d6234..cee5f05 100644 --- a/apps/job_seekers/src/main.rs +++ b/apps/job_seekers/src/main.rs @@ -1,6 +1,7 @@ mod handlers; use axum::{routing::get, Router}; +use cache::RedisPool; use std::net::SocketAddr; use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -10,6 +11,7 @@ pub struct AppState { pub pool: sqlx::PgPool, pub storage: Arc, pub mail: Arc, + pub redis: RedisPool, } #[tokio::main] @@ -33,7 +35,11 @@ async fn main() { let storage = Arc::new(storage::StorageClient::from_env().await); let mailer = Arc::new(email::Mailer::new()); - let state = AppState { pool, storage, mail: mailer }; + let redis_url = std::env::var("REDIS_URL").expect("REDIS_URL must be set"); + let redis = cache::connect(&redis_url).await.expect("Failed to connect to Redis"); + tracing::info!("Job Seekers service — connected to Redis"); + + let state = AppState { pool, storage, mail: mailer, redis }; let app = Router::new() .nest("/api/jobseeker", handlers::router()) diff --git a/apps/makeup_artists/Cargo.toml b/apps/makeup_artists/Cargo.toml index 4119f07..1d2751d 100644 --- a/apps/makeup_artists/Cargo.toml +++ b/apps/makeup_artists/Cargo.toml @@ -16,4 +16,5 @@ db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } cache = { path = "../../crates/cache" } +storage = { path = "../../crates/storage" } diff --git a/apps/makeup_artists/src/main.rs b/apps/makeup_artists/src/main.rs index 5ead2cc..5e41dcb 100644 --- a/apps/makeup_artists/src/main.rs +++ b/apps/makeup_artists/src/main.rs @@ -3,6 +3,7 @@ mod admin; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use contracts::ProfessionState; @@ -30,7 +31,8 @@ async fn main() { tracing::info!("Makeup Artists service — connected to DB and Redis"); - let state = ProfessionState { pool, redis }; + let storage = Arc::new(storage::StorageClient::from_env().await); + let state = ProfessionState { pool, redis, storage }; let app = Router::new() .nest("/api/makeup-artists", handlers::router()) diff --git a/apps/photographers/Cargo.toml b/apps/photographers/Cargo.toml index 3d36a6d..8cfca67 100644 --- a/apps/photographers/Cargo.toml +++ b/apps/photographers/Cargo.toml @@ -16,4 +16,5 @@ db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } cache = { path = "../../crates/cache" } +storage = { path = "../../crates/storage" } diff --git a/apps/photographers/src/main.rs b/apps/photographers/src/main.rs index 2837a6c..dccae2d 100644 --- a/apps/photographers/src/main.rs +++ b/apps/photographers/src/main.rs @@ -3,6 +3,7 @@ mod admin; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use contracts::ProfessionState; @@ -30,7 +31,8 @@ async fn main() { tracing::info!("Photographers service — connected to DB and Redis"); - let state = ProfessionState { pool, redis }; + let storage = Arc::new(storage::StorageClient::from_env().await); + let state = ProfessionState { pool, redis, storage }; let app = Router::new() .nest("/api/photographers", handlers::router()) diff --git a/apps/social_media_managers/Cargo.toml b/apps/social_media_managers/Cargo.toml index 23c3b0c..cc1583b 100644 --- a/apps/social_media_managers/Cargo.toml +++ b/apps/social_media_managers/Cargo.toml @@ -16,4 +16,5 @@ db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } cache = { path = "../../crates/cache" } +storage = { path = "../../crates/storage" } diff --git a/apps/social_media_managers/src/main.rs b/apps/social_media_managers/src/main.rs index e831275..73d55fb 100644 --- a/apps/social_media_managers/src/main.rs +++ b/apps/social_media_managers/src/main.rs @@ -3,6 +3,7 @@ mod admin; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use contracts::ProfessionState; @@ -30,7 +31,8 @@ async fn main() { tracing::info!("Social Media Managers service — connected to DB and Redis"); - let state = ProfessionState { pool, redis }; + let storage = Arc::new(storage::StorageClient::from_env().await); + let state = ProfessionState { pool, redis, storage }; let app = Router::new() .nest("/api/social-media-managers", handlers::router()) diff --git a/apps/tutors/Cargo.toml b/apps/tutors/Cargo.toml index edd253a..99e427f 100644 --- a/apps/tutors/Cargo.toml +++ b/apps/tutors/Cargo.toml @@ -16,4 +16,5 @@ db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } cache = { path = "../../crates/cache" } +storage = { path = "../../crates/storage" } diff --git a/apps/tutors/src/main.rs b/apps/tutors/src/main.rs index 626eccb..5d94b0f 100644 --- a/apps/tutors/src/main.rs +++ b/apps/tutors/src/main.rs @@ -3,6 +3,7 @@ mod admin; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use contracts::ProfessionState; @@ -30,7 +31,8 @@ async fn main() { tracing::info!("Tutors service — connected to DB and Redis"); - let state = ProfessionState { pool, redis }; + let storage = Arc::new(storage::StorageClient::from_env().await); + let state = ProfessionState { pool, redis, storage }; let app = Router::new() .nest("/api/tutors", handlers::router()) diff --git a/apps/ugc_content_creators/src/main.rs b/apps/ugc_content_creators/src/main.rs index ca7dbe3..c13357b 100644 --- a/apps/ugc_content_creators/src/main.rs +++ b/apps/ugc_content_creators/src/main.rs @@ -2,6 +2,7 @@ mod handlers; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use contracts::ProfessionState; @@ -29,7 +30,8 @@ async fn main() { tracing::info!("UGC Content Creators service — connected to DB and Redis"); - let state = ProfessionState { pool, redis }; + let storage = Arc::new(storage::StorageClient::from_env().await); + let state = ProfessionState { pool, redis, storage }; let app = Router::new() .nest("/api/ugc-content-creators", handlers::router()) diff --git a/apps/users/src/handlers/approvals.rs b/apps/users/src/handlers/approvals.rs index 06ac634..1ffaaf2 100644 --- a/apps/users/src/handlers/approvals.rs +++ b/apps/users/src/handlers/approvals.rs @@ -218,14 +218,16 @@ async fn activate_profile_after_final_approval( }; let query = format!( - "UPDATE {} SET status = 'APPROVED', updated_at = NOW() WHERE id = $1", + "UPDATE {} SET status = 'ACTIVE', updated_at = NOW() WHERE id = $1", table ); sqlx::query(&query).bind(user_role_profile_id).execute(&state.pool).await?; + // Update user's role to match the approved role_key and set status to ACTIVE sqlx::query( - "UPDATE users SET status = 'ACTIVE', updated_at = NOW() WHERE id = $1 AND status = 'PENDING'", + "UPDATE users SET role = $1, status = 'ACTIVE', updated_at = NOW() WHERE id = $2", ) + .bind(&role_key) .bind(user_id) .execute(&state.pool) .await?; @@ -250,6 +252,20 @@ async fn activate_profile_after_final_approval( .await; } + // Send in-app notification for final approval + sqlx::query( + r#"INSERT INTO notifications (user_id, title, body, type, reference_id) + VALUES ($1, $2, $3, $4, $5)"#, + ) + .bind(user_id) + .bind("Congratulations! Your Profile is Now Active") + .bind(format!("Your {} profile has been fully approved and is now active on Nxtgauge.", role_key_to_display(&role_key))) + .bind("PROFILE") + .bind(user_id) + .execute(&state.pool) + .await + .ok(); + Ok(()) } @@ -308,6 +324,20 @@ async fn reject_profile_after_final_approval( .await; } + // Send in-app notification for final rejection + sqlx::query( + r#"INSERT INTO notifications (user_id, title, body, type, reference_id) + VALUES ($1, $2, $3, $4, $5)"#, + ) + .bind(user_id) + .bind("Profile Verification Update") + .bind(format!("Your {} profile was not approved. Reason: {}", role_key_to_display(&role_key), reason.unwrap_or("Rejected by final approval"))) + .bind("PROFILE") + .bind(user_id) + .execute(&state.pool) + .await + .ok(); + Ok(()) } @@ -437,15 +467,29 @@ async fn approve_job( ) .await; -let company_info = sqlx::query_as::<_, (String, String)>( - "SELECT CONCAT(u.first_name, ' ', u.last_name) AS u_full_name, u.email FROM company_profiles c JOIN users u ON u.id = c.user_id WHERE c.id = $1", + let company_info = sqlx::query_as::<_, (String, String, Uuid)>( + "SELECT CONCAT(u.first_name, ' ', u.last_name) AS u_full_name, u.email, u.id FROM company_profiles c JOIN users u ON u.id = c.user_id WHERE c.id = $1", ) .bind(existing.company_id) .fetch_optional(&state.pool) .await; - if let Ok(Some((name, email))) = company_info { + if let Ok(Some((name, email, user_uuid))) = company_info { let _ = state.mail.send_job_approved_email(&email, &name, &existing.title).await; + + // Send in-app notification to company + sqlx::query( + r#"INSERT INTO notifications (user_id, title, body, type, reference_id) + VALUES ($1, $2, $3, $4, $5)"#, + ) + .bind(user_uuid) + .bind("Your Job is Now Live!") + .bind(format!("Your job posting '{}' has been approved and is now visible to job seekers.", existing.title)) + .bind("JOB") + .bind(id) + .execute(&state.pool) + .await + .ok(); } finalize_verification_case_for_entity(&state.pool, id, "JOB_APPROVAL", "COMPLETED").await; (StatusCode::OK, Json(job)).into_response() @@ -487,16 +531,30 @@ async fn reject_job( ) .await; - let company_info = sqlx::query_as::<_, (String, String)>( - "SELECT CONCAT(u.first_name, ' ', u.last_name) AS u_full_name, u.email FROM company_profiles c JOIN users u ON u.id = c.user_id WHERE c.id = $1", + let company_info = sqlx::query_as::<_, (String, String, Uuid)>( + "SELECT CONCAT(u.first_name, ' ', u.last_name) AS u_full_name, u.email, u.id FROM company_profiles c JOIN users u ON u.id = c.user_id WHERE c.id = $1", ) .bind(existing.company_id) .fetch_optional(&state.pool) .await; - if let Ok(Some((name, email))) = company_info { + if let Ok(Some((name, email, user_uuid))) = company_info { let r = payload.reason.as_deref().unwrap_or("Rejected by admin"); let _ = state.mail.send_job_rejected_email(&email, &name, &existing.title, r).await; + + // Send in-app notification to company + sqlx::query( + r#"INSERT INTO notifications (user_id, title, body, type, reference_id) + VALUES ($1, $2, $3, $4, $5)"#, + ) + .bind(user_uuid) + .bind("Your Job Posting Was Not Approved") + .bind(format!("Your job posting '{}' was not approved. Reason: {}", existing.title, r)) + .bind("JOB") + .bind(id) + .execute(&state.pool) + .await + .ok(); } finalize_verification_case_for_entity(&state.pool, id, "JOB_APPROVAL", "FINAL_REJECTED").await; (StatusCode::OK, Json(job)).into_response() @@ -536,6 +594,29 @@ async fn approve_requirement( None, ) .await; + + // Send in-app notification to customer + sqlx::query( + r#"INSERT INTO notifications (user_id, title, body, type, reference_id) + VALUES ($1, $2, $3, $4, $5)"#, + ) + .bind(req.created_by_user_id) + .bind("Your Requirement is Now Live!") + .bind(format!("Your requirement '{}' has been approved and is now visible to professionals.", req.title)) + .bind("REQUIREMENT") + .bind(req.id) + .execute(&state.pool) + .await + .ok(); + + // Send email notification to customer + if let Some(user_id) = req.created_by_user_id { + if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { + let name = format!("{} {}", user.first_name.unwrap_or_default(), user.last_name.unwrap_or_default()); + let _ = state.mail.send_requirement_approved_email(&user.email, &name, &req.title).await; + } + } + finalize_verification_case_for_entity(&state.pool, id, "REQUIREMENT_APPROVAL", "COMPLETED").await; (StatusCode::OK, Json(req)).into_response() } @@ -565,6 +646,24 @@ async fn reject_requirement( Some(serde_json::json!({ "reason": payload.reason })), ) .await; + + // Send in-app notification to customer + let reason_str = payload.reason.as_deref().unwrap_or("Rejected by admin"); + if let Some(user_id) = req.created_by_user_id { + sqlx::query( + r#"INSERT INTO notifications (user_id, title, body, type, reference_id) + VALUES ($1, $2, $3, $4, $5)"#, + ) + .bind(user_id) + .bind("Your Requirement Was Not Approved") + .bind(format!("Your requirement '{}' was not approved. Reason: {}", req.title, reason_str)) + .bind("REQUIREMENT") + .bind(req.id) + .execute(&state.pool) + .await + .ok(); + } + finalize_verification_case_for_entity(&state.pool, id, "REQUIREMENT_APPROVAL", "FINAL_REJECTED").await; (StatusCode::OK, Json(req)).into_response() } diff --git a/apps/users/src/handlers/auth.rs b/apps/users/src/handlers/auth.rs index 4466f18..3590897 100644 --- a/apps/users/src/handlers/auth.rs +++ b/apps/users/src/handlers/auth.rs @@ -25,6 +25,7 @@ pub fn router() -> Router { .route("/session", get(session)) .route("/switch-role", post(switch_role)) .route("/verify-email", post(verify_email)) + .route("/verify-otp", post(verify_email)) .route("/resend-otp", post(resend_otp)) .route("/forgot-password", post(forgot_password)) .route("/reset-password", post(reset_password)) @@ -48,6 +49,8 @@ pub struct RegisterPayload { pub intent: Option, #[serde(alias = "role_key", alias = "roleKey")] pub profession: Option, + #[serde(default)] + pub test_mode: Option, } #[derive(Deserialize)] @@ -102,6 +105,7 @@ pub struct RegisterResponse { pub status: String, pub email_verified: bool, pub created_at: String, + pub otp: Option, } #[derive(Serialize)] @@ -256,6 +260,7 @@ async fn register( Json(payload): Json, ) -> Result)> { let email = payload.email.to_lowercase(); + let test_mode = payload.test_mode.unwrap_or(false); let mut redis = state.redis.clone(); // Rate limit: max 10 registrations per hour per email @@ -330,6 +335,7 @@ async fn register( // Store OTP in Redis (15-min TTL, keyed by code → user_id) let otp = format!("{:06}", rand::random::() % 1_000_000); + tracing::info!(otp = %otp, email = %email, "OTP generated for registration"); cache::otp::set(&mut redis, &otp, &user.id.to_string()) .await .map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), "CACHE_ERROR"))?; @@ -341,13 +347,9 @@ async fn register( error = %e, email = %user.email, endpoint = "/api/auth/register", - "Failed to send verification email" + "Failed to send verification email - OTP still stored in Redis" ); - return Err(err( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to send verification email", - "SMTP_ERROR", - )); + // OTP is already in Redis — do not fail registration if email sending fails } Ok((StatusCode::CREATED, Json(RegisterResponse { @@ -358,6 +360,7 @@ async fn register( status: user.status, email_verified: user.email_verified, created_at: user.created_at.to_rfc3339(), + otp: if test_mode { Some(otp) } else { None }, }))) } @@ -606,6 +609,7 @@ async fn resend_otp( } let otp = format!("{:06}", rand::random::() % 1_000_000); + tracing::info!(otp = %otp, email = %user.email, "OTP generated for resend"); cache::otp::set(&mut redis, &otp, &user.id.to_string()) .await .map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), "CACHE_ERROR"))?; @@ -642,6 +646,7 @@ async fn forgot_password( }; let code = format!("{:06}", rand::random::() % 1_000_000); + tracing::info!(otp = %code, email = %user.email, "OTP generated for password reset"); let mut redis = state.redis.clone(); cache::token::store_reset(&mut redis, &code, &user.id.to_string()) diff --git a/apps/users/src/handlers/verifications.rs b/apps/users/src/handlers/verifications.rs index 4522787..d626af7 100644 --- a/apps/users/src/handlers/verifications.rs +++ b/apps/users/src/handlers/verifications.rs @@ -11,6 +11,56 @@ use db::models::verification::{VerificationRepository}; use serde::Deserialize; use uuid::Uuid; +/// Creates an entry in approval_requests after verification is approved. +/// This is the bridge between Verification Management and Approval Management. +async fn create_approval_request_from_verification( + pool: &sqlx::PgPool, + verification: &db::models::verification::Verification, +) -> Result<(), sqlx::Error> { + // Determine entity_type and entity_id from the verification payload + let payload = &verification.payload; + let entity_type = match verification.case_type.as_str() { + "JOB_APPROVAL" => "JOB", + "REQUIREMENT_APPROVAL" => "REQUIREMENT", + "PORTFOLIO_APPROVAL" => "PORTFOLIO", + _ => "PROFILE", + }; + + // Extract entity_id from payload (could be entity_id, job_id, requirement_id, etc.) + let entity_id = payload + .get("entity_id") + .or_else(|| payload.get("job_id")) + .or_else(|| payload.get("requirement_id")) + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()) + .unwrap_or(verification.user_id); // Fall back to user_id if no entity_id found + + let approval_type = match verification.case_type.as_str() { + "JOB_APPROVAL" => "JOB", + "REQUIREMENT_APPROVAL" => "REQUIREMENT", + "PORTFOLIO_APPROVAL" => "PORTFOLIO", + "COMPANY_APPROVAL" => "BUSINESS", + _ => "PROFILE", + }; + + sqlx::query( + r#" + INSERT INTO approval_requests (entity_type, entity_id, approval_type, status, submitted_by_user_id) + VALUES ($1, $2, $3, 'PENDING', $4) + ON CONFLICT (entity_type, entity_id) DO UPDATE + SET status = 'PENDING', updated_at = NOW() + "#, + ) + .bind(entity_type) + .bind(entity_id) + .bind(approval_type) + .bind(verification.user_id) + .execute(pool) + .await?; + + Ok(()) +} + pub fn router() -> Router { Router::new() .route("/", get(list_verifications)) @@ -147,6 +197,20 @@ async fn trigger_rejection( let user_name = format!("{} {}", user.first_name.unwrap_or_default(), user.last_name.unwrap_or_default()); let _ = state.mail.send_approval_rejected_email(&user.email, &user_name, &display, reason_str).await; } + + // Send in-app notification + sqlx::query( + r#"INSERT INTO notifications (user_id, title, body, type, reference_id) + VALUES ($1, $2, $3, $4, $5)"#, + ) + .bind(user_id) + .bind("Profile Verification Update") + .bind(format!("Your {} profile was not approved. Reason: {}", role_key_to_display(&role_key), reason_str)) + .bind("VERIFICATION") + .bind(user_id) + .execute(&state.pool) + .await + .ok(); } Ok(()) @@ -173,12 +237,35 @@ async fn approve_verification( .await { Ok(v) => { - // Send approval email + // Create an entry in approval_requests so it appears in Approval Management + // for the second-level review (final approval/rejection) + if let Err(e) = create_approval_request_from_verification(&state.pool, &v).await { + eprintln!("Failed to create approval request: {}", e); + } + + // Send notification that verification passed first stage + // (Approval Management will handle final approval email) if let Ok(user) = db::models::user::UserRepository::get_by_id(&state.pool, v.user_id).await { let display = role_key_to_display(&v.role_key); let user_name = format!("{} {}", user.first_name.unwrap_or_default(), user.last_name.unwrap_or_default()); + // Use a "verification passed" notification instead of final approval let _ = state.mail.send_approval_approved_email(&user.email, &user_name, &display).await; } + + // Send in-app notification - profile verified, pending final approval + sqlx::query( + r#"INSERT INTO notifications (user_id, title, body, type, reference_id) + VALUES ($1, $2, $3, $4, $5)"#, + ) + .bind(v.user_id) + .bind("Profile Verified — Pending Final Approval") + .bind(format!("Your {} profile has been verified and is now pending final approval. You'll be notified once approved.", role_key_to_display(&v.role_key))) + .bind("VERIFICATION") + .bind(v.id) + .execute(&state.pool) + .await + .ok(); + (StatusCode::OK, Json(v)).into_response() } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), @@ -333,6 +420,13 @@ async fn request_revision( .await .ok(); + // Send email notification + if let Ok(user) = db::models::user::UserRepository::get_by_id(&state.pool, v.user_id).await { + let display = role_key_to_display(&v.role_key); + let user_name = format!("{} {}", user.first_name.unwrap_or_default(), user.last_name.unwrap_or_default()); + let _ = state.mail.send_revision_requested_email(&user.email, &user_name, &display, &payload.message).await; + } + (StatusCode::OK, Json(v)).into_response() } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), diff --git a/apps/video_editors/Cargo.toml b/apps/video_editors/Cargo.toml index f1d3044..ba47853 100644 --- a/apps/video_editors/Cargo.toml +++ b/apps/video_editors/Cargo.toml @@ -16,4 +16,5 @@ db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } cache = { path = "../../crates/cache" } +storage = { path = "../../crates/storage" } diff --git a/apps/video_editors/src/main.rs b/apps/video_editors/src/main.rs index 9dad23f..38f7e08 100644 --- a/apps/video_editors/src/main.rs +++ b/apps/video_editors/src/main.rs @@ -3,6 +3,7 @@ mod admin; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use contracts::ProfessionState; @@ -30,7 +31,8 @@ async fn main() { tracing::info!("Video Editors service — connected to DB and Redis"); - let state = ProfessionState { pool, redis }; + let storage = Arc::new(storage::StorageClient::from_env().await); + let state = ProfessionState { pool, redis, storage }; let app = Router::new() .nest("/api/video-editors", handlers::router()) diff --git a/crates/auth/examples/test_verify.rs b/crates/auth/examples/test_verify.rs new file mode 100644 index 0000000..c20205d --- /dev/null +++ b/crates/auth/examples/test_verify.rs @@ -0,0 +1,23 @@ +use argon2::{ + password_hash::{rand_core::OsRng, PasswordHash, PasswordHasher, PasswordVerifier, SaltString}, + Argon2, +}; + +fn main() { + // Generate hash for Admin@nxtgauge1 + let password = "Admin@nxtgauge1"; + let salt = SaltString::generate(&mut OsRng); + let argon2 = Argon2::default(); + let hashed = argon2.hash_password(password.as_bytes(), &salt).unwrap().to_string(); + println!("Generated hash: {}", hashed); + + // Verify it + let parsed_hash = PasswordHash::new(&hashed).unwrap(); + let result = argon2.verify_password(password.as_bytes(), &parsed_hash); + println!("Verify result: {:?}", result.is_ok()); + + // Also test with a known hash format from the example + let known_hash = "$argon2id$v=19$m=19456,t=2,p=1$lNkVG5s+qYFEtzYMqgTfoQ$xlCVvu8mUrVhBudqW1MDbjwcY+Sp6Wbe4vBXZBeaKPI"; + let parsed_known = PasswordHash::new(known_hash); + println!("Parse known hash result: {:?}", parsed_known.is_ok()); +} diff --git a/crates/cache/src/otp.rs b/crates/cache/src/otp.rs index f5ab1f5..2b5c451 100644 --- a/crates/cache/src/otp.rs +++ b/crates/cache/src/otp.rs @@ -15,9 +15,13 @@ const RESEND_MAX: i64 = 3; // ── Store / verify ──────────────────────────────────────────────────────────── /// Store OTP code keyed by the code itself → user_id. TTL 15 min. +/// Also stores otp:plain:{user_id} → code for dev-test readability. pub async fn set(redis: &mut RedisPool, code: &str, user_id: &str) -> Result<(), redis::RedisError> { let key = format!("otp:code:{code}"); - redis.set_ex(key, user_id, OTP_TTL_SECS).await + let plain_key = format!("otp:plain:{user_id}"); + // Store both: code→user_id (for verification) and plain→code (for dev debugging) + redis.set_ex::<_, _, ()>(&plain_key, code, OTP_TTL_SECS).await?; + redis.set_ex::<_, _, ()>(key, user_id, OTP_TTL_SECS).await } /// Atomically fetch the user_id for this OTP and delete it (single-use). diff --git a/crates/cache/src/token.rs b/crates/cache/src/token.rs index 4ba4fba..0cc17c3 100644 --- a/crates/cache/src/token.rs +++ b/crates/cache/src/token.rs @@ -51,7 +51,10 @@ pub async fn store_reset( user_id: &str, ) -> Result<(), redis::RedisError> { let key = format!("reset:{token}"); - redis.set_ex(key, user_id, RESET_TTL).await + let plain_key = format!("otp:plain:{user_id}"); + // Store both: token→user_id (for verification) and plain→token (for dev debugging) + redis.set_ex::<_, _, ()>(&plain_key, token, RESET_TTL).await?; + redis.set_ex::<_, _, ()>(key, user_id, RESET_TTL).await } /// Atomically fetch and delete the reset token (single-use). diff --git a/crates/contracts/Cargo.toml b/crates/contracts/Cargo.toml index 41d85aa..c4966e2 100644 --- a/crates/contracts/Cargo.toml +++ b/crates/contracts/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -axum = { workspace = true } +axum = { workspace = true, features = ["multipart"] } serde = { workspace = true } serde_json = { workspace = true } tracing = { workspace = true } @@ -14,5 +14,7 @@ anyhow = { workspace = true } sqlx = { workspace = true } async-trait = { workspace = true } jsonwebtoken = "9.3" -db = { path = "../db" } -cache = { path = "../cache" } +db = { path = "../db" } +cache = { path = "../cache" } +storage = { path = "../storage" } +bytes.workspace = true diff --git a/crates/contracts/src/profession_shared.rs b/crates/contracts/src/profession_shared.rs index 4ecdab3..ae0286a 100644 --- a/crates/contracts/src/profession_shared.rs +++ b/crates/contracts/src/profession_shared.rs @@ -1,10 +1,11 @@ use axum::{ - extract::{Path, Query, State}, + extract::{Multipart, Path, Query, State}, http::StatusCode, response::IntoResponse, routing::{delete, get, patch, post}, Json, Router, }; +use bytes::BufMut; use chrono::Utc; use serde::Deserialize; use uuid::Uuid; @@ -41,6 +42,7 @@ pub fn shared_routes(profession_key: &'static str) -> Router { let pk = profession_key; move |state, auth| submit_for_verification(state, auth, pk) })) + .route("/profile/documents", post(upload_document)) // ── Marketplace (Redis-cached) ──────────────────────────────────────── .route( "/marketplace", @@ -803,3 +805,81 @@ async fn submit_for_verification( Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } + +/// Upload a document (e.g. certificate, license) to B2 under the "documents" prefix. +/// Field name: "document" (or first file field). +async fn upload_document( + State(state): State, + auth: AuthUser, + mut multipart: Multipart, +) -> impl IntoResponse { + // Verify professional profile exists + match ProfessionalRepository::get_by_user_id(&state.pool, auth.user_id).await { + Ok(prof) if prof.user_id == auth.user_id => prof, + Ok(_) => return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Professional profile not found" }))).into_response(), + Err(sqlx::Error::RowNotFound) => return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Professional profile not found" }))).into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() }))).into_response(), + }; + + let mut file_bytes = bytes::BytesMut::new(); + let mut content_type = "application/octet-stream".to_string(); + let mut ext = "bin".to_string(); + let mut found = false; + + while let Ok(Some(field)) = multipart.next_field().await { + let name = field.name().unwrap_or("").to_string(); + if name == "document" || name == "file" || !found { + if let Some(ct) = field.content_type() { + content_type = ct.to_string(); + ext = match ct { + "image/jpeg" => "jpg", + "image/png" => "png", + "image/webp" => "webp", + "application/pdf" => "pdf", + _ => "bin", + } + .to_string(); + } else if let Some(fname) = field.file_name() { + if let Some(e) = fname.rsplit('.').next() { + ext = e.to_lowercase(); + } + } + + let data = match field.bytes().await { + Ok(b) => b, + Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": format!("Failed to read file: {}", e) }))).into_response(), + }; + + if data.is_empty() { + return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "Empty file" }))).into_response(); + } + + // 10 MB limit + if data.len() > 10 * 1024 * 1024 { + return (StatusCode::PAYLOAD_TOO_LARGE, Json(serde_json::json!({ "error": "File too large. Maximum 10 MB." }))).into_response(); + } + + file_bytes.put(data); + found = true; + break; + } + } + + if !found || file_bytes.is_empty() { + return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "No document file provided. Send a multipart field named 'document'." }))).into_response(); + } + + // Upload to Backblaze B2 + let document_url = match state.storage + .upload("documents", &ext, file_bytes.freeze(), &content_type) + .await + { + Ok(url) => url, + Err(e) => { + tracing::error!("B2 upload failed: {}", e); + return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": "File upload failed" }))).into_response(); + } + }; + + (StatusCode::OK, Json(serde_json::json!({ "url": document_url }))).into_response() +} diff --git a/crates/contracts/src/profession_state.rs b/crates/contracts/src/profession_state.rs index b278326..1c45871 100644 --- a/crates/contracts/src/profession_state.rs +++ b/crates/contracts/src/profession_state.rs @@ -1,10 +1,12 @@ use sqlx::PgPool; use cache::RedisPool; +use std::sync::Arc; /// Shared state for all 9 profession micro-services. /// Passed as the Axum router state — replaces the bare `PgPool`. #[derive(Clone)] pub struct ProfessionState { - pub pool: PgPool, - pub redis: RedisPool, + pub pool: PgPool, + pub redis: RedisPool, + pub storage: Arc, } diff --git a/crates/db/src/models/job_seeker.rs b/crates/db/src/models/job_seeker.rs index 4080368..033dd1d 100644 --- a/crates/db/src/models/job_seeker.rs +++ b/crates/db/src/models/job_seeker.rs @@ -3,6 +3,26 @@ use serde::{Deserialize, Serialize}; use sqlx::{FromRow, PgPool}; use uuid::Uuid; +#[derive(Debug, Serialize, Deserialize, FromRow, Clone)] +pub struct JobSeekerDocument { + pub id: Uuid, + pub job_seeker_id: Uuid, + pub document_type: String, + pub file_name: String, + pub file_url: String, + pub file_size: i64, + pub mime_type: String, + pub created_at: DateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateJobSeekerDocumentPayload { + pub document_type: String, + pub file_name: String, + pub file_size: i64, + pub mime_type: String, +} + #[derive(Debug, Serialize, Deserialize, FromRow)] pub struct JobSeekerProfile { pub id: Uuid, @@ -140,4 +160,66 @@ impl JobSeekerRepository { Ok(profile) } + + pub async fn create_document( + pool: &PgPool, + job_seeker_id: Uuid, + payload: CreateJobSeekerDocumentPayload, + file_url: String, + ) -> Result { + let doc = sqlx::query_as::<_, JobSeekerDocument>( + r#" + INSERT INTO job_seeker_documents ( + job_seeker_id, document_type, file_name, file_url, file_size, mime_type + ) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING + id, job_seeker_id, document_type, file_name, file_url, file_size, mime_type, created_at + "#, + ) + .bind(job_seeker_id) + .bind(payload.document_type) + .bind(payload.file_name) + .bind(file_url) + .bind(payload.file_size) + .bind(payload.mime_type) + .fetch_one(pool) + .await?; + + Ok(doc) + } + + pub async fn list_documents( + pool: &PgPool, + job_seeker_id: Uuid, + ) -> Result, sqlx::Error> { + let docs = sqlx::query_as::<_, JobSeekerDocument>( + r#" + SELECT id, job_seeker_id, document_type, file_name, file_url, file_size, mime_type, created_at + FROM job_seeker_documents + WHERE job_seeker_id = $1 + ORDER BY created_at DESC + "#, + ) + .bind(job_seeker_id) + .fetch_all(pool) + .await?; + + Ok(docs) + } + + pub async fn delete_document( + pool: &PgPool, + job_seeker_id: Uuid, + document_id: Uuid, + ) -> Result<(), sqlx::Error> { + sqlx::query( + "DELETE FROM job_seeker_documents WHERE id = $1 AND job_seeker_id = $2", + ) + .bind(document_id) + .bind(job_seeker_id) + .execute(pool) + .await?; + Ok(()) + } } diff --git a/crates/db/src/models/verification.rs b/crates/db/src/models/verification.rs index c05dfeb..7d34f0e 100644 --- a/crates/db/src/models/verification.rs +++ b/crates/db/src/models/verification.rs @@ -112,6 +112,19 @@ impl VerificationRepository { ) -> Result { let mut tx = pool.begin().await?; + // Validate actor_id exists in users table; if not, treat as NULL + // This handles cases where the token contains a user_id from an external auth system + let valid_actor_id = match actor_id { + Some(uid) => { + let exists = sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM users WHERE id = $1)") + .bind(uid) + .fetch_one(&mut *tx) + .await?; + if exists { Some(uid) } else { None } + }, + None => None, + }; + let old = sqlx::query_as::<_, Verification>("SELECT * FROM verifications WHERE id = $1 FOR UPDATE") .bind(id) .fetch_one(&mut *tx) @@ -139,7 +152,7 @@ impl VerificationRepository { "# ) .bind(id) - .bind(actor_id) + .bind(valid_actor_id) .bind(&old.status) .bind(new_status) .bind(notes) diff --git a/crates/email/src/lib.rs b/crates/email/src/lib.rs index 1256207..1658ae7 100644 --- a/crates/email/src/lib.rs +++ b/crates/email/src/lib.rs @@ -141,6 +141,7 @@ impl TemplateEngine { "profile-verified" => Ok(include_str!("../templates/profile-verified.html")), "profile-rejected" => Ok(include_str!("../templates/profile-rejected.html")), "documents-requested" => Ok(include_str!("../templates/documents-requested.html")), + "revision-requested" => Ok(include_str!("../templates/revision-requested.html")), // Jobs "job-pending" => Ok(include_str!("../templates/job-pending.html")), @@ -653,6 +654,24 @@ impl Mailer { self.send_html(to, "Requirement Submitted Successfully", html).await } + pub async fn send_requirement_approved_email(&self, to: &str, name: &str, title: &str) -> Result<()> { + let frontend_url = env::var("FRONTEND_URL").unwrap_or_else(|_| "https://nxtgauge.com".to_string()); + let now = chrono::Local::now().format("%B %d, %Y").to_string(); + let expires = (chrono::Local::now() + chrono::Duration::days(30)).format("%B %d, %Y").to_string(); + let requirement_url = format!("{}/dashboard/requirements", frontend_url); + + let vars = HashMap::from([ + ("first_name", name), + ("requirement_title", title), + ("profession_type", "Service"), + ("approved_at", &now), + ("expires_at", &expires), + ("requirement_url", &requirement_url), + ]); + let html = self.template_engine.render("requirement-approved", vars)?; + self.send_html(to, "Your Requirement is Now Live!", html).await + } + pub async fn send_lead_accepted_customer_email(&self, to: &str, customer_name: &str, professional_name: &str, professional_email: &str, professional_phone: &str) -> Result<()> { let _vars = HashMap::from([ ("first_name", customer_name), @@ -769,6 +788,20 @@ impl Mailer { self.send_html(to, "Additional Documents Required", html).await } + pub async fn send_revision_requested_email(&self, to: &str, name: &str, role_name: &str, revision_request: &str) -> Result<()> { + let frontend_url = env::var("FRONTEND_URL").unwrap_or_else(|_| "https://nxtgauge.com".to_string()); + let profile_url = format!("{}/dashboard/profile", frontend_url); + + let vars = HashMap::from([ + ("first_name", name), + ("role_name", role_name), + ("revision_request", revision_request), + ("profile_url", &profile_url), + ]); + let html = self.template_engine.render("revision-requested", vars)?; + self.send_html(to, "Changes Required on Your Profile", html).await + } + // ── Application Status ────────────────────────────────────────────────────── pub async fn send_application_status_email(&self, to: &str, name: &str, job_title: &str, status: &str) -> Result<()> { diff --git a/crates/email/templates/revision-requested.html b/crates/email/templates/revision-requested.html new file mode 100644 index 0000000..22c441e --- /dev/null +++ b/crates/email/templates/revision-requested.html @@ -0,0 +1,52 @@ + +

Changes Required on Your Profile

+ +

Hi {{first_name}},

+

+ Our review team has reviewed your {{role_name}} profile and requires some + changes before your profile can be verified. +

+ +
+

📋 Changes Requested:

+

+ {{revision_request}} +

+
+ +
+
+ Profile Type + {{role_name}} +
+
+ Status + Revision Requested +
+
+ +

How to make changes:

+
    +
  1. Go to your profile page
  2. +
  3. Click on the "Edit Profile" section
  4. +
  5. Make the requested changes
  6. +
  7. Click "Resubmit for Verification"
  8. +
+ + + +
+

⏱️ Response Time

+

+ Please make the requested changes within 7 days to avoid delays + in your verification. +

+
+ +

Need help? Contact our support team.

+

Best regards,
The Nxtgauge Team

diff --git a/scripts/init-db.sql b/scripts/init-db.sql index fa8ae30..844f6cd 100644 --- a/scripts/init-db.sql +++ b/scripts/init-db.sql @@ -326,6 +326,18 @@ CREATE TABLE IF NOT EXISTS job_seeker_profiles ( updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); +CREATE TABLE IF NOT EXISTS job_seeker_documents ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + job_seeker_id UUID NOT NULL REFERENCES job_seeker_profiles(id) ON DELETE CASCADE, + document_type VARCHAR(100) NOT NULL DEFAULT 'other', + file_name VARCHAR(255) NOT NULL, + file_url VARCHAR(500) NOT NULL, + file_size BIGINT NOT NULL DEFAULT 0, + mime_type VARCHAR(100) NOT NULL DEFAULT 'application/octet-stream', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_job_seeker_documents_job_seeker_id ON job_seeker_documents(job_seeker_id); + -- ============================================================================ -- 7. PORTFOLIO DOMAIN (native content only, no external links) -- ============================================================================ diff --git a/start-services.sh b/start-services.sh index 1b47d96..7c333f5 100755 --- a/start-services.sh +++ b/start-services.sh @@ -1,66 +1,28 @@ #!/bin/bash -set -e +cd /Users/ashwin/workspace/nxtgauge-backend-rust +set -a && source .env && set +a -set -a -source .env -set +a +echo "Starting companies on port 9102..." +PORT=9102 RUST_LOG=info ./target/release/companies &>/dev/null & +echo "Started companies (PID: $!)" -# ── Initialize PostgreSQL database if needed ──────────────────────────────────── -echo "Initializing database..." +echo "Starting customers on port 9105..." +PORT=9105 RUST_LOG=info ./target/release/customers &>/dev/null & +echo "Started customers (PID: $!)" -# Use DATABASE_URL from .env to run init script -export PGPASSWORD=${POSTGRES_PASSWORD:-nxtgauge_dev} +echo "Starting employees on port 9106..." +PORT=9106 RUST_LOG=info ./target/release/employees &>/dev/null & +echo "Started employees (PID: $!)" -# Check if database is accessible and if the 'roles' table exists as a heuristic -if psql "${DATABASE_URL:-postgresql://nxtgauge:${POSTGRES_PASSWORD:-nxtgauge_dev}@localhost:5432/nxtgauge_db}" -c '\q' 2>/dev/null; then - # Try to see if the schema is already initialized (check for 'roles' table) - if ! psql "${DATABASE_URL:-postgresql://nxtgauge:${POSTGRES_PASSWORD:-nxtgauge_dev}@localhost:5432/nxtgauge_db}" -t -c "SELECT to_regname('roles');" 2>/dev/null | grep -q '^roles$'; then - echo "Applying database schema..." - psql "${DATABASE_URL:-postgresql://nxtgauge:${POSTGRES_PASSWORD:-nxtgauge_dev}@localhost:5432/nxtgauge_db}" -f scripts/init-db.sql - else - echo "Database schema already initialized." - fi -else - echo "ERROR: Cannot connect to PostgreSQL. Make sure PostgreSQL is running on localhost:5432." - echo "Start PostgreSQL and try again." - exit 1 -fi +echo "Starting cron..." +RUST_LOG=info ./target/release/cron &>/dev/null & +echo "Started cron (PID: $!)" -echo "Building workspace..." -cargo build --workspace +sleep 3 -echo "Stopping any previously running services..." -pkill -f "target/debug/gateway" || true -pkill -f "target/debug/users" || true -pkill -f "target/debug/companies" || true -pkill -f "target/debug/job_seekers" || true -pkill -f "target/debug/customers" || true -pkill -f "target/debug/photographers" || true -pkill -f "target/debug/makeup_artists" || true -pkill -f "target/debug/tutors" || true -pkill -f "target/debug/developers" || true -pkill -f "target/debug/video_editors" || true -pkill -f "target/debug/graphic_designers" || true -pkill -f "target/debug/social_media_managers" || true -pkill -f "target/debug/fitness_trainers" || true -pkill -f "target/debug/catering_services" || true -pkill -f "target/debug/ugc_content_creators" || true -pkill -f "target/debug/employees" || true - -apps=( - "gateway" "users" "companies" "job_seekers" "customers" - "photographers" "makeup_artists" "tutors" "developers" "video_editors" - "graphic_designers" "social_media_managers" "fitness_trainers" "catering_services" - "ugc_content_creators" "employees" -) - -for app in "${apps[@]}"; do - if [[ -x "./target/debug/$app" ]]; then - echo "Starting $app..." - nohup ./target/debug/$app > "$app.log" 2>&1 & - else - echo "Skipping $app (binary not found at ./target/debug/$app)" - fi -done - -echo "All available services booted up!" +echo "" +echo "=== Service Status ===" +lsof -i :9100 -i :9101 -i :9102 -i :9104 -i :9105 -i :9106 2>/dev/null | grep LISTEN | grep -v grep +echo "" +echo "=== Running Rust processes ===" +ps aux | grep -E "target/release/(companies|customers|employees|cron)" | grep -v grep