nxtgauge-backend-rust/apps/leads/src/lead_requests.rs
2026-04-18 18:30:56 +02:00

540 lines
17 KiB
Rust

use crate::AppState;
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Json, Router,
};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Debug, Deserialize)]
pub struct PaginationQuery {
pub page: Option<i64>,
pub limit: Option<i64>,
pub status: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct SendLeadRequestPayload {
pub lead_id: Uuid,
pub message: Option<String>,
}
#[derive(Debug, FromRow)]
pub struct LeadRequestRow {
pub id: Uuid,
pub lead_id: Uuid,
pub user_role_profile_id: Uuid,
pub customer_user_id: Uuid,
pub status: String,
pub tracecoins_reserved: i32,
pub message: Option<String>,
pub expires_at: chrono::DateTime<chrono::Utc>,
pub accepted_at: Option<chrono::DateTime<chrono::Utc>>,
pub rejected_at: Option<chrono::DateTime<chrono::Utc>>,
pub rejected_reason: Option<String>,
pub created_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Serialize)]
pub struct LeadRequestResponse {
pub id: Uuid,
pub lead_id: Uuid,
pub user_role_profile_id: Uuid,
pub customer_user_id: Uuid,
pub professional_name: Option<String>,
pub professional_role: Option<String>,
pub customer_name: Option<String>,
pub lead_title: Option<String>,
pub status: String,
pub tracecoins_reserved: i32,
pub message: Option<String>,
pub expires_at: chrono::DateTime<chrono::Utc>,
pub accepted_at: Option<chrono::DateTime<chrono::Utc>>,
pub rejected_at: Option<chrono::DateTime<chrono::Utc>>,
pub rejected_reason: Option<String>,
pub created_at: chrono::DateTime<chrono::Utc>,
}
pub fn router() -> Router<Arc<AppState>> {
Router::new()
.route("/", get(list_lead_requests))
.route("/send", post(send_lead_request))
.route("/{id}/accept", post(accept_lead_request))
.route("/{id}/reject", post(reject_lead_request))
.route("/my-requests", get(my_requests))
.route("/my-pending", get(my_pending_requests))
.route("/customer/{lead_id}", get(get_customer_lead_requests))
}
fn lead_request_to_response(row: LeadRequestRow) -> LeadRequestResponse {
LeadRequestResponse {
id: row.id,
lead_id: row.lead_id,
user_role_profile_id: row.user_role_profile_id,
customer_user_id: row.customer_user_id,
professional_name: None,
professional_role: None,
customer_name: None,
lead_title: None,
status: row.status,
tracecoins_reserved: row.tracecoins_reserved,
message: row.message,
expires_at: row.expires_at,
accepted_at: row.accepted_at,
rejected_at: row.rejected_at,
rejected_reason: row.rejected_reason,
created_at: row.created_at,
}
}
async fn list_lead_requests(
State(state): State<Arc<AppState>>,
Query(q): Query<PaginationQuery>,
) -> impl IntoResponse {
let page = q.page.unwrap_or(1);
let limit = q.limit.unwrap_or(20);
let offset = (page - 1) * limit;
let status_filter = q.status
.as_ref()
.map(|s| format!("AND lr.status = '{}'", s))
.unwrap_or_default();
let requests = match sqlx::query_as::<_, LeadRequestRow>(&format!(
r#"
SELECT lr.* FROM lead_requests lr
WHERE 1=1 {}
ORDER BY lr.created_at DESC
LIMIT {} OFFSET {}
"#,
status_filter, limit, offset
))
.fetch_all(&state.pool)
.await
{
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
let requests: Vec<LeadRequestResponse> = requests.into_iter().map(lead_request_to_response).collect();
(StatusCode::OK, Json(serde_json::json!({
"data": requests,
"pagination": { "page": page, "limit": limit }
}))).into_response()
}
async fn send_lead_request(
State(state): State<Arc<AppState>>,
axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
Json(payload): Json<SendLeadRequestPayload>,
) -> impl IntoResponse {
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default();
let user_role_profile_id = match sqlx::query_scalar::<_, Uuid>(
"SELECT id FROM user_role_profiles WHERE user_id = $1 LIMIT 1"
)
.bind(user_id)
.fetch_optional(&state.pool)
.await
{
Ok(Some(id)) => id,
Ok(None) => return (StatusCode::NOT_FOUND, "Professional profile not found. Please complete your profile first.").into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
let lead = match sqlx::query_as::<_, (Uuid, String, Uuid, String, i32)>(
"SELECT id, title, customer_user_id, status, COALESCE(current_acceptances, 0) FROM leads WHERE id = $1"
)
.bind(payload.lead_id)
.fetch_optional(&state.pool)
.await
{
Ok(Some(l)) => l,
Ok(None) => return (StatusCode::NOT_FOUND, "Lead not found").into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
if lead.3 != "OPEN" {
return (StatusCode::BAD_REQUEST, "Lead is not open for requests").into_response();
}
if lead.4 >= 10 {
return (StatusCode::BAD_REQUEST, "Lead has reached maximum acceptances").into_response();
}
let duplicate = match sqlx::query_scalar::<_, Uuid>(
"SELECT id FROM lead_requests WHERE lead_id = $1 AND user_role_profile_id = $2 AND status IN ('PENDING', 'ACCEPTED')"
)
.bind(payload.lead_id)
.bind(user_role_profile_id)
.fetch_optional(&state.pool)
.await
{
Ok(Some(_)) => true,
Ok(None) => false,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
if duplicate {
return (StatusCode::CONFLICT, "You have already sent a request for this lead").into_response();
}
let request_count: (i64,) = match sqlx::query_as(
"SELECT COUNT(*) FROM lead_requests WHERE lead_id = $1 AND status IN ('PENDING', 'ACCEPTED')"
)
.bind(payload.lead_id)
.fetch_one(&state.pool)
.await
{
Ok(c) => c,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
if request_count.0 >= 20 {
return (StatusCode::CONFLICT, "Lead has reached maximum requests").into_response();
}
let wallet = match sqlx::query_as::<_, (Uuid, i64)>(
"SELECT id, balance FROM tracecoin_wallets WHERE user_id = $1"
)
.bind(user_id)
.fetch_optional(&state.pool)
.await
{
Ok(Some(w)) => w,
Ok(None) => return (StatusCode::BAD_REQUEST, "Wallet not found. Please contact support.").into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
let tracecoins_cost = 25;
if wallet.1 < tracecoins_cost as i64 {
return (StatusCode::PAYMENT_REQUIRED, format!("Insufficient balance. You need at least {} Tracecoins.", tracecoins_cost)).into_response();
}
let expires_at = chrono::Utc::now() + chrono::Duration::hours(24);
let result = sqlx::query_as::<_, LeadRequestRow>(
r#"
INSERT INTO lead_requests (lead_id, user_role_profile_id, customer_user_id, status, tracecoins_reserved, message, expires_at)
VALUES ($1, $2, $3, 'PENDING', $4, $5, $6)
RETURNING *
"#
)
.bind(payload.lead_id)
.bind(user_role_profile_id)
.bind(lead.2)
.bind(tracecoins_cost)
.bind(&payload.message)
.bind(expires_at)
.fetch_one(&state.pool)
.await;
match result {
Ok(req) => {
let _ = sqlx::query(
r#"
UPDATE tracecoin_wallets SET
balance = balance - $1,
reserved = COALESCE(reserved, 0) + $1,
updated_at = NOW()
WHERE user_id = $2
"#
)
.bind(tracecoins_cost as i64)
.bind(user_id)
.execute(&state.pool)
.await;
let _ = sqlx::query(
r#"
INSERT INTO notifications (user_id, title, body, notification_type, reference_id)
VALUES ($1, $2, $3, $4, $5)
"#
)
.bind(lead.2)
.bind("New Lead Request")
.bind("You have a new lead request. Please review and respond within 24 hours.")
.bind("LEAD_REQUEST")
.bind(req.id)
.execute(&state.pool)
.await;
let response = lead_request_to_response(req);
(StatusCode::CREATED, Json(response)).into_response()
}
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
async fn accept_lead_request(
State(state): State<Arc<AppState>>,
Path(id): Path<Uuid>,
axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
) -> impl IntoResponse {
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default();
let request = match sqlx::query_as::<_, LeadRequestRow>(
"SELECT * FROM lead_requests WHERE id = $1 AND status = 'PENDING'"
)
.bind(id)
.fetch_optional(&state.pool)
.await
{
Ok(Some(r)) => r,
Ok(None) => return (StatusCode::NOT_FOUND, "Lead request not found or already processed").into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
if request.customer_user_id != user_id {
return (StatusCode::FORBIDDEN, "You are not authorized to accept this request").into_response();
}
if request.expires_at < chrono::Utc::now() {
return (StatusCode::BAD_REQUEST, "This request has expired").into_response();
}
let lead_acceptances: (i32,) = match sqlx::query_as(
"SELECT COALESCE(current_acceptances, 0) FROM leads WHERE id = $1"
)
.bind(request.lead_id)
.fetch_optional(&state.pool)
.await
{
Ok(Some(l)) => l,
Ok(None) => return (StatusCode::NOT_FOUND, "Lead not found").into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
if lead_acceptances.0 >= 10 {
return (StatusCode::BAD_REQUEST, "Lead has reached maximum acceptances").into_response();
}
let _ = sqlx::query(
"UPDATE lead_requests SET status = 'ACCEPTED', accepted_at = NOW(), updated_at = NOW() WHERE id = $1"
)
.bind(id)
.execute(&state.pool)
.await;
let _ = sqlx::query(
"UPDATE leads SET current_acceptances = current_acceptances + 1, updated_at = NOW() WHERE id = $1"
)
.bind(request.lead_id)
.execute(&state.pool)
.await;
let _ = sqlx::query(
r#"
UPDATE tracecoin_wallets SET
reserved = reserved - $1,
updated_at = NOW()
WHERE user_id = $2
"#
)
.bind(request.tracecoins_reserved as i64)
.bind(user_id)
.execute(&state.pool)
.await;
if lead_acceptances.0 + 1 >= 10 {
let _ = sqlx::query("UPDATE leads SET status = 'CLOSED', updated_at = NOW() WHERE id = $1")
.bind(request.lead_id)
.execute(&state.pool)
.await;
}
let _ = sqlx::query(
r#"
INSERT INTO notifications (user_id, title, body, notification_type, reference_id)
VALUES ($1, $2, $3, $4, $5)
"#
)
.bind(user_id)
.bind("Lead Request Accepted")
.bind("Your lead request has been accepted! Contact details have been shared.")
.bind("LEAD_REQUEST")
.bind(id)
.execute(&state.pool)
.await;
(StatusCode::OK, Json(serde_json::json!({
"message": "Lead request accepted successfully",
"contact_details_shared": true
}))).into_response()
}
async fn reject_lead_request(
State(state): State<Arc<AppState>>,
Path(id): Path<Uuid>,
axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
) -> impl IntoResponse {
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default();
let request = match sqlx::query_as::<_, LeadRequestRow>(
"SELECT * FROM lead_requests WHERE id = $1 AND status = 'PENDING'"
)
.bind(id)
.fetch_optional(&state.pool)
.await
{
Ok(Some(r)) => r,
Ok(None) => return (StatusCode::NOT_FOUND, "Lead request not found or already processed").into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
if request.customer_user_id != user_id {
return (StatusCode::FORBIDDEN, "You are not authorized to reject this request").into_response();
}
let _ = sqlx::query(
"UPDATE lead_requests SET status = 'REJECTED', rejected_at = NOW(), rejected_reason = 'Rejected by customer', updated_at = NOW() WHERE id = $1"
)
.bind(id)
.execute(&state.pool)
.await;
let _ = sqlx::query(
r#"
UPDATE tracecoin_wallets SET
balance = balance + $1,
reserved = reserved - $1,
updated_at = NOW()
WHERE user_id = $2
"#
)
.bind(request.tracecoins_reserved as i64)
.bind(user_id)
.execute(&state.pool)
.await;
let _ = sqlx::query(
r#"
INSERT INTO notifications (user_id, title, body, notification_type, reference_id)
VALUES ($1, $2, $3, $4, $5)
"#
)
.bind(user_id)
.bind("Lead Request Rejected")
.bind("Your lead request was not accepted. Tracecoins have been refunded.")
.bind("LEAD_REQUEST")
.bind(id)
.execute(&state.pool)
.await;
(StatusCode::OK, Json(serde_json::json!({
"message": "Lead request rejected. Tracecoins refunded.",
"refunded": request.tracecoins_reserved
}))).into_response()
}
async fn my_requests(
State(state): State<Arc<AppState>>,
Query(q): Query<PaginationQuery>,
axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
) -> impl IntoResponse {
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default();
let page = q.page.unwrap_or(1);
let limit = q.limit.unwrap_or(20);
let offset = (page - 1) * limit;
let status_filter = q.status
.as_ref()
.map(|s| format!("AND lr.status = '{}'", s))
.unwrap_or_default();
let requests = match sqlx::query_as::<_, LeadRequestRow>(&format!(
r#"
SELECT lr.* FROM lead_requests lr
JOIN user_role_profiles urp ON urp.id = lr.user_role_profile_id
WHERE urp.user_id = $1 {}
ORDER BY lr.created_at DESC
LIMIT {} OFFSET {}
"#,
status_filter, limit, offset
))
.bind(user_id)
.fetch_all(&state.pool)
.await
{
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
let requests: Vec<LeadRequestResponse> = requests.into_iter().map(lead_request_to_response).collect();
(StatusCode::OK, Json(serde_json::json!({
"data": requests,
"pagination": { "page": page, "limit": limit }
}))).into_response()
}
async fn my_pending_requests(
State(state): State<Arc<AppState>>,
axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
) -> impl IntoResponse {
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default();
let requests = match sqlx::query_as::<_, LeadRequestRow>(
r#"
SELECT lr.* FROM lead_requests lr
WHERE lr.customer_user_id = $1 AND lr.status = 'PENDING'
ORDER BY lr.expires_at ASC
"#
)
.bind(user_id)
.fetch_all(&state.pool)
.await
{
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
let requests: Vec<LeadRequestResponse> = requests.into_iter().map(lead_request_to_response).collect();
(StatusCode::OK, Json(serde_json::json!({
"data": requests
}))).into_response()
}
async fn get_customer_lead_requests(
State(state): State<Arc<AppState>>,
Path(lead_id): Path<Uuid>,
Query(q): Query<PaginationQuery>,
axum::extract::ConnectInfo(_addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
) -> impl IntoResponse {
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap_or_default();
let page = q.page.unwrap_or(1);
let limit = q.limit.unwrap_or(20);
let offset = (page - 1) * limit;
let requests = match sqlx::query_as::<_, LeadRequestRow>(&format!(
r#"
SELECT lr.* FROM lead_requests lr
WHERE lr.lead_id = $1 AND lr.customer_user_id = $2
ORDER BY lr.created_at DESC
LIMIT {} OFFSET {}
"#,
limit, offset
))
.bind(lead_id)
.bind(user_id)
.fetch_all(&state.pool)
.await
{
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
let requests: Vec<LeadRequestResponse> = requests.into_iter().map(lead_request_to_response).collect();
(StatusCode::OK, Json(serde_json::json!({
"data": requests,
"pagination": { "page": page, "limit": limit }
}))).into_response()
}