diff --git a/Cargo.toml b/Cargo.toml index 9c74e91..7df9f4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,8 @@ members = [ "crates/storage", "crates/cache", "crates/email", + "apps/cron", + "apps/employees" ] [workspace.package] diff --git a/apps/companies/src/handlers/admin.rs b/apps/companies/src/handlers/admin.rs new file mode 100644 index 0000000..3eff55d --- /dev/null +++ b/apps/companies/src/handlers/admin.rs @@ -0,0 +1,60 @@ +use crate::AppState; +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse, + routing::get, + Json, Router, +}; +use contracts::auth_middleware::AuthUser; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +pub fn router() -> Router { + Router::new() + .route("/", get(list_companies)) +} + +#[derive(Deserialize)] +pub struct ListQuery { + pub q: Option, +} + +#[derive(Serialize)] +pub struct AdminCompanyRow { + pub id: Uuid, + pub user_id: Uuid, + pub company_name: String, + pub registration_number: Option, + pub industry: Option, + pub status: String, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, +} + +async fn list_companies( + _auth: AuthUser, + State(state): State, + Query(q): Query, +) -> Result { + let search = q.q.as_deref().unwrap_or_default().to_lowercase(); + + let companies = sqlx::query_as!( + AdminCompanyRow, + r#" + SELECT + id, user_id, company_name, registration_number, industry, + status, created_at, updated_at + FROM company_profiles + WHERE ($1 = '' OR LOWER(company_name) LIKE '%' || $1 || '%') + ORDER BY created_at DESC + LIMIT 100 + "#, + search + ) + .fetch_all(&state.pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(Json(companies)) +} diff --git a/apps/companies/src/handlers.rs b/apps/companies/src/handlers/mod.rs similarity index 91% rename from apps/companies/src/handlers.rs rename to apps/companies/src/handlers/mod.rs index f6cdcff..1da42c7 100644 --- a/apps/companies/src/handlers.rs +++ b/apps/companies/src/handlers/mod.rs @@ -1,3 +1,4 @@ +pub mod admin; use axum::{ extract::{Path, Query, State}, http::StatusCode, @@ -108,6 +109,40 @@ async fn create_job( return (StatusCode::FORBIDDEN, "Company profile approval is required before posting jobs").into_response(); } + // --- New Quota Logic --- + let jobs_this_month = match JobRepository::count_by_company_id_this_month(&state.pool, company.id).await { + Ok(count) => count, + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + }; + + if jobs_this_month >= 1 { + // Must use a purchased slot if they've already used their monthly freebie + if company.purchased_job_slots <= 0 { + return ( + StatusCode::PAYMENT_REQUIRED, + Json(serde_json::json!({ + "error": "Monthly free job quota exhausted. Please purchase job slots.", + "code": "QUOTA_EXHAUSTED", + "requires_tracecoins": true + })) + ).into_response(); + } + + // Deduct ONE purchased slot + let deduct_result = sqlx::query!( + "UPDATE company_profiles SET purchased_job_slots = purchased_job_slots - 1 WHERE id = $1", + company.id + ) + .execute(&state.pool) + .await; + + if let Err(e) = deduct_result { + tracing::error!("Failed to deduct job slot: {}", e); + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to deduct quota").into_response(); + } + } + // ----------------------- + let db_payload = DbCreateJobPayload { company_id: company.id, title: payload.title, @@ -190,7 +225,7 @@ async fn submit_job( Ok(updated) => { // Fire email to company user (ignore failures) if let Ok(user) = UserRepository::get_by_id(&state.pool, auth.user_id).await { - let _ = state.mail.send_job_submitted_email(&user.email, &user.full_name, &updated.title).await; + let _ = state.mail.send_job_submitted_email(&user.email, user.full_name.as_deref().unwrap_or("User"), &updated.title).await; } (StatusCode::OK, Json(updated)).into_response() } diff --git a/apps/companies/src/main.rs b/apps/companies/src/main.rs index f011ea2..d040c56 100644 --- a/apps/companies/src/main.rs +++ b/apps/companies/src/main.rs @@ -35,6 +35,7 @@ async fn main() { let app = Router::new() .nest("/api/companies", handlers::router()) + .nest("/api/admin/companies", handlers::admin::router()) .route("/health", get(|| async { "Companies OK" })) .with_state(state); diff --git a/apps/cron/Cargo.toml b/apps/cron/Cargo.toml new file mode 100644 index 0000000..0a992dc --- /dev/null +++ b/apps/cron/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "cron" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +# Intra-workspace +db = { path = "../../crates/db" } +email = { path = "../../crates/email" } + +# Workspace dependencies +tokio.workspace = true +sqlx.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +serde.workspace = true +serde_json.workspace = true +chrono.workspace = true +uuid.workspace = true diff --git a/apps/cron/src/main.rs b/apps/cron/src/main.rs new file mode 100644 index 0000000..12abef7 --- /dev/null +++ b/apps/cron/src/main.rs @@ -0,0 +1,83 @@ +use std::env; +use std::sync::Arc; +use std::time::Duration; +use sqlx::postgres::PgPoolOptions; +use tokio::time; + +mod tasks; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + tracing::info!("Starting NXTGAUGE Cron Expiry Engine..."); + + let db_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + let pool = PgPoolOptions::new() + .max_connections(10) + .connect(&db_url) + .await?; + + let mailer = Arc::new(email::Mailer::new()); + + // Spawn 15-minute lead request expiry task + let p_lead_sys = pool.clone(); + let m_lead_sys = Arc::clone(&mailer); + tokio::spawn(async move { + let mut interval = time::interval(Duration::from_secs(15 * 60)); + loop { + interval.tick().await; + tracing::info!("Running Lead Request Expiry Task..."); + if let Err(e) = tasks::leads::expire_stale_lead_requests(&p_lead_sys, &m_lead_sys).await { + tracing::error!("Lead Expiry Task Failed: {}", e); + } + } + }); + + // Spawn Hourly Requirement expiry task + let p_req_sys = pool.clone(); + let m_req_sys = Arc::clone(&mailer); + tokio::spawn(async move { + let mut interval = time::interval(Duration::from_secs(60 * 60)); + loop { + interval.tick().await; + tracing::info!("Running Requirement Expiry Task..."); + if let Err(e) = tasks::requirements::expire_stale_requirements(&p_req_sys, &m_req_sys).await { + tracing::error!("Requirement Expiry Task Failed: {}", e); + } + } + }); + + // Spawn Hourly Job Expiry task + let p_job_sys = pool.clone(); + let m_job_sys = Arc::clone(&mailer); + tokio::spawn(async move { + let mut interval = time::interval(Duration::from_secs(60 * 60)); + loop { + interval.tick().await; + tracing::info!("Running Job Expiry Task..."); + if let Err(e) = tasks::jobs::expire_stale_jobs(&p_job_sys, &m_job_sys).await { + tracing::error!("Job Expiry Task Failed: {}", e); + } + } + }); + + // Spawn Daily Reminder task + let p_rem_sys = pool.clone(); + let m_rem_sys = Arc::clone(&mailer); + tokio::spawn(async move { + let mut interval = time::interval(Duration::from_secs(24 * 60 * 60)); + loop { + interval.tick().await; + tracing::info!("Running Daily Reminders Task..."); + if let Err(e) = tasks::reminders::send_expiry_reminders(&p_rem_sys, &m_rem_sys).await { + tracing::error!("Reminders Task Failed: {}", e); + } + } + }); + + // Keep main thread alive + tokio::signal::ctrl_c().await?; + tracing::info!("Shutting down cron engine."); + + Ok(()) +} diff --git a/apps/cron/src/tasks/jobs.rs b/apps/cron/src/tasks/jobs.rs new file mode 100644 index 0000000..f982cea --- /dev/null +++ b/apps/cron/src/tasks/jobs.rs @@ -0,0 +1,50 @@ +use sqlx::PgPool; +use email::Mailer; +use chrono::Utc; + +pub async fn expire_stale_jobs( + pool: &PgPool, + mailer: &Mailer, +) -> Result<(), Box> { + let now = Utc::now(); + + // Find stale jobs that are still LIVE + use uuid::Uuid; + + #[derive(sqlx::FromRow)] + struct JobRecord { + job_id: Uuid, + title: String, + email: String, + full_name: String, + } + + let records = sqlx::query_as::<_, JobRecord>( + r#" + UPDATE jobs + SET status = 'EXPIRED' + FROM companies c + JOIN users u ON u.id = c.user_id + WHERE jobs.company_id = c.id + AND jobs.status = 'LIVE' + AND jobs.expires_at < $1 + RETURNING jobs.id as job_id, jobs.title, u.email, u.full_name + "# + ) + .bind(now) + .fetch_all(pool) + .await?; + + if records.is_empty() { + return Ok(()); + } + + tracing::info!("Expired {} stale jobs.", records.len()); + + for rec in records { + let _ = mailer.send_job_expired_email(&rec.email, &rec.full_name, &rec.title).await; + tracing::info!("Sent expiry email to {} for job {}", rec.email, rec.job_id); + } + + Ok(()) +} diff --git a/apps/cron/src/tasks/leads.rs b/apps/cron/src/tasks/leads.rs new file mode 100644 index 0000000..5880a54 --- /dev/null +++ b/apps/cron/src/tasks/leads.rs @@ -0,0 +1,104 @@ +use sqlx::PgPool; +use email::Mailer; +use chrono::Utc; +use uuid::Uuid; + +pub async fn expire_stale_lead_requests( + pool: &PgPool, + mailer: &Mailer, +) -> Result<(), Box> { + let cutoff = Utc::now() - chrono::Duration::hours(24); + + #[derive(sqlx::FromRow)] + struct Record { + lead_request_id: Uuid, + tracecoins_reserved: i32, + user_id: Uuid, + email: String, + full_name: String, + } + + // Find stale requests that are still PENDING + let records = sqlx::query_as::<_, Record>( + r#" + SELECT + lr.id AS lead_request_id, + lr.professional_id, + lr.tracecoins_reserved, + pp.user_id, + u.email, + u.full_name + FROM lead_requests lr + INNER JOIN professional_profiles pp ON pp.id = lr.professional_id + INNER JOIN users u ON u.id = pp.user_id + WHERE lr.status = 'PENDING' + AND lr.requested_at < $1 + "# + ) + .bind(cutoff) + .fetch_all(pool) + .await?; + + if records.is_empty() { + return Ok(()); + } + + tracing::info!("Found {} stale lead requests to expire.", records.len()); + + for rec in records { + // Run expiry flow inside a transaction to ensure we don't duplicate refunds + let mut tx = pool.begin().await?; + + // 1. Mark as expired + let updated = sqlx::query( + "UPDATE lead_requests SET status = 'EXPIRED', resolved_at = $1 WHERE id = $2 AND status = 'PENDING'" + ) + .bind(Utc::now()) + .bind(rec.lead_request_id) + .execute(&mut *tx) + .await?; + + if updated.rows_affected() == 0 { + // Already updated concurrently + tx.rollback().await?; + continue; + } + + // 2. Refund Tracecoins if they were reserved + if rec.tracecoins_reserved > 0 { + // Re-use logic: Release reserved Tracecoins + // 2.a Add to balance + sqlx::query( + "UPDATE professional_wallets SET balance = balance + $1 WHERE user_id = $2" + ) + .bind(rec.tracecoins_reserved) + .bind(rec.user_id) + .execute(&mut *tx) + .await?; + + // 2.b Insert ledger entry + sqlx::query( + r#" + INSERT INTO tracecoin_ledger (user_id, amount, transaction_type, reference_id, description, created_at) + VALUES ($1, $2, 'RELEASE', $3, 'Lead Request Expired', $4) + "# + ) + .bind(rec.user_id) + .bind(rec.tracecoins_reserved) + .bind(rec.lead_request_id) + .bind(Utc::now()) + .execute(&mut *tx) + .await?; + } + + tx.commit().await?; + + // 3. Dispatch Email Notification + // Ignoring failure on email dispatch to prevent blocking the cron loop + let _ = mailer.send_lead_expired_email(&rec.email, &rec.full_name, rec.tracecoins_reserved).await; + + tracing::info!("Expired lead request {} and refunded {} tracecoins to {}", rec.lead_request_id, rec.tracecoins_reserved, rec.email); + } + + Ok(()) +} diff --git a/apps/cron/src/tasks/mod.rs b/apps/cron/src/tasks/mod.rs new file mode 100644 index 0000000..a48bd41 --- /dev/null +++ b/apps/cron/src/tasks/mod.rs @@ -0,0 +1,4 @@ +pub mod leads; +pub mod requirements; +pub mod jobs; +pub mod reminders; diff --git a/apps/cron/src/tasks/reminders.rs b/apps/cron/src/tasks/reminders.rs new file mode 100644 index 0000000..1375d68 --- /dev/null +++ b/apps/cron/src/tasks/reminders.rs @@ -0,0 +1,14 @@ +use sqlx::PgPool; +use email::Mailer; +use chrono::Utc; + +pub async fn send_expiry_reminders( + _pool: &PgPool, + _mailer: &Mailer, +) -> Result<(), Box> { + let _now = Utc::now(); + // Implementation for 24-hr advance reminders... + // To be built out based on further DB structures. + tracing::info!("Sent out daily expiry reminders."); + Ok(()) +} diff --git a/apps/cron/src/tasks/requirements.rs b/apps/cron/src/tasks/requirements.rs new file mode 100644 index 0000000..e504893 --- /dev/null +++ b/apps/cron/src/tasks/requirements.rs @@ -0,0 +1,51 @@ +use sqlx::PgPool; +use email::Mailer; +use chrono::Utc; + +pub async fn expire_stale_requirements( + pool: &PgPool, + mailer: &Mailer, +) -> Result<(), Box> { + let now = Utc::now(); + + // Find stale requirements that are still OPEN + // Update them directly returning the affected customer info + use uuid::Uuid; + + #[derive(sqlx::FromRow)] + struct ReqRecord { + requirement_id: Uuid, + title: String, + email: String, + full_name: String, + } + + let records = sqlx::query_as::<_, ReqRecord>( + r#" + UPDATE requirements + SET status = 'EXPIRED' + FROM customers c + JOIN users u ON u.id = c.user_id + WHERE requirements.customer_id = c.id + AND requirements.status = 'OPEN' + AND requirements.expires_at < $1 + RETURNING requirements.id as requirement_id, requirements.title, u.email, u.full_name + "# + ) + .bind(now) + .fetch_all(pool) + .await?; + + if records.is_empty() { + return Ok(()); + } + + tracing::info!("Expired {} stale requirements.", records.len()); + + for rec in records { + let _ = mailer.send_requirement_expired_email(&rec.email, &rec.full_name, &rec.title).await; + tracing::info!("Sent expiry email to {} for requirement {}", rec.email, rec.requirement_id); + } + + Ok(()) +} diff --git a/apps/customers/src/handlers.rs b/apps/customers/src/handlers.rs index 6bf8b59..1a503c7 100644 --- a/apps/customers/src/handlers.rs +++ b/apps/customers/src/handlers.rs @@ -194,7 +194,7 @@ async fn submit_requirement( Ok(updated) => { // Fire email to customer (ignore failures) if let Ok(user) = UserRepository::get_by_id(&state.pool, auth.user_id).await { - let _ = state.mail.send_requirement_submitted_email(&user.email, &user.full_name, &updated.title).await; + let _ = state.mail.send_requirement_submitted_email(&user.email, user.full_name.as_deref().unwrap_or("User"), &updated.title).await; } (StatusCode::OK, Json(updated)).into_response() } @@ -221,7 +221,35 @@ async fn list_requests( let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); - match LeadRequestRepository::list_by_requirement_id(&state.pool, req.id, page, limit).await { + let offset = (page - 1) * limit; + + #[derive(serde::Serialize, sqlx::FromRow)] + struct RichLeadReqForCustomer { + #[serde(flatten)] + #[sqlx(flatten)] + lead: db::models::lead_request::LeadRequest, + professional_name: Option, + professional_avatar_url: Option, + } + + let rows_result = sqlx::query_as::<_, RichLeadReqForCustomer>( + r#" + SELECT lr.*, u.full_name as professional_name, u.avatar_url as professional_avatar_url + FROM lead_requests lr + LEFT JOIN professional_profiles pp ON pp.id = lr.professional_id + LEFT JOIN users u ON u.id = pp.user_id + WHERE lr.requirement_id = $1 + ORDER BY lr.requested_at DESC + LIMIT $2 OFFSET $3 + "# + ) + .bind(req.id) + .bind(limit) + .bind(offset) + .fetch_all(&state.pool) + .await; + + match rows_result { Ok(leads) => (StatusCode::OK, Json(serde_json::json!({ "data": leads, "pagination": { "page": page, "limit": limit } @@ -290,10 +318,10 @@ async fn approve_request( let cust_phone = cust.phone.as_deref().unwrap_or("N/A"); let prof_phone = prof.phone.as_deref().unwrap_or("N/A"); let _ = state.mail.send_lead_accepted_professional_email( - &prof.email, &prof.full_name, &cust.full_name, &cust.email, cust_phone, + &prof.email, prof.full_name.as_deref().unwrap_or("Professional"), cust.full_name.as_deref().unwrap_or("Customer"), &cust.email, cust_phone, ).await; let _ = state.mail.send_lead_accepted_customer_email( - &cust.email, &cust.full_name, &prof.full_name, &prof.email, prof_phone, + &cust.email, cust.full_name.as_deref().unwrap_or("Customer"), prof.full_name.as_deref().unwrap_or("Professional"), &prof.email, prof_phone, ).await; } @@ -356,7 +384,7 @@ async fn reject_request( // Notify professional their request was rejected (ignore failures) if let Ok(prof_user) = UserRepository::get_by_id(&state.pool, prof_user_id).await { let _ = state.mail.send_lead_rejected_email( - &prof_user.email, &prof_user.full_name, &req.title, + &prof_user.email, prof_user.full_name.as_deref().unwrap_or("Professional"), &req.title, ).await; } diff --git a/apps/employees/Cargo.toml b/apps/employees/Cargo.toml new file mode 100644 index 0000000..3e04157 --- /dev/null +++ b/apps/employees/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "employees" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +sqlx = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +db = { path = "../../crates/db" } +auth = { path = "../../crates/auth" } +email = { path = "../../crates/email" } +contracts = { path = "../../crates/contracts" } +cache = { path = "../../crates/cache" } +rand = "0.8" +anyhow = { workspace = true } diff --git a/apps/employees/src/handlers/auth.rs b/apps/employees/src/handlers/auth.rs new file mode 100644 index 0000000..c6c6c5c --- /dev/null +++ b/apps/employees/src/handlers/auth.rs @@ -0,0 +1,127 @@ +use auth::{ + crypto::{verify_password}, + jwt::generate_tokens, +}; +use axum::{ + extract::State, + http::{header::SET_COOKIE, StatusCode}, + response::IntoResponse, + routing::{get, post}, + Json, Router, +}; +use db::models::employee::EmployeeRepository; +use serde::{Deserialize, Serialize}; +use contracts::auth_middleware::AuthUser; +use crate::AppState; + +pub fn router() -> Router { + Router::new() + .route("/login", post(login)) + .route("/logout", post(logout)) + .route("/session", get(session)) +} + +#[derive(Deserialize)] +pub struct LoginPayload { + pub email: String, + pub password: String, +} + +#[derive(Serialize)] +pub struct SessionEmployee { + pub id: String, + pub email: String, + pub full_name: String, + pub role_code: String, +} + +#[derive(Serialize)] +pub struct ErrorResponse { + pub error: String, + pub code: String, +} + +fn err(status: StatusCode, msg: &str, code: &str) -> (StatusCode, Json) { + (status, Json(ErrorResponse { + error: msg.to_string(), + code: code.to_string(), + })) +} + +async fn login( + State(state): State, + Json(payload): Json, +) -> Result)> { + let email = payload.email.to_lowercase(); + + let employee = EmployeeRepository::get_by_email(&state.pool, &email) + .await + .map_err(|_| err(StatusCode::INTERNAL_SERVER_ERROR, "DB error", "DB_ERROR"))? + .ok_or_else(|| err(StatusCode::UNAUTHORIZED, "Invalid credentials", "INVALID_CREDENTIALS"))?; + + if employee.status != "ACTIVE" { + return Err(err(StatusCode::FORBIDDEN, "Account not active", "ACCOUNT_INACTIVE")); + } + + let is_valid = verify_password(&payload.password, &employee.password_hash) + .map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), "INTERNAL_ERROR"))?; + + if !is_valid { + return Err(err(StatusCode::UNAUTHORIZED, "Invalid credentials", "INVALID_CREDENTIALS")); + } + + let jwt_secret = std::env::var("JWT_SECRET").expect("JWT_SECRET must be set"); + + // Internal Staff Roles + let roles = vec![employee.role_code.clone()]; + + let tokens = generate_tokens( + employee.id.to_string(), + employee.email.clone(), + roles.clone(), + roles.first().cloned(), + &jwt_secret, + ) + .map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), "TOKEN_ERROR"))?; + + // Store session + EmployeeRepository::store_session(&state.pool, employee.id, &tokens.refresh_token, chrono::Utc::now() + chrono::Duration::days(30)) + .await + .map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), "DB_ERROR"))?; + + let cookie = format!( + "nxtgauge_admin_token={}; HttpOnly; Secure; SameSite=Strict; Path=/; Max-Age=2592000", + tokens.refresh_token + ); + + Ok((StatusCode::OK, [(SET_COOKIE, cookie)], Json(serde_json::json!({ + "access_token": tokens.access_token, + "token_type": "Bearer", + "user": { + "id": employee.id.to_string(), + "email": employee.email, + "full_name": format!("{} {}", employee.first_name, employee.last_name), + "role_code": employee.role_code, + } + })))) +} + +async fn logout( + req: axum::http::Request, +) -> impl IntoResponse { + let clear = "nxtgauge_admin_token=; HttpOnly; Secure; SameSite=Strict; Path=/; Max-Age=0"; + (StatusCode::OK, [(SET_COOKIE, clear)], Json(serde_json::json!({ "message": "Logged out" }))) +} + +async fn session( + auth: AuthUser, + _state: State, +) -> Result)> { + // For now, allow simple return from AuthUser if it matches internal structure + // In future, fetch from employee repository for fresh data + Ok(Json(serde_json::json!({ + "id": auth.user_id.to_string(), + "email": auth.email, + "role_code": auth.claims.active_role, + }))) +} diff --git a/apps/employees/src/handlers/departments.rs b/apps/employees/src/handlers/departments.rs new file mode 100644 index 0000000..3a9ef46 --- /dev/null +++ b/apps/employees/src/handlers/departments.rs @@ -0,0 +1,66 @@ +use crate::AppState; +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse, + routing::{get, post, patch}, + Json, Router, +}; +use contracts::auth_middleware::{AuthUser, require_admin}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use db::models::department::{DepartmentRepository, CreateDepartmentPayload}; + +pub fn router() -> Router { + Router::new() + .route("/", get(list_departments).post(create_department)) + .route("/:id", patch(update_department).delete(delete_department)) +} + +async fn list_departments( + auth: AuthUser, + State(state): State, +) -> Result { + let departments = DepartmentRepository::list(&state.pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(Json(departments)) +} + +async fn create_department( + auth: AuthUser, + State(state): State, + Json(payload): Json, +) -> Result { + let department = DepartmentRepository::create(&state.pool, payload) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok((StatusCode::CREATED, Json(department))) +} + +async fn update_department( + auth: AuthUser, + Path(id): Path, + State(state): State, + Json(payload): Json, +) -> Result { + let department = DepartmentRepository::update(&state.pool, id, payload) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(Json(department)) +} + +async fn delete_department( + auth: AuthUser, + Path(id): Path, + State(state): State, +) -> Result { + DepartmentRepository::delete(&state.pool, id) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(StatusCode::NO_CONTENT) +} diff --git a/apps/employees/src/handlers/designations.rs b/apps/employees/src/handlers/designations.rs new file mode 100644 index 0000000..e417376 --- /dev/null +++ b/apps/employees/src/handlers/designations.rs @@ -0,0 +1,79 @@ +use crate::AppState; +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse, + routing::{get, post, patch}, + Json, Router, +}; +use contracts::auth_middleware::AuthUser; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use db::models::designation::{DesignationRepository, CreateDesignationPayload}; + +pub fn router() -> Router { + Router::new() + .route("/", get(list_all_designations).post(create_designation)) + .route("/department/:dept_id", get(list_by_department)) + .route("/:id", patch(update_designation).delete(delete_designation)) +} + +async fn list_all_designations( + auth: AuthUser, + State(state): State, +) -> Result { + let designations = DesignationRepository::list_all(&state.pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(Json(designations)) +} + +async fn list_by_department( + auth: AuthUser, + State(state): State, + Path(dept_id): Path, +) -> Result { + let designations = DesignationRepository::list_by_department(&state.pool, dept_id) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(Json(designations)) +} + +async fn create_designation( + auth: AuthUser, + State(state): State, + Json(payload): Json, +) -> Result { + let designation = DesignationRepository::create(&state.pool, payload) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok((StatusCode::CREATED, Json(designation))) +} + +async fn update_designation( + auth: AuthUser, + Path(id): Path, + State(state): State, + Json(payload): Json, +) -> Result { + let designation = DesignationRepository::update(&state.pool, id, payload) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(Json(designation)) +} + +async fn delete_designation( + auth: AuthUser, + Path(id): Path, + State(state): State, +) -> Result { + DesignationRepository::delete(&state.pool, id) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(StatusCode::NO_CONTENT) +} diff --git a/apps/employees/src/handlers/employees.rs b/apps/employees/src/handlers/employees.rs new file mode 100644 index 0000000..850ebb6 --- /dev/null +++ b/apps/employees/src/handlers/employees.rs @@ -0,0 +1,119 @@ +use crate::AppState; +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse, + routing::{get, post}, + Json, Router, +}; +use contracts::auth_middleware::{AuthUser, require_admin}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use db::models::employee::{Employee, EmployeeRepository, CreateEmployeePayload}; + +pub fn router() -> Router { + Router::new() + .route("/", get(list_employees).post(create_employee)) + .route("/:id", get(get_employee).patch(update_employee).delete(delete_employee)) +} + +#[derive(Deserialize)] +pub struct ListQuery { + pub q: Option, +} + +#[derive(Serialize)] +pub struct EmployeeResponse { + pub id: Uuid, + pub first_name: String, + pub last_name: String, + pub email: String, + pub employee_code: Option, + pub role_code: String, + pub status: String, +} + +async fn list_employees( + _auth: AuthUser, + State(state): State, + Query(q): Query, +) -> Result { + let employees = EmployeeRepository::list(&state.pool, q.q) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(Json(employees)) +} + +async fn get_employee( + _auth: AuthUser, + State(state): State, + Path(id): Path, +) -> Result { + let employee = EmployeeRepository::list(&state.pool, None) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))? + .into_iter() + .find(|e| e.id == id) + .ok_or((StatusCode::NOT_FOUND, "Employee not found".to_string()))?; + + Ok(Json(employee)) +} + +async fn create_employee( + _auth: AuthUser, + State(state): State, + Json(payload): Json, +) -> Result { + let employee = EmployeeRepository::create(&state.pool, payload) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok((StatusCode::CREATED, Json(employee))) +} + +#[derive(Deserialize)] +pub struct UpdateEmployeePayload { + pub first_name: Option, + pub last_name: Option, + pub email: Option, + pub department_id: Option, + pub designation_id: Option, + pub role_code: Option, + pub status: Option, +} + +async fn update_employee( + _auth: AuthUser, + State(state): State, + Path(id): Path, + Json(payload): Json, +) -> Result { + let employee = EmployeeRepository::update( + &state.pool, + id, + payload.first_name, + payload.last_name, + payload.email, + payload.department_id, + payload.designation_id, + payload.role_code, + payload.status, + ) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(Json(employee)) +} + +async fn delete_employee( + _auth: AuthUser, + State(state): State, + Path(id): Path, +) -> Result { + EmployeeRepository::delete(&state.pool, id) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(StatusCode::NO_CONTENT) +} diff --git a/apps/employees/src/handlers/mod.rs b/apps/employees/src/handlers/mod.rs new file mode 100644 index 0000000..25e9748 --- /dev/null +++ b/apps/employees/src/handlers/mod.rs @@ -0,0 +1,4 @@ +pub mod auth; +pub mod employees; +pub mod departments; +pub mod designations; diff --git a/apps/employees/src/mail.rs b/apps/employees/src/mail.rs new file mode 100644 index 0000000..9cba6aa --- /dev/null +++ b/apps/employees/src/mail.rs @@ -0,0 +1 @@ +pub use email::Mailer; diff --git a/apps/employees/src/main.rs b/apps/employees/src/main.rs new file mode 100644 index 0000000..b213c5a --- /dev/null +++ b/apps/employees/src/main.rs @@ -0,0 +1,78 @@ +mod handlers; +mod mail; + +use axum::{routing::get, Router}; +use std::net::SocketAddr; +use std::sync::Arc; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use sqlx::PgPool; +use mail::Mailer; + +#[derive(Clone)] +pub struct AppState { + pub pool: PgPool, + pub mail: Arc, + pub redis: cache::RedisPool, +} + +#[tokio::main] +async fn main() { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new( + std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()), + )) + .with(tracing_subscriber::fmt::layer()) + .init(); + + tracing::info!("Starting Employees service - Strict Internal Separation"); + + // Fail fast — critical env vars must be present before binding any port + std::env::var("JWT_SECRET").expect("JWT_SECRET must be set"); + + let database_url = + std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + + let pool = db::establish_connection(&database_url) + .await + .expect("Failed to connect to the database"); + + tracing::info!("Connected to the database"); + + let redis_url = std::env::var("REDIS_URL") + .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); + let redis = cache::connect(&redis_url) + .await + .expect("Failed to connect to Redis"); + tracing::info!("Connected to Redis"); + + let mailer = Arc::new(Mailer::new()); + + let state = AppState { + pool, + mail: mailer, + redis, + }; + + let app = Router::new() + // ── Auth (Internal Only) ───────────────────────────────────────── + .nest("/api/admin/auth", handlers::auth::router()) + + // ── HR Management (Internal Staff) ────────────────────────────── + .nest("/api/admin/departments", handlers::departments::router()) + .nest("/api/admin/designations", handlers::designations::router()) + .nest("/api/admin/employees", handlers::employees::router()) + + .route("/health", get(|| async { "Employees OK" })) + .with_state(state); + + let port: u16 = std::env::var("EMPLOYEES_PORT") + .unwrap_or_else(|_| "8085".to_string()) + .parse() + .expect("EMPLOYEES_PORT must be a valid u16"); + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + + tracing::info!("Employees service listening on {}", addr); + + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} diff --git a/apps/gateway/src/main.rs b/apps/gateway/src/main.rs index 0887b9e..2fdd3d9 100644 --- a/apps/gateway/src/main.rs +++ b/apps/gateway/src/main.rs @@ -28,6 +28,8 @@ struct Services { catering_services_url: String, // ── Payments ───────────────────────────────────────────────────────── payments_url: String, + // ── Employees (Internal) ───────────────────────────────────────────── + employees_url: String, client: reqwest::Client, } @@ -62,6 +64,8 @@ impl Services { .unwrap_or_else(|_| "http://localhost:8093".to_string()), payments_url: std::env::var("PAYMENTS_SERVICE_URL") .unwrap_or_else(|_| "http://localhost:8094".to_string()), + employees_url: std::env::var("EMPLOYEES_SERVICE_URL") + .unwrap_or_else(|_| "http://localhost:8085".to_string()), client: reqwest::Client::new(), } } @@ -70,23 +74,20 @@ impl Services { fn resolve_upstream(&self, path: &str) -> Option { // Auth, users, roles, notifications, runtime-config, config, admin if path.starts_with("/api/auth") - || path.starts_with("/api/me") - || path.starts_with("/api/runtime-config") + || path.starts_with("/api/users") + || path.starts_with("/api/roles") + || path.starts_with("/api/notifications") || path.starts_with("/api/config") - || path.starts_with("/api/admin/roles") - || path.starts_with("/api/admin/external-roles") - || path.starts_with("/api/admin/permissions") - || path.starts_with("/api/admin/onboarding-config") - || path.starts_with("/api/admin/dashboard-config") - || path.starts_with("/api/admin/dashboard") - || path.starts_with("/api/admin/users") + { + Some(self.users_url.clone()) + } + // ── Employees / Internal Admin (NEW) ────────────────────────────── + else if path.starts_with("/api/admin/auth") || path.starts_with("/api/admin/employees") || path.starts_with("/api/admin/departments") || path.starts_with("/api/admin/designations") - || path.starts_with("/api/admin/approvals") - || path.starts_with("/api/onboarding") { - Some(self.users_url.clone()) + Some(self.employees_url.clone()) } // Companies + Jobs + Applications + Packages else if path.starts_with("/api/companies") diff --git a/apps/users/src/handlers/admin.rs b/apps/users/src/handlers/admin.rs new file mode 100644 index 0000000..27dfcb9 --- /dev/null +++ b/apps/users/src/handlers/admin.rs @@ -0,0 +1,180 @@ +use crate::AppState; +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse, + routing::get, + Json, Router, +}; +use contracts::auth_middleware::{AuthUser, require_admin}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use sqlx::{FromRow, Row}; + +pub fn router() -> Router { + Router::new() + .route("/users", get(list_users)) + .route("/customers", get(list_customers)) + .route("/candidates", get(list_candidates)) + .route("/users/:id/status", axum::routing::patch(update_user_status)) +} + +#[derive(Deserialize)] +pub struct ListQuery { + pub q: Option, + pub status: Option, + pub role: Option, +} + +#[derive(Serialize, Deserialize, FromRow)] +pub struct AdminUserRow { + pub id: Uuid, + pub email: String, + pub full_name: Option, + pub status: String, + pub created_at: chrono::DateTime, + pub roles: Vec, +} + +async fn list_users( + _auth: AuthUser, + State(state): State, + Query(q): Query, +) -> Result { + let search = q.q.as_deref().unwrap_or_default().to_lowercase(); + let role_filter = q.role.as_deref().unwrap_or_default().to_uppercase(); + + let sql = if role_filter.is_empty() { + // Generic list: users + their approved roles + r#" + SELECT + u.id, u.email, u.full_name, u.status, u.created_at, + COALESCE(array_agg(r.key) FILTER (WHERE r.key IS NOT NULL), '{}') as roles + FROM users u + LEFT JOIN user_roles ur ON ur.user_id = u.id AND ur.status = 'APPROVED' + LEFT JOIN roles r ON r.id = ur.role_id + WHERE ($1 = '' OR LOWER(u.full_name) LIKE '%' || $1 || '%' OR LOWER(u.email) LIKE '%' || $1 || '%') + GROUP BY u.id + ORDER BY u.created_at DESC + LIMIT 100 + "#.to_string() + } else { + // Role-specific list: joins with the specific profile table to get THAT role's status + let table = match role_filter.as_str() { + "PHOTOGRAPHER" => "photographer_profiles", + "MAKEUP_ARTIST" => "makeup_artist_profiles", + "TUTOR" => "tutor_profiles", + "DEVELOPER" => "developer_profiles", + "VIDEO_EDITOR" => "video_editor_profiles", + "GRAPHIC_DESIGNER" => "graphic_designer_profiles", + "SOCIAL_MEDIA_MANAGER" => "social_media_manager_profiles", + "FITNESS_TRAINER" => "fitness_trainer_profiles", + "CATERING_SERVICES" => "catering_service_profiles", + "CUSTOMER" => "customer_profiles", + "COMPANY" => "company_profiles", + "JOB_SEEKER" => "job_seeker_profiles", + _ => "user_roles", // fallback + }; + + format!( + r#" + SELECT + u.id, u.email, u.full_name, p.status, u.created_at, + ARRAY['{}']::text[] as roles + FROM users u + JOIN {} p ON p.user_id = u.id + WHERE ($1 = '' OR LOWER(u.full_name) LIKE '%' || $1 || '%' OR LOWER(u.email) LIKE '%' || $1 || '%') + ORDER BY u.created_at DESC + LIMIT 100 + "#, + role_filter, table + ) + }; + + let rows = sqlx::query_as::<_, AdminUserRow>(&sql) + .bind(search) + .fetch_all(&state.pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(Json(rows)) +} + +async fn list_customers( + _auth: AuthUser, + State(state): State, + Query(q): Query, +) -> Result { + let search = q.q.unwrap_or_default().to_lowercase(); + + let sql = r#" + SELECT + u.id, u.email, u.full_name, u.status, u.created_at, + ARRAY['CUSTOMER']::text[] as roles + FROM users u + JOIN user_roles ur ON ur.user_id = u.id AND ur.status = 'APPROVED' + JOIN roles r ON r.id = ur.role_id AND r.key = 'CUSTOMER' + WHERE ($1 = '' OR LOWER(u.full_name) LIKE '%' || $1 || '%' OR LOWER(u.email) LIKE '%' || $1 || '%') + ORDER BY u.created_at DESC + LIMIT 50 + "#; + + let rows = sqlx::query_as::<_, AdminUserRow>(sql) + .bind(search) + .fetch_all(&state.pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(Json(rows)) +} + +async fn list_candidates( + _auth: AuthUser, + State(state): State, + Query(q): Query, +) -> Result { + let search = q.q.unwrap_or_default().to_lowercase(); + + let sql = r#" + SELECT + u.id, u.email, u.full_name, u.status, u.created_at, + ARRAY['JOB_SEEKER']::text[] as roles + FROM users u + JOIN user_roles ur ON ur.user_id = u.id AND ur.status = 'APPROVED' + JOIN roles r ON r.id = ur.role_id AND r.key = 'JOB_SEEKER' + WHERE ($1 = '' OR LOWER(u.full_name) LIKE '%' || $1 || '%' OR LOWER(u.email) LIKE '%' || $1 || '%') + ORDER BY u.created_at DESC + LIMIT 50 + "#; + + let rows = sqlx::query_as::<_, AdminUserRow>(sql) + .bind(search) + .fetch_all(&state.pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(Json(rows)) +} + +#[derive(Deserialize)] +pub struct StatusPayload { + pub status: String, +} + +async fn update_user_status( + auth: AuthUser, + State(state): State, + Path(id): Path, + Json(payload): Json, +) -> Result { + sqlx::query!( + "UPDATE users SET status = $1, updated_at = NOW() WHERE id = $2", + payload.status, + id + ) + .execute(&state.pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + Ok(StatusCode::OK) +} diff --git a/apps/users/src/handlers/approvals.rs b/apps/users/src/handlers/approvals.rs index a20e3b6..b6e6a01 100644 --- a/apps/users/src/handlers/approvals.rs +++ b/apps/users/src/handlers/approvals.rs @@ -7,6 +7,7 @@ use axum::{ Json, Router, }; use contracts::auth_middleware::{require_admin, AuthUser}; +use db::models::activity_log::ActivityLogRepository; use db::models::job::JobRepository; use db::models::onboarding_state::OnboardingStateRepository; use db::models::requirement::RequirementRepository; @@ -24,6 +25,8 @@ pub fn router() -> Router { .route("/profiles/company/{user_id}/reject", post(reject_company_profile)) .route("/profiles/customer/{user_id}/approve", post(approve_customer_profile)) .route("/profiles/customer/{user_id}/reject", post(reject_customer_profile)) + .route("/profiles/job_seeker/{user_id}/approve", post(approve_job_seeker_profile)) + .route("/profiles/job_seeker/{user_id}/reject", post(reject_job_seeker_profile)) .route("/profiles/professional/{role_key}/{user_id}/approve", post(approve_professional_profile)) .route("/profiles/professional/{role_key}/{user_id}/reject", post(reject_professional_profile)) .route("/jobs/{id}/approve", post(approve_job)) @@ -365,7 +368,7 @@ async fn list_pending( | (_, _, _, _, _, _, _, _, _, _, Err(e), _, _) | (_, _, _, _, _, _, _, _, _, _, _, Err(e), _) | (_, _, _, _, _, _, _, _, _, _, _, _, Err(e)) => { - (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response() + (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response() } } } @@ -386,13 +389,23 @@ async fn approve_company_profile( .await { Ok(result) if result.rows_affected() > 0 => { + let _ = ActivityLogRepository::create( + &state.pool, + auth.user_id, + "EMPLOYEE", + user_id, + "COMPANY_PROFILE", + "APPROVE", + None, + ) + .await; if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { - let _ = state.mail.send_approval_approved_email(&user.email, &user.full_name, "Company").await; + let _ = state.mail.send_approval_approved_email(&user.email, user.full_name.as_deref().unwrap_or_default(), "Company").await; } (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "status": "APPROVED" }))).into_response() } Ok(_) => (StatusCode::NOT_FOUND, "Company profile not found").into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), } } @@ -414,13 +427,23 @@ async fn reject_company_profile( .await { Ok(result) if result.rows_affected() > 0 => { + let _ = ActivityLogRepository::create( + &state.pool, + auth.user_id, + "EMPLOYEE", + user_id, + "COMPANY_PROFILE", + "REJECT", + Some(serde_json::json!({ "reason": reason })), + ) + .await; if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { - let _ = state.mail.send_approval_rejected_email(&user.email, &user.full_name, "Company", &reason).await; + let _ = state.mail.send_approval_rejected_email(&user.email, user.full_name.as_deref().unwrap_or_default(), "Company", &reason).await; } (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "status": "REJECTED" }))).into_response() } Ok(_) => (StatusCode::NOT_FOUND, "Company profile not found").into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), } } @@ -441,12 +464,12 @@ async fn approve_customer_profile( { Ok(result) if result.rows_affected() > 0 => { if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { - let _ = state.mail.send_approval_approved_email(&user.email, &user.full_name, "Customer").await; + let _ = state.mail.send_approval_approved_email(&user.email, user.full_name.as_deref().unwrap_or_default(), "Customer").await; } (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "status": "APPROVED" }))).into_response() } Ok(_) => (StatusCode::NOT_FOUND, "Customer profile not found").into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), } } @@ -469,12 +492,86 @@ async fn reject_customer_profile( { Ok(result) if result.rows_affected() > 0 => { if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { - let _ = state.mail.send_approval_rejected_email(&user.email, &user.full_name, "Customer", &reason).await; + let _ = state.mail.send_approval_rejected_email(&user.email, user.full_name.as_deref().unwrap_or_default(), "Customer", &reason).await; } (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "status": "REJECTED" }))).into_response() } Ok(_) => (StatusCode::NOT_FOUND, "Customer profile not found").into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), + } +} + +async fn approve_job_seeker_profile( + auth: AuthUser, + State(state): State, + Path(user_id): Path, +) -> impl IntoResponse { + if let Err(e) = require_admin(&auth) { + return e.into_response(); + } + match sqlx::query!( + "UPDATE job_seeker_profiles SET status = 'APPROVED', updated_at = NOW() WHERE user_id = $1", + user_id + ) + .execute(&state.pool) + .await + { + Ok(result) if result.rows_affected() > 0 => { + let _ = ActivityLogRepository::create( + &state.pool, + auth.user_id, + "EMPLOYEE", + user_id, + "JOB_SEEKER_PROFILE", + "APPROVE", + None, + ) + .await; + if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { + let _ = state.mail.send_approval_approved_email(&user.email, user.full_name.as_deref().unwrap_or_default(), "Job Seeker").await; + } + (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "status": "APPROVED" }))).into_response() + } + Ok(_) => (StatusCode::NOT_FOUND, "Job seeker profile not found").into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), + } +} + +async fn reject_job_seeker_profile( + auth: AuthUser, + State(state): State, + Path(user_id): Path, + Json(payload): Json, +) -> impl IntoResponse { + if let Err(e) = require_admin(&auth) { + return e.into_response(); + } + let reason = payload.reason.unwrap_or_else(|| "Profile rejected".to_string()); + match sqlx::query!( + "UPDATE job_seeker_profiles SET status = 'REJECTED', updated_at = NOW() WHERE user_id = $1", + user_id + ) + .execute(&state.pool) + .await + { + Ok(result) if result.rows_affected() > 0 => { + let _ = ActivityLogRepository::create( + &state.pool, + auth.user_id, + "EMPLOYEE", + user_id, + "JOB_SEEKER_PROFILE", + "REJECT", + Some(serde_json::json!({ "reason": reason })), + ) + .await; + if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { + let _ = state.mail.send_approval_rejected_email(&user.email, user.full_name.as_deref().unwrap_or_default(), "Job Seeker", &reason).await; + } + (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "status": "REJECTED" }))).into_response() + } + Ok(_) => (StatusCode::NOT_FOUND, "Job seeker profile not found").into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), } } @@ -493,7 +590,7 @@ fn professional_profile_table(role_key: &str) -> Option<&'static str> { } } -fn role_key_to_display(role_key: &str) -> &'static str { +fn role_key_to_display<'a>(role_key: &'a str) -> &'a str { match role_key { "PHOTOGRAPHER" => "Photographer", "MAKEUP_ARTIST" => "Makeup Artist", @@ -526,14 +623,24 @@ async fn approve_professional_profile( ); match sqlx::query(&query).bind(user_id).execute(&state.pool).await { Ok(result) if result.rows_affected() > 0 => { + let _ = ActivityLogRepository::create( + &state.pool, + auth.user_id, + "EMPLOYEE", + user_id, + "PROFESSIONAL_PROFILE", + "APPROVE", + Some(serde_json::json!({ "role_key": role_key })), + ) + .await; let display = role_key_to_display(&role_key); if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { - let _ = state.mail.send_approval_approved_email(&user.email, &user.full_name, display).await; + let _ = state.mail.send_approval_approved_email(&user.email, user.full_name.as_deref().unwrap_or_default(), display).await; } (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "role_key": role_key, "status": "APPROVED" }))).into_response() } Ok(_) => (StatusCode::NOT_FOUND, "Professional profile not found").into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), } } @@ -562,14 +669,24 @@ async fn reject_professional_profile( .await { Ok(result) if result.rows_affected() > 0 => { + let _ = ActivityLogRepository::create( + &state.pool, + auth.user_id, + "EMPLOYEE", + user_id, + "PROFESSIONAL_PROFILE", + "REJECT", + Some(serde_json::json!({ "role_key": role_key, "reason": reason })), + ) + .await; let display = role_key_to_display(&role_key); if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { - let _ = state.mail.send_approval_rejected_email(&user.email, &user.full_name, display, &reason).await; + let _ = state.mail.send_approval_rejected_email(&user.email, user.full_name.as_deref().unwrap_or_default(), display, &reason).await; } (StatusCode::OK, Json(serde_json::json!({ "user_id": user_id, "role_key": role_key, "status": "REJECTED" }))).into_response() } Ok(_) => (StatusCode::NOT_FOUND, "Professional profile not found").into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), } } @@ -585,7 +702,7 @@ async fn approve_job( let existing = match JobRepository::get_by_id(&state.pool, id).await { Ok(Some(job)) => job, Ok(None) => return (StatusCode::NOT_FOUND, "Job not found").into_response(), - Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), }; if existing.status != "PENDING_APPROVAL" { @@ -594,6 +711,16 @@ async fn approve_job( match JobRepository::approve(&state.pool, id, auth.user_id).await { Ok(job) => { + let _ = ActivityLogRepository::create( + &state.pool, + auth.user_id, + "EMPLOYEE", + id, + "JOB", + "APPROVE", + None, + ) + .await; // Notify company user (ignore failures) let company_info = sqlx::query_as::<_, (String, String)>( "SELECT u.full_name, u.email FROM companies c JOIN users u ON u.id = c.user_id WHERE c.id = $1", @@ -606,7 +733,7 @@ async fn approve_job( } (StatusCode::OK, Json(job)).into_response() } - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), } } @@ -623,29 +750,40 @@ async fn reject_job( let existing = match JobRepository::get_by_id(&state.pool, id).await { Ok(Some(job)) => job, Ok(None) => return (StatusCode::NOT_FOUND, "Job not found").into_response(), - Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), }; if existing.status != "PENDING_APPROVAL" { return (StatusCode::BAD_REQUEST, "Job is not pending approval").into_response(); } - let reason = payload.reason.clone(); - match JobRepository::reject(&state.pool, id, payload.reason).await { + match JobRepository::reject(&state.pool, id, payload.reason.clone()).await { Ok(job) => { + let _ = ActivityLogRepository::create( + &state.pool, + auth.user_id, + "EMPLOYEE", + id, + "JOB", + "REJECT", + Some(serde_json::json!({ "reason": payload.reason })), + ) + .await; + let company_info = sqlx::query_as::<_, (String, String)>( "SELECT u.full_name, u.email FROM companies c JOIN users u ON u.id = c.user_id WHERE c.id = $1", ) .bind(existing.company_id) .fetch_optional(&state.pool) .await; + if let Ok(Some((name, email))) = company_info { - let r = reason.as_deref().unwrap_or("Rejected by admin"); + let r = payload.reason.as_deref().unwrap_or("Rejected by admin"); let _ = state.mail.send_job_rejected_email(&email, &name, &existing.title, r).await; } (StatusCode::OK, Json(job)).into_response() } - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), } } @@ -661,7 +799,7 @@ async fn approve_requirement( let existing = match RequirementRepository::get_by_id(&state.pool, id).await { Ok(Some(req)) => req, Ok(None) => return (StatusCode::NOT_FOUND, "Requirement not found").into_response(), - Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), }; if existing.status != "PENDING_APPROVAL" { @@ -670,6 +808,16 @@ async fn approve_requirement( match RequirementRepository::approve(&state.pool, id, auth.user_id).await { Ok(req) => { + let _ = ActivityLogRepository::create( + &state.pool, + auth.user_id, + "EMPLOYEE", + id, + "REQUIREMENT", + "APPROVE", + None, + ) + .await; let customer_info = sqlx::query_as::<_, (String, String)>( "SELECT u.full_name, u.email FROM customers c JOIN users u ON u.id = c.user_id WHERE c.id = $1", ) @@ -681,7 +829,7 @@ async fn approve_requirement( } (StatusCode::OK, Json(req)).into_response() } - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), } } @@ -698,15 +846,27 @@ async fn reject_requirement( let existing = match RequirementRepository::get_by_id(&state.pool, id).await { Ok(Some(req)) => req, Ok(None) => return (StatusCode::NOT_FOUND, "Requirement not found").into_response(), - Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), }; if existing.status != "PENDING_APPROVAL" { return (StatusCode::BAD_REQUEST, "Requirement is not pending approval").into_response(); } - match RequirementRepository::reject(&state.pool, id, payload.reason).await { - Ok(req) => (StatusCode::OK, Json(req)).into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), + match RequirementRepository::reject(&state.pool, id, payload.reason.clone()).await { + Ok(req) => { + let _ = ActivityLogRepository::create( + &state.pool, + auth.user_id, + "EMPLOYEE", + id, + "REQUIREMENT", + "REJECT", + Some(serde_json::json!({ "reason": payload.reason })), + ) + .await; + (StatusCode::OK, Json(req)).into_response() + } + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)).into_response(), } } diff --git a/apps/users/src/handlers/auth.rs b/apps/users/src/handlers/auth.rs index d12b988..7365c2a 100644 --- a/apps/users/src/handlers/auth.rs +++ b/apps/users/src/handlers/auth.rs @@ -499,7 +499,7 @@ async fn reset_password( .map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), "DB_ERROR"))?; if let Ok(user) = UserRepository::get_by_id(&state.pool, user_id).await { - let _ = state.mail.send_password_changed_email(&user.email, &user.full_name).await; + let _ = state.mail.send_password_changed_email(&user.email, user.full_name.as_deref().unwrap_or_default()).await; } Ok((StatusCode::OK, Json(serde_json::json!({ "message": "Password reset successfully" })))) @@ -532,7 +532,7 @@ async fn change_password( .await .map_err(|e| err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), "DB_ERROR"))?; - let _ = state.mail.send_password_changed_email(&user.email, &user.full_name).await; + let _ = state.mail.send_password_changed_email(&user.email, user.full_name.as_deref().unwrap_or_default()).await; Ok((StatusCode::OK, Json(serde_json::json!({ "message": "Password changed successfully" })))) } diff --git a/apps/users/src/handlers/dashboard.rs b/apps/users/src/handlers/dashboard.rs index 6b8a8d0..ecbf75b 100644 --- a/apps/users/src/handlers/dashboard.rs +++ b/apps/users/src/handlers/dashboard.rs @@ -1,4 +1,4 @@ -use axum::{routing::get, Json, Router}; +use axum::{extract::State, routing::get, Json, Router}; use serde::Serialize; use serde_json::{json, Value}; @@ -14,12 +14,30 @@ pub fn router() -> Router { Router::new().route("/metrics", get(get_metrics)) } -async fn get_metrics() -> Json { - // Return realistic mock metrics to wire the frontend +async fn get_metrics(State(state): State) -> Json { + // Return live scalar counts for Users, Companies, and Leads where possible + let total_users: i64 = sqlx::query_scalar!("SELECT COUNT(*) FROM users") + .fetch_one(&state.pool) + .await + .unwrap_or(Some(0)) + .unwrap_or(0); + + let active_companies: i64 = sqlx::query_scalar!("SELECT COUNT(*) FROM company_profiles WHERE status = 'APPROVED'") + .fetch_one(&state.pool) + .await + .unwrap_or(Some(0)) + .unwrap_or(0); + + let open_leads: i64 = sqlx::query_scalar!("SELECT COUNT(*) FROM requirements WHERE status = 'PENDING_APPROVAL' OR status = 'APPROVED'") + .fetch_one(&state.pool) + .await + .unwrap_or(Some(0)) + .unwrap_or(0); + let kpis = vec![ - json!({ "id": "users", "title": "Total Users", "value": "12,450", "trend": "+12%", "trendUp": true }), - json!({ "id": "companies", "title": "Active Companies", "value": "840", "trend": "+5%", "trendUp": true }), - json!({ "id": "leads", "title": "Open Leads", "value": "3,210", "trend": "-2%", "trendUp": false }), + json!({ "id": "users", "title": "Total Users", "value": format!("{}", total_users), "trend": "-", "trendUp": true }), + json!({ "id": "companies", "title": "Active Companies", "value": format!("{}", active_companies), "trend": "-", "trendUp": true }), + json!({ "id": "leads", "title": "Open Leads", "value": format!("{}", open_leads), "trend": "-", "trendUp": true }), json!({ "id": "credits", "title": "Credits Purchased", "value": "$45,200", "trend": "+18%", "trendUp": true }), ]; diff --git a/apps/users/src/handlers/employees.rs b/apps/users/src/handlers/employees.rs index 3dd8475..27c5eeb 100644 --- a/apps/users/src/handlers/employees.rs +++ b/apps/users/src/handlers/employees.rs @@ -13,6 +13,7 @@ use sqlx::Row; pub fn router() -> Router { Router::new() + .route("/provision", axum::routing::post(provision_employee)) .route("/", get(list_employees).post(create_employee)) .route("/{id}", get(get_employee).patch(update_employee).delete(delete_employee)) } @@ -235,6 +236,78 @@ async fn create_employee( get_employee(auth, State(state), Path(row.id)).await } +#[derive(Deserialize)] +struct ProvisionEmployeePayload { + email: String, + full_name: String, + role_id: Uuid, + department_id: Option, + designation_id: Option, + employee_code: Option, + #[serde(default)] + generate_login: bool, + password: Option, +} + +async fn provision_employee( + auth: AuthUser, + State(state): State, + Json(p): Json, +) -> Result { + if let Err(_e) = require_admin(&auth) { + return Err((StatusCode::FORBIDDEN, "Forbidden".to_string())); + } + use db::models::user::{CreateUserPayload, UserRepository}; + use auth::crypto::hash_password; + + let email = p.email.trim().to_lowercase(); + + // 1. Resolve User + let user_id = match UserRepository::get_by_email(&state.pool, &email).await { + Ok(user) => user.id, + Err(_) => { + if !p.generate_login { + return Err((StatusCode::BAD_REQUEST, "User not found for this email".to_string())); + } + let plain_password = p.password.unwrap_or_else(|| format!("{:08}", rand::random::() % 100_000_000)); + if plain_password.len() < 8 { + return Err((StatusCode::BAD_REQUEST, "Password must be at least 8 characters".to_string())); + } + let password_hash = hash_password(&plain_password) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Crypto error: {e}")))?; + + let new_user = UserRepository::create(&state.pool, CreateUserPayload { + full_name: p.full_name.clone(), + email: email.clone(), + phone: None, + password_hash, + }).await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + new_user.id + } + }; + + // 2. Link Employee + let row = sqlx::query!( + r#" + INSERT INTO employees (user_id, role_id, department_id, designation_id, employee_code) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (user_id) DO UPDATE SET + role_id = EXCLUDED.role_id, + department_id = EXCLUDED.department_id, + designation_id = EXCLUDED.designation_id, + employee_code = EXCLUDED.employee_code + RETURNING id + "#, + user_id, p.role_id, p.department_id, p.designation_id, p.employee_code + ) + .fetch_one(&state.pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {e}")))?; + + get_employee(auth, State(state), Path(row.id)).await +} + #[derive(Deserialize)] struct UpdateEmployeePayload { role_id: Option, diff --git a/apps/users/src/handlers/mod.rs b/apps/users/src/handlers/mod.rs index deca835..fb29f92 100644 --- a/apps/users/src/handlers/mod.rs +++ b/apps/users/src/handlers/mod.rs @@ -1,3 +1,4 @@ +pub mod admin; pub mod approvals; pub mod auth; pub mod config; diff --git a/apps/users/src/handlers/onboarding.rs b/apps/users/src/handlers/onboarding.rs index 206889e..65f5aef 100644 --- a/apps/users/src/handlers/onboarding.rs +++ b/apps/users/src/handlers/onboarding.rs @@ -141,6 +141,7 @@ async fn submit( let progress = input.progress_json.unwrap_or(serde_json::Value::Object(Default::default())); + // 1. Complete onboarding state let completed = OnboardingStateRepository::complete( &state.pool, auth.user_id, @@ -150,6 +151,67 @@ async fn submit( .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + // 2. Create/Update specialized profile based on role + let role_key = input.role_key.to_uppercase(); + let table_name = match role_key.as_str() { + "PHOTOGRAPHER" => Some("photographer_profiles"), + "MAKEUP_ARTIST" => Some("makeup_artist_profiles"), + "TUTOR" => Some("tutor_profiles"), + "JOB_SEEKER" | "JOBSEEKER" => Some("job_seeker_profiles"), + _ => None, + }; + + if let Some(tbl) = table_name { + let query = format!( + r#" + INSERT INTO {} (user_id, "profileData", verification_status, submitted_at, updated_at) + VALUES ($1, $2, 'PENDING', NOW(), NOW()) + ON CONFLICT (user_id) DO UPDATE SET + "profileData" = EXCLUDED."profileData", + verification_status = 'PENDING', + submitted_at = NOW(), + updated_at = NOW() + "#, + tbl + ); + + sqlx::query(&query) + .bind(auth.user_id) + .bind(&progress) + .execute(&state.pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Profile creation error: {}", e)))?; + } else if role_key == "COMPANY" { + // Simple companies upsert (using basic fields if possible) + sqlx::query( + r#" + INSERT INTO companies ("userId", status, "updatedAt") + VALUES ($1, 'PENDING', NOW()) + ON CONFLICT ("userId") DO UPDATE SET + status = 'PENDING', + "updatedAt" = NOW() + "#, + ) + .bind(auth.user_id) + .execute(&state.pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Company profile error: {}", e)))?; + } + + // 3. Mark the user_role as PENDING (awaiting admin review of onboarding) + sqlx::query( + r#" + UPDATE user_roles + SET status = 'PENDING', updated_at = NOW() + WHERE user_id = $1 AND role_id = $2 + "#, + ) + .bind(auth.user_id) + .bind(role.id) + .execute(&state.pool) + .await + .map_err(|e: sqlx::Error| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + Ok((StatusCode::OK, Json(completed))) } diff --git a/apps/users/src/handlers/user_roles.rs b/apps/users/src/handlers/user_roles.rs index 71aed8b..4174c4f 100644 --- a/apps/users/src/handlers/user_roles.rs +++ b/apps/users/src/handlers/user_roles.rs @@ -104,29 +104,7 @@ async fn register_role( .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; - if is_professional_role(&role_key) { - let user = UserRepository::get_by_id(&state.pool, auth.user_id) - .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; - let display_name = user - .full_name - .unwrap_or_else(|| user.email.split('@').next().unwrap_or("Professional").to_string()); - - sqlx::query( - r#" - INSERT INTO professionals (user_id, profession_key, display_name) - VALUES ($1, $2, $3) - ON CONFLICT (user_id) - DO UPDATE SET profession_key = EXCLUDED.profession_key, display_name = EXCLUDED.display_name, updated_at = NOW() - "#, - ) - .bind(auth.user_id) - .bind(role_key.clone()) - .bind(display_name) - .execute(&state.pool) - .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; - } + // Note: Professional profile creation is now handled upon successful submission of onboarding data. Ok(( StatusCode::OK, diff --git a/apps/users/src/main.rs b/apps/users/src/main.rs index 25e139e..cf43b38 100644 --- a/apps/users/src/main.rs +++ b/apps/users/src/main.rs @@ -57,10 +57,8 @@ async fn main() { // ── Roles & User Self-Service ───────────────────────────────────── .nest("/api/admin/roles", handlers::roles::router()) .nest("/api/admin/permissions", handlers::permissions::router()) - .nest("/api/admin/departments", handlers::departments::router()) - .nest("/api/admin/designations", handlers::designations::router()) - .nest("/api/admin/employees", handlers::employees::router()) .nest("/api/admin/external-roles", handlers::external_roles::router()) + .nest("/api/admin/users", handlers::admin::router()) .nest("/api/me/roles", handlers::user_roles::router()) // ── Notifications ───────────────────────────────────────────────── .nest("/api/me/notifications", handlers::notifications::router()) diff --git a/crates/contracts/src/profession_shared.rs b/crates/contracts/src/profession_shared.rs index 8ee1cb8..4889f38 100644 --- a/crates/contracts/src/profession_shared.rs +++ b/crates/contracts/src/profession_shared.rs @@ -344,25 +344,52 @@ async fn my_requests( let limit = q.limit.unwrap_or(20).clamp(1, 100); let offset = (page - 1) * limit; + #[derive(serde::Serialize, sqlx::FromRow)] + struct RichLeadReq { + #[serde(flatten)] + #[sqlx(flatten)] + lead: db::models::lead_request::LeadRequest, + req_title: Option, + req_profession_key: Option, + req_location: Option, + req_budget: Option, + customer_name: Option, + customer_email: Option, + customer_phone: Option, + } + let rows = if let Some(ref status) = q.status { - sqlx::query_as::<_, db::models::lead_request::LeadRequest>( - "SELECT * FROM lead_requests WHERE professional_id = $1 AND status = $2 ORDER BY requested_at DESC LIMIT $3 OFFSET $4" + sqlx::query_as::<_, RichLeadReq>( + r#" + SELECT lr.*, r.title as req_title, r.profession_key as req_profession_key, r.location as req_location, r.budget as req_budget, + CASE WHEN lr.status = 'ACCEPTED' THEN u.full_name ELSE NULL END as customer_name, + CASE WHEN lr.status = 'ACCEPTED' THEN u.email ELSE NULL END as customer_email, + CASE WHEN lr.status = 'ACCEPTED' THEN u.phone ELSE NULL END as customer_phone + FROM lead_requests lr + LEFT JOIN requirements r ON r.id = lr.requirement_id + LEFT JOIN customers c ON c.id = r.customer_id + LEFT JOIN users u ON u.id = c.user_id + WHERE lr.professional_id = $1 AND lr.status = $2 + ORDER BY lr.requested_at DESC LIMIT $3 OFFSET $4 + "# ) - .bind(prof.id) - .bind(status) - .bind(limit) - .bind(offset) - .fetch_all(&state.pool) - .await + .bind(prof.id).bind(status).bind(limit).bind(offset).fetch_all(&state.pool).await } else { - sqlx::query_as::<_, db::models::lead_request::LeadRequest>( - "SELECT * FROM lead_requests WHERE professional_id = $1 ORDER BY requested_at DESC LIMIT $2 OFFSET $3" + sqlx::query_as::<_, RichLeadReq>( + r#" + SELECT lr.*, r.title as req_title, r.profession_key as req_profession_key, r.location as req_location, r.budget as req_budget, + CASE WHEN lr.status = 'ACCEPTED' THEN u.full_name ELSE NULL END as customer_name, + CASE WHEN lr.status = 'ACCEPTED' THEN u.email ELSE NULL END as customer_email, + CASE WHEN lr.status = 'ACCEPTED' THEN u.phone ELSE NULL END as customer_phone + FROM lead_requests lr + LEFT JOIN requirements r ON r.id = lr.requirement_id + LEFT JOIN customers c ON c.id = r.customer_id + LEFT JOIN users u ON u.id = c.user_id + WHERE lr.professional_id = $1 + ORDER BY lr.requested_at DESC LIMIT $2 OFFSET $3 + "# ) - .bind(prof.id) - .bind(limit) - .bind(offset) - .fetch_all(&state.pool) - .await + .bind(prof.id).bind(limit).bind(offset).fetch_all(&state.pool).await }; let total: i64 = if let Some(ref status) = q.status { diff --git a/crates/db/migrations/20260402030000_strict_employee_separation.down.sql b/crates/db/migrations/20260402030000_strict_employee_separation.down.sql new file mode 100644 index 0000000..6aee9a3 --- /dev/null +++ b/crates/db/migrations/20260402030000_strict_employee_separation.down.sql @@ -0,0 +1,6 @@ +-- DOWN: 20260402030000_strict_employee_separation.down.sql + +DROP TABLE IF EXISTS employee_sessions; +DROP TABLE IF EXISTS employees; +DROP TABLE IF EXISTS designations; +DROP TABLE IF EXISTS departments; diff --git a/crates/db/migrations/20260402030000_strict_employee_separation.up.sql b/crates/db/migrations/20260402030000_strict_employee_separation.up.sql new file mode 100644 index 0000000..5b4374e --- /dev/null +++ b/crates/db/migrations/20260402030000_strict_employee_separation.up.sql @@ -0,0 +1,33 @@ +-- UP: 20260402030000_strict_employee_separation.up.sql + +-- 1. EMPLOYEES (Standalone Table - Not Linked to 'users') +CREATE TABLE IF NOT EXISTS employees ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + first_name VARCHAR(100) NOT NULL, + last_name VARCHAR(100) NOT NULL, + email VARCHAR(255) UNIQUE NOT NULL, + password_hash VARCHAR(255) NOT NULL, + employee_code VARCHAR(50) UNIQUE, + department_id UUID REFERENCES departments(id) ON DELETE SET NULL, + designation_id UUID REFERENCES designations(id) ON DELETE SET NULL, + role_code VARCHAR(50) NOT NULL DEFAULT 'STAFF', -- ADMIN, MANAGER, STAFF + status VARCHAR(50) NOT NULL DEFAULT 'ACTIVE', -- ACTIVE, INACTIVE, SUSPENDED + joined_at DATE NOT NULL DEFAULT CURRENT_DATE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- 4. EMPLOYEE SESSIONS (Standalone Auth) +CREATE TABLE IF NOT EXISTS employee_sessions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + employee_id UUID NOT NULL REFERENCES employees(id) ON DELETE CASCADE, + token_hash VARCHAR(255) UNIQUE NOT NULL, + expires_at TIMESTAMPTZ NOT NULL, + revoked BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Indexes +CREATE INDEX IF NOT EXISTS idx_employees_email ON employees(email); +CREATE INDEX IF NOT EXISTS idx_employees_status ON employees(status); +CREATE INDEX IF NOT EXISTS idx_employee_sessions_token ON employee_sessions(token_hash); diff --git a/crates/db/migrations/20260402092000_init_activity_logs.up.sql b/crates/db/migrations/20260402092000_init_activity_logs.up.sql new file mode 100644 index 0000000..5faf908 --- /dev/null +++ b/crates/db/migrations/20260402092000_init_activity_logs.up.sql @@ -0,0 +1,17 @@ +-- Up migration: Create activity_logs table +CREATE TABLE IF NOT EXISTS activity_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + actor_id UUID NOT NULL, -- User or Employee who performed the action + actor_type VARCHAR(20) NOT NULL, -- 'USER' or 'EMPLOYEE' + entity_id UUID NOT NULL, -- Target of the action (User ID, Job ID, etc.) + entity_type VARCHAR(50) NOT NULL, -- 'USER', 'JOB', 'REQUIREMENT', 'EMPLOYEE', etc. + action VARCHAR(100) NOT NULL, -- 'APPROVE', 'REJECT', 'STATUS_CHANGE', 'DELETE', etc. + metadata JSONB, -- Optional extra context: { "old_status": "PENDING", "new_status": "APPROVED", "reason": "..." } + ip_address VARCHAR(45), + user_agent TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_activity_logs_entity ON activity_logs (entity_type, entity_id); +CREATE INDEX IF NOT EXISTS idx_activity_logs_actor ON activity_logs (actor_type, actor_id); +CREATE INDEX IF NOT EXISTS idx_activity_logs_created_at ON activity_logs (created_at DESC); diff --git a/crates/db/src/models/activity_log.rs b/crates/db/src/models/activity_log.rs new file mode 100644 index 0000000..bd8c163 --- /dev/null +++ b/crates/db/src/models/activity_log.rs @@ -0,0 +1,67 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, PgPool}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct ActivityLog { + pub id: Uuid, + pub actor_id: Uuid, + pub actor_type: String, + pub entity_id: Uuid, + pub entity_type: String, + pub action: String, + pub metadata: Option, + pub ip_address: Option, + pub user_agent: Option, + pub created_at: DateTime, +} + +pub struct ActivityLogRepository; + +impl ActivityLogRepository { + pub async fn create( + pool: &PgPool, + actor_id: Uuid, + actor_type: &str, + entity_id: Uuid, + entity_type: &str, + action: &str, + metadata: Option, + ) -> Result { + sqlx::query_as::<_, ActivityLog>( + r#" + INSERT INTO activity_logs (actor_id, actor_type, entity_id, entity_type, action, metadata) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING id, actor_id, actor_type, entity_id, entity_type, action, metadata, ip_address, user_agent, created_at + "# + ) + .bind(actor_id) + .bind(actor_type) + .bind(entity_id) + .bind(entity_type) + .bind(action) + .bind(metadata) + .fetch_one(pool) + .await + } + + pub async fn list_for_entity( + pool: &PgPool, + entity_type: &str, + entity_id: Uuid, + ) -> Result, sqlx::Error> { + sqlx::query_as::<_, ActivityLog>( + r#" + SELECT id, actor_id, actor_type, entity_id, entity_type, action, metadata, ip_address, user_agent, created_at + FROM activity_logs + WHERE entity_type = $1 AND entity_id = $2 + ORDER BY created_at DESC + "# + ) + .bind(entity_type) + .bind(entity_id) + .fetch_all(pool) + .await + } +} diff --git a/crates/db/src/models/department.rs b/crates/db/src/models/department.rs new file mode 100644 index 0000000..ff459d5 --- /dev/null +++ b/crates/db/src/models/department.rs @@ -0,0 +1,127 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, PgPool}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct Department { + pub id: Uuid, + pub name: String, + pub code: Option, + pub description: Option, + pub department_head: Option, + pub department_email: Option, + pub is_active: bool, + pub visibility: String, + pub transfers_enabled: bool, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateDepartmentPayload { + pub name: String, + pub code: String, + pub description: Option, + pub department_head: Option, + pub department_email: Option, + pub status: Option, // ACTIVE | INACTIVE + pub visibility: Option, // INTERNAL | EXTERNAL + pub transfers_enabled: Option, +} + +pub struct DepartmentRepository; + +impl DepartmentRepository { + pub async fn create(pool: &PgPool, payload: CreateDepartmentPayload) -> Result { + let is_active = payload.status.map(|s| s.to_uppercase() == "ACTIVE").unwrap_or(true); + let visibility = payload.visibility.unwrap_or_else(|| "INTERNAL".to_string()); + let transfers_enabled = payload.transfers_enabled.unwrap_or(false); + + sqlx::query_as::<_, Department>( + r#" + INSERT INTO departments ( + name, code, description, department_head, department_email, + is_active, visibility, transfers_enabled + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING id, name, code, description, department_head, department_email, is_active, visibility, transfers_enabled, created_at, updated_at + "# + ) + .bind(payload.name) + .bind(payload.code.to_uppercase()) + .bind(payload.description) + .bind(payload.department_head) + .bind(payload.department_email) + .bind(is_active) + .bind(visibility) + .bind(transfers_enabled) + .fetch_one(pool) + .await + } + + pub async fn update(pool: &PgPool, id: Uuid, payload: serde_json::Value) -> Result { + let name = payload.get("name").and_then(|v| v.as_str()); + let code = payload.get("code").and_then(|v| v.as_str()); + let description = payload.get("description").map(|v| v.as_str().unwrap_or_default()); + let department_head = payload.get("department_head").map(|v| v.as_str().unwrap_or_default()); + let department_email = payload.get("department_email").map(|v| v.as_str().unwrap_or_default()); + let status = payload.get("status").and_then(|v| v.as_str()); + let is_active = status.map(|s| s.to_uppercase() == "ACTIVE"); + let visibility = payload.get("visibility").and_then(|v| v.as_str()); + let transfers_enabled = payload.get("transfers_enabled").and_then(|v| v.as_bool()); + + sqlx::query_as::<_, Department>( + r#" + UPDATE departments + SET name = COALESCE($2, name), + code = COALESCE($3, code), + description = COALESCE($4, description), + department_head = COALESCE($5, department_head), + department_email = COALESCE($6, department_email), + is_active = COALESCE($7, is_active), + visibility = COALESCE($8, visibility), + transfers_enabled = COALESCE($9, transfers_enabled), + updated_at = NOW() + WHERE id = $1 + RETURNING id, name, code, description, department_head, department_email, is_active, visibility, transfers_enabled, created_at, updated_at + "# + ) + .bind(id) + .bind(name) + .bind(code.map(|c| c.to_uppercase())) + .bind(description) + .bind(department_head) + .bind(department_email) + .bind(is_active) + .bind(visibility) + .bind(transfers_enabled) + .fetch_one(pool) + .await + } + + pub async fn delete(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM departments WHERE id = $1") + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn list(pool: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as::<_, Department>( + "SELECT id, name, code, description, department_head, department_email, is_active, visibility, transfers_enabled, created_at, updated_at FROM departments ORDER BY name ASC" + ) + .fetch_all(pool) + .await + } + + pub async fn get_by_id(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { + sqlx::query_as::<_, Department>( + "SELECT id, name, code, description, department_head, department_email, is_active, visibility, transfers_enabled, created_at, updated_at FROM departments WHERE id = $1" + ) + .bind(id) + .fetch_optional(pool) + .await + } +} diff --git a/crates/db/src/models/designation.rs b/crates/db/src/models/designation.rs new file mode 100644 index 0000000..c936984 --- /dev/null +++ b/crates/db/src/models/designation.rs @@ -0,0 +1,106 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, PgPool}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct Designation { + pub id: Uuid, + pub name: String, + pub code: Option, + pub level: Option, + pub department_id: Option, + pub can_approve: bool, + pub is_active: bool, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateDesignationPayload { + pub name: String, + pub level: Option, + pub department_id: Option, + pub can_approve: Option, + pub status: Option, // ACTIVE | INACTIVE +} + +pub struct DesignationRepository; + +impl DesignationRepository { + pub async fn create(pool: &PgPool, payload: CreateDesignationPayload) -> Result { + let is_active = payload.status.map(|s| s.to_uppercase() == "ACTIVE").unwrap_or(true); + let can_approve = payload.can_approve.unwrap_or(false); + + sqlx::query_as::<_, Designation>( + r#" + INSERT INTO designations (name, level, department_id, is_active, can_approve) + VALUES ($1, $2, $3, $4, $5) + RETURNING id, name, code, level, department_id, can_approve, is_active, created_at, updated_at + "# + ) + .bind(payload.name) + .bind(payload.level) + .bind(payload.department_id) + .bind(is_active) + .bind(can_approve) + .fetch_one(pool) + .await + } + + pub async fn update(pool: &PgPool, id: Uuid, payload: serde_json::Value) -> Result { + let name = payload.get("name").and_then(|v| v.as_str()); + let level = payload.get("level").map(|v| v.to_string()); + let department_id = payload.get("department_id").and_then(|v| v.as_str()).and_then(|s| Uuid::parse_str(s).ok()); + let can_approve = payload.get("can_approve").and_then(|v| v.as_bool()); + let status = payload.get("status").and_then(|v| v.as_str()); + let is_active = status.map(|s| s.to_uppercase() == "ACTIVE"); + + sqlx::query_as::<_, Designation>( + r#" + UPDATE designations + SET name = COALESCE($2, name), + level = COALESCE($3, level), + department_id = COALESCE($4, department_id), + can_approve = COALESCE($5, can_approve), + is_active = COALESCE($6, is_active), + updated_at = NOW() + WHERE id = $1 + RETURNING id, name, code, level, department_id, can_approve, is_active, created_at, updated_at + "# + ) + .bind(id) + .bind(name) + .bind(level) + .bind(department_id) + .bind(can_approve) + .bind(is_active) + .fetch_one(pool) + .await + } + + pub async fn delete(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM designations WHERE id = $1") + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn list_all(pool: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as::<_, Designation>( + "SELECT id, name, code, level, department_id, can_approve, is_active, created_at, updated_at FROM designations ORDER BY name ASC" + ) + .fetch_all(pool) + .await + } + + pub async fn list_by_department(pool: &PgPool, department_id: Uuid) -> Result, sqlx::Error> { + sqlx::query_as::<_, Designation>( + "SELECT id, name, code, level, department_id, can_approve, is_active, created_at, updated_at FROM designations WHERE department_id = $1 ORDER BY level DESC" + ) + .bind(department_id) + .fetch_all(pool) + .await + } +} diff --git a/crates/db/src/models/employee.rs b/crates/db/src/models/employee.rs new file mode 100644 index 0000000..232d4dd --- /dev/null +++ b/crates/db/src/models/employee.rs @@ -0,0 +1,156 @@ +use chrono::{DateTime, Utc, NaiveDate}; +use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, PgPool}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct Employee { + pub id: Uuid, + pub first_name: String, + pub last_name: String, + pub email: String, + pub password_hash: String, + pub employee_code: Option, + pub department_id: Option, + pub designation_id: Option, + pub role_code: String, + pub status: String, + pub joined_at: NaiveDate, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct EmployeeSession { + pub id: Uuid, + pub employee_id: Uuid, + pub token_hash: String, + pub expires_at: DateTime, + pub revoked: bool, + pub created_at: DateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateEmployeePayload { + pub first_name: String, + pub last_name: String, + pub email: String, + pub password_hash: String, + pub department_id: Option, + pub designation_id: Option, + pub role_code: String, +} + +pub struct EmployeeRepository; + +impl EmployeeRepository { + pub async fn create(pool: &PgPool, payload: CreateEmployeePayload) -> Result { + let level_code = payload.role_code.clone(); + sqlx::query_as::<_, Employee>( + r#" + INSERT INTO employees (first_name, last_name, email, password_hash, department_id, designation_id, role_code) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING id, first_name, last_name, email, password_hash, employee_code, department_id, designation_id, role_code, status, joined_at, created_at, updated_at + "# + ) + .bind(payload.first_name) + .bind(payload.last_name) + .bind(payload.email.to_lowercase()) + .bind(payload.password_hash) + .bind(payload.department_id) + .bind(payload.designation_id) + .bind(level_code) + .fetch_one(pool) + .await + } + + pub async fn update( + pool: &PgPool, + id: Uuid, + first_name: Option, + last_name: Option, + email: Option, + department_id: Option, + designation_id: Option, + role_code: Option, + status: Option, + ) -> Result { + sqlx::query_as::<_, Employee>( + r#" + UPDATE employees + SET + first_name = COALESCE($1, first_name), + last_name = COALESCE($2, last_name), + email = COALESCE($3, email), + department_id = COALESCE($4, department_id), + designation_id = COALESCE($5, designation_id), + role_code = COALESCE($6, role_code), + status = COALESCE($7, status), + updated_at = NOW() + WHERE id = $8 + RETURNING id, first_name, last_name, email, password_hash, employee_code, department_id, designation_id, role_code, status, joined_at, created_at, updated_at + "# + ) + .bind(first_name) + .bind(last_name) + .bind(email.map(|e| e.to_lowercase())) + .bind(department_id) + .bind(designation_id) + .bind(role_code) + .bind(status) + .bind(id) + .fetch_one(pool) + .await + } + + pub async fn delete(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> { + sqlx::query!("DELETE FROM employees WHERE id = $1", id) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn get_by_email(pool: &PgPool, email: &str) -> Result, sqlx::Error> { + sqlx::query_as::<_, Employee>( + "SELECT id, first_name, last_name, email, password_hash, employee_code, department_id, designation_id, role_code, status, joined_at, created_at, updated_at FROM employees WHERE email = $1" + ) + .bind(email.to_lowercase()) + .fetch_optional(pool) + .await + } + + pub async fn list(pool: &PgPool, q: Option) -> Result, sqlx::Error> { + let search = q.unwrap_or_default().to_lowercase(); + sqlx::query_as::<_, Employee>( + r#" + SELECT id, first_name, last_name, email, password_hash, employee_code, department_id, designation_id, role_code, status, joined_at, created_at, updated_at + FROM employees + WHERE ($1 = '' OR LOWER(first_name) LIKE '%' || $1 || '%' OR LOWER(last_name) LIKE '%' || $1 || '%' OR LOWER(email) LIKE '%' || $1 || '%') + ORDER BY last_name, first_name + "# + ) + .bind(search) + .fetch_all(pool) + .await + } + + pub async fn store_session( + pool: &PgPool, + employee_id: Uuid, + token_hash: &str, + expires_at: DateTime, + ) -> Result { + sqlx::query_as::<_, EmployeeSession>( + r#" + INSERT INTO employee_sessions (employee_id, token_hash, expires_at) + VALUES ($1, $2, $3) + RETURNING id, employee_id, token_hash, expires_at, revoked, created_at + "# + ) + .bind(employee_id) + .bind(token_hash) + .bind(expires_at) + .fetch_one(pool) + .await + } +} diff --git a/crates/db/src/models/job.rs b/crates/db/src/models/job.rs index a23a978..37ecad8 100644 --- a/crates/db/src/models/job.rs +++ b/crates/db/src/models/job.rs @@ -212,4 +212,24 @@ impl JobRepository { .await?; Ok(job) } + + pub async fn count_by_company_id_this_month( + pool: &PgPool, + company_id: Uuid, + ) -> Result { + let count = sqlx::query!( + r#" + SELECT COUNT(*) as "count!" + FROM jobs + WHERE company_id = $1 + AND created_at >= date_trunc('month', now()) + AND status != 'REJECTED' + "#, + company_id + ) + .fetch_one(pool) + .await?; + + Ok(count.count) + } } diff --git a/crates/db/src/models/mod.rs b/crates/db/src/models/mod.rs index fa84d96..73a7fdf 100644 --- a/crates/db/src/models/mod.rs +++ b/crates/db/src/models/mod.rs @@ -1,5 +1,7 @@ +pub mod job; pub mod config; pub mod onboarding_state; +pub mod activity_log; pub mod role; pub mod user; pub mod photographer; @@ -16,7 +18,9 @@ pub mod fitness_trainer; pub mod catering_service; pub mod requirement; pub mod lead_request; -pub mod job; pub mod application; pub mod professional; +pub mod employee; +pub mod department; +pub mod designation; diff --git a/crates/email/src/lib.rs b/crates/email/src/lib.rs index 936efad..ed1ef1f 100644 --- a/crates/email/src/lib.rs +++ b/crates/email/src/lib.rs @@ -310,6 +310,41 @@ impl Mailer { ), ).await } + + // ── Expiry (Cron) ───────────────────────────────────────────────────────── + + pub async fn send_lead_expired_email(&self, to: &str, name: &str, tracecoins_returned: i32) -> Result<()> { + self.send( + to, + "Your lead request has expired", + format!( + "Hello {},\n\nYour lead request has expired because it wasn't accepted within 24 hours.\n\nWe have refunded your {} reserved Tracecoins back to your wallet.\n\nRegards,\nThe NXTGAUGE Team", + name, tracecoins_returned + ), + ).await + } + + pub async fn send_requirement_expired_email(&self, to: &str, name: &str, title: &str) -> Result<()> { + self.send( + to, + "Your requirement has expired", + format!( + "Hello {},\n\nYour requirement \"{}\" has expired and is no longer visible to professionals.\n\nRegards,\nThe NXTGAUGE Team", + name, title + ), + ).await + } + + pub async fn send_job_expired_email(&self, to: &str, company_name: &str, title: &str) -> Result<()> { + self.send( + to, + "Your job posting has expired", + format!( + "Hello {},\n\nYour job posting \"{}\" has expired and is no longer accepting applications.\n\nRegards,\nThe NXTGAUGE Team", + company_name, title + ), + ).await + } } impl Default for Mailer {