sanctions-etl-svc — ingestion + scheduler configurable des listes
Module :
platform/sanctions-etl-svc/(Ktor + Quartz + Temporal CronWorkflow). Producer downstreamsanctions-svc(cf page engineering) etsanctions-admin-ui(cf console admin).ADRs : ADR-006 (stratégie pipeline + air-gap), ADR-030 (moteur de matching), ADR-001 (Temporal pour orchestration), ADR-002 (multi-tenant RLS).
Pages liées : Sanctions screening — moteur, Sanctions admin UI, POC sanctions matcher, Runbook SRE sanctions.
Cette page fixe la spec engineering complète du service d’ingestion des listes de sanctions. Un dev qui le lit doit pouvoir :
- Comprendre les 4 étapes du pipeline (ingestion → normalisation → publication → signal)
- Implémenter chaque connecteur (OFAC, UN, EU, HMT, OpenSanctions, Dow Jones)
- Câbler le scheduler Quartz configurable depuis l’admin UI (cf §3 sanctions-admin-ui.md)
- Gérer les 3 modes de déploiement (SaaS, on-prem connecté, on-prem air-gap)
- Mesurer la santé du service (Prometheus + alertes)
1. Vue d’ensemble
Section intitulée « 1. Vue d’ensemble »Flow nominal :
- Quartz lit
sanctions_sourceau boot et au reload (signal Kafkasanctions.schedule.updatedou polling 60 s). - À l’heure cron, Temporal
CronWorkflowlanceIngestSourceActivity→ connector → fetch HTTP/XML/JSON. - Connector parse le format source →
RawEntity(modèle source-specific). NormalizermappeRawEntity→NormalizedEntity(format VKL v1, schéma FtM-compatible).Publisherupsert dans OpenSearch (indexsanctions-{source}-active) + snapshot MinIO signé Ed25519.sanctions_list_runenregistre run avec stats (added/updated/removed).- Kafka emit
vitakyc.sanctions.lists.updated→sanctions-svcrafraîchit son cache. audit-svcreçoit chaque étape signée (HMAC chain).
2. Modèle de données
Section intitulée « 2. Modèle de données »2.1 Schéma PostgreSQL
Section intitulée « 2.1 Schéma PostgreSQL »-- Migration V1__sanctions_etl.sql
CREATE TABLE sanctions_source ( source_id VARCHAR(32) PRIMARY KEY, -- ex : 'OFAC_SDN', 'OFAC_CONSOLIDATED', 'UN_SC', 'EU_CFSP', 'HMT_OFSI', -- 'OPENSANCTIONS', 'DOW_JONES', 'BCT_TN' display_name VARCHAR(128) NOT NULL, kind VARCHAR(16) NOT NULL CHECK (kind IN ('PUBLIC','COMMERCIAL','REGIONAL')), url TEXT NOT NULL, format VARCHAR(16) NOT NULL CHECK (format IN ('XML','JSON','CSV','FtM_JSON')), enabled BOOLEAN NOT NULL DEFAULT true, cron_full VARCHAR(64) NOT NULL, -- Quartz 6-field, ex '0 0 2 ? * SUN' (full sync hebdo) cron_delta VARCHAR(64) NOT NULL, -- ex '0 0 */4 * * ?' (delta toutes les 4h) air_gap_path TEXT, -- file path local pour mode air-gap last_run_at TIMESTAMPTZ, last_run_id UUID, last_status VARCHAR(16) CHECK (last_status IN ('success','failed','running','skipped')), last_count INTEGER, next_fire_at TIMESTAMPTZ, list_version VARCHAR(32), -- ex 'v2026-04-27.a' updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_by VARCHAR(128) NOT NULL DEFAULT 'system');
CREATE TABLE sanctions_list_run ( run_id UUID PRIMARY KEY, source_id VARCHAR(32) NOT NULL REFERENCES sanctions_source(source_id), kind VARCHAR(8) NOT NULL CHECK (kind IN ('full','delta','retry','manual')), triggered_by VARCHAR(128) NOT NULL, -- 'cron' ou 'manual:user@domain' started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), finished_at TIMESTAMPTZ, duration_ms INTEGER, status VARCHAR(16) NOT NULL CHECK (status IN ('running','success','failed','partial')), fetched_count INTEGER, parsed_count INTEGER, added_count INTEGER, updated_count INTEGER, removed_count INTEGER, error_message TEXT, snapshot_uri TEXT, -- minio://sanctions-snapshots/<source>/<runId>.vkl.json.sig list_version VARCHAR(32));CREATE INDEX run_source_started ON sanctions_list_run(source_id, started_at DESC);CREATE INDEX run_status_started ON sanctions_list_run(status, started_at DESC) WHERE status IN ('failed','partial');
CREATE TABLE sanctions_schedule_audit ( audit_id UUID PRIMARY KEY, source_id VARCHAR(32) NOT NULL REFERENCES sanctions_source(source_id), changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), changed_by VARCHAR(128) NOT NULL, field VARCHAR(32) NOT NULL, -- 'cron_full' | 'cron_delta' | 'enabled' | 'url' ... old_value TEXT, new_value TEXT, approval_user_id VARCHAR(128), -- dual-control : 2e signataire chain_hash CHAR(64) NOT NULL -- HMAC chain (cf audit-svc));2.2 Bootstrap initial (Flyway V1)
Section intitulée « 2.2 Bootstrap initial (Flyway V1) »8 sources pré-configurées (toutes enabled = false par défaut, l’admin les active explicitement) :
source_id | display_name | kind | cron_full | cron_delta |
|---|---|---|---|---|
OFAC_SDN | OFAC SDN | PUBLIC | 0 0 2 ? * SUN | 0 0 */4 * * ? |
OFAC_CONSOLIDATED | OFAC Consolidated (non-SDN) | PUBLIC | 0 0 3 ? * SUN | 0 0 */6 * * ? |
UN_SC | UN Security Council Consolidated | PUBLIC | 0 0 4 ? * SUN | 0 0 */12 * * ? |
EU_CFSP | EU CFSP Consolidated | PUBLIC | 0 0 5 ? * SUN | 0 0 */6 * * ? |
HMT_OFSI | UK HMT OFSI Consolidated | PUBLIC | 0 0 6 ? * SUN | 0 0 */6 * * ? |
OPENSANCTIONS | OpenSanctions agrégat (free) | PUBLIC | 0 0 7 ? * SUN | 0 0 */6 * * ? |
DOW_JONES | Dow Jones Watchlist (option) | COMMERCIAL | 0 0 1 ? * SUN | 0 0 */2 * * ? |
BCT_TN | BCT Tunisie liste nationale | REGIONAL | 0 0 8 ? * MON | - (pas de delta) |
3. Format VKL v1 (VitaKYC Lists)
Section intitulée « 3. Format VKL v1 (VitaKYC Lists) »Source de vérité : ADR-006 §“Format VKL v1”. Récap normatif :
{ "$schema": "https://schemas.vitakyc.io/vkl/v1.json", "list_version": "v2026-05-02.a", "source": "OFAC_SDN", "kind": "full", "fetched_at": "2026-05-02T02:00:00Z", "signature": { "alg": "Ed25519", "key_id": "vitakyc-vkl-2026", "sig": "BASE64..." }, "entities": [ { "ftm_id": "ofac-sdn-32310", "schema": "Person", "primary_name": "Vladimir Putin", "akas": ["Vladimir Vladimirovich Putin", "Путин Владимир Владимирович"], "dates_of_birth": ["1952-10-07"], "places_of_birth": ["Leningrad, USSR"], "citizenships": ["RU"], "categories": ["SAN", "PEP"], "source_lists_hits": [ { "list": "OFAC-SDN", "version": "2026-04-27.a", "added_at": "2022-02-25" } ], "addresses": ["Moscow, Russia"], "associations": [ { "target_ftm_id": "ofac-sdn-32311", "type": "FAMILY", "label": "daughter" } ] } ]}Le format est canonicalisé (RFC 8785) avant signature pour garantir la reproductibilité.
4. Connectors par source
Section intitulée « 4. Connectors par source »4.1 Interface commune
Section intitulée « 4.1 Interface commune »interface SanctionsConnector { val sourceId: String val format: SourceFormat // XML, JSON, CSV, FtM_JSON
/** Télécharge le payload brut depuis l'URL (ou lit depuis air-gap path). */ suspend fun fetch(): RawSanctionsBundle
/** Parse le payload brut vers une liste d'entités source-specific. */ suspend fun parse(bundle: RawSanctionsBundle): List<RawEntity>}
data class RawSanctionsBundle( val sourceId: String, val fetchedAt: Instant, val rawBytes: ByteArray, val sourceVersion: String?, // ex: "2026-04-27.a" depuis header HTTP ou meta XML val sha256: String // pour idempotence)
interface VklNormalizer { /** Mappe une RawEntity (source-specific) vers NormalizedEntity (VKL v1). */ fun normalize(raw: RawEntity, sourceId: String): NormalizedEntity}4.2 Sources publiques (livrées MVP)
Section intitulée « 4.2 Sources publiques (livrées MVP) »| Connector | Endpoint | Format | Particularités |
|---|---|---|---|
OfacSdnConnector | https://www.treasury.gov/ofac/downloads/sdn.xml | XML | + sdn_advanced.xml pour AKA enrichis ; ETag-based 304 |
UnConsolidatedConnector | https://scsanctions.un.org/resources/xml/en/consolidated.xml | XML | parsing UN-style schema, support arabe/cyrillique |
EuCfspConnector | https://webgate.ec.europa.eu/fsd/fsf/public/files/xmlFullSanctionsList/content | XML | nécessite header Accept-Language: fr pour les regimes EU |
HmtOfsiConnector | https://ofsistorage.blob.core.windows.net/publishlive/2022format/ConList.xml | XML | format UK depuis 2022 |
OpenSanctionsConnector | https://data.opensanctions.org/datasets/latest/sanctions/entities.ftm.json | FtM JSON | format FtM natif, mapping direct vers VKL v1 |
4.3 Sources commerciales (option)
Section intitulée « 4.3 Sources commerciales (option) »| Connector | Endpoint | Format | Note |
|---|---|---|---|
DowJonesConnector | https://api.dowjones.com/risk/sanctions/v1/... | JSON | OAuth2, key client-side, ~3,8M entités |
4.4 Sources régionales (V1)
Section intitulée « 4.4 Sources régionales (V1) »| Connector | Endpoint | Format | Note |
|---|---|---|---|
BctTunisiaConnector | https://www.bct.gov.tn/... (PDF) | PDF/scrape | ingestion manuelle V0 ; scrape OCR V1 |
4.5 Mode air-gap
Section intitulée « 4.5 Mode air-gap »Si air_gap_path est défini sur la source, le connector lit le fichier local (/var/lib/vitakyc/sanctions/<source_id>.xml) au lieu de faire un appel HTTP. Le fichier est typiquement déposé via :
- transfert sFTP one-way (banque vers VitaKYC, signed)
- USB chiffré (rotation hebdo/mensuel)
- Bundle signé
vkl-bundle-2026-05-02.tar.gz.sig
Le connector vérifie la signature Ed25519 avant parsing.
5. Scheduler configurable
Section intitulée « 5. Scheduler configurable »5.1 Architecture
Section intitulée « 5.1 Architecture »5.2 API REST
Section intitulée « 5.2 API REST »| Méthode | Endpoint | Auth | Description |
|---|---|---|---|
GET | /v1/sanctions/etl/sources | bearer compliance | tous les sources avec leur état |
GET | /v1/sanctions/etl/sources/:sourceId | bearer compliance | détail d’une source |
PATCH | /v1/sanctions/etl/sources/:sourceId | bearer DSI + compliance (dual) | update cron_full, cron_delta, enabled, url, air_gap_path |
POST | `/v1/sanctions/etl/sources/:sourceId/trigger?kind=full | delta` | bearer DSI |
GET | /v1/sanctions/etl/runs?sourceId=&status=&since=&limit= | bearer compliance | historique des runs (pagination) |
GET | /v1/sanctions/etl/runs/:runId | bearer compliance | détail run (logs + stats) |
GET | /v1/sanctions/etl/schedule | bearer compliance | vue plannification par jour/heure (utilisée par Cron calendar UI) |
GET | /v1/sanctions/etl/snapshots/:sourceId | bearer compliance | liste des snapshots MinIO (10 conservés) |
POST | /v1/sanctions/etl/snapshots/:sourceId/:runId/download | bearer DSI | URL pré-signée MinIO (TTL 5 min) |
5.3 Validation cron
Section intitulée « 5.3 Validation cron »Côté serveur, validation Quartz stricte :
fun validateQuartzCron(expr: String): ValidationResult { return runCatching { CronExpression.validateExpression(expr); CronExpression(expr) } .map { ValidationResult.Ok(it.getNextValidTimeAfter(Instant.now())) } .getOrElse { ValidationResult.Invalid(it.message ?: "invalid cron") }}Refus :
- expression < 1 minute d’écart entre 2 fires (anti-DDoS sources externes)
- chevauchement avec un autre cron du même source (full + delta sur le même slot)
- crons trop fréquents pour OFAC (max 1/h pour respecter rate limit US Treasury)
5.4 Dual-control sur PATCH
Section intitulée « 5.4 Dual-control sur PATCH »Toute modification est protégée par dual-control (2 signataires distincts, rôles différents) :
PATCH /v1/sanctions/etl/sources/OFAC_SDNX-Approval-User-Id: leila.dubois@example.tnX-Approval-Signature: <base64 ed25519 sig>Content-Type: application/json
{ "cron_delta": "0 0 */2 * * ?", "approval_reason": "BCT request — increase frequency" }Le serveur :
- Vérifie que l’auteur (X-User-Id JWT) et l’approbateur (X-Approval-User-Id) sont distincts
- Vérifie que l’un a rôle
complianceET l’autredsi(ouaudit) - Vérifie la signature Ed25519 sur
(sourceId, field, new_value, ts)avec la clé publique de l’approbateur - Insert audit row dans
sanctions_schedule_audit - Émet event Kafka pour reload du Quartz scheduler
5.5 Reload du scheduler
Section intitulée « 5.5 Reload du scheduler »Le scheduler tourne en mémoire. Pour rafraîchir après un PATCH :
- Option A (retenue) : event Kafka
vitakyc.sanctions.schedule.updatedconsommé par chaque pod sanctions-etl-svc →Scheduler.rescheduleJob() - Option B (fallback) : polling
sanctions_sourcetoutes les 60 s (pour les déploiements sans Kafka)
Les jobs en cours de fire au moment du reload continuent jusqu’au bout ; le reload n’affecte que les fires futurs.
6. Publisher
Section intitulée « 6. Publisher »6.1 OpenSearch
Section intitulée « 6.1 OpenSearch »Pour chaque source, un alias sanctions-{source}-active pointe vers l’index actif. Le publisher fait :
- Crée un nouvel index
sanctions-{source}-{timestamp}(suffixe =runId) - Bulk
_indexdes entités normalisées (multi-fields ICU + phonetic + n-gram cf ADR-030) - Vérifie le count via
GET _count→ si OK - Réoriente l’alias
sanctions-{source}-activevers le nouvel index (POST _aliasesatomique) - Marque l’ancien index
delete_after = now() + 30j(rétention)
6.2 Snapshots MinIO
Section intitulée « 6.2 Snapshots MinIO »Chaque run produit un snapshot signé :
- Path :
s3://sanctions-snapshots/<source_id>/<run_id>.vkl.json.sig - Bundle = JSON canonical + signature Ed25519 detached
- Rétention : 10 snapshots glissants par source (purge auto)
- Utilisé pour :
- Audit BCT (rejeu d’un screening N mois plus tard)
- Distribution on-prem air-gap (export hebdo via job dédié)
6.3 Event Kafka
Section intitulée « 6.3 Event Kafka »Topic : vitakyc.sanctions.lists.updated (key = sourceId).
Payload :
{ "schema": "sanctions.lists.updated.v1", "source_id": "OFAC_SDN", "list_version": "v2026-05-02.a", "run_id": "...", "kind": "full", "added_count": 12, "updated_count": 47, "removed_count": 3, "completed_at": "2026-05-02T02:14:33Z"}sanctions-svc consomme cet event pour invalider son cache local (Caffeine 5 min TTL) et recharger les hot lists si besoin.
7. Audit signed
Section intitulée « 7. Audit signed »Chaque étape critique émet un event vers audit-svc (cf page Audit log) :
| Action | Severity | Trigger |
|---|---|---|
SANCTIONS_SOURCE_FETCHED | INFO | Connector.fetch() succès (size, sha256, source_version) |
SANCTIONS_LIST_PUBLISHED | INFO | OpenSearch alias swapped |
SANCTIONS_LIST_FAILED | ALERT | run failed après tous les retries |
SANCTIONS_SCHEDULE_CHANGED | NOTICE | PATCH d’un cron — chaîne avec approval_user_id |
SANCTIONS_MANUAL_TRIGGER | NOTICE | POST /trigger |
SANCTIONS_SNAPSHOT_DOWNLOADED | NOTICE | URL MinIO pré-signée téléchargée |
8. Modes de déploiement
Section intitulée « 8. Modes de déploiement »8.1 SaaS (par défaut)
Section intitulée « 8.1 SaaS (par défaut) »- Connecteurs HTTP actifs, pulls directs depuis sources officielles
- OpenSearch managé (cluster VitaKYC)
- MinIO managé (S3 compatible)
- Kafka multi-tenant
- Quartz lit
sanctions_sourceglobal (pas par tenant — listes communes)
8.2 On-prem connecté
Section intitulée « 8.2 On-prem connecté »- Pulls directs depuis sources officielles (le réseau banque autorise les FQDN de Treasury, EU, UN, OFSI)
- OpenSearch + MinIO + Kafka tournent dans le cluster banque
- Pas de différence fonctionnelle vs SaaS
8.3 On-prem air-gap
Section intitulée « 8.3 On-prem air-gap »enabled = truemaisair_gap_path = '/var/lib/vitakyc/sanctions/<source_id>.xml'- Le connector lit le fichier local au lieu d’HTTP
- Bundle signed transféré via :
- sFTP one-way (depuis VitaKYC vers le client) — recommandé
- USB chiffré (rotation hebdo/mensuelle)
- VKL bundle signé Ed25519, signature vérifiée par le client avant parsing
- Métrique
sanctions_etl_air_gap_lag_hoursexposée pour monitoring de la fraîcheur
9. Performance et capacité
Section intitulée « 9. Performance et capacité »| Metric | Cible MVP | Cible V2 |
|---|---|---|
| Latence p95 ingestion OFAC SDN (~13K entités) | ≤ 90 s | ≤ 45 s |
| Latence p95 ingestion OpenSanctions (~52K entités) | ≤ 6 min | ≤ 3 min |
| Latence p95 ingestion Dow Jones (~3,8M entités) | ≤ 45 min | ≤ 20 min |
| Disponibilité du service | 99,5 % | 99,9 % |
| Lag entre source officielle et OpenSearch (delta) | ≤ 4h | ≤ 1h |
| Échec en série tolérée avant alerte | 2 runs consécutives | 1 run |
10. Sécurité
Section intitulée « 10. Sécurité »- Signatures Ed25519 : VKL bundles + audit chain. Clés stockées dans Vault, rotées tous les 12 mois.
- Vérification SHA-256 : entre fetch et parse pour détecter altération réseau.
- Rate limiting : respect des limits sources (OFAC max 1/h, EU 6/jour, etc.). Throttling natif dans Quartz.
- mTLS : entre
sanctions-etl-svcetaudit-svc/opensearch. - Anti-DDoS : un cron qui boucle est détecté et désactivé automatiquement (3 fires < 1 min → kill).
- Dual-control : tout PATCH ou trigger manuel exige 2 signataires distincts.
- Rétention : runs en DB 90 jours, snapshots MinIO 10 versions, audit events 10 ans WORM.
11. Conformité
Section intitulée « 11. Conformité »| Obligation | Couverture |
|---|---|
| BCT 2024 : reproductibilité screening N+10 ans | Snapshots MinIO + audit signed ✓ |
| FATF Reco. 6 (sanctions financières ciblées) | Sources officielles UN, OFAC, EU, HMT ✓ |
| RGPD art. 17 (droit à l’oubli) | Les données sanctions sont de la donnée publique légalement requise — exception RGPD art. 17.3 (obligation légale). Documenté dans la DPIA. |
| OFAC 50% rule | Couverte indirectement par OpenSanctions agg. (entités liées à > 50%) ; à ré-exécuter manuellement par le compliance officer si OFAC publie une advisory. |
12. Tests
Section intitulée « 12. Tests »12.1 Unit (≥ 25 tests)
Section intitulée « 12.1 Unit (≥ 25 tests) »- Validation Quartz cron : 6-field, 5-field rejeté, intervals min/max
- Normalizer : OFAC/UN/EU/HMT → VKL v1 (1 test par source + roundtrip)
- Connector : parsing XML/JSON sample fixtures (golden tests)
- Dual-control : refuser quand auteur == approbateur ; refuser sans rôle DSI ; refuser sans signature
- Idempotence : re-fetch même
sourceVersion→ skip avec statusskipped - Reload scheduler sur event Kafka
12.2 Integration (≥ 10 tests)
Section intitulée « 12.2 Integration (≥ 10 tests) »- Testcontainers : PostgreSQL + OpenSearch + MinIO + Kafka
- Run E2E ingestion OFAC sample (XML fixture local) → OpenSearch index → alias swap → snapshot signed
- Trigger manuel → audit row inséré + Kafka event émis
- Mode air-gap : VKL bundle signed depuis
/tmp/airgap/ - Échec connector → retry 3× exponential → status
failed→ alerte audit
12.3 Load (V1)
Section intitulée « 12.3 Load (V1) »- Simuler ingestion Dow Jones (3,8M entités) — vérifier mémoire pic < 4 Go, latence < 45 min
- 100 PATCH concurrents sur cron → reload Quartz cohérent
13. Plan migration MVP → V2
Section intitulée « 13. Plan migration MVP → V2 »| Item | MVP | V2 |
|---|---|---|
| Sources publiques (OFAC, UN, EU, HMT, OpenSanctions) | ✓ | ✓ |
| Source régionale BCT TN | scrape manuel | scrape OCR auto |
| Dow Jones | adapter livré, désactivé par défaut | activé selon licence client |
| Quartz scheduler | embarqué dans bio-svc pod | extrait en service dédié multi-instance |
| Reload scheduler | Kafka event | Temporal SignalChannel |
| Snapshots | MinIO | + Cosign + transparency log |
| Air-gap distribution | sFTP manuel | bundle CDN signé + auto-pull script client |
| OFAC 50% rule | manual advisory | auto-derived via OpenSanctions edges |
14. Checklist go-live MVP
Section intitulée « 14. Checklist go-live MVP »- Tables Flyway
sanctions_source+sanctions_list_run+sanctions_schedule_auditmigrées - 6 connecteurs publics opérationnels (OFAC SDN, OFAC Cons, UN, EU, HMT, OpenSanctions) — fixtures XML/JSON sample testés
- Quartz scheduler boot lit
sanctions_sourcecorrectement - API REST 9 endpoints, dual-control PATCH
- Sanctions admin UI (page 12.1, 12.2) consomme
sanctions-etl-svcAPI - Audit signed
SANCTIONS_*events émis et chainés - OpenSearch indices
sanctions-{source}-activepeuplés - MinIO snapshots signés Ed25519, vérifiables côté client
- Kafka event
vitakyc.sanctions.lists.updatedémis et consommé parsanctions-svc - Mode air-gap testé sur 1 tenant pilote on-prem
- Métriques Prometheus :
sanctions_etl_runs_total,sanctions_etl_run_duration_seconds,sanctions_etl_entities_indexed,sanctions_etl_air_gap_lag_hours - Runbook on-call (cf Sanctions monitoring) couvre fail OFAC, fail EU, fail OpenSearch, conflit cron
15. Références
Section intitulée « 15. Références »- ADRs : ADR-001, ADR-002, ADR-006, ADR-030
- Pages : Sanctions admin UI (UI consommatrice §3 Cron calendar), Sanctions screening (moteur de matching), POC sanctions matcher (algos déterministes)
- Standards : OFAC SDN data, UN Consolidated List, EU CFSP Sanctions, OpenSanctions schema, Quartz cron format
- Spec format : VKL v1 (cf ADR-006 §“Format
VKL v1”)
Document de spec sanctions-etl-svc — version 1.0 (2026-05-02). Mises à jour bloquantes nécessitent un ADR.