import { parse } from 'csv-parse'; import fs from 'node:fs'; import zlib from 'node:zlib'; import { pipeline } from 'node:stream/promises'; import { Writable } from 'node:stream'; import mysql from 'mysql2/promise'; import './lib/dotenv.js'; const BASE_URL = 'https://files.data.gouv.fr/geo-dvf/latest/csv'; const GEODVF_DIR = 'geodvf'; const connectionString = process.env.MYSQL; if (!connectionString) { throw new Error('MYSQL environment variable not set'); } // parse mysql connection string const url = new URL(connectionString); const dbConfig = { host: url.hostname, port: url.port || 3306, user: url.username, password: decodeURIComponent(url.password), database: url.pathname.slice(1), charset: 'utf8mb4', }; async function getYearsOnServer() { const res = await fetch(`${BASE_URL}/`); const html = await res.text(); const years = []; for (const match of html.matchAll(/href="(\d{4})\/"/g)) { years.push(Number(match[1])); } return years.toSorted(); } async function getYearsInDb(connection) { const [rows] = await connection.query('SELECT DISTINCT YEAR(date_mutation) as annee FROM dvf ORDER BY annee'); return rows.map(r => r.annee); } async function downloadYear(year) { const file = `${GEODVF_DIR}/${year}.csv.gz`; if (fs.existsSync(file)) { console.log(` ${file} déjà présent, skip download`); return file; } const url = `${BASE_URL}/${year}/full.csv.gz`; console.log(` Téléchargement ${url} ...`); const res = await fetch(url); if (!res.ok) throw new Error(`HTTP ${res.status} pour ${url}`); fs.mkdirSync(GEODVF_DIR, { recursive: true }); const dest = fs.createWriteStream(file); await pipeline(res.body, dest); const size = fs.statSync(file).size; console.log(` Téléchargé ${file} (${(size / 1024 / 1024).toFixed(1)} Mo)`); return file; } async function importYear(connection, year, file) { console.log(` Parsing et insertion de ${file} ...`); let columns = null; let inserted = 0; const BATCH_SIZE = 5000; let batch = []; const flush = async () => { if (batch.length === 0) return; const placeholders = batch.map(row => `(${row.map(() => '?').join(', ')})`).join(', '); const sql = `INSERT INTO dvf (${columns.join(', ')}) VALUES ${placeholders}`; const flat = batch.flat(); await connection.query(sql, flat); inserted += batch.length; if (inserted % 100_000 < BATCH_SIZE) { process.stdout.write(` ... ${inserted.toLocaleString()} lignes insérées\r`); } batch = []; }; const parser = parse({ delimiter: ',', columns: true }); const writer = new Writable({ objectMode: true, async write(record, _encoding, callback) { try { if (!columns) { columns = Object.keys(record).map(col => `\`${col}\``); } const values = Object.values(record).map(v => (v === '' ? null : v)); batch.push(values); if (batch.length >= BATCH_SIZE) { await flush(); } callback(); } catch (err) { callback(err); } }, async final(callback) { try { await flush(); callback(); } catch (err) { callback(err); } }, }); const input = fs.createReadStream(file).pipe(zlib.createGunzip()); await pipeline(input, parser, writer); console.log(` ${inserted.toLocaleString()} lignes insérées pour ${year}`); return inserted; } async function main() { const forceYear = process.argv[2] ? Number(process.argv[2]) : null; console.log('Connexion à la base...'); const connection = await mysql.createConnection(dbConfig); console.log('Vérification des années sur le serveur...'); const serverYears = await getYearsOnServer(); console.log(`Années disponibles : ${serverYears.join(', ')}`); const dbYears = await getYearsInDb(connection); console.log(`Années en base : ${dbYears.join(', ')}`); let missing = serverYears.filter(y => !dbYears.includes(y)); if (forceYear) { missing = [forceYear]; console.log(`Mode forcé : import de ${forceYear}`); } if (missing.length === 0) { console.log('Tout est à jour !'); await connection.end(); return; } console.log(`Années manquantes : ${missing.join(', ')}`); for (const year of missing) { console.log(`\n=== ${year} ===`); const file = await downloadYear(year); await importYear(connection, year, file); } console.log('\nTerminé !'); await connection.end(); } await main();