diff --git a/Cargo.toml b/Cargo.toml index 651b877..9c74e91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ members = [ "crates/auth", "crates/storage", "crates/cache", + "crates/email", ] [workspace.package] @@ -44,3 +45,4 @@ chrono = { version = "0.4", features = ["serde"] } lettre = { version = "0.11", default-features = false, features = ["tokio1-rustls-tls", "smtp-transport", "builder", "serde"] } redis = { version = "0.27", features = ["tokio-comp", "connection-manager"] } async-trait = "0.1" +bytes = "1" diff --git a/apps/companies/Cargo.toml b/apps/companies/Cargo.toml index 9deb171..3ce7588 100644 --- a/apps/companies/Cargo.toml +++ b/apps/companies/Cargo.toml @@ -16,4 +16,5 @@ db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } serde_json = { workspace = true } +email = { path = "../../crates/email" } diff --git a/apps/companies/src/handlers.rs b/apps/companies/src/handlers.rs index 773ceae..f6cdcff 100644 --- a/apps/companies/src/handlers.rs +++ b/apps/companies/src/handlers.rs @@ -6,14 +6,15 @@ use axum::{ Json, Router, }; use serde::Deserialize; -use sqlx::PgPool; use uuid::Uuid; use db::models::company::{CompanyRepository, UpsertCompanyProfilePayload}; use db::models::job::{JobRepository, CreateJobPayload as DbCreateJobPayload, UpdateJobPayload as DbUpdateJobPayload}; use db::models::application::ApplicationRepository; +use db::models::user::UserRepository; use contracts::auth_middleware::AuthUser; +use crate::AppState; -pub fn router() -> Router { +pub fn router() -> Router { Router::new() .route("/profile/me", get(get_profile).patch(update_profile)) .route("/jobs", get(list_jobs).post(create_job)) @@ -51,10 +52,10 @@ pub struct UpdateApplicationStatusPayload { } async fn get_profile( - State(pool): State, + State(state): State, auth: AuthUser, ) -> impl IntoResponse { - match CompanyRepository::get_by_user_id(&pool, auth.user_id).await { + match CompanyRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(profile)) => (StatusCode::OK, Json(profile)).into_response(), Ok(None) => (StatusCode::NOT_FOUND, "Company profile not found").into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), @@ -62,29 +63,29 @@ async fn get_profile( } async fn update_profile( - State(pool): State, + State(state): State, auth: AuthUser, Json(payload): Json, ) -> impl IntoResponse { - match CompanyRepository::upsert(&pool, auth.user_id, payload).await { + match CompanyRepository::upsert(&state.pool, auth.user_id, payload).await { Ok(profile) => (StatusCode::OK, Json(profile)).into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } async fn list_jobs( - State(pool): State, + State(state): State, auth: AuthUser, Query(q): Query, ) -> impl IntoResponse { - let company = match CompanyRepository::get_by_user_id(&pool, auth.user_id).await { + let company = match CompanyRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Company not found").into_response(), }; let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); - match JobRepository::list_by_company_id(&pool, company.id, q.status, page, limit).await { + match JobRepository::list_by_company_id(&state.pool, company.id, q.status, page, limit).await { Ok(jobs) => (StatusCode::OK, Json(serde_json::json!({ "data": jobs, "pagination": { "page": page, "limit": limit } @@ -94,11 +95,11 @@ async fn list_jobs( } async fn create_job( - State(pool): State, + State(state): State, auth: AuthUser, Json(payload): Json, ) -> impl IntoResponse { - let company = match CompanyRepository::get_by_user_id(&pool, auth.user_id).await { + let company = match CompanyRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Company not found").into_response(), }; @@ -120,18 +121,18 @@ async fn create_job( skills: payload.skills, }; - match JobRepository::create(&pool, db_payload).await { + match JobRepository::create(&state.pool, db_payload).await { Ok(job) => (StatusCode::CREATED, Json(job)).into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } async fn get_job( - State(pool): State, + State(state): State, Path(id): Path, _auth: AuthUser, ) -> impl IntoResponse { - match JobRepository::get_by_id(&pool, id).await { + match JobRepository::get_by_id(&state.pool, id).await { Ok(Some(job)) => (StatusCode::OK, Json(job)).into_response(), Ok(None) => (StatusCode::NOT_FOUND, "Job not found").into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), @@ -139,13 +140,12 @@ async fn get_job( } async fn update_job( - State(pool): State, + State(state): State, Path(id): Path, auth: AuthUser, Json(payload): Json, ) -> impl IntoResponse { - // Basic verification: does job belong to auth user's company? - let company = match CompanyRepository::get_by_user_id(&pool, auth.user_id).await { + let company = match CompanyRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Company not found").into_response(), }; @@ -154,29 +154,29 @@ async fn update_job( return (StatusCode::FORBIDDEN, "Company profile approval is required before submitting jobs").into_response(); } - let job = match JobRepository::get_by_id(&pool, id).await { + let job = match JobRepository::get_by_id(&state.pool, id).await { Ok(Some(j)) if j.company_id == company.id => j, Ok(Some(_)) => return (StatusCode::FORBIDDEN, "Access denied").into_response(), _ => return (StatusCode::NOT_FOUND, "Job not found").into_response(), }; - match JobRepository::update(&pool, job.id, payload).await { + match JobRepository::update(&state.pool, job.id, payload).await { Ok(updated) => (StatusCode::OK, Json(updated)).into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } async fn submit_job( - State(pool): State, + State(state): State, Path(id): Path, auth: AuthUser, ) -> impl IntoResponse { - let company = match CompanyRepository::get_by_user_id(&pool, auth.user_id).await { + let company = match CompanyRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Company not found").into_response(), }; - - let job = match JobRepository::get_by_id(&pool, id).await { + + let job = match JobRepository::get_by_id(&state.pool, id).await { Ok(Some(j)) if j.company_id == company.id => j, Ok(Some(_)) => return (StatusCode::FORBIDDEN, "Access denied").into_response(), _ => return (StatusCode::NOT_FOUND, "Job not found").into_response(), @@ -186,46 +186,52 @@ async fn submit_job( return (StatusCode::BAD_REQUEST, "Job already submitted or live").into_response(); } - match JobRepository::update_status(&pool, job.id, "PENDING_APPROVAL").await { - Ok(updated) => (StatusCode::OK, Json(updated)).into_response(), + match JobRepository::update_status(&state.pool, job.id, "PENDING_APPROVAL").await { + Ok(updated) => { + // Fire email to company user (ignore failures) + if let Ok(user) = UserRepository::get_by_id(&state.pool, auth.user_id).await { + let _ = state.mail.send_job_submitted_email(&user.email, &user.full_name, &updated.title).await; + } + (StatusCode::OK, Json(updated)).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } async fn close_job( - State(pool): State, + State(state): State, Path(id): Path, auth: AuthUser, ) -> impl IntoResponse { - let company = match CompanyRepository::get_by_user_id(&pool, auth.user_id).await { + let company = match CompanyRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Company not found").into_response(), }; - - let job = match JobRepository::get_by_id(&pool, id).await { + + let job = match JobRepository::get_by_id(&state.pool, id).await { Ok(Some(j)) if j.company_id == company.id => j, Ok(Some(_)) => return (StatusCode::FORBIDDEN, "Access denied").into_response(), _ => return (StatusCode::NOT_FOUND, "Job not found").into_response(), }; - match JobRepository::update_status(&pool, job.id, "CLOSED").await { + match JobRepository::update_status(&state.pool, job.id, "CLOSED").await { Ok(updated) => (StatusCode::OK, Json(updated)).into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } async fn list_applications( - State(pool): State, + State(state): State, Path(id): Path, auth: AuthUser, Query(q): Query, ) -> impl IntoResponse { - let company = match CompanyRepository::get_by_user_id(&pool, auth.user_id).await { + let company = match CompanyRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Company not found").into_response(), }; - - let job = match JobRepository::get_by_id(&pool, id).await { + + let job = match JobRepository::get_by_id(&state.pool, id).await { Ok(Some(j)) if j.company_id == company.id => j, Ok(Some(_)) => return (StatusCode::FORBIDDEN, "Access denied").into_response(), _ => return (StatusCode::NOT_FOUND, "Job not found").into_response(), @@ -233,7 +239,7 @@ async fn list_applications( let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); - match ApplicationRepository::list_by_job_id(&pool, job.id, q.status, page, limit).await { + match ApplicationRepository::list_by_job_id(&state.pool, job.id, q.status, page, limit).await { Ok(apps) => (StatusCode::OK, Json(serde_json::json!({ "data": apps, "pagination": { "page": page, "limit": limit } @@ -243,22 +249,22 @@ async fn list_applications( } async fn update_application_status( - State(pool): State, + State(state): State, Path(id): Path, auth: AuthUser, Json(payload): Json, ) -> impl IntoResponse { - let app = match ApplicationRepository::get_by_id(&pool, id).await { + let app = match ApplicationRepository::get_by_id(&state.pool, id).await { Ok(Some(a)) => a, _ => return (StatusCode::NOT_FOUND, "Application not found").into_response(), }; - let job = match JobRepository::get_by_id(&pool, app.job_id).await { + let job = match JobRepository::get_by_id(&state.pool, app.job_id).await { Ok(Some(j)) => j, _ => return (StatusCode::INTERNAL_SERVER_ERROR, "Job lost").into_response(), }; - let company = match CompanyRepository::get_by_user_id(&pool, auth.user_id).await { + let company = match CompanyRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::FORBIDDEN, "Access denied").into_response(), }; @@ -267,28 +273,40 @@ async fn update_application_status( return (StatusCode::FORBIDDEN, "Access denied").into_response(); } - match ApplicationRepository::update_status(&pool, app.id, &payload.status).await { - Ok(updated) => (StatusCode::OK, Json(updated)).into_response(), + match ApplicationRepository::update_status(&state.pool, app.id, &payload.status).await { + Ok(updated) => { + // Notify applicant of status change (ignore failures) + let applicant_info = sqlx::query_as::<_, (String, String)>( + "SELECT u.full_name, u.email FROM users u INNER JOIN job_seekers js ON js.user_id = u.id WHERE js.id = $1", + ) + .bind(app.job_seeker_id) + .fetch_optional(&state.pool) + .await; + if let Ok(Some((name, email))) = applicant_info { + let _ = state.mail.send_application_status_email(&email, &name, &job.title, &payload.status).await; + } + (StatusCode::OK, Json(updated)).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } async fn view_contact( - State(pool): State, + State(state): State, Path(id): Path, auth: AuthUser, ) -> impl IntoResponse { - let app = match ApplicationRepository::get_by_id(&pool, id).await { + let app = match ApplicationRepository::get_by_id(&state.pool, id).await { Ok(Some(a)) => a, _ => return (StatusCode::NOT_FOUND, "Application not found").into_response(), }; - let job = match JobRepository::get_by_id(&pool, app.job_id).await { + let job = match JobRepository::get_by_id(&state.pool, app.job_id).await { Ok(Some(j)) => j, _ => return (StatusCode::INTERNAL_SERVER_ERROR, "Job lost").into_response(), }; - let company = match CompanyRepository::get_by_user_id(&pool, auth.user_id).await { + let company = match CompanyRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::FORBIDDEN, "Access denied").into_response(), }; @@ -297,15 +315,78 @@ async fn view_contact( return (StatusCode::FORBIDDEN, "Access denied").into_response(); } - // TODO: logic to deduct quota + fetch job seeker contact info from users table - // For now, just mark viewed and return placeholder - let _ = ApplicationRepository::mark_contact_viewed(&pool, app.id).await; + // If contact was already viewed for this application, return info without deducting again + if !app.contact_viewed { + let total_remaining = company.free_contact_views + company.purchased_contact_views; + if total_remaining <= 0 { + return ( + StatusCode::PAYMENT_REQUIRED, + Json(serde_json::json!({ + "error": "Contact view quota exhausted. Please purchase a package.", + "code": "QUOTA_EXHAUSTED" + })), + ) + .into_response(); + } - (StatusCode::OK, Json(serde_json::json!({ - "application_id": id.to_string(), - "full_name": "Applicant Contact Info Locked", - "email": "hidden@example.com", - "phone": "+91 0000000000", - "message": "Contact revealed" - }))).into_response() + // Deduct from free views first, then purchased + let sql = if company.free_contact_views > 0 { + "UPDATE companies SET free_contact_views = free_contact_views - 1 WHERE id = $1" + } else { + "UPDATE companies SET purchased_contact_views = purchased_contact_views - 1 WHERE id = $1" + }; + + if let Err(e) = sqlx::query(sql).bind(company.id).execute(&state.pool).await { + tracing::error!("Failed to deduct contact view quota: {}", e); + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to deduct quota").into_response(); + } + + if let Err(e) = ApplicationRepository::mark_contact_viewed(&state.pool, app.id).await { + tracing::error!("Failed to mark contact viewed: {}", e); + } + } + + // Fetch job seeker contact info via job_seeker_id → job_seekers.user_id → users + let contact = sqlx::query_as::<_, (Option, String, Option)>( + r#" + SELECT u.full_name, u.email, u.phone + FROM users u + INNER JOIN job_seekers js ON js.user_id = u.id + WHERE js.id = $1 + "#, + ) + .bind(app.job_seeker_id) + .fetch_optional(&state.pool) + .await; + + match contact { + Ok(Some((full_name, email, phone))) => { + // Fetch updated quota to return to client + let updated_company = CompanyRepository::get_by_user_id(&state.pool, auth.user_id) + .await + .ok() + .flatten(); + let (free_remaining, purchased_remaining) = updated_company + .map(|c| (c.free_contact_views, c.purchased_contact_views)) + .unwrap_or((0, 0)); + + (StatusCode::OK, Json(serde_json::json!({ + "application_id": id, + "full_name": full_name, + "email": email, + "phone": phone, + "quota": { + "free_remaining": free_remaining, + "purchased_remaining": purchased_remaining, + "total_remaining": free_remaining + purchased_remaining + } + }))) + .into_response() + } + Ok(None) => (StatusCode::NOT_FOUND, "Applicant not found").into_response(), + Err(e) => { + tracing::error!("Failed to fetch applicant contact: {}", e); + (StatusCode::INTERNAL_SERVER_ERROR, "Failed to fetch contact info").into_response() + } + } } diff --git a/apps/companies/src/main.rs b/apps/companies/src/main.rs index 6dea422..f011ea2 100644 --- a/apps/companies/src/main.rs +++ b/apps/companies/src/main.rs @@ -2,7 +2,15 @@ mod handlers; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use sqlx::PgPool; + +#[derive(Clone)] +pub struct AppState { + pub pool: PgPool, + pub mail: Arc, +} #[tokio::main] async fn main() { @@ -22,10 +30,13 @@ async fn main() { tracing::info!("Companies service — connected to database"); + let mailer = Arc::new(email::Mailer::new()); + let state = AppState { pool, mail: mailer }; + let app = Router::new() .nest("/api/companies", handlers::router()) .route("/health", get(|| async { "Companies OK" })) - .with_state(pool); + .with_state(state); let port: u16 = std::env::var("PORT") .unwrap_or_else(|_| "8081".to_string()) diff --git a/apps/customers/Cargo.toml b/apps/customers/Cargo.toml index 54aed40..578fdcf 100644 --- a/apps/customers/Cargo.toml +++ b/apps/customers/Cargo.toml @@ -16,4 +16,5 @@ db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } serde_json = { workspace = true } +email = { path = "../../crates/email" } diff --git a/apps/customers/src/handlers.rs b/apps/customers/src/handlers.rs index 995e14b..6bf8b59 100644 --- a/apps/customers/src/handlers.rs +++ b/apps/customers/src/handlers.rs @@ -6,15 +6,16 @@ use axum::{ Json, Router, }; use serde::Deserialize; -use sqlx::PgPool; use uuid::Uuid; use db::models::customer::{CustomerRepository, UpsertCustomerProfilePayload}; use db::models::professional::ProfessionalRepository; use db::models::requirement::{RequirementRepository, CreateRequirementPayload as DbCreateRequirementPayload, UpdateRequirementPayload as DbUpdateRequirementPayload}; use db::models::lead_request::LeadRequestRepository; +use db::models::user::UserRepository; use contracts::auth_middleware::AuthUser; +use crate::AppState; -pub fn router() -> Router { +pub fn router() -> Router { Router::new() .route("/profile/me", get(get_profile).patch(update_profile)) .route("/requirements", get(list_requirements).post(create_requirement)) @@ -48,10 +49,10 @@ pub struct RejectRequestPayload { } async fn get_profile( - State(pool): State, + State(state): State, auth: AuthUser, ) -> impl IntoResponse { - match CustomerRepository::get_by_user_id(&pool, auth.user_id).await { + match CustomerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(profile)) => (StatusCode::OK, Json(profile)).into_response(), Ok(None) => (StatusCode::NOT_FOUND, "Customer profile not found").into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), @@ -59,29 +60,29 @@ async fn get_profile( } async fn update_profile( - State(pool): State, + State(state): State, auth: AuthUser, Json(payload): Json, ) -> impl IntoResponse { - match CustomerRepository::upsert(&pool, auth.user_id, payload).await { + match CustomerRepository::upsert(&state.pool, auth.user_id, payload).await { Ok(profile) => (StatusCode::OK, Json(profile)).into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } async fn list_requirements( - State(pool): State, + State(state): State, auth: AuthUser, Query(q): Query, ) -> impl IntoResponse { - let customer = match CustomerRepository::get_by_user_id(&pool, auth.user_id).await { + let customer = match CustomerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Customer not found").into_response(), }; let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); - match RequirementRepository::list_by_customer_id(&pool, customer.id, page, limit).await { + match RequirementRepository::list_by_customer_id(&state.pool, customer.id, page, limit).await { Ok(reqs) => (StatusCode::OK, Json(serde_json::json!({ "data": reqs, "pagination": { "page": page, "limit": limit } @@ -91,11 +92,11 @@ async fn list_requirements( } async fn create_requirement( - State(pool): State, + State(state): State, auth: AuthUser, Json(payload): Json, ) -> impl IntoResponse { - let customer = match CustomerRepository::get_by_user_id(&pool, auth.user_id).await { + let customer = match CustomerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Customer not found").into_response(), }; @@ -121,9 +122,9 @@ async fn create_requirement( extra_data_json: payload.extra_data_json, }; - match RequirementRepository::create(&pool, db_payload).await { + match RequirementRepository::create(&state.pool, db_payload).await { Ok(req) => { - let _ = CustomerRepository::update_active_requirement_count(&pool, customer.id, 1).await; + let _ = CustomerRepository::update_active_requirement_count(&state.pool, customer.id, 1).await; (StatusCode::CREATED, Json(req)).into_response() }, Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), @@ -131,11 +132,11 @@ async fn create_requirement( } async fn get_requirement( - State(pool): State, + State(state): State, Path(id): Path, _auth: AuthUser, ) -> impl IntoResponse { - match RequirementRepository::get_by_id(&pool, id).await { + match RequirementRepository::get_by_id(&state.pool, id).await { Ok(Some(req)) => (StatusCode::OK, Json(req)).into_response(), Ok(None) => (StatusCode::NOT_FOUND, "Requirement not found").into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), @@ -143,40 +144,76 @@ async fn get_requirement( } async fn update_requirement( - State(pool): State, + State(state): State, Path(id): Path, auth: AuthUser, Json(payload): Json, ) -> impl IntoResponse { - let customer = match CustomerRepository::get_by_user_id(&pool, auth.user_id).await { + let customer = match CustomerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Customer not found").into_response(), }; - - let req = match RequirementRepository::get_by_id(&pool, id).await { + + let req = match RequirementRepository::get_by_id(&state.pool, id).await { Ok(Some(r)) if r.customer_id == customer.id => r, Ok(Some(_)) => return (StatusCode::FORBIDDEN, "Access denied").into_response(), _ => return (StatusCode::NOT_FOUND, "Requirement not found").into_response(), }; - match RequirementRepository::update(&pool, req.id, payload).await { + match RequirementRepository::update(&state.pool, req.id, payload).await { Ok(updated) => (StatusCode::OK, Json(updated)).into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } +async fn submit_requirement( + State(state): State, + Path(id): Path, + auth: AuthUser, +) -> impl IntoResponse { + let customer = match CustomerRepository::get_by_user_id(&state.pool, auth.user_id).await { + Ok(Some(c)) => c, + _ => return (StatusCode::NOT_FOUND, "Customer not found").into_response(), + }; + + if customer.status != "APPROVED" { + return (StatusCode::FORBIDDEN, "Customer profile approval is required before submitting requirements").into_response(); + } + + let req = match RequirementRepository::get_by_id(&state.pool, id).await { + Ok(Some(r)) if r.customer_id == customer.id => r, + Ok(Some(_)) => return (StatusCode::FORBIDDEN, "Access denied").into_response(), + _ => return (StatusCode::NOT_FOUND, "Requirement not found").into_response(), + }; + + if req.status != "DRAFT" { + return (StatusCode::BAD_REQUEST, "Requirement already submitted or closed").into_response(); + } + + match RequirementRepository::update_status(&state.pool, req.id, "PENDING_APPROVAL").await { + Ok(updated) => { + // Fire email to customer (ignore failures) + if let Ok(user) = UserRepository::get_by_id(&state.pool, auth.user_id).await { + let _ = state.mail.send_requirement_submitted_email(&user.email, &user.full_name, &updated.title).await; + } + (StatusCode::OK, Json(updated)).into_response() + } + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + } +} + async fn list_requests( - State(pool): State, + State(state): State, Path(id): Path, auth: AuthUser, Query(q): Query, ) -> impl IntoResponse { - let customer = match CustomerRepository::get_by_user_id(&pool, auth.user_id).await { + let customer = match CustomerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Customer not found").into_response(), }; - - let req = match RequirementRepository::get_by_id(&pool, id).await { + + let req = match RequirementRepository::get_by_id(&state.pool, id).await { Ok(Some(r)) if r.customer_id == customer.id => r, Ok(Some(_)) => return (StatusCode::FORBIDDEN, "Access denied").into_response(), _ => return (StatusCode::NOT_FOUND, "Requirement not found").into_response(), @@ -184,7 +221,7 @@ async fn list_requests( let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); - match LeadRequestRepository::list_by_requirement_id(&pool, req.id, page, limit).await { + match LeadRequestRepository::list_by_requirement_id(&state.pool, req.id, page, limit).await { Ok(leads) => (StatusCode::OK, Json(serde_json::json!({ "data": leads, "pagination": { "page": page, "limit": limit } @@ -194,22 +231,22 @@ async fn list_requests( } async fn approve_request( - State(pool): State, + State(state): State, Path((req_id, lead_id)): Path<(Uuid, Uuid)>, auth: AuthUser, ) -> impl IntoResponse { - let customer = match CustomerRepository::get_by_user_id(&pool, auth.user_id).await { + let customer = match CustomerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Customer not found").into_response(), }; - - let req = match RequirementRepository::get_by_id(&pool, req_id).await { + + let req = match RequirementRepository::get_by_id(&state.pool, req_id).await { Ok(Some(r)) if r.customer_id == customer.id => r, Ok(Some(_)) => return (StatusCode::FORBIDDEN, "Access denied").into_response(), _ => return (StatusCode::NOT_FOUND, "Requirement not found").into_response(), }; - let lead = match LeadRequestRepository::get_by_id(&pool, lead_id).await { + let lead = match LeadRequestRepository::get_by_id(&state.pool, lead_id).await { Ok(Some(l)) if l.requirement_id == req.id => l, _ => return (StatusCode::NOT_FOUND, "Lead request not found").into_response(), }; @@ -218,16 +255,16 @@ async fn approve_request( return (StatusCode::BAD_REQUEST, "Lead already resolved").into_response(); } - match LeadRequestRepository::update_status(&pool, lead.id, "ACCEPTED").await { + match LeadRequestRepository::update_status(&state.pool, lead.id, "ACCEPTED").await { Ok(updated) => { - let prof_user_id = match ProfessionalRepository::get_user_id_by_professional_id(&pool, lead.professional_id).await { + let prof_user_id = match ProfessionalRepository::get_user_id_by_professional_id(&state.pool, lead.professional_id).await { Ok(Some(user_id)) => user_id, Ok(None) => return (StatusCode::NOT_FOUND, "Professional not found").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; match ProfessionalRepository::try_debit_reserved_tracecoins( - &pool, + &state.pool, prof_user_id, lead.tracecoins_reserved, lead.id, @@ -237,13 +274,27 @@ async fn approve_request( Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } - let req_after = match RequirementRepository::increment_accepted_count_and_get(&pool, req.id).await { + let req_after = match RequirementRepository::increment_accepted_count_and_get(&state.pool, req.id).await { Ok(r) => r, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if req_after.accepted_count >= 10 && req_after.status != "CLOSED" { - let _ = RequirementRepository::update_status(&pool, req.id, "CLOSED").await; + let _ = RequirementRepository::update_status(&state.pool, req.id, "CLOSED").await; + } + + // Send contact-exchange emails to both parties (ignore failures) + let customer_user = UserRepository::get_by_id(&state.pool, auth.user_id).await.ok(); + let professional_user = UserRepository::get_by_id(&state.pool, prof_user_id).await.ok(); + if let (Some(cust), Some(prof)) = (customer_user, professional_user) { + let cust_phone = cust.phone.as_deref().unwrap_or("N/A"); + let prof_phone = prof.phone.as_deref().unwrap_or("N/A"); + let _ = state.mail.send_lead_accepted_professional_email( + &prof.email, &prof.full_name, &cust.full_name, &cust.email, cust_phone, + ).await; + let _ = state.mail.send_lead_accepted_customer_email( + &cust.email, &cust.full_name, &prof.full_name, &prof.email, prof_phone, + ).await; } (StatusCode::OK, Json(serde_json::json!({ @@ -257,23 +308,23 @@ async fn approve_request( } async fn reject_request( - State(pool): State, + State(state): State, Path((req_id, lead_id)): Path<(Uuid, Uuid)>, auth: AuthUser, Json(_payload): Json, ) -> impl IntoResponse { - let customer = match CustomerRepository::get_by_user_id(&pool, auth.user_id).await { + let customer = match CustomerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(c)) => c, _ => return (StatusCode::NOT_FOUND, "Customer not found").into_response(), }; - - let req = match RequirementRepository::get_by_id(&pool, req_id).await { + + let req = match RequirementRepository::get_by_id(&state.pool, req_id).await { Ok(Some(r)) if r.customer_id == customer.id => r, Ok(Some(_)) => return (StatusCode::FORBIDDEN, "Access denied").into_response(), _ => return (StatusCode::NOT_FOUND, "Requirement not found").into_response(), }; - let lead = match LeadRequestRepository::get_by_id(&pool, lead_id).await { + let lead = match LeadRequestRepository::get_by_id(&state.pool, lead_id).await { Ok(Some(l)) if l.requirement_id == req.id => l, _ => return (StatusCode::NOT_FOUND, "Lead request not found").into_response(), }; @@ -282,16 +333,16 @@ async fn reject_request( return (StatusCode::BAD_REQUEST, "Lead already resolved").into_response(); } - match LeadRequestRepository::update_status(&pool, lead.id, "REJECTED").await { + match LeadRequestRepository::update_status(&state.pool, lead.id, "REJECTED").await { Ok(updated) => { - let prof_user_id = match ProfessionalRepository::get_user_id_by_professional_id(&pool, lead.professional_id).await { + let prof_user_id = match ProfessionalRepository::get_user_id_by_professional_id(&state.pool, lead.professional_id).await { Ok(Some(user_id)) => user_id, Ok(None) => return (StatusCode::NOT_FOUND, "Professional not found").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; match ProfessionalRepository::try_release_reserved_tracecoins( - &pool, + &state.pool, prof_user_id, lead.tracecoins_reserved, lead.id, @@ -302,38 +353,15 @@ async fn reject_request( Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } + // Notify professional their request was rejected (ignore failures) + if let Ok(prof_user) = UserRepository::get_by_id(&state.pool, prof_user_id).await { + let _ = state.mail.send_lead_rejected_email( + &prof_user.email, &prof_user.full_name, &req.title, + ).await; + } + (StatusCode::OK, Json(updated)).into_response() }, Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } - -async fn submit_requirement( - State(pool): State, - Path(id): Path, - auth: AuthUser, -) -> impl IntoResponse { - let customer = match CustomerRepository::get_by_user_id(&pool, auth.user_id).await { - Ok(Some(c)) => c, - _ => return (StatusCode::NOT_FOUND, "Customer not found").into_response(), - }; - - if customer.status != "APPROVED" { - return (StatusCode::FORBIDDEN, "Customer profile approval is required before submitting requirements").into_response(); - } - - let req = match RequirementRepository::get_by_id(&pool, id).await { - Ok(Some(r)) if r.customer_id == customer.id => r, - Ok(Some(_)) => return (StatusCode::FORBIDDEN, "Access denied").into_response(), - _ => return (StatusCode::NOT_FOUND, "Requirement not found").into_response(), - }; - - if req.status != "DRAFT" { - return (StatusCode::BAD_REQUEST, "Requirement already submitted or closed").into_response(); - } - - match RequirementRepository::update_status(&pool, req.id, "PENDING_APPROVAL").await { - Ok(updated) => (StatusCode::OK, Json(updated)).into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), - } -} diff --git a/apps/customers/src/main.rs b/apps/customers/src/main.rs index 1d8dee9..0c43581 100644 --- a/apps/customers/src/main.rs +++ b/apps/customers/src/main.rs @@ -2,7 +2,15 @@ mod handlers; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use sqlx::PgPool; + +#[derive(Clone)] +pub struct AppState { + pub pool: PgPool, + pub mail: Arc, +} #[tokio::main] async fn main() { @@ -22,10 +30,13 @@ async fn main() { tracing::info!("Customers service — connected to database"); + let mailer = Arc::new(email::Mailer::new()); + let state = AppState { pool, mail: mailer }; + let app = Router::new() .nest("/api/customers", handlers::router()) .route("/health", get(|| async { "Customers OK" })) - .with_state(pool); + .with_state(state); let port: u16 = std::env::var("PORT") .unwrap_or_else(|_| "8083".to_string()) diff --git a/apps/job_seekers/Cargo.toml b/apps/job_seekers/Cargo.toml index af28103..613b658 100644 --- a/apps/job_seekers/Cargo.toml +++ b/apps/job_seekers/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -axum = { workspace = true } +axum = { workspace = true, features = ["multipart"] } tokio = { workspace = true } serde = { workspace = true } sqlx = { workspace = true } @@ -12,8 +12,10 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } +bytes = { workspace = true } db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } contracts = { path = "../../crates/contracts" } +storage = { path = "../../crates/storage" } serde_json = { workspace = true } diff --git a/apps/job_seekers/src/handlers.rs b/apps/job_seekers/src/handlers.rs index 5c5e263..ff3c356 100644 --- a/apps/job_seekers/src/handlers.rs +++ b/apps/job_seekers/src/handlers.rs @@ -1,19 +1,20 @@ +use crate::AppState; use axum::{ - extract::{Path, Query, State}, + extract::{Multipart, Path, Query, State}, http::StatusCode, response::IntoResponse, routing::{get, post}, Json, Router, }; +use bytes::BufMut; use serde::Deserialize; -use sqlx::PgPool; use uuid::Uuid; use db::models::job_seeker::{JobSeekerRepository, UpsertJobSeekerProfilePayload}; use db::models::job::JobRepository; use db::models::application::{ApplicationRepository, CreateApplicationPayload}; use contracts::auth_middleware::AuthUser; -pub fn router() -> Router { +pub fn router() -> Router { Router::new() .route("/profile/me", get(get_profile).patch(update_profile)) .route("/profile/resume", post(upload_resume)) @@ -40,11 +41,19 @@ pub struct ApplyRequest { pub resume_url: Option, } +#[derive(Deserialize)] +pub struct PaginationQuery { + pub page: Option, + pub limit: Option, +} + +// ── Handlers ────────────────────────────────────────────────────────────────── + async fn get_profile( - State(pool): State, + State(state): State, auth: AuthUser, ) -> impl IntoResponse { - match JobSeekerRepository::get_by_user_id(&pool, auth.user_id).await { + match JobSeekerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(profile)) => (StatusCode::OK, Json(profile)).into_response(), Ok(None) => (StatusCode::NOT_FOUND, "Job seeker profile not found").into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), @@ -52,40 +61,120 @@ async fn get_profile( } async fn update_profile( - State(pool): State, + State(state): State, auth: AuthUser, Json(payload): Json, ) -> impl IntoResponse { - match JobSeekerRepository::upsert(&pool, auth.user_id, payload).await { + match JobSeekerRepository::upsert(&state.pool, auth.user_id, payload).await { Ok(profile) => (StatusCode::OK, Json(profile)).into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } async fn upload_resume( - State(_pool): State, - _auth: AuthUser, + State(state): State, + auth: AuthUser, + mut multipart: Multipart, ) -> impl IntoResponse { - // TODO: multipart upload handler - (StatusCode::OK, Json(serde_json::json!({ "resume_url": null }))) + // Find the job seeker profile first so we have the profile id to update. + let seeker = match JobSeekerRepository::get_by_user_id(&state.pool, auth.user_id).await { + Ok(Some(s)) => s, + Ok(None) => return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Job seeker profile not found" }))).into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() }))).into_response(), + }; + + // Read the multipart field named "resume" (or the first field if unnamed). + let mut file_bytes = bytes::BytesMut::new(); + let mut content_type = "application/octet-stream".to_string(); + let mut ext = "pdf".to_string(); + let mut found = false; + + while let Ok(Some(field)) = multipart.next_field().await { + let name = field.name().unwrap_or("").to_string(); + if name == "resume" || name == "file" || !found { + // Detect content type and extension from the field + if let Some(ct) = field.content_type() { + content_type = ct.to_string(); + ext = match ct { + "application/pdf" => "pdf", + "application/msword" => "doc", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => "docx", + _ => "pdf", + } + .to_string(); + } else if let Some(fname) = field.file_name() { + if let Some(e) = fname.rsplit('.').next() { + ext = e.to_lowercase(); + } + } + + let data = match field.bytes().await { + Ok(b) => b, + Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": format!("Failed to read file: {}", e) }))).into_response(), + }; + + if data.is_empty() { + return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "Empty file" }))).into_response(); + } + + // 10 MB limit + if data.len() > 10 * 1024 * 1024 { + return (StatusCode::PAYLOAD_TOO_LARGE, Json(serde_json::json!({ "error": "File too large. Maximum 10 MB." }))).into_response(); + } + + file_bytes.put(data); + found = true; + break; + } + } + + if !found || file_bytes.is_empty() { + return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "No resume file provided. Send a multipart field named 'resume'." }))).into_response(); + } + + // Upload to Backblaze B2 + let resume_url = match state.storage + .upload("resume", &ext, file_bytes.freeze(), &content_type) + .await + { + Ok(url) => url, + Err(e) => { + tracing::error!("B2 upload failed: {}", e); + return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": "File upload failed" }))).into_response(); + } + }; + + // Save resume_url to job seeker profile + let update_result = sqlx::query( + "UPDATE job_seekers SET resume_url = $1 WHERE id = $2" + ) + .bind(&resume_url) + .bind(seeker.id) + .execute(&state.pool) + .await; + + match update_result { + Ok(_) => (StatusCode::OK, Json(serde_json::json!({ "resume_url": resume_url }))).into_response(), + Err(e) => { + tracing::error!("Failed to save resume_url to profile: {}", e); + (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": "Uploaded but failed to save URL" }))).into_response() + } + } } async fn browse_jobs( - State(pool): State, + State(state): State, Query(q): Query, ) -> impl IntoResponse { - // Public feed of LIVE jobs - // Note: This logic should ideally be in JobRepository but for now it's simple listing let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); let offset = (page - 1) * limit; - // Filter by LIVE status for public browse let jobs = sqlx::query_as!( db::models::job::Job, r#" - SELECT * FROM jobs - WHERE status = 'LIVE' + SELECT * FROM jobs + WHERE status = 'LIVE' AND ($1::VARCHAR IS NULL OR location ILIKE '%' || $1 || '%') AND ($2::VARCHAR IS NULL OR job_type = $2) AND ($3::VARCHAR IS NULL OR title ILIKE '%' || $3 || '%') @@ -98,7 +187,7 @@ async fn browse_jobs( limit, offset ) - .fetch_all(&pool) + .fetch_all(&state.pool) .await; match jobs { @@ -111,10 +200,10 @@ async fn browse_jobs( } async fn get_job( - State(pool): State, + State(state): State, Path(id): Path, ) -> impl IntoResponse { - match JobRepository::get_by_id(&pool, id).await { + match JobRepository::get_by_id(&state.pool, id).await { Ok(Some(job)) if job.status == "LIVE" => (StatusCode::OK, Json(job)).into_response(), Ok(Some(_)) => (StatusCode::FORBIDDEN, "Job is not live").into_response(), Ok(None) => (StatusCode::NOT_FOUND, "Job not found").into_response(), @@ -123,17 +212,17 @@ async fn get_job( } async fn apply_to_job( - State(pool): State, + State(state): State, auth: AuthUser, Path(id): Path, Json(payload): Json, ) -> impl IntoResponse { - let seeker = match JobSeekerRepository::get_by_user_id(&pool, auth.user_id).await { + let seeker = match JobSeekerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(s)) => s, _ => return (StatusCode::NOT_FOUND, "Job seeker profile not found").into_response(), }; - let job = match JobRepository::get_by_id(&pool, id).await { + let job = match JobRepository::get_by_id(&state.pool, id).await { Ok(Some(j)) if j.status == "LIVE" => j, Ok(Some(_)) => return (StatusCode::BAD_REQUEST, "Job is not live").into_response(), _ => return (StatusCode::NOT_FOUND, "Job not found").into_response(), @@ -150,11 +239,11 @@ async fn apply_to_job( resume_url: payload.resume_url.or(seeker.resume_url), }; - match ApplicationRepository::create(&pool, db_payload).await { + match ApplicationRepository::create(&state.pool, db_payload).await { Ok(app) => { - let _ = JobSeekerRepository::update_active_application_count(&pool, seeker.id, 1).await; + let _ = JobSeekerRepository::update_active_application_count(&state.pool, seeker.id, 1).await; (StatusCode::CREATED, Json(app)).into_response() - }, + } Err(e) => { if e.to_string().contains("unique") { (StatusCode::CONFLICT, "Already applied to this job").into_response() @@ -166,11 +255,11 @@ async fn apply_to_job( } async fn list_my_applications( - State(pool): State, + State(state): State, auth: AuthUser, Query(q): Query, ) -> impl IntoResponse { - let seeker = match JobSeekerRepository::get_by_user_id(&pool, auth.user_id).await { + let seeker = match JobSeekerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(s)) => s, _ => return (StatusCode::NOT_FOUND, "Job seeker profile not found").into_response(), }; @@ -178,7 +267,7 @@ async fn list_my_applications( let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); - match ApplicationRepository::list_by_job_seeker_id(&pool, seeker.id, page, limit).await { + match ApplicationRepository::list_by_job_seeker_id(&state.pool, seeker.id, page, limit).await { Ok(apps) => (StatusCode::OK, Json(serde_json::json!({ "data": apps, "pagination": { "page": page, "limit": limit } @@ -188,16 +277,16 @@ async fn list_my_applications( } async fn get_my_application( - State(pool): State, + State(state): State, auth: AuthUser, Path(id): Path, ) -> impl IntoResponse { - let seeker = match JobSeekerRepository::get_by_user_id(&pool, auth.user_id).await { + let seeker = match JobSeekerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(s)) => s, _ => return (StatusCode::NOT_FOUND, "Job seeker profile not found").into_response(), }; - match ApplicationRepository::get_by_id(&pool, id).await { + match ApplicationRepository::get_by_id(&state.pool, id).await { Ok(Some(app)) if app.job_seeker_id == seeker.id => (StatusCode::OK, Json(app)).into_response(), Ok(Some(_)) => (StatusCode::FORBIDDEN, "Access denied").into_response(), Ok(None) => (StatusCode::NOT_FOUND, "Application not found").into_response(), @@ -206,33 +295,26 @@ async fn get_my_application( } async fn withdraw_application( - State(pool): State, + State(state): State, auth: AuthUser, Path(id): Path, ) -> impl IntoResponse { - let seeker = match JobSeekerRepository::get_by_user_id(&pool, auth.user_id).await { + let seeker = match JobSeekerRepository::get_by_user_id(&state.pool, auth.user_id).await { Ok(Some(s)) => s, _ => return (StatusCode::NOT_FOUND, "Job seeker profile not found").into_response(), }; - let app = match ApplicationRepository::get_by_id(&pool, id).await { + let app = match ApplicationRepository::get_by_id(&state.pool, id).await { Ok(Some(a)) if a.job_seeker_id == seeker.id => a, Ok(Some(_)) => return (StatusCode::FORBIDDEN, "Access denied").into_response(), _ => return (StatusCode::NOT_FOUND, "Application not found").into_response(), }; - match ApplicationRepository::update_status(&pool, app.id, "WITHDRAWN").await { + match ApplicationRepository::update_status(&state.pool, app.id, "WITHDRAWN").await { Ok(updated) => { - let _ = JobSeekerRepository::update_active_application_count(&pool, seeker.id, -1).await; + let _ = JobSeekerRepository::update_active_application_count(&state.pool, seeker.id, -1).await; (StatusCode::OK, Json(updated)).into_response() - }, + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } - -#[derive(Deserialize)] -pub struct PaginationQuery { - pub page: Option, - pub limit: Option, -} - diff --git a/apps/job_seekers/src/main.rs b/apps/job_seekers/src/main.rs index 3ac33f3..9467f5a 100644 --- a/apps/job_seekers/src/main.rs +++ b/apps/job_seekers/src/main.rs @@ -2,8 +2,15 @@ mod handlers; use axum::{routing::get, Router}; use std::net::SocketAddr; +use std::sync::Arc; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +#[derive(Clone)] +pub struct AppState { + pub pool: sqlx::PgPool, + pub storage: Arc, +} + #[tokio::main] async fn main() { tracing_subscriber::registry() @@ -22,10 +29,14 @@ async fn main() { tracing::info!("Job Seekers service — connected to database"); + let storage = Arc::new(storage::StorageClient::from_env().await); + + let state = AppState { pool, storage }; + let app = Router::new() .nest("/api/jobseeker", handlers::router()) .route("/health", get(|| async { "Job Seekers OK" })) - .with_state(pool); + .with_state(state); let port: u16 = std::env::var("PORT") .unwrap_or_else(|_| "8082".to_string()) diff --git a/apps/users/Cargo.toml b/apps/users/Cargo.toml index c19ec47..3428eda 100644 --- a/apps/users/Cargo.toml +++ b/apps/users/Cargo.toml @@ -15,7 +15,7 @@ uuid = { workspace = true } chrono = { workspace = true } db = { path = "../../crates/db" } auth = { path = "../../crates/auth" } -lettre = { workspace = true } +email = { path = "../../crates/email" } contracts = { path = "../../crates/contracts" } cache = { path = "../../crates/cache" } rand = "0.8" diff --git a/apps/users/src/handlers/approvals.rs b/apps/users/src/handlers/approvals.rs index 9d12673..a20e3b6 100644 --- a/apps/users/src/handlers/approvals.rs +++ b/apps/users/src/handlers/approvals.rs @@ -386,6 +386,9 @@ async fn approve_company_profile( .await { Ok(result) if result.rows_affected() > 0 => { + if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { + let _ = state.mail.send_approval_approved_email(&user.email, &user.full_name, "Company").await; + } (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "status": "APPROVED" }))).into_response() } Ok(_) => (StatusCode::NOT_FOUND, "Company profile not found").into_response(), @@ -397,10 +400,12 @@ async fn reject_company_profile( auth: AuthUser, State(state): State, Path(user_id): Path, + Json(payload): Json, ) -> impl IntoResponse { if let Err(e) = require_admin(&auth) { return e.into_response(); } + let reason = payload.reason.unwrap_or_else(|| "Profile rejected".to_string()); match sqlx::query!( "UPDATE company_profiles SET status = 'REJECTED', updated_at = NOW() WHERE user_id = $1", user_id @@ -409,6 +414,9 @@ async fn reject_company_profile( .await { Ok(result) if result.rows_affected() > 0 => { + if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { + let _ = state.mail.send_approval_rejected_email(&user.email, &user.full_name, "Company", &reason).await; + } (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "status": "REJECTED" }))).into_response() } Ok(_) => (StatusCode::NOT_FOUND, "Company profile not found").into_response(), @@ -432,6 +440,9 @@ async fn approve_customer_profile( .await { Ok(result) if result.rows_affected() > 0 => { + if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { + let _ = state.mail.send_approval_approved_email(&user.email, &user.full_name, "Customer").await; + } (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "status": "APPROVED" }))).into_response() } Ok(_) => (StatusCode::NOT_FOUND, "Customer profile not found").into_response(), @@ -443,10 +454,12 @@ async fn reject_customer_profile( auth: AuthUser, State(state): State, Path(user_id): Path, + Json(payload): Json, ) -> impl IntoResponse { if let Err(e) = require_admin(&auth) { return e.into_response(); } + let reason = payload.reason.unwrap_or_else(|| "Profile rejected".to_string()); match sqlx::query!( "UPDATE customer_profiles SET status = 'REJECTED', updated_at = NOW() WHERE user_id = $1", user_id @@ -455,6 +468,9 @@ async fn reject_customer_profile( .await { Ok(result) if result.rows_affected() > 0 => { + if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { + let _ = state.mail.send_approval_rejected_email(&user.email, &user.full_name, "Customer", &reason).await; + } (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "status": "REJECTED" }))).into_response() } Ok(_) => (StatusCode::NOT_FOUND, "Customer profile not found").into_response(), @@ -477,6 +493,21 @@ fn professional_profile_table(role_key: &str) -> Option<&'static str> { } } +fn role_key_to_display(role_key: &str) -> &'static str { + match role_key { + "PHOTOGRAPHER" => "Photographer", + "MAKEUP_ARTIST" => "Makeup Artist", + "TUTOR" => "Tutor", + "DEVELOPER" => "Developer", + "VIDEO_EDITOR" => "Video Editor", + "GRAPHIC_DESIGNER" => "Graphic Designer", + "SOCIAL_MEDIA_MANAGER" => "Social Media Manager", + "FITNESS_TRAINER" => "Fitness Trainer", + "CATERING_SERVICES" => "Catering Services", + _ => role_key, + } +} + async fn approve_professional_profile( auth: AuthUser, State(state): State, @@ -494,11 +525,13 @@ async fn approve_professional_profile( table ); match sqlx::query(&query).bind(user_id).execute(&state.pool).await { - Ok(result) if result.rows_affected() > 0 => ( - StatusCode::OK, - Json(serde_json::json!({ "user_id": user_id, "role_key": role_key, "status": "APPROVED" })), - ) - .into_response(), + Ok(result) if result.rows_affected() > 0 => { + let display = role_key_to_display(&role_key); + if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { + let _ = state.mail.send_approval_approved_email(&user.email, &user.full_name, display).await; + } + (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "role_key": role_key, "status": "APPROVED" }))).into_response() + } Ok(_) => (StatusCode::NOT_FOUND, "Professional profile not found").into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } @@ -517,21 +550,24 @@ async fn reject_professional_profile( let Some(table) = professional_profile_table(&role_key) else { return (StatusCode::BAD_REQUEST, "Unsupported professional role_key").into_response(); }; + let reason = payload.reason.unwrap_or_else(|| "Profile rejected".to_string()); let query = format!( "UPDATE {} SET status = 'REJECTED', rejection_reason = $2, updated_at = NOW() WHERE user_id = $1", table ); match sqlx::query(&query) .bind(user_id) - .bind(payload.reason.unwrap_or_else(|| "Profile rejected".to_string())) + .bind(&reason) .execute(&state.pool) .await { - Ok(result) if result.rows_affected() > 0 => ( - StatusCode::OK, - Json(serde_json::json!({ "user_id": user_id, "role_key": role_key, "status": "REJECTED" })), - ) - .into_response(), + Ok(result) if result.rows_affected() > 0 => { + let display = role_key_to_display(&role_key); + if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { + let _ = state.mail.send_approval_rejected_email(&user.email, &user.full_name, display, &reason).await; + } + (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "role_key": role_key, "status": "REJECTED" }))).into_response() + } Ok(_) => (StatusCode::NOT_FOUND, "Professional profile not found").into_response(), Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } @@ -557,7 +593,19 @@ async fn approve_job( } match JobRepository::approve(&state.pool, id, auth.user_id).await { - Ok(job) => (StatusCode::OK, Json(job)).into_response(), + Ok(job) => { + // Notify company user (ignore failures) + let company_info = sqlx::query_as::<_, (String, String)>( + "SELECT u.full_name, u.email FROM companies c JOIN users u ON u.id = c.user_id WHERE c.id = $1", + ) + .bind(existing.company_id) + .fetch_optional(&state.pool) + .await; + if let Ok(Some((name, email))) = company_info { + let _ = state.mail.send_job_approved_email(&email, &name, &existing.title).await; + } + (StatusCode::OK, Json(job)).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } @@ -582,8 +630,21 @@ async fn reject_job( return (StatusCode::BAD_REQUEST, "Job is not pending approval").into_response(); } + let reason = payload.reason.clone(); match JobRepository::reject(&state.pool, id, payload.reason).await { - Ok(job) => (StatusCode::OK, Json(job)).into_response(), + Ok(job) => { + let company_info = sqlx::query_as::<_, (String, String)>( + "SELECT u.full_name, u.email FROM companies c JOIN users u ON u.id = c.user_id WHERE c.id = $1", + ) + .bind(existing.company_id) + .fetch_optional(&state.pool) + .await; + if let Ok(Some((name, email))) = company_info { + let r = reason.as_deref().unwrap_or("Rejected by admin"); + let _ = state.mail.send_job_rejected_email(&email, &name, &existing.title, r).await; + } + (StatusCode::OK, Json(job)).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } @@ -608,7 +669,18 @@ async fn approve_requirement( } match RequirementRepository::approve(&state.pool, id, auth.user_id).await { - Ok(req) => (StatusCode::OK, Json(req)).into_response(), + Ok(req) => { + let customer_info = sqlx::query_as::<_, (String, String)>( + "SELECT u.full_name, u.email FROM customers c JOIN users u ON u.id = c.user_id WHERE c.id = $1", + ) + .bind(existing.customer_id) + .fetch_optional(&state.pool) + .await; + if let Ok(Some((name, email))) = customer_info { + let _ = state.mail.send_requirement_approved_email(&email, &name, &existing.title).await; + } + (StatusCode::OK, Json(req)).into_response() + } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } diff --git a/apps/users/src/handlers/auth.rs b/apps/users/src/handlers/auth.rs index 17c29ae..d12b988 100644 --- a/apps/users/src/handlers/auth.rs +++ b/apps/users/src/handlers/auth.rs @@ -498,6 +498,10 @@ async fn reset_password( .await .map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), "DB_ERROR"))?; + if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { + let _ = state.mail.send_password_changed_email(&user.email, &user.full_name).await; + } + Ok((StatusCode::OK, Json(serde_json::json!({ "message": "Password reset successfully" })))) } @@ -528,6 +532,8 @@ async fn change_password( .await .map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), "DB_ERROR"))?; + let _ = state.mail.send_password_changed_email(&user.email, &user.full_name).await; + Ok((StatusCode::OK, Json(serde_json::json!({ "message": "Password changed successfully" })))) } diff --git a/apps/users/src/handlers/config.rs b/apps/users/src/handlers/config.rs index b49c635..6ff2b67 100644 --- a/apps/users/src/handlers/config.rs +++ b/apps/users/src/handlers/config.rs @@ -303,11 +303,84 @@ async fn get_my_runtime_config( } obj.insert("permissions".to_string(), serde_json::Value::Object(permissions_obj)); } + } else { + // EXTERNAL role: derive enabled_modules and sidebar config from the active dashboard_config. + // Falls back to the runtime_config's own enabled_modules if no dashboard_config exists. + let dash_config = ConfigRepository::get_active_dashboard_config( + &state.pool, + role.id, + "EXTERNAL", + ) + .await + .ok(); + + if let Some(dash) = dash_config { + let config_json = &dash.config_json; + + // Extract sidebar_items (the admin saves this key as snake_case in the API). + // Try both "sidebar_items" and "sidebarItems" for forward-compatibility. + let sidebar_items: Vec = config_json + .get("sidebar_items") + .or_else(|| config_json.get("sidebarItems")) + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str().map(str::to_string)) + .collect() + }) + .unwrap_or_default(); + + // Map display-name sidebar items to the module keys the frontend MODULE_NAV_MAP understands. + let enabled_modules: Vec = sidebar_items + .iter() + .filter_map(|item| sidebar_item_to_module_key(item.as_str())) + .map(str::to_string) + .collect(); + + if let Some(obj) = response.as_object_mut() { + // Only overwrite enabled_modules if the dashboard config has sidebar items defined. + if !enabled_modules.is_empty() { + obj.insert("enabled_modules".to_string(), serde_json::json!(enabled_modules)); + } + + // Include the full dashboard config so the frontend can read widgets/tabs directly. + obj.insert("dashboard_config".to_string(), config_json.clone()); + } + } } Ok((StatusCode::OK, Json(response))) } +/// Maps an admin-configured sidebar display name to the module key used in the frontend's +/// MODULE_NAV_MAP. Returns None for items that don't map to a routable module (e.g. Logout). +fn sidebar_item_to_module_key(item: &str) -> Option<&'static str> { + let normalized = item.trim().to_lowercase(); + match normalized.as_str() { + "my dashboard" | "dashboard" => Some("dashboard"), + "my profile" | "profile" => Some("profile"), + "my portfolio" | "portfolio" => Some("portfolio"), + "leads" | "my leads" => Some("leads"), + "my responses" | "responses" | "my requests" => Some("leads"), + "received responses" | "shortlisted responses" => Some("marketplace"), + "marketplace" => Some("marketplace"), + "jobs" | "job postings" => Some("job_postings"), + "applications" | "my applications" + | "shortlisted candidates" => Some("applications"), + "my requirements" | "requirements" + | "post requirement" => Some("requirements"), + "credits" | "tracecoins" | "wallet" => Some("wallet"), + "services" | "my services" => Some("services"), + "explore nxtgauge" | "explore" => Some("onboarding"), + "verification" | "verification status" => Some("verification"), + "notifications" => Some("notifications"), + "help center" | "support" => Some("support"), + "settings" => Some("settings"), + "switch services" | "logout" => None, // UI-only, not a module + _ => None, + } +} + fn parse_permission_key(key: &str) -> Option<(String, String)> { // Format: MODULE:Action (e.g. DEPARTMENT_MANAGEMENT:View) if let Some((module, action)) = key.split_once(':') { diff --git a/apps/users/src/handlers/notifications.rs b/apps/users/src/handlers/notifications.rs index d7321b4..709a8ce 100644 --- a/apps/users/src/handlers/notifications.rs +++ b/apps/users/src/handlers/notifications.rs @@ -1,12 +1,13 @@ use crate::AppState; use axum::{ - extract::{Path, State}, + extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, routing::{get, patch}, Json, Router, }; -use serde::Serialize; +use contracts::auth_middleware::AuthUser; +use serde::{Deserialize, Serialize}; use uuid::Uuid; pub fn router() -> Router { @@ -17,13 +18,24 @@ pub fn router() -> Router { .route("/read-all", patch(mark_all_read)) } +// ── Query params ────────────────────────────────────────────────────────────── + +#[derive(Deserialize)] +struct ListQuery { + page: Option, + limit: Option, +} + +// ── Response types ──────────────────────────────────────────────────────────── + #[derive(Serialize)] pub struct NotificationDto { - pub id: String, + pub id: Uuid, pub title: String, pub body: Option, #[serde(rename = "type")] pub notification_type: Option, + pub reference_id: Option, pub is_read: bool, pub created_at: String, } @@ -42,44 +54,154 @@ pub struct Pagination { pub total_pages: i64, } -// TODO: Replace with real JWT extractor middleware -// For now this handler is a placeholder that shows the expected shape. +// ── Handlers ────────────────────────────────────────────────────────────────── async fn list_notifications( + auth: AuthUser, State(state): State, - // TODO: axum::extract::Query for page/limit - // TODO: JWT middleware to get user_id + Query(params): Query, ) -> impl IntoResponse { - let _ = state; - ( - StatusCode::OK, - Json(PaginatedResponse:: { - data: vec![], - pagination: Pagination { - page: 1, - limit: 20, - total: 0, - total_pages: 0, - }, - }), + let page = params.page.unwrap_or(1).max(1); + let limit = params.limit.unwrap_or(20).clamp(1, 100); + let offset = (page - 1) * limit; + + let rows = sqlx::query!( + r#" + SELECT id, title, body, type AS notification_type, + reference_id, is_read, created_at + FROM notifications + WHERE user_id = $1 + ORDER BY created_at DESC + LIMIT $2 OFFSET $3 + "#, + auth.user_id, + limit, + offset ) + .fetch_all(&state.pool) + .await; + + let total: i64 = sqlx::query_scalar!( + "SELECT COUNT(*) FROM notifications WHERE user_id = $1", + auth.user_id + ) + .fetch_one(&state.pool) + .await + .unwrap_or(Some(0)) + .unwrap_or(0); + + match rows { + Ok(rows) => { + let data = rows + .into_iter() + .map(|r| NotificationDto { + id: r.id, + title: r.title, + body: r.body, + notification_type: r.notification_type, + reference_id: r.reference_id, + is_read: r.is_read, + created_at: r.created_at.to_rfc3339(), + }) + .collect(); + + let total_pages = if total == 0 { 1 } else { (total + limit - 1) / limit }; + + ( + StatusCode::OK, + Json(PaginatedResponse { + data, + pagination: Pagination { page, limit, total, total_pages }, + }), + ) + .into_response() + } + Err(e) => { + tracing::error!("Failed to fetch notifications: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ "error": "Failed to fetch notifications" })), + ) + .into_response() + } + } } -async fn unread_count(State(state): State) -> impl IntoResponse { - let _ = state; - (StatusCode::OK, Json(serde_json::json!({ "unread_count": 0 }))) +async fn unread_count( + auth: AuthUser, + State(state): State, +) -> impl IntoResponse { + let count = sqlx::query_scalar!( + "SELECT COUNT(*) FROM notifications WHERE user_id = $1 AND is_read = false", + auth.user_id + ) + .fetch_one(&state.pool) + .await + .unwrap_or(Some(0)) + .unwrap_or(0); + + (StatusCode::OK, Json(serde_json::json!({ "unread_count": count }))) } async fn mark_read( + auth: AuthUser, State(state): State, Path(id): Path, ) -> impl IntoResponse { - let _ = state; - (StatusCode::OK, Json(serde_json::json!({ "id": id.to_string(), "is_read": true }))) + let result = sqlx::query!( + "UPDATE notifications SET is_read = true WHERE id = $1 AND user_id = $2", + id, + auth.user_id + ) + .execute(&state.pool) + .await; + + match result { + Ok(r) if r.rows_affected() == 0 => ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({ "error": "Notification not found" })), + ) + .into_response(), + Ok(_) => ( + StatusCode::OK, + Json(serde_json::json!({ "id": id, "is_read": true })), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to mark notification as read: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ "error": "Failed to update notification" })), + ) + .into_response() + } + } } -async fn mark_all_read(State(state): State) -> impl IntoResponse { - let _ = state; - (StatusCode::OK, Json(serde_json::json!({ "message": "All notifications marked as read" }))) -} +async fn mark_all_read( + auth: AuthUser, + State(state): State, +) -> impl IntoResponse { + let result = sqlx::query!( + "UPDATE notifications SET is_read = true WHERE user_id = $1 AND is_read = false", + auth.user_id + ) + .execute(&state.pool) + .await; + match result { + Ok(r) => ( + StatusCode::OK, + Json(serde_json::json!({ "updated": r.rows_affected() })), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to mark all notifications as read: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ "error": "Failed to update notifications" })), + ) + .into_response() + } + } +} diff --git a/apps/users/src/mail.rs b/apps/users/src/mail.rs index cc58961..a03f1e6 100644 --- a/apps/users/src/mail.rs +++ b/apps/users/src/mail.rs @@ -1,100 +1,2 @@ -use lettre::transport::smtp::authentication::Credentials; -use lettre::{AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor}; -use anyhow::Result; -use std::env; - -pub struct Mailer { - transport: Option>, - from_email: String, - from_name: String, -} - -impl Mailer { - pub fn new() -> Self { - // SMTP is optional — if vars are missing, emails are silently skipped. - // The service still starts so development works without a real SMTP server. - let smtp_host = env::var("SMTP_HOST").ok(); - let smtp_user = env::var("SMTP_USER").ok(); - let smtp_pass = env::var("SMTP_PASS").ok(); - let smtp_port: u16 = env::var("SMTP_PORT") - .ok() - .and_then(|p| p.parse().ok()) - .unwrap_or(587); - - let from_email = env::var("SMTP_FROM_EMAIL") - .unwrap_or_else(|_| "noreply@nxtgauge.com".to_string()); - let from_name = env::var("SMTP_FROM_NAME") - .unwrap_or_else(|_| "NXTGAUGE".to_string()); - - let transport = match (smtp_host, smtp_user, smtp_pass) { - (Some(host), Some(user), Some(pass)) => { - let credentials = Credentials::new(user, pass); - match AsyncSmtpTransport::::starttls_relay(&host) { - Ok(builder) => { - let t = builder.port(smtp_port).credentials(credentials).build(); - tracing::info!("SMTP transport configured (host={}:{})", host, smtp_port); - Some(t) - } - Err(e) => { - tracing::warn!("SMTP transport init failed: {} — emails will be skipped", e); - None - } - } - } - _ => { - tracing::warn!( - "SMTP_HOST / SMTP_USER / SMTP_PASS not all set — email sending is disabled" - ); - None - } - }; - - Self { transport, from_email, from_name } - } - - pub async fn send_verification_email(&self, to_email: &str, full_name: &str, otp: &str) -> Result<()> { - let Some(transport) = &self.transport else { - tracing::debug!("SMTP disabled — skipping verification email to {}", to_email); - return Ok(()); - }; - - let body = format!( - "Hello {},\n\nYour verification code for NXTGAUGE is: {}\n\nThis code expires in 15 minutes.\n\nRegards,\nThe NXTGAUGE Team", - full_name, otp - ); - - let email = Message::builder() - .from(format!("{} <{}>", self.from_name, self.from_email).parse()?) - .to(to_email.parse()?) - .subject("Verify your NXTGAUGE account") - .body(body)?; - - transport.send(email).await?; - Ok(()) - } - - pub async fn send_password_reset_email(&self, to_email: &str, full_name: &str, token: &str) -> Result<()> { - let Some(transport) = &self.transport else { - tracing::debug!("SMTP disabled — skipping password reset email to {}", to_email); - return Ok(()); - }; - - let frontend_url = env::var("FRONTEND_URL") - .unwrap_or_else(|_| "http://localhost:3000".to_string()); - let reset_link = format!("{}/reset-password?token={}", frontend_url, token); - - let body = format!( - "Hello {},\n\nYou requested a password reset. Click the link below:\n\n{}\n\nIf you did not request this, please ignore this email.\n\nRegards,\nThe NXTGAUGE Team", - full_name, reset_link - ); - - let email = Message::builder() - .from(format!("{} <{}>", self.from_name, self.from_email).parse()?) - .to(to_email.parse()?) - .subject("Reset your NXTGAUGE password") - .body(body)?; - - transport.send(email).await?; - Ok(()) - } -} +// Re-export from shared email crate so existing `use crate::mail::Mailer` imports keep working. +pub use email::Mailer; diff --git a/crates/contracts/src/profession_shared.rs b/crates/contracts/src/profession_shared.rs index 06431a3..8ee1cb8 100644 --- a/crates/contracts/src/profession_shared.rs +++ b/crates/contracts/src/profession_shared.rs @@ -321,38 +321,277 @@ async fn wallet_balance(State(state): State, auth: AuthUser) -> } } -// ── Stub handlers ───────────────────────────────────────────────────────────── +// ── Lead request handlers ───────────────────────────────────────────────────── + +#[derive(Deserialize)] +struct RequestsQuery { + page: Option, + limit: Option, + status: Option, +} async fn my_requests( - _s: State, - _a: AuthUser, - _q: Query, + State(state): State, + auth: AuthUser, + Query(q): Query, ) -> impl IntoResponse { - (StatusCode::OK, Json(serde_json::json!({ "data": [] }))) + let prof = match ProfessionalRepository::get_by_user_id(&state.pool, auth.user_id).await { + Ok(p) => p, + Err(_) => return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Professional profile not found" }))).into_response(), + }; + + let page = q.page.unwrap_or(1).max(1); + let limit = q.limit.unwrap_or(20).clamp(1, 100); + let offset = (page - 1) * limit; + + let rows = if let Some(ref status) = q.status { + sqlx::query_as::<_, db::models::lead_request::LeadRequest>( + "SELECT * FROM lead_requests WHERE professional_id = $1 AND status = $2 ORDER BY requested_at DESC LIMIT $3 OFFSET $4" + ) + .bind(prof.id) + .bind(status) + .bind(limit) + .bind(offset) + .fetch_all(&state.pool) + .await + } else { + sqlx::query_as::<_, db::models::lead_request::LeadRequest>( + "SELECT * FROM lead_requests WHERE professional_id = $1 ORDER BY requested_at DESC LIMIT $2 OFFSET $3" + ) + .bind(prof.id) + .bind(limit) + .bind(offset) + .fetch_all(&state.pool) + .await + }; + + let total: i64 = if let Some(ref status) = q.status { + sqlx::query_scalar("SELECT COUNT(*) FROM lead_requests WHERE professional_id = $1 AND status = $2") + .bind(prof.id).bind(status).fetch_one(&state.pool).await.unwrap_or(0) + } else { + sqlx::query_scalar("SELECT COUNT(*) FROM lead_requests WHERE professional_id = $1") + .bind(prof.id).fetch_one(&state.pool).await.unwrap_or(0) + }; + + match rows { + Ok(data) => { + let total_pages = if total == 0 { 1 } else { (total + limit - 1) / limit }; + (StatusCode::OK, Json(serde_json::json!({ + "data": data, + "pagination": { "page": page, "limit": limit, "total": total, "total_pages": total_pages } + }))).into_response() + } + Err(e) => { + tracing::error!("Failed to fetch lead requests: {}", e); + (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": "Failed to fetch requests" }))).into_response() + } + } } async fn cancel_request( - _s: State, - _a: AuthUser, - _p: Path, + State(state): State, + auth: AuthUser, + Path(id): Path, ) -> impl IntoResponse { - (StatusCode::OK, Json(serde_json::json!({ "message": "Cancelled" }))) + let prof = match ProfessionalRepository::get_by_user_id(&state.pool, auth.user_id).await { + Ok(p) => p, + Err(_) => return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Professional profile not found" }))).into_response(), + }; + + let lead = match LeadRequestRepository::get_by_id(&state.pool, id).await { + Ok(Some(l)) => l, + Ok(None) => return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Lead request not found" }))).into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() }))).into_response(), + }; + + if lead.professional_id != prof.id { + return (StatusCode::FORBIDDEN, Json(serde_json::json!({ "error": "Access denied" }))).into_response(); + } + + if lead.status != "PENDING" { + return (StatusCode::BAD_REQUEST, Json(serde_json::json!({ + "error": format!("Cannot cancel a request with status '{}'", lead.status) + }))).into_response(); + } + + // Release reserved Tracecoins back to balance + if lead.tracecoins_reserved > 0 { + let _ = ProfessionalRepository::try_release_reserved_tracecoins( + &state.pool, + auth.user_id, + lead.tracecoins_reserved, + lead.id, + "LEAD_CANCELLED", + ).await; + } + + match LeadRequestRepository::update_status(&state.pool, id, "CANCELLED").await { + Ok(updated) => (StatusCode::OK, Json(updated)).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e.to_string() }))).into_response(), + } } async fn accepted_leads( - _s: State, - _a: AuthUser, - _q: Query, + State(state): State, + auth: AuthUser, + Query(q): Query, ) -> impl IntoResponse { - (StatusCode::OK, Json(serde_json::json!({ "data": [] }))) + let prof = match ProfessionalRepository::get_by_user_id(&state.pool, auth.user_id).await { + Ok(p) => p, + Err(_) => return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Professional profile not found" }))).into_response(), + }; + + let page = q.page.unwrap_or(1).max(1); + let limit = q.limit.unwrap_or(20).clamp(1, 100); + let offset = (page - 1) * limit; + + // Join lead_requests → requirements → customers → users to get full contact info + let rows = sqlx::query( + r#" + SELECT + lr.id AS lead_id, + lr.status, + lr.requested_at, + lr.resolved_at, + r.id AS requirement_id, + r.title AS requirement_title, + r.description AS requirement_description, + r.location AS requirement_location, + r.profession_key, + u.full_name AS customer_name, + u.email AS customer_email, + u.phone AS customer_phone + FROM lead_requests lr + INNER JOIN requirements r ON r.id = lr.requirement_id + INNER JOIN customers c ON c.id = r.customer_id + INNER JOIN users u ON u.id = c.user_id + WHERE lr.professional_id = $1 + AND lr.status = 'ACCEPTED' + ORDER BY lr.resolved_at DESC + LIMIT $2 OFFSET $3 + "# + ) + .bind(prof.id) + .bind(limit) + .bind(offset) + .fetch_all(&state.pool) + .await; + + let total: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM lead_requests WHERE professional_id = $1 AND status = 'ACCEPTED'" + ) + .bind(prof.id) + .fetch_one(&state.pool) + .await + .unwrap_or(0); + + match rows { + Ok(rows) => { + use sqlx::Row; + let data: Vec = rows.iter().map(|row| { + serde_json::json!({ + "lead_id": row.get::("lead_id"), + "status": row.get::("status"), + "requested_at": row.get::, _>("requested_at"), + "resolved_at": row.try_get::, _>("resolved_at").ok(), + "requirement_id": row.get::("requirement_id"), + "requirement_title": row.get::("requirement_title"), + "requirement_description": row.try_get::("requirement_description").ok(), + "requirement_location": row.try_get::("requirement_location").ok(), + "profession_key": row.get::("profession_key"), + "customer": { + "name": row.try_get::("customer_name").ok(), + "email": row.get::("customer_email"), + "phone": row.try_get::("customer_phone").ok(), + } + }) + }).collect(); + + let total_pages = if total == 0 { 1 } else { (total + limit - 1) / limit }; + (StatusCode::OK, Json(serde_json::json!({ + "data": data, + "pagination": { "page": page, "limit": limit, "total": total, "total_pages": total_pages } + }))).into_response() + } + Err(e) => { + tracing::error!("Failed to fetch accepted leads: {}", e); + (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": "Failed to fetch accepted leads" }))).into_response() + } + } } async fn accepted_lead_detail( - _s: State, - _a: AuthUser, + State(state): State, + auth: AuthUser, Path(id): Path, ) -> impl IntoResponse { - (StatusCode::OK, Json(serde_json::json!({ "id": id.to_string() }))) + let prof = match ProfessionalRepository::get_by_user_id(&state.pool, auth.user_id).await { + Ok(p) => p, + Err(_) => return (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Professional profile not found" }))).into_response(), + }; + + let row = sqlx::query( + r#" + SELECT + lr.id AS lead_id, + lr.status, + lr.tracecoins_reserved, + lr.requested_at, + lr.resolved_at, + r.id AS requirement_id, + r.title AS requirement_title, + r.description AS requirement_description, + r.location AS requirement_location, + r.profession_key, + r.custom_fields, + u.full_name AS customer_name, + u.email AS customer_email, + u.phone AS customer_phone + FROM lead_requests lr + INNER JOIN requirements r ON r.id = lr.requirement_id + INNER JOIN customers c ON c.id = r.customer_id + INNER JOIN users u ON u.id = c.user_id + WHERE lr.id = $1 + AND lr.professional_id = $2 + AND lr.status = 'ACCEPTED' + "# + ) + .bind(id) + .bind(prof.id) + .fetch_optional(&state.pool) + .await; + + match row { + Ok(Some(row)) => { + use sqlx::Row; + let data = serde_json::json!({ + "lead_id": row.get::("lead_id"), + "status": row.get::("status"), + "tracecoins_reserved": row.get::("tracecoins_reserved"), + "requested_at": row.get::, _>("requested_at"), + "resolved_at": row.try_get::, _>("resolved_at").ok(), + "requirement": { + "id": row.get::("requirement_id"), + "title": row.get::("requirement_title"), + "description": row.try_get::("requirement_description").ok(), + "location": row.try_get::("requirement_location").ok(), + "profession_key": row.get::("profession_key"), + "custom_fields": row.try_get::("custom_fields").ok(), + }, + "customer": { + "name": row.try_get::("customer_name").ok(), + "email": row.get::("customer_email"), + "phone": row.try_get::("customer_phone").ok(), + } + }); + (StatusCode::OK, Json(data)).into_response() + } + Ok(None) => (StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Accepted lead not found" }))).into_response(), + Err(e) => { + tracing::error!("Failed to fetch accepted lead detail: {}", e); + (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": "Failed to fetch lead detail" }))).into_response() + } + } } async fn create_portfolio_item( diff --git a/crates/email/Cargo.toml b/crates/email/Cargo.toml new file mode 100644 index 0000000..a8fed60 --- /dev/null +++ b/crates/email/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "email" +version = "0.1.0" +edition = "2021" + +[dependencies] +lettre = { workspace = true } +anyhow = { workspace = true } +tracing = { workspace = true } diff --git a/crates/email/src/lib.rs b/crates/email/src/lib.rs new file mode 100644 index 0000000..936efad --- /dev/null +++ b/crates/email/src/lib.rs @@ -0,0 +1,319 @@ +use anyhow::Result; +use lettre::{ + transport::smtp::authentication::Credentials, AsyncSmtpTransport, AsyncTransport, + Message, Tokio1Executor, +}; +use std::env; + +// ── Mailer ──────────────────────────────────────────────────────────────────── + +pub struct Mailer { + transport: Option>, + from_email: String, + from_name: String, +} + +impl Mailer { + /// Build from environment variables. SMTP is optional — if vars are missing emails are + /// silently skipped so development works without a real SMTP server. + pub fn new() -> Self { + let smtp_host = env::var("SMTP_HOST").ok(); + let smtp_user = env::var("SMTP_USER").ok(); + let smtp_pass = env::var("SMTP_PASS").ok(); + let smtp_port: u16 = env::var("SMTP_PORT") + .ok() + .and_then(|p| p.parse().ok()) + .unwrap_or(587); + + let from_email = env::var("SMTP_FROM_EMAIL") + .unwrap_or_else(|_| "noreply@nxtgauge.com".to_string()); + let from_name = env::var("SMTP_FROM_NAME") + .unwrap_or_else(|_| "NXTGAUGE".to_string()); + + let transport = match (smtp_host, smtp_user, smtp_pass) { + (Some(host), Some(user), Some(pass)) => { + let creds = Credentials::new(user, pass); + match AsyncSmtpTransport::::starttls_relay(&host) { + Ok(builder) => { + let t = builder.port(smtp_port).credentials(creds).build(); + tracing::info!("SMTP transport configured (host={}:{})", host, smtp_port); + Some(t) + } + Err(e) => { + tracing::warn!("SMTP transport init failed: {} — emails disabled", e); + None + } + } + } + _ => { + tracing::warn!("SMTP_HOST/SMTP_USER/SMTP_PASS not all set — email disabled"); + None + } + }; + + Self { transport, from_email, from_name } + } + + async fn send(&self, to: &str, subject: &str, body: String) -> Result<()> { + let Some(transport) = &self.transport else { + tracing::debug!("SMTP disabled — skipping email to {} (subject: {})", to, subject); + return Ok(()); + }; + let email = Message::builder() + .from(format!("{} <{}>", self.from_name, self.from_email).parse()?) + .to(to.parse()?) + .subject(subject) + .body(body)?; + transport.send(email).await?; + Ok(()) + } + + // ── Auth ────────────────────────────────────────────────────────────────── + + pub async fn send_verification_email(&self, to: &str, name: &str, otp: &str) -> Result<()> { + self.send( + to, + "Verify your NXTGAUGE account", + format!( + "Hello {},\n\nYour verification code is: {}\n\nThis code expires in 15 minutes.\n\nRegards,\nThe NXTGAUGE Team", + name, otp + ), + ).await + } + + pub async fn send_password_reset_email(&self, to: &str, name: &str, token: &str) -> Result<()> { + let frontend_url = env::var("FRONTEND_URL") + .unwrap_or_else(|_| "http://localhost:3000".to_string()); + self.send( + to, + "Reset your NXTGAUGE password", + format!( + "Hello {},\n\nClick to reset your password:\n{}/reset-password?token={}\n\nIf you did not request this, ignore this email.\n\nRegards,\nThe NXTGAUGE Team", + name, frontend_url, token + ), + ).await + } + + pub async fn send_password_changed_email(&self, to: &str, name: &str) -> Result<()> { + self.send( + to, + "Your NXTGAUGE password was changed", + format!( + "Hello {},\n\nYour password was successfully changed. If you did not do this, contact support immediately.\n\nRegards,\nThe NXTGAUGE Team", + name + ), + ).await + } + + pub async fn send_account_suspended_email(&self, to: &str, name: &str, reason: &str) -> Result<()> { + self.send( + to, + "Your NXTGAUGE account has been suspended", + format!( + "Hello {},\n\nYour account has been suspended.\n\nReason: {}\n\nIf you believe this is a mistake, contact support.\n\nRegards,\nThe NXTGAUGE Team", + name, reason + ), + ).await + } + + // ── Onboarding & Approvals ──────────────────────────────────────────────── + + pub async fn send_onboarding_submitted_email(&self, to: &str, name: &str, role: &str) -> Result<()> { + self.send( + to, + "Your NXTGAUGE profile is under review", + format!( + "Hello {},\n\nThank you for submitting your {} profile on NXTGAUGE. Our team will review it within 1–2 business days.\n\nYou will receive an email once your profile is approved.\n\nRegards,\nThe NXTGAUGE Team", + name, role + ), + ).await + } + + pub async fn send_approval_approved_email(&self, to: &str, name: &str, role: &str) -> Result<()> { + let frontend_url = env::var("FRONTEND_URL") + .unwrap_or_else(|_| "http://localhost:3000".to_string()); + self.send( + to, + "Your NXTGAUGE profile is approved!", + format!( + "Hello {},\n\nGreat news! Your {} profile on NXTGAUGE has been approved. You can now access your full dashboard.\n\n{}/dashboard\n\nRegards,\nThe NXTGAUGE Team", + name, role, frontend_url + ), + ).await + } + + pub async fn send_approval_rejected_email(&self, to: &str, name: &str, role: &str, reason: &str) -> Result<()> { + self.send( + to, + "Update required on your NXTGAUGE profile", + format!( + "Hello {},\n\nUnfortunately, we were unable to approve your {} profile at this time.\n\nReason: {}\n\nPlease update your profile and resubmit. If you have questions, contact support.\n\nRegards,\nThe NXTGAUGE Team", + name, role, reason + ), + ).await + } + + // ── Jobs (Company) ──────────────────────────────────────────────────────── + + pub async fn send_job_submitted_email(&self, to: &str, company_name: &str, job_title: &str) -> Result<()> { + self.send( + to, + "Your job posting is under review", + format!( + "Hello {},\n\nYour job posting \"{}\" has been submitted for review. It will go live once our team approves it (usually within 24 hours).\n\nRegards,\nThe NXTGAUGE Team", + company_name, job_title + ), + ).await + } + + pub async fn send_job_approved_email(&self, to: &str, company_name: &str, job_title: &str) -> Result<()> { + let frontend_url = env::var("FRONTEND_URL") + .unwrap_or_else(|_| "http://localhost:3000".to_string()); + self.send( + to, + "Your job posting is now live!", + format!( + "Hello {},\n\nYour job posting \"{}\" has been approved and is now live on NXTGAUGE.\n\n{}/dashboard/jobs\n\nRegards,\nThe NXTGAUGE Team", + company_name, job_title, frontend_url + ), + ).await + } + + pub async fn send_job_rejected_email(&self, to: &str, company_name: &str, job_title: &str, reason: &str) -> Result<()> { + self.send( + to, + "Your job posting needs updates", + format!( + "Hello {},\n\nYour job posting \"{}\" could not be approved.\n\nReason: {}\n\nPlease update and resubmit from your dashboard.\n\nRegards,\nThe NXTGAUGE Team", + company_name, job_title, reason + ), + ).await + } + + pub async fn send_new_application_email(&self, to: &str, company_name: &str, job_title: &str, applicant_name: &str) -> Result<()> { + let frontend_url = env::var("FRONTEND_URL") + .unwrap_or_else(|_| "http://localhost:3000".to_string()); + self.send( + to, + &format!("New application for \"{}\"", job_title), + format!( + "Hello {},\n\n{} has applied for your job posting \"{}\".\n\nReview the application:\n{}/dashboard/jobs\n\nRegards,\nThe NXTGAUGE Team", + company_name, applicant_name, job_title, frontend_url + ), + ).await + } + + pub async fn send_application_status_email(&self, to: &str, applicant_name: &str, job_title: &str, status: &str) -> Result<()> { + let frontend_url = env::var("FRONTEND_URL") + .unwrap_or_else(|_| "http://localhost:3000".to_string()); + let status_label = match status { + "SHORTLISTED" => "shortlisted", + "INTERVIEW" => "selected for an interview", + "OFFERED" => "offered the position", + "HIRED" => "hired", + "REJECTED" => "not selected at this time", + _ => status, + }; + self.send( + to, + &format!("Update on your application for \"{}\"", job_title), + format!( + "Hello {},\n\nYour application for \"{}\" has been updated: you have been {}.\n\nView details:\n{}/dashboard/applications\n\nRegards,\nThe NXTGAUGE Team", + applicant_name, job_title, status_label, frontend_url + ), + ).await + } + + // ── Requirements (Customer) ─────────────────────────────────────────────── + + pub async fn send_requirement_submitted_email(&self, to: &str, name: &str, title: &str) -> Result<()> { + self.send( + to, + "Your requirement is under review", + format!( + "Hello {},\n\nYour requirement \"{}\" has been submitted for review and will go live once approved.\n\nRegards,\nThe NXTGAUGE Team", + name, title + ), + ).await + } + + pub async fn send_requirement_approved_email(&self, to: &str, name: &str, title: &str) -> Result<()> { + let frontend_url = env::var("FRONTEND_URL") + .unwrap_or_else(|_| "http://localhost:3000".to_string()); + self.send( + to, + "Your requirement is now live!", + format!( + "Hello {},\n\nYour requirement \"{}\" is now live and professionals can send you requests.\n\n{}/dashboard/requirements\n\nRegards,\nThe NXTGAUGE Team", + name, title, frontend_url + ), + ).await + } + + pub async fn send_lead_request_received_email(&self, to: &str, customer_name: &str, requirement_title: &str, professional_name: &str) -> Result<()> { + let frontend_url = env::var("FRONTEND_URL") + .unwrap_or_else(|_| "http://localhost:3000".to_string()); + self.send( + to, + &format!("New request for \"{}\"", requirement_title), + format!( + "Hello {},\n\n{} is interested in your requirement \"{}\" and has sent a request to connect.\n\nReview and accept/reject:\n{}/dashboard/requirements\n\nRegards,\nThe NXTGAUGE Team", + customer_name, professional_name, requirement_title, frontend_url + ), + ).await + } + + pub async fn send_lead_accepted_professional_email(&self, to: &str, professional_name: &str, customer_name: &str, customer_email: &str, customer_phone: &str) -> Result<()> { + self.send( + to, + "Your lead request was accepted!", + format!( + "Hello {},\n\n{} has accepted your request. Here are their contact details:\n\nEmail: {}\nPhone: {}\n\nPlease reach out to them directly.\n\nRegards,\nThe NXTGAUGE Team", + professional_name, customer_name, customer_email, customer_phone + ), + ).await + } + + pub async fn send_lead_accepted_customer_email(&self, to: &str, customer_name: &str, professional_name: &str, professional_email: &str, professional_phone: &str) -> Result<()> { + self.send( + to, + "You accepted a professional request", + format!( + "Hello {},\n\nYou accepted {}'s request. Here are their contact details:\n\nEmail: {}\nPhone: {}\n\nRegards,\nThe NXTGAUGE Team", + customer_name, professional_name, professional_email, professional_phone + ), + ).await + } + + pub async fn send_lead_rejected_email(&self, to: &str, professional_name: &str, requirement_title: &str) -> Result<()> { + self.send( + to, + "Your lead request was not accepted", + format!( + "Hello {},\n\nYour request for the requirement \"{}\" was not accepted this time. Your Tracecoins have been returned to your wallet.\n\nKeep exploring the marketplace for more opportunities!\n\n{}/dashboard/marketplace\n\nRegards,\nThe NXTGAUGE Team", + professional_name, + requirement_title, + env::var("FRONTEND_URL").unwrap_or_else(|_| "http://localhost:3000".to_string()) + ), + ).await + } + + // ── Tracecoins ──────────────────────────────────────────────────────────── + + pub async fn send_manual_credit_email(&self, to: &str, name: &str, amount: i32, reason: &str) -> Result<()> { + self.send( + to, + "Tracecoins credited to your account", + format!( + "Hello {},\n\n{} Tracecoins have been credited to your NXTGAUGE wallet.\n\nReason: {}\n\nRegards,\nThe NXTGAUGE Team", + name, amount, reason + ), + ).await + } +} + +impl Default for Mailer { + fn default() -> Self { + Self::new() + } +}