- Update leads service to use 'leads' table - Update extension models to use user_role_profile_id - Update ProfessionalRepository to work with new schema - Create TracecoinWalletRepository for wallet operations - Update all handlers to use new model fields - Rename Application fields (job_seeker_id -> applicant_user_id) - Update cron tasks for new schema - Fix compilation errors across all services
105 lines
3.3 KiB
Rust
105 lines
3.3 KiB
Rust
use std::path::Path;
|
|
use anyhow::{Context, Result};
|
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<()> {
|
|
tracing_subscriber::registry()
|
|
.with(tracing_subscriber::EnvFilter::new(
|
|
std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()),
|
|
))
|
|
.with(tracing_subscriber::fmt::layer())
|
|
.init();
|
|
|
|
let database_url = std::env::var("DATABASE_URL")
|
|
.context("DATABASE_URL must be set")?;
|
|
|
|
tracing::info!("Connecting to database...");
|
|
let pool = sqlx::postgres::PgPoolOptions::new()
|
|
.max_connections(1)
|
|
.connect(&database_url)
|
|
.await
|
|
.context("Failed to connect to database")?;
|
|
tracing::info!("Connected to database");
|
|
|
|
let drop_existing = std::env::var("DROP_EXISTING_TABLES")
|
|
.unwrap_or_default()
|
|
.to_lowercase();
|
|
|
|
if drop_existing == "true" || drop_existing == "1" || drop_existing == "yes" {
|
|
drop_all_tables(&pool).await?;
|
|
}
|
|
|
|
let migrations_dir = std::env::var("MIGRATIONS_DIR")
|
|
.unwrap_or_else(|_| "/migrations".to_string());
|
|
|
|
run_migrations(&pool, &migrations_dir).await?;
|
|
|
|
tracing::info!("All migrations completed successfully!");
|
|
Ok(())
|
|
}
|
|
|
|
async fn drop_all_tables(pool: &sqlx::PgPool) -> Result<()> {
|
|
tracing::warn!("DROP_EXISTING_TABLES is enabled - dropping all tables!");
|
|
|
|
let rows: Vec<(String,)> = sqlx::query_as("SELECT tablename FROM pg_tables WHERE schemaname = 'public'")
|
|
.fetch_all(pool)
|
|
.await?;
|
|
|
|
if rows.is_empty() {
|
|
tracing::info!("No tables to drop");
|
|
return Ok(());
|
|
}
|
|
|
|
tracing::info!("Found {} tables to drop", rows.len());
|
|
|
|
for (table_name,) in rows {
|
|
let sql = format!("DROP TABLE IF EXISTS \"{}\" CASCADE", table_name);
|
|
tracing::info!("Dropping table: {}", table_name);
|
|
sqlx::raw_sql(&sql).execute(pool).await?;
|
|
}
|
|
|
|
tracing::info!("All tables dropped successfully");
|
|
Ok(())
|
|
}
|
|
|
|
async fn run_migrations(pool: &sqlx::PgPool, migrations_dir: &str) -> Result<()> {
|
|
let migrations_path = Path::new(migrations_dir);
|
|
|
|
if !migrations_path.exists() {
|
|
tracing::warn!("Migrations directory does not exist: {}", migrations_dir);
|
|
return Ok(());
|
|
}
|
|
|
|
let mut entries: Vec<_> = std::fs::read_dir(migrations_path)?
|
|
.filter_map(|e| e.ok())
|
|
.filter(|e| {
|
|
let name = e.file_name();
|
|
let name_str = name.to_string_lossy();
|
|
name_str.ends_with(".up.sql")
|
|
})
|
|
.collect();
|
|
|
|
entries.sort_by_key(|e| e.file_name());
|
|
|
|
tracing::info!("Found {} migration files", entries.len());
|
|
|
|
for entry in entries {
|
|
let file_name = entry.file_name();
|
|
let file_path = entry.path();
|
|
|
|
tracing::info!("Applying migration: {}", file_name.to_string_lossy());
|
|
|
|
let sql = std::fs::read_to_string(&file_path)
|
|
.with_context(|| format!("Failed to read migration: {}", file_name.to_string_lossy()))?;
|
|
|
|
sqlx::raw_sql(&sql)
|
|
.execute(pool)
|
|
.await
|
|
.with_context(|| format!("Failed to execute migration: {}", file_name.to_string_lossy()))?;
|
|
|
|
tracing::info!("Applied migration: {}", file_name.to_string_lossy());
|
|
}
|
|
|
|
Ok(())
|
|
}
|