use crate::AppState; use axum::{ extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, routing::{get, post}, Json, Router, }; use serde::{Deserialize, Serialize}; use sqlx::FromRow; use std::sync::Arc; use uuid::Uuid; #[derive(Debug, Deserialize)] pub struct PaginationQuery { pub page: Option, pub limit: Option, pub status: Option, } #[derive(Debug, Deserialize)] pub struct SendLeadRequestPayload { pub lead_id: Uuid, pub message: Option, } #[derive(Debug, Deserialize)] pub struct SendLeadRequestAiPayload { pub lead_id: Uuid, pub user_id: Uuid, pub profession_key: String, } #[derive(Debug, FromRow)] pub struct LeadRequestRow { pub id: Uuid, pub lead_id: Uuid, pub user_role_profile_id: Uuid, pub customer_user_id: Uuid, pub status: String, pub tracecoins_reserved: i32, pub message: Option, pub expires_at: chrono::DateTime, pub accepted_at: Option>, pub rejected_at: Option>, pub rejected_reason: Option, pub created_at: chrono::DateTime, } #[derive(Debug, Serialize)] pub struct LeadRequestResponse { pub id: Uuid, pub lead_id: Uuid, pub user_role_profile_id: Uuid, pub customer_user_id: Uuid, pub professional_name: Option, pub professional_role: Option, pub customer_name: Option, pub lead_title: Option, pub status: String, pub tracecoins_reserved: i32, pub message: Option, pub expires_at: chrono::DateTime, pub accepted_at: Option>, pub rejected_at: Option>, pub rejected_reason: Option, pub created_at: chrono::DateTime, } pub fn router() -> Router> { Router::new() .route("/", get(list_lead_requests)) .route("/send", post(send_lead_request)) .route("/send-ai", post(send_lead_request_ai)) .route("/{id}/accept", post(accept_lead_request)) .route("/{id}/reject", post(reject_lead_request)) .route("/my-requests", get(my_requests)) .route("/my-pending", get(my_pending_requests)) .route("/customer/{lead_id}", get(get_customer_lead_requests)) } fn lead_request_to_response(row: LeadRequestRow) -> LeadRequestResponse { LeadRequestResponse { id: row.id, lead_id: row.lead_id, user_role_profile_id: row.user_role_profile_id, customer_user_id: row.customer_user_id, professional_name: None, professional_role: None, customer_name: None, lead_title: None, status: row.status, tracecoins_reserved: row.tracecoins_reserved, message: row.message, expires_at: row.expires_at, accepted_at: row.accepted_at, rejected_at: row.rejected_at, rejected_reason: row.rejected_reason, created_at: row.created_at, } } async fn list_lead_requests( State(state): State>, Query(q): Query, ) -> impl IntoResponse { let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); let offset = (page - 1) * limit; let status_filter = q.status .as_ref() .map(|s| format!("AND lr.status = '{}'", s)) .unwrap_or_default(); let requests = match sqlx::query_as::<_, LeadRequestRow>(&format!( r#" SELECT lr.* FROM lead_requests lr WHERE 1=1 {} ORDER BY lr.created_at DESC LIMIT {} OFFSET {} "#, status_filter, limit, offset )) .fetch_all(&state.pool) .await { Ok(r) => r, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let requests: Vec = requests.into_iter().map(lead_request_to_response).collect(); (StatusCode::OK, Json(serde_json::json!({ "data": requests, "pagination": { "page": page, "limit": limit } }))).into_response() } async fn send_lead_request( State(state): State>, axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo, Json(payload): Json, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let user_role_profile_id = match sqlx::query_scalar::<_, Uuid>( "SELECT id FROM user_role_profiles WHERE user_id = $1 LIMIT 1" ) .bind(user_id) .fetch_optional(&state.pool) .await { Ok(Some(id)) => id, Ok(None) => return (StatusCode::NOT_FOUND, "Professional profile not found. Please complete your profile first.").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let lead = match sqlx::query_as::<_, (Uuid, String, Uuid, String, i32)>( "SELECT id, title, customer_user_id, status, COALESCE(current_acceptances, 0) FROM leads WHERE id = $1" ) .bind(payload.lead_id) .fetch_optional(&state.pool) .await { Ok(Some(l)) => l, Ok(None) => return (StatusCode::NOT_FOUND, "Lead not found").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if lead.3 != "OPEN" { return (StatusCode::BAD_REQUEST, "Lead is not open for requests").into_response(); } if lead.4 >= 10 { return (StatusCode::BAD_REQUEST, "Lead has reached maximum acceptances").into_response(); } let duplicate = match sqlx::query_scalar::<_, Uuid>( "SELECT id FROM lead_requests WHERE lead_id = $1 AND user_role_profile_id = $2 AND status IN ('PENDING', 'ACCEPTED')" ) .bind(payload.lead_id) .bind(user_role_profile_id) .fetch_optional(&state.pool) .await { Ok(Some(_)) => true, Ok(None) => false, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if duplicate { return (StatusCode::CONFLICT, "You have already sent a request for this lead").into_response(); } let request_count: (i64,) = match sqlx::query_as( "SELECT COUNT(*) FROM lead_requests WHERE lead_id = $1 AND status IN ('PENDING', 'ACCEPTED')" ) .bind(payload.lead_id) .fetch_one(&state.pool) .await { Ok(c) => c, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if request_count.0 >= 20 { return (StatusCode::CONFLICT, "Lead has reached maximum requests").into_response(); } let wallet = match sqlx::query_as::<_, (Uuid, i64)>( "SELECT id, balance FROM tracecoin_wallets WHERE user_id = $1" ) .bind(user_id) .fetch_optional(&state.pool) .await { Ok(Some(w)) => w, Ok(None) => return (StatusCode::BAD_REQUEST, "Wallet not found. Please contact support.").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let tracecoins_cost = 25; if wallet.1 < tracecoins_cost as i64 { return (StatusCode::PAYMENT_REQUIRED, format!("Insufficient balance. You need at least {} Tracecoins.", tracecoins_cost)).into_response(); } let expires_at = chrono::Utc::now() + chrono::Duration::hours(24); let result = sqlx::query_as::<_, LeadRequestRow>( r#" INSERT INTO lead_requests (lead_id, user_role_profile_id, customer_user_id, status, tracecoins_reserved, message, expires_at) VALUES ($1, $2, $3, 'PENDING', $4, $5, $6) RETURNING * "# ) .bind(payload.lead_id) .bind(user_role_profile_id) .bind(lead.2) .bind(tracecoins_cost) .bind(&payload.message) .bind(expires_at) .fetch_one(&state.pool) .await; match result { Ok(req) => { let _ = sqlx::query( r#" UPDATE tracecoin_wallets SET balance = balance - $1, reserved = COALESCE(reserved, 0) + $1, updated_at = NOW() WHERE user_id = $2 "# ) .bind(tracecoins_cost as i64) .bind(user_id) .execute(&state.pool) .await; let _ = sqlx::query( r#" INSERT INTO notifications (user_id, title, body, notification_type, reference_id) VALUES ($1, $2, $3, $4, $5) "# ) .bind(lead.2) .bind("New Lead Request") .bind("You have a new lead request. Please review and respond within 24 hours.") .bind("LEAD_REQUEST") .bind(req.id) .execute(&state.pool) .await; let response = lead_request_to_response(req); (StatusCode::CREATED, Json(response)).into_response() } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } async fn send_lead_request_ai( State(state): State>, Json(payload): Json, ) -> impl IntoResponse { let user_id = payload.user_id; let lead = match sqlx::query_as::<_, (Uuid, String, String, String, String, Option, Option)>( "SELECT id, title, description, location, profession_key, budget_min, budget_max FROM leads WHERE id = $1" ) .bind(payload.lead_id) .fetch_optional(&state.pool) .await { Ok(Some(l)) => l, Ok(None) => return (StatusCode::NOT_FOUND, "Lead not found").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if lead.4 != payload.profession_key { return (StatusCode::BAD_REQUEST, "Lead profession does not match your profile").into_response(); } let user_role_profile_id: Uuid = match sqlx::query_scalar::<_, Uuid>( "SELECT id FROM user_role_profiles WHERE user_id = $1 LIMIT 1" ) .bind(user_id) .fetch_optional(&state.pool) .await { Ok(Some(id)) => id, Ok(None) => return (StatusCode::NOT_FOUND, "Professional profile not found").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let existing = match sqlx::query_scalar::<_, Uuid>( "SELECT id FROM lead_requests WHERE lead_id = $1 AND user_role_profile_id = $2 AND status IN ('PENDING', 'ACCEPTED')" ) .bind(payload.lead_id) .bind(user_role_profile_id) .fetch_optional(&state.pool) .await { Ok(Some(_)) => true, Ok(None) => false, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if existing { return (StatusCode::CONFLICT, "You have already sent a request for this lead").into_response(); } let wallet = match sqlx::query_as::<_, (Uuid, i64)>( "SELECT id, balance FROM tracecoin_wallets WHERE user_id = $1" ) .bind(user_id) .fetch_optional(&state.pool) .await { Ok(Some(w)) => w, Ok(None) => return (StatusCode::BAD_REQUEST, "Wallet not found").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let tracecoins_cost = 30; if wallet.1 < tracecoins_cost as i64 { return (StatusCode::PAYMENT_REQUIRED, format!("Insufficient balance. You need {} Tracecoins.", tracecoins_cost)).into_response(); } let budget = match (lead.5, lead.6) { (Some(min), Some(max)) => format!("Budget: ₹{}-₹{}", min, max), (Some(min), None) => format!("Budget: ₹{} onwards", min), _ => "Budget: Not specified".to_string(), }; let prompt = format!( "You are a professional {} responding to a potential client's lead/request.\n\n\ IMPORTANT: Do NOT include phone number, email, or any contact information in your response. \ Clients pay to view contact details through the platform.\n\n\ LEAD DETAILS:\n\ Title: {}\n\ Description: {}\n\ Location: {}\n\ {}\n\n\ Write a professional, friendly message (max 150 words) expressing your interest and qualifications. \ Mention relevant experience and ask any clarifying questions. Be concise and compelling.", payload.profession_key.replace("_", " "), lead.1, lead.2, lead.3, budget ); let ai_message = match generate_ai_message(&state.http_client, &state.ollama_base_url, &state.ollama_model, &prompt).await { Ok(msg) => msg, Err(e) => { tracing::error!("AI message generation failed: {}", e); return (StatusCode::INTERNAL_SERVER_ERROR, "AI generation failed").into_response(); } }; let expires_at = chrono::Utc::now() + chrono::Duration::hours(24); let result = sqlx::query_as::<_, LeadRequestRow>( r#" INSERT INTO lead_requests (lead_id, user_role_profile_id, customer_user_id, status, tracecoins_reserved, message, expires_at) VALUES ($1, $2, $3, 'PENDING', $4, $5, $6) RETURNING * "# ) .bind(payload.lead_id) .bind(user_role_profile_id) .bind(user_id) .bind(tracecoins_cost) .bind(&ai_message) .bind(expires_at) .fetch_one(&state.pool) .await; match result { Ok(req) => { let _ = sqlx::query( r#" UPDATE tracecoin_wallets SET balance = balance - $1, reserved = COALESCE(reserved, 0) + $1, updated_at = NOW() WHERE user_id = $2 "# ) .bind(tracecoins_cost as i64) .bind(user_id) .execute(&state.pool) .await; let _ = sqlx::query( r#" INSERT INTO notifications (user_id, title, body, notification_type, reference_id) VALUES ($1, $2, $3, $4, $5) "# ) .bind(user_id) .bind("AI Auto-Respond Sent") .bind("Your AI-assisted response has been sent to the customer.") .bind("LEAD_REQUEST") .bind(req.id) .execute(&state.pool) .await; let response = lead_request_to_response(req); (StatusCode::CREATED, Json(response)).into_response() } Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), } } async fn generate_ai_message( client: &reqwest::Client, base_url: &str, model: &str, prompt: &str, ) -> Result { #[derive(Serialize)] struct GenerateRequest<'a> { model: &'a str, prompt: String, stream: bool, } #[derive(Deserialize)] struct GenerateResponse { response: String, } let url = format!("{}/api/generate", base_url.trim_end_matches('/')); let req = GenerateRequest { model, prompt: prompt.to_string(), stream: false, }; let response = client .post(&url) .json(&req) .send() .await .map_err(|e| format!("ollama request failed: {}", e))?; if !response.status().is_success() { return Err(format!("ollama returned status: {}", response.status())); } let result: GenerateResponse = response .json() .await .map_err(|e| format!("failed to parse ollama response: {}", e))?; Ok(result.response.trim().to_string()) } async fn accept_lead_request( State(state): State>, Path(id): Path, axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let request = match sqlx::query_as::<_, LeadRequestRow>( "SELECT * FROM lead_requests WHERE id = $1 AND status = 'PENDING'" ) .bind(id) .fetch_optional(&state.pool) .await { Ok(Some(r)) => r, Ok(None) => return (StatusCode::NOT_FOUND, "Lead request not found or already processed").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if request.customer_user_id != user_id { return (StatusCode::FORBIDDEN, "You are not authorized to accept this request").into_response(); } if request.expires_at < chrono::Utc::now() { return (StatusCode::BAD_REQUEST, "This request has expired").into_response(); } let lead_acceptances: (i32,) = match sqlx::query_as( "SELECT COALESCE(current_acceptances, 0) FROM leads WHERE id = $1" ) .bind(request.lead_id) .fetch_optional(&state.pool) .await { Ok(Some(l)) => l, Ok(None) => return (StatusCode::NOT_FOUND, "Lead not found").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if lead_acceptances.0 >= 10 { return (StatusCode::BAD_REQUEST, "Lead has reached maximum acceptances").into_response(); } let _ = sqlx::query( "UPDATE lead_requests SET status = 'ACCEPTED', accepted_at = NOW(), updated_at = NOW() WHERE id = $1" ) .bind(id) .execute(&state.pool) .await; let _ = sqlx::query( "UPDATE leads SET current_acceptances = current_acceptances + 1, updated_at = NOW() WHERE id = $1" ) .bind(request.lead_id) .execute(&state.pool) .await; let _ = sqlx::query( r#" UPDATE tracecoin_wallets SET reserved = reserved - $1, updated_at = NOW() WHERE user_id = $2 "# ) .bind(request.tracecoins_reserved as i64) .bind(user_id) .execute(&state.pool) .await; if lead_acceptances.0 + 1 >= 10 { let _ = sqlx::query("UPDATE leads SET status = 'CLOSED', updated_at = NOW() WHERE id = $1") .bind(request.lead_id) .execute(&state.pool) .await; } let _ = sqlx::query( r#" INSERT INTO notifications (user_id, title, body, notification_type, reference_id) VALUES ($1, $2, $3, $4, $5) "# ) .bind(user_id) .bind("Lead Request Accepted") .bind("Your lead request has been accepted! Contact details have been shared.") .bind("LEAD_REQUEST") .bind(id) .execute(&state.pool) .await; (StatusCode::OK, Json(serde_json::json!({ "message": "Lead request accepted successfully", "contact_details_shared": true }))).into_response() } async fn reject_lead_request( State(state): State>, Path(id): Path, axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let request = match sqlx::query_as::<_, LeadRequestRow>( "SELECT * FROM lead_requests WHERE id = $1 AND status = 'PENDING'" ) .bind(id) .fetch_optional(&state.pool) .await { Ok(Some(r)) => r, Ok(None) => return (StatusCode::NOT_FOUND, "Lead request not found or already processed").into_response(), Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; if request.customer_user_id != user_id { return (StatusCode::FORBIDDEN, "You are not authorized to reject this request").into_response(); } let _ = sqlx::query( "UPDATE lead_requests SET status = 'REJECTED', rejected_at = NOW(), rejected_reason = 'Rejected by customer', updated_at = NOW() WHERE id = $1" ) .bind(id) .execute(&state.pool) .await; let _ = sqlx::query( r#" UPDATE tracecoin_wallets SET balance = balance + $1, reserved = reserved - $1, updated_at = NOW() WHERE user_id = $2 "# ) .bind(request.tracecoins_reserved as i64) .bind(user_id) .execute(&state.pool) .await; let _ = sqlx::query( r#" INSERT INTO notifications (user_id, title, body, notification_type, reference_id) VALUES ($1, $2, $3, $4, $5) "# ) .bind(user_id) .bind("Lead Request Rejected") .bind("Your lead request was not accepted. Tracecoins have been refunded.") .bind("LEAD_REQUEST") .bind(id) .execute(&state.pool) .await; (StatusCode::OK, Json(serde_json::json!({ "message": "Lead request rejected. Tracecoins refunded.", "refunded": request.tracecoins_reserved }))).into_response() } async fn my_requests( State(state): State>, Query(q): Query, axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); let offset = (page - 1) * limit; let status_filter = q.status .as_ref() .map(|s| format!("AND lr.status = '{}'", s)) .unwrap_or_default(); let requests = match sqlx::query_as::<_, LeadRequestRow>(&format!( r#" SELECT lr.* FROM lead_requests lr JOIN user_role_profiles urp ON urp.id = lr.user_role_profile_id WHERE urp.user_id = $1 {} ORDER BY lr.created_at DESC LIMIT {} OFFSET {} "#, status_filter, limit, offset )) .bind(user_id) .fetch_all(&state.pool) .await { Ok(r) => r, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let requests: Vec = requests.into_iter().map(lead_request_to_response).collect(); (StatusCode::OK, Json(serde_json::json!({ "data": requests, "pagination": { "page": page, "limit": limit } }))).into_response() } async fn my_pending_requests( State(state): State>, axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let requests = match sqlx::query_as::<_, LeadRequestRow>( r#" SELECT lr.* FROM lead_requests lr WHERE lr.customer_user_id = $1 AND lr.status = 'PENDING' ORDER BY lr.expires_at ASC "# ) .bind(user_id) .fetch_all(&state.pool) .await { Ok(r) => r, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let requests: Vec = requests.into_iter().map(lead_request_to_response).collect(); (StatusCode::OK, Json(serde_json::json!({ "data": requests }))).into_response() } async fn get_customer_lead_requests( State(state): State>, Path(lead_id): Path, Query(q): Query, axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo, ) -> impl IntoResponse { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default(); let page = q.page.unwrap_or(1); let limit = q.limit.unwrap_or(20); let offset = (page - 1) * limit; let requests = match sqlx::query_as::<_, LeadRequestRow>(&format!( r#" SELECT lr.* FROM lead_requests lr WHERE lr.lead_id = $1 AND lr.customer_user_id = $2 ORDER BY lr.created_at DESC LIMIT {} OFFSET {} "#, limit, offset )) .bind(lead_id) .bind(user_id) .fetch_all(&state.pool) .await { Ok(r) => r, Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), }; let requests: Vec = requests.into_iter().map(lead_request_to_response).collect(); (StatusCode::OK, Json(serde_json::json!({ "data": requests, "pagination": { "page": page, "limit": limit } }))).into_response() }