10af5a50e5
parse.js/sync.js/lib/dotenv.js -> .ts (run via tsx), ajout tsconfig.json, eslint config TS-enabled (parser tseslint + resolver typescript), devDeps typescript/typescript-eslint/tsx/eslint-import-resolver-typescript, scripts package.json en tsx + typecheck, .lintstagedrc *.ts, README en tsx. Fix securite: escapeId sur les identifiants de colonnes dans sync.ts (au lieu de backticks bruts) pour ne pas casser/injecter via un en-tete CSV distant. Suppression du todo.txt audit (finding escapeId traite). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
156 lines
5.1 KiB
TypeScript
156 lines
5.1 KiB
TypeScript
import { parse } from 'csv-parse';
|
|
import fs from 'node:fs';
|
|
import zlib from 'node:zlib';
|
|
import { pipeline } from 'node:stream/promises';
|
|
import { Writable } from 'node:stream';
|
|
import mysql from 'mysql2/promise';
|
|
import type { Connection } from 'mysql2/promise';
|
|
import './lib/dotenv.ts';
|
|
|
|
const connectionString = process.env.MYSQL;
|
|
if (!connectionString) {
|
|
throw new Error('MYSQL environment variable not set');
|
|
}
|
|
|
|
const BASE_URL = 'https://files.data.gouv.fr/geo-dvf/latest/csv';
|
|
const GEODVF_DIR = 'geodvf';
|
|
|
|
// parse mysql connection string
|
|
const url = new URL(connectionString);
|
|
const dbConfig = {
|
|
host: url.hostname,
|
|
port: url.port ? Number(url.port) : 3306,
|
|
user: url.username,
|
|
password: decodeURIComponent(url.password),
|
|
database: url.pathname.slice(1),
|
|
charset: 'utf8mb4',
|
|
};
|
|
|
|
async function getYearsOnServer() {
|
|
const res = await fetch(`${BASE_URL}/`);
|
|
const html = await res.text();
|
|
const years = Array.from(html.matchAll(/href="(\d{4})\/"/g), match => Number(match[1]));
|
|
return years.toSorted((a, b) => a - b);
|
|
}
|
|
|
|
async function getYearsInDb(connection: Connection) {
|
|
const [rows] = await connection.query('SELECT DISTINCT YEAR(date_mutation) as annee FROM dvf ORDER BY annee');
|
|
return (rows as { annee: number }[]).map(r => r.annee);
|
|
}
|
|
|
|
async function downloadYear(year) {
|
|
const file = `${GEODVF_DIR}/${year}.csv.gz`;
|
|
if (fs.existsSync(file)) {
|
|
console.log(` ${file} déjà présent, skip download`);
|
|
return file;
|
|
}
|
|
const url = `${BASE_URL}/${year}/full.csv.gz`;
|
|
console.log(` Téléchargement ${url} ...`);
|
|
const res = await fetch(url);
|
|
if (!res.ok) throw new Error(`HTTP ${res.status} pour ${url}`);
|
|
fs.mkdirSync(GEODVF_DIR, { recursive: true });
|
|
const dest = fs.createWriteStream(file);
|
|
await pipeline(res.body, dest);
|
|
const size = fs.statSync(file).size;
|
|
console.log(` Téléchargé ${file} (${(size / 1024 / 1024).toFixed(1)} Mo)`);
|
|
return file;
|
|
}
|
|
|
|
async function importYear(connection: Connection, year: number, file: string) {
|
|
console.log(` Parsing et insertion de ${file} ...`);
|
|
|
|
let columns: string[] | null = null;
|
|
let inserted = 0;
|
|
const BATCH_SIZE = 5000;
|
|
let batch: unknown[][] = [];
|
|
|
|
const flush = async () => {
|
|
if (batch.length === 0) return;
|
|
const placeholders = batch.map(row => `(${row.map(() => '?').join(', ')})`).join(', ');
|
|
const sql = `INSERT INTO dvf (${columns.join(', ')}) VALUES ${placeholders}`;
|
|
const flat = batch.flat();
|
|
await connection.query(sql, flat);
|
|
inserted += batch.length;
|
|
if (inserted % 100_000 < BATCH_SIZE) {
|
|
process.stdout.write(` ... ${inserted.toLocaleString()} lignes insérées\r`);
|
|
}
|
|
batch = [];
|
|
};
|
|
|
|
const parser = parse({ delimiter: ',', columns: true });
|
|
|
|
const writer = new Writable({
|
|
objectMode: true,
|
|
async write(record: Record<string, string>, _encoding, callback) {
|
|
try {
|
|
if (!columns) {
|
|
// escapeId (et non un simple backtick) pour ne pas casser l'identifiant
|
|
// si un en-tête CSV distant contient un backtick — cf. parse.ts
|
|
columns = Object.keys(record).map(col => connection.escapeId(col));
|
|
}
|
|
const values = Object.values(record).map(v => (v === '' ? null : v));
|
|
batch.push(values);
|
|
if (batch.length >= BATCH_SIZE) {
|
|
await flush();
|
|
}
|
|
callback();
|
|
} catch (err) {
|
|
callback(err);
|
|
}
|
|
},
|
|
async final(callback) {
|
|
try {
|
|
await flush();
|
|
callback();
|
|
} catch (err) {
|
|
callback(err);
|
|
}
|
|
},
|
|
});
|
|
|
|
const input = fs.createReadStream(file).pipe(zlib.createGunzip());
|
|
await pipeline(input, parser, writer);
|
|
|
|
console.log(` ${inserted.toLocaleString()} lignes insérées pour ${year}`);
|
|
return inserted;
|
|
}
|
|
|
|
async function main() {
|
|
const forceYear = process.argv[2] ? Number(process.argv[2]) : null;
|
|
|
|
console.log('Connexion à la base...');
|
|
const connection = await mysql.createConnection(dbConfig);
|
|
|
|
console.log('Vérification des années sur le serveur...');
|
|
const serverYears = await getYearsOnServer();
|
|
console.log(`Années disponibles : ${serverYears.join(', ')}`);
|
|
|
|
const dbYears = await getYearsInDb(connection);
|
|
console.log(`Années en base : ${dbYears.join(', ')}`);
|
|
|
|
let missing = serverYears.filter(y => !dbYears.includes(y));
|
|
if (forceYear) {
|
|
missing = [forceYear];
|
|
console.log(`Mode forcé : import de ${forceYear}`);
|
|
}
|
|
|
|
if (missing.length === 0) {
|
|
console.log('Tout est à jour !');
|
|
await connection.end();
|
|
return;
|
|
}
|
|
|
|
console.log(`Années manquantes : ${missing.join(', ')}`);
|
|
|
|
for (const year of missing) {
|
|
console.log(`\n=== ${year} ===`);
|
|
const file = await downloadYear(year);
|
|
await importYear(connection, year, file);
|
|
}
|
|
|
|
console.log('\nTerminé !');
|
|
await connection.end();
|
|
}
|
|
|
|
await main();
|