use crate::AppState; use axum::{ extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, routing::{get, post}, Json, Router, }; use serde::{Deserialize, Serialize}; use sqlx::FromRow; use std::sync::Arc; use uuid::Uuid; #[derive(Debug, Deserialize)] pub struct PaginationQuery { pub page: Option, pub limit: Option, pub status: Option, } #[derive(Debug, Deserialize)] pub struct SendLeadRequestPayload { pub lead_id: Uuid, pub message: Option, } #[derive(Debug, FromRow)] pub struct LeadRequestRow { pub id: Uuid, pub lead_id: Uuid, pub user_role_profile_id: Uuid, pub customer_user_id: Uuid, pub status: String, pub tracecoins_reserved: i32, pub message: Option, pub expires_at: chrono::DateTime, pub accepted_at: Option>, pub rejected_at: Option>, pub rejected_reason: Option, pub created_at: chrono::DateTime, } #[derive(Debug, Serialize)] pub struct LeadRequestResponse { pub id: Uuid, pub lead_id: Uuid, pub user_role_profile_id: Uuid, pub customer_user_id: Uuid, pub professional_name: Option, pub professional_role: Option, pub customer_name: Option, pub lead_title: Option, pub status: String, pub tracecoins_reserved: i32, pub message: Option, pub expires_at: chrono::DateTime, pub accepted_at: Option>, pub rejected_at: Option>, pub rejected_reason: Option, pub created_at: chrono::DateTime, } pub fn router() -> Router> { Router::new() .route("/", get(list_lead_requests)) .route("/send", post(send_lead_request)) .route("/{id}/accept", post(accept_lead_request)) .route("/{id}/reject", post(reject_lead_request)) .route("/my-requests", get(my_requests)) .route("/my-pending", get(my_pending_requests)) .route("/customer/{lead_id}", get(get_customer_lead_requests)) } fn lead_request_to_response(row: LeadRequestRow) -> LeadRequestResponse { LeadRequestResponse { id: row.id, lead_id: row.lead_id, user_role_profile_id: row.user_role_profile_id, customer_user_id: row.customer_user_id, professional_name: None, professional_role: None, customer_name: None, lead_title: None, status: row.status, tracecoins_reserved: row.tracecoins_reserved, message: row.message, expires_at: row.expires_at, accepted_at: row.accepted_at, rejected_at: row.rejected_at, rejected_reason: row.rejected_reason, created_at: row.created_at, } } async fn list_lead_requests( State(state): State>, Query(q): Query, ) -> impl IntoResponse { let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); let offset = (page - 1) * limit; let status_filter = q.status .as_ref() .map(|s| format!("AND lr.status = '{}'", s)) .unwrap_or_default(); let requests = match sqlx::query_as::<_, LeadRequestRow>(&format!( r#" SELECT lr.* FROM lead_requests lr WHERE 1=1 {} ORDER BY lr.created_at DESC LIMIT {} OFFSET {} "#, status_filter, limit, offset )) .fetch_all(&state.pool) .await { Ok(r) => r, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let requests: Vec = requests.into_iter().map(lead_request_to_response).collect(); (StatusCode::OK, Json(serde_json::json!({ "data": requests, "pagination": { "page": page, "limit": limit } }))).into_response() } async fn send_lead_request( State(state): State>, axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo, Json(payload): Json, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let user_role_profile_id = match sqlx::query_scalar::<_, Uuid>( "SELECT id FROM user_role_profiles WHERE user_id = $1 LIMIT 1" ) .bind(user_id) .fetch_optional(&state.pool) .await { Ok(Some(id)) => id, Ok(None) => return (StatusCode::NOT_FOUND, "Professional profile not found. Please complete your profile first.").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let lead = match sqlx::query_as::<_, (Uuid, String, Uuid, String, i32)>( "SELECT id, title, customer_user_id, status, COALESCE(current_acceptances, 0) FROM leads WHERE id = $1" ) .bind(payload.lead_id) .fetch_optional(&state.pool) .await { Ok(Some(l)) => l, Ok(None) => return (StatusCode::NOT_FOUND, "Lead not found").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if lead.3 != "OPEN" { return (StatusCode::BAD_REQUEST, "Lead is not open for requests").into_response(); } if lead.4 >= 10 { return (StatusCode::BAD_REQUEST, "Lead has reached maximum acceptances").into_response(); } let duplicate = match sqlx::query_scalar::<_, Uuid>( "SELECT id FROM lead_requests WHERE lead_id = $1 AND user_role_profile_id = $2 AND status IN ('PENDING', 'ACCEPTED')" ) .bind(payload.lead_id) .bind(user_role_profile_id) .fetch_optional(&state.pool) .await { Ok(Some(_)) => true, Ok(None) => false, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if duplicate { return (StatusCode::CONFLICT, "You have already sent a request for this lead").into_response(); } let request_count: (i64,) = match sqlx::query_as( "SELECT COUNT(*) FROM lead_requests WHERE lead_id = $1 AND status IN ('PENDING', 'ACCEPTED')" ) .bind(payload.lead_id) .fetch_one(&state.pool) .await { Ok(c) => c, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if request_count.0 >= 20 { return (StatusCode::CONFLICT, "Lead has reached maximum requests").into_response(); } let wallet = match sqlx::query_as::<_, (Uuid, i64)>( "SELECT id, balance FROM tracecoin_wallets WHERE user_id = $1" ) .bind(user_id) .fetch_optional(&state.pool) .await { Ok(Some(w)) => w, Ok(None) => return (StatusCode::BAD_REQUEST, "Wallet not found. Please contact support.").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let tracecoins_cost = 25; if wallet.1 < tracecoins_cost as i64 { return (StatusCode::PAYMENT_REQUIRED, format!("Insufficient balance. You need at least {} Tracecoins.", tracecoins_cost)).into_response(); } let expires_at = chrono::Utc::now() + chrono::Duration::hours(24); let result = sqlx::query_as::<_, LeadRequestRow>( r#" INSERT INTO lead_requests (lead_id, user_role_profile_id, customer_user_id, status, tracecoins_reserved, message, expires_at) VALUES ($1, $2, $3, 'PENDING', $4, $5, $6) RETURNING * "# ) .bind(payload.lead_id) .bind(user_role_profile_id) .bind(lead.2) .bind(tracecoins_cost) .bind(&payload.message) .bind(expires_at) .fetch_one(&state.pool) .await; match result { Ok(req) => { let _ = sqlx::query( r#" UPDATE tracecoin_wallets SET balance = balance - $1, reserved = COALESCE(reserved, 0) + $1, updated_at = NOW() WHERE user_id = $2 "# ) .bind(tracecoins_cost as i64) .bind(user_id) .execute(&state.pool) .await; let _ = sqlx::query( r#" INSERT INTO notifications (user_id, title, body, notification_type, reference_id) VALUES ($1, $2, $3, $4, $5) "# ) .bind(lead.2) .bind("New Lead Request") .bind("You have a new lead request. Please review and respond within 24 hours.") .bind("LEAD_REQUEST") .bind(req.id) .execute(&state.pool) .await; let response = lead_request_to_response(req); (StatusCode::CREATED, Json(response)).into_response() } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } async fn accept_lead_request( State(state): State>, Path(id): Path, axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let request = match sqlx::query_as::<_, LeadRequestRow>( "SELECT * FROM lead_requests WHERE id = $1 AND status = 'PENDING'" ) .bind(id) .fetch_optional(&state.pool) .await { Ok(Some(r)) => r, Ok(None) => return (StatusCode::NOT_FOUND, "Lead request not found or already processed").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if request.customer_user_id != user_id { return (StatusCode::FORBIDDEN, "You are not authorized to accept this request").into_response(); } if request.expires_at < chrono::Utc::now() { return (StatusCode::BAD_REQUEST, "This request has expired").into_response(); } let lead_acceptances: (i32,) = match sqlx::query_as( "SELECT COALESCE(current_acceptances, 0) FROM leads WHERE id = $1" ) .bind(request.lead_id) .fetch_optional(&state.pool) .await { Ok(Some(l)) => l, Ok(None) => return (StatusCode::NOT_FOUND, "Lead not found").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if lead_acceptances.0 >= 10 { return (StatusCode::BAD_REQUEST, "Lead has reached maximum acceptances").into_response(); } let _ = sqlx::query( "UPDATE lead_requests SET status = 'ACCEPTED', accepted_at = NOW(), updated_at = NOW() WHERE id = $1" ) .bind(id) .execute(&state.pool) .await; let _ = sqlx::query( "UPDATE leads SET current_acceptances = current_acceptances + 1, updated_at = NOW() WHERE id = $1" ) .bind(request.lead_id) .execute(&state.pool) .await; let _ = sqlx::query( r#" UPDATE tracecoin_wallets SET reserved = reserved - $1, updated_at = NOW() WHERE user_id = $2 "# ) .bind(request.tracecoins_reserved as i64) .bind(user_id) .execute(&state.pool) .await; if lead_acceptances.0 + 1 >= 10 { let _ = sqlx::query("UPDATE leads SET status = 'CLOSED', updated_at = NOW() WHERE id = $1") .bind(request.lead_id) .execute(&state.pool) .await; } let _ = sqlx::query( r#" INSERT INTO notifications (user_id, title, body, notification_type, reference_id) VALUES ($1, $2, $3, $4, $5) "# ) .bind(user_id) .bind("Lead Request Accepted") .bind("Your lead request has been accepted! Contact details have been shared.") .bind("LEAD_REQUEST") .bind(id) .execute(&state.pool) .await; (StatusCode::OK, Json(serde_json::json!({ "message": "Lead request accepted successfully", "contact_details_shared": true }))).into_response() } async fn reject_lead_request( State(state): State>, Path(id): Path, axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let request = match sqlx::query_as::<_, LeadRequestRow>( "SELECT * FROM lead_requests WHERE id = $1 AND status = 'PENDING'" ) .bind(id) .fetch_optional(&state.pool) .await { Ok(Some(r)) => r, Ok(None) => return (StatusCode::NOT_FOUND, "Lead request not found or already processed").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if request.customer_user_id != user_id { return (StatusCode::FORBIDDEN, "You are not authorized to reject this request").into_response(); } let _ = sqlx::query( "UPDATE lead_requests SET status = 'REJECTED', rejected_at = NOW(), rejected_reason = 'Rejected by customer', updated_at = NOW() WHERE id = $1" ) .bind(id) .execute(&state.pool) .await; let _ = sqlx::query( r#" UPDATE tracecoin_wallets SET balance = balance + $1, reserved = reserved - $1, updated_at = NOW() WHERE user_id = $2 "# ) .bind(request.tracecoins_reserved as i64) .bind(user_id) .execute(&state.pool) .await; let _ = sqlx::query( r#" INSERT INTO notifications (user_id, title, body, notification_type, reference_id) VALUES ($1, $2, $3, $4, $5) "# ) .bind(user_id) .bind("Lead Request Rejected") .bind("Your lead request was not accepted. Tracecoins have been refunded.") .bind("LEAD_REQUEST") .bind(id) .execute(&state.pool) .await; (StatusCode::OK, Json(serde_json::json!({ "message": "Lead request rejected. Tracecoins refunded.", "refunded": request.tracecoins_reserved }))).into_response() } async fn my_requests( State(state): State>, Query(q): Query, axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); let offset = (page - 1) * limit; let status_filter = q.status .as_ref() .map(|s| format!("AND lr.status = '{}'", s)) .unwrap_or_default(); let requests = match sqlx::query_as::<_, LeadRequestRow>(&format!( r#" SELECT lr.* FROM lead_requests lr JOIN user_role_profiles urp ON urp.id = lr.user_role_profile_id WHERE urp.user_id = $1 {} ORDER BY lr.created_at DESC LIMIT {} OFFSET {} "#, status_filter, limit, offset )) .bind(user_id) .fetch_all(&state.pool) .await { Ok(r) => r, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let requests: Vec = requests.into_iter().map(lead_request_to_response).collect(); (StatusCode::OK, Json(serde_json::json!({ "data": requests, "pagination": { "page": page, "limit": limit } }))).into_response() } async fn my_pending_requests( State(state): State>, axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let requests = match sqlx::query_as::<_, LeadRequestRow>( r#" SELECT lr.* FROM lead_requests lr WHERE lr.customer_user_id = $1 AND lr.status = 'PENDING' ORDER BY lr.expires_at ASC "# ) .bind(user_id) .fetch_all(&state.pool) .await { Ok(r) => r, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let requests: Vec = requests.into_iter().map(lead_request_to_response).collect(); (StatusCode::OK, Json(serde_json::json!({ "data": requests }))).into_response() } async fn get_customer_lead_requests( State(state): State>, Path(lead_id): Path, Query(q): Query, axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); let offset = (page - 1) * limit; let requests = match sqlx::query_as::<_, LeadRequestRow>(&format!( r#" SELECT lr.* FROM lead_requests lr WHERE lr.lead_id = $1 AND lr.customer_user_id = $2 ORDER BY lr.created_at DESC LIMIT {} OFFSET {} "#, limit, offset )) .bind(lead_id) .bind(user_id) .fetch_all(&state.pool) .await { Ok(r) => r, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let requests: Vec = requests.into_iter().map(lead_request_to_response).collect(); (StatusCode::OK, Json(serde_json::json!({ "data": requests, "pagination": { "page": page, "limit": limit } }))).into_response() }