Files
dvf/sync.js

158 lines
4.8 KiB
JavaScript

import { parse } from 'csv-parse';
import fs from 'node:fs';
import zlib from 'node:zlib';
import { pipeline } from 'node:stream/promises';
import { Writable } from 'node:stream';
import mysql from 'mysql2/promise';
import './lib/dotenv.js';
const BASE_URL = 'https://files.data.gouv.fr/geo-dvf/latest/csv';
const GEODVF_DIR = 'geodvf';
const connectionString = process.env.MYSQL;
if (!connectionString) {
throw new Error('MYSQL environment variable not set');
}
// parse mysql connection string
const url = new URL(connectionString);
const dbConfig = {
host: url.hostname,
port: url.port || 3306,
user: url.username,
password: decodeURIComponent(url.password),
database: url.pathname.slice(1),
charset: 'utf8mb4',
};
async function getYearsOnServer() {
const res = await fetch(`${BASE_URL}/`);
const html = await res.text();
const years = [];
for (const match of html.matchAll(/href="(\d{4})\/"/g)) {
years.push(Number(match[1]));
}
return years.toSorted();
}
async function getYearsInDb(connection) {
const [rows] = await connection.query('SELECT DISTINCT YEAR(date_mutation) as annee FROM dvf ORDER BY annee');
return rows.map(r => r.annee);
}
async function downloadYear(year) {
const file = `${GEODVF_DIR}/${year}.csv.gz`;
if (fs.existsSync(file)) {
console.log(` ${file} déjà présent, skip download`);
return file;
}
const url = `${BASE_URL}/${year}/full.csv.gz`;
console.log(` Téléchargement ${url} ...`);
const res = await fetch(url);
if (!res.ok) throw new Error(`HTTP ${res.status} pour ${url}`);
fs.mkdirSync(GEODVF_DIR, { recursive: true });
const dest = fs.createWriteStream(file);
await pipeline(res.body, dest);
const size = fs.statSync(file).size;
console.log(` Téléchargé ${file} (${(size / 1024 / 1024).toFixed(1)} Mo)`);
return file;
}
async function importYear(connection, year, file) {
console.log(` Parsing et insertion de ${file} ...`);
let columns = null;
let inserted = 0;
const BATCH_SIZE = 5000;
let batch = [];
const flush = async () => {
if (batch.length === 0) return;
const placeholders = batch.map(row => `(${row.map(() => '?').join(', ')})`).join(', ');
const sql = `INSERT INTO dvf (${columns.join(', ')}) VALUES ${placeholders}`;
const flat = batch.flat();
await connection.query(sql, flat);
inserted += batch.length;
if (inserted % 100_000 < BATCH_SIZE) {
process.stdout.write(` ... ${inserted.toLocaleString()} lignes insérées\r`);
}
batch = [];
};
const parser = parse({ delimiter: ',', columns: true });
const writer = new Writable({
objectMode: true,
async write(record, _encoding, callback) {
try {
if (!columns) {
columns = Object.keys(record).map(col => `\`${col}\``);
}
const values = Object.values(record).map(v => v === '' ? null : v);
batch.push(values);
if (batch.length >= BATCH_SIZE) {
await flush();
}
callback();
} catch (err) {
callback(err);
}
},
async final(callback) {
try {
await flush();
callback();
} catch (err) {
callback(err);
}
},
});
const input = fs.createReadStream(file).pipe(zlib.createGunzip());
await pipeline(input, parser, writer);
console.log(` ${inserted.toLocaleString()} lignes insérées pour ${year}`);
return inserted;
}
async function main() {
const forceYear = process.argv[2] ? Number(process.argv[2]) : null;
console.log('Connexion à la base...');
const connection = await mysql.createConnection(dbConfig);
console.log('Vérification des années sur le serveur...');
const serverYears = await getYearsOnServer();
console.log(`Années disponibles : ${serverYears.join(', ')}`);
const dbYears = await getYearsInDb(connection);
console.log(`Années en base : ${dbYears.join(', ')}`);
let missing;
if (forceYear) {
missing = [forceYear];
console.log(`Mode forcé : import de ${forceYear}`);
} else {
missing = serverYears.filter(y => !dbYears.includes(y));
}
if (missing.length === 0) {
console.log('Tout est à jour !');
await connection.end();
return;
}
console.log(`Années manquantes : ${missing.join(', ')}`);
for (const year of missing) {
console.log(`\n=== ${year} ===`);
const file = await downloadYear(year);
await importYear(connection, year, file);
}
console.log('\nTerminé !');
await connection.end();
}
await main();