Aller au contenu

sanctions-etl-svc — ingestion + scheduler configurable des listes

Module : platform/sanctions-etl-svc/ (Ktor + Quartz + Temporal CronWorkflow). Producer downstream sanctions-svc (cf page engineering) et sanctions-admin-ui (cf console admin).

ADRs : ADR-006 (stratégie pipeline + air-gap), ADR-030 (moteur de matching), ADR-001 (Temporal pour orchestration), ADR-002 (multi-tenant RLS).

Pages liées : Sanctions screening — moteur, Sanctions admin UI, POC sanctions matcher, Runbook SRE sanctions.

Cette page fixe la spec engineering complète du service d’ingestion des listes de sanctions. Un dev qui le lit doit pouvoir :

  1. Comprendre les 4 étapes du pipeline (ingestion → normalisation → publication → signal)
  2. Implémenter chaque connecteur (OFAC, UN, EU, HMT, OpenSanctions, Dow Jones)
  3. Câbler le scheduler Quartz configurable depuis l’admin UI (cf §3 sanctions-admin-ui.md)
  4. Gérer les 3 modes de déploiement (SaaS, on-prem connecté, on-prem air-gap)
  5. Mesurer la santé du service (Prometheus + alertes)

Flow nominal :

  1. Quartz lit sanctions_source au boot et au reload (signal Kafka sanctions.schedule.updated ou polling 60 s).
  2. À l’heure cron, Temporal CronWorkflow lance IngestSourceActivity → connector → fetch HTTP/XML/JSON.
  3. Connector parse le format source → RawEntity (modèle source-specific).
  4. Normalizer mappe RawEntityNormalizedEntity (format VKL v1, schéma FtM-compatible).
  5. Publisher upsert dans OpenSearch (index sanctions-{source}-active) + snapshot MinIO signé Ed25519.
  6. sanctions_list_run enregistre run avec stats (added/updated/removed).
  7. Kafka emit vitakyc.sanctions.lists.updatedsanctions-svc rafraîchit son cache.
  8. audit-svc reçoit chaque étape signée (HMAC chain).

-- Migration V1__sanctions_etl.sql
CREATE TABLE sanctions_source (
source_id VARCHAR(32) PRIMARY KEY,
-- ex : 'OFAC_SDN', 'OFAC_CONSOLIDATED', 'UN_SC', 'EU_CFSP', 'HMT_OFSI',
-- 'OPENSANCTIONS', 'DOW_JONES', 'BCT_TN'
display_name VARCHAR(128) NOT NULL,
kind VARCHAR(16) NOT NULL CHECK (kind IN ('PUBLIC','COMMERCIAL','REGIONAL')),
url TEXT NOT NULL,
format VARCHAR(16) NOT NULL CHECK (format IN ('XML','JSON','CSV','FtM_JSON')),
enabled BOOLEAN NOT NULL DEFAULT true,
cron_full VARCHAR(64) NOT NULL, -- Quartz 6-field, ex '0 0 2 ? * SUN' (full sync hebdo)
cron_delta VARCHAR(64) NOT NULL, -- ex '0 0 */4 * * ?' (delta toutes les 4h)
air_gap_path TEXT, -- file path local pour mode air-gap
last_run_at TIMESTAMPTZ,
last_run_id UUID,
last_status VARCHAR(16) CHECK (last_status IN ('success','failed','running','skipped')),
last_count INTEGER,
next_fire_at TIMESTAMPTZ,
list_version VARCHAR(32), -- ex 'v2026-04-27.a'
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_by VARCHAR(128) NOT NULL DEFAULT 'system'
);
CREATE TABLE sanctions_list_run (
run_id UUID PRIMARY KEY,
source_id VARCHAR(32) NOT NULL REFERENCES sanctions_source(source_id),
kind VARCHAR(8) NOT NULL CHECK (kind IN ('full','delta','retry','manual')),
triggered_by VARCHAR(128) NOT NULL, -- 'cron' ou 'manual:user@domain'
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
finished_at TIMESTAMPTZ,
duration_ms INTEGER,
status VARCHAR(16) NOT NULL CHECK (status IN ('running','success','failed','partial')),
fetched_count INTEGER,
parsed_count INTEGER,
added_count INTEGER,
updated_count INTEGER,
removed_count INTEGER,
error_message TEXT,
snapshot_uri TEXT, -- minio://sanctions-snapshots/<source>/<runId>.vkl.json.sig
list_version VARCHAR(32)
);
CREATE INDEX run_source_started ON sanctions_list_run(source_id, started_at DESC);
CREATE INDEX run_status_started ON sanctions_list_run(status, started_at DESC) WHERE status IN ('failed','partial');
CREATE TABLE sanctions_schedule_audit (
audit_id UUID PRIMARY KEY,
source_id VARCHAR(32) NOT NULL REFERENCES sanctions_source(source_id),
changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
changed_by VARCHAR(128) NOT NULL,
field VARCHAR(32) NOT NULL, -- 'cron_full' | 'cron_delta' | 'enabled' | 'url' ...
old_value TEXT,
new_value TEXT,
approval_user_id VARCHAR(128), -- dual-control : 2e signataire
chain_hash CHAR(64) NOT NULL -- HMAC chain (cf audit-svc)
);

8 sources pré-configurées (toutes enabled = false par défaut, l’admin les active explicitement) :

source_iddisplay_namekindcron_fullcron_delta
OFAC_SDNOFAC SDNPUBLIC0 0 2 ? * SUN0 0 */4 * * ?
OFAC_CONSOLIDATEDOFAC Consolidated (non-SDN)PUBLIC0 0 3 ? * SUN0 0 */6 * * ?
UN_SCUN Security Council ConsolidatedPUBLIC0 0 4 ? * SUN0 0 */12 * * ?
EU_CFSPEU CFSP ConsolidatedPUBLIC0 0 5 ? * SUN0 0 */6 * * ?
HMT_OFSIUK HMT OFSI ConsolidatedPUBLIC0 0 6 ? * SUN0 0 */6 * * ?
OPENSANCTIONSOpenSanctions agrégat (free)PUBLIC0 0 7 ? * SUN0 0 */6 * * ?
DOW_JONESDow Jones Watchlist (option)COMMERCIAL0 0 1 ? * SUN0 0 */2 * * ?
BCT_TNBCT Tunisie liste nationaleREGIONAL0 0 8 ? * MON- (pas de delta)

Source de vérité : ADR-006 §“Format VKL v1”. Récap normatif :

{
"$schema": "https://schemas.vitakyc.io/vkl/v1.json",
"list_version": "v2026-05-02.a",
"source": "OFAC_SDN",
"kind": "full",
"fetched_at": "2026-05-02T02:00:00Z",
"signature": {
"alg": "Ed25519",
"key_id": "vitakyc-vkl-2026",
"sig": "BASE64..."
},
"entities": [
{
"ftm_id": "ofac-sdn-32310",
"schema": "Person",
"primary_name": "Vladimir Putin",
"akas": ["Vladimir Vladimirovich Putin", "Путин Владимир Владимирович"],
"dates_of_birth": ["1952-10-07"],
"places_of_birth": ["Leningrad, USSR"],
"citizenships": ["RU"],
"categories": ["SAN", "PEP"],
"source_lists_hits": [
{ "list": "OFAC-SDN", "version": "2026-04-27.a", "added_at": "2022-02-25" }
],
"addresses": ["Moscow, Russia"],
"associations": [
{ "target_ftm_id": "ofac-sdn-32311", "type": "FAMILY", "label": "daughter" }
]
}
]
}

Le format est canonicalisé (RFC 8785) avant signature pour garantir la reproductibilité.


interface SanctionsConnector {
val sourceId: String
val format: SourceFormat // XML, JSON, CSV, FtM_JSON
/** Télécharge le payload brut depuis l'URL (ou lit depuis air-gap path). */
suspend fun fetch(): RawSanctionsBundle
/** Parse le payload brut vers une liste d'entités source-specific. */
suspend fun parse(bundle: RawSanctionsBundle): List<RawEntity>
}
data class RawSanctionsBundle(
val sourceId: String,
val fetchedAt: Instant,
val rawBytes: ByteArray,
val sourceVersion: String?, // ex: "2026-04-27.a" depuis header HTTP ou meta XML
val sha256: String // pour idempotence
)
interface VklNormalizer {
/** Mappe une RawEntity (source-specific) vers NormalizedEntity (VKL v1). */
fun normalize(raw: RawEntity, sourceId: String): NormalizedEntity
}
ConnectorEndpointFormatParticularités
OfacSdnConnectorhttps://www.treasury.gov/ofac/downloads/sdn.xmlXML+ sdn_advanced.xml pour AKA enrichis ; ETag-based 304
UnConsolidatedConnectorhttps://scsanctions.un.org/resources/xml/en/consolidated.xmlXMLparsing UN-style schema, support arabe/cyrillique
EuCfspConnectorhttps://webgate.ec.europa.eu/fsd/fsf/public/files/xmlFullSanctionsList/contentXMLnécessite header Accept-Language: fr pour les regimes EU
HmtOfsiConnectorhttps://ofsistorage.blob.core.windows.net/publishlive/2022format/ConList.xmlXMLformat UK depuis 2022
OpenSanctionsConnectorhttps://data.opensanctions.org/datasets/latest/sanctions/entities.ftm.jsonFtM JSONformat FtM natif, mapping direct vers VKL v1
ConnectorEndpointFormatNote
DowJonesConnectorhttps://api.dowjones.com/risk/sanctions/v1/...JSONOAuth2, key client-side, ~3,8M entités
ConnectorEndpointFormatNote
BctTunisiaConnectorhttps://www.bct.gov.tn/... (PDF)PDF/scrapeingestion manuelle V0 ; scrape OCR V1

Si air_gap_path est défini sur la source, le connector lit le fichier local (/var/lib/vitakyc/sanctions/<source_id>.xml) au lieu de faire un appel HTTP. Le fichier est typiquement déposé via :

  • transfert sFTP one-way (banque vers VitaKYC, signed)
  • USB chiffré (rotation hebdo/mensuel)
  • Bundle signé vkl-bundle-2026-05-02.tar.gz.sig

Le connector vérifie la signature Ed25519 avant parsing.


MéthodeEndpointAuthDescription
GET/v1/sanctions/etl/sourcesbearer compliancetous les sources avec leur état
GET/v1/sanctions/etl/sources/:sourceIdbearer compliancedétail d’une source
PATCH/v1/sanctions/etl/sources/:sourceIdbearer DSI + compliance (dual)update cron_full, cron_delta, enabled, url, air_gap_path
POST`/v1/sanctions/etl/sources/:sourceId/trigger?kind=fulldelta`bearer DSI
GET/v1/sanctions/etl/runs?sourceId=&status=&since=&limit=bearer compliancehistorique des runs (pagination)
GET/v1/sanctions/etl/runs/:runIdbearer compliancedétail run (logs + stats)
GET/v1/sanctions/etl/schedulebearer compliancevue plannification par jour/heure (utilisée par Cron calendar UI)
GET/v1/sanctions/etl/snapshots/:sourceIdbearer complianceliste des snapshots MinIO (10 conservés)
POST/v1/sanctions/etl/snapshots/:sourceId/:runId/downloadbearer DSIURL pré-signée MinIO (TTL 5 min)

Côté serveur, validation Quartz stricte :

fun validateQuartzCron(expr: String): ValidationResult {
return runCatching { CronExpression.validateExpression(expr); CronExpression(expr) }
.map { ValidationResult.Ok(it.getNextValidTimeAfter(Instant.now())) }
.getOrElse { ValidationResult.Invalid(it.message ?: "invalid cron") }
}

Refus :

  • expression < 1 minute d’écart entre 2 fires (anti-DDoS sources externes)
  • chevauchement avec un autre cron du même source (full + delta sur le même slot)
  • crons trop fréquents pour OFAC (max 1/h pour respecter rate limit US Treasury)

Toute modification est protégée par dual-control (2 signataires distincts, rôles différents) :

PATCH /v1/sanctions/etl/sources/OFAC_SDN
X-Approval-User-Id: leila.dubois@example.tn
X-Approval-Signature: <base64 ed25519 sig>
Content-Type: application/json
{ "cron_delta": "0 0 */2 * * ?", "approval_reason": "BCT request — increase frequency" }

Le serveur :

  1. Vérifie que l’auteur (X-User-Id JWT) et l’approbateur (X-Approval-User-Id) sont distincts
  2. Vérifie que l’un a rôle compliance ET l’autre dsi (ou audit)
  3. Vérifie la signature Ed25519 sur (sourceId, field, new_value, ts) avec la clé publique de l’approbateur
  4. Insert audit row dans sanctions_schedule_audit
  5. Émet event Kafka pour reload du Quartz scheduler

Le scheduler tourne en mémoire. Pour rafraîchir après un PATCH :

  • Option A (retenue) : event Kafka vitakyc.sanctions.schedule.updated consommé par chaque pod sanctions-etl-svc → Scheduler.rescheduleJob()
  • Option B (fallback) : polling sanctions_source toutes les 60 s (pour les déploiements sans Kafka)

Les jobs en cours de fire au moment du reload continuent jusqu’au bout ; le reload n’affecte que les fires futurs.


Pour chaque source, un alias sanctions-{source}-active pointe vers l’index actif. Le publisher fait :

  1. Crée un nouvel index sanctions-{source}-{timestamp} (suffixe = runId)
  2. Bulk _index des entités normalisées (multi-fields ICU + phonetic + n-gram cf ADR-030)
  3. Vérifie le count via GET _count → si OK
  4. Réoriente l’alias sanctions-{source}-active vers le nouvel index (POST _aliases atomique)
  5. Marque l’ancien index delete_after = now() + 30j (rétention)

Chaque run produit un snapshot signé :

  • Path : s3://sanctions-snapshots/<source_id>/<run_id>.vkl.json.sig
  • Bundle = JSON canonical + signature Ed25519 detached
  • Rétention : 10 snapshots glissants par source (purge auto)
  • Utilisé pour :
    • Audit BCT (rejeu d’un screening N mois plus tard)
    • Distribution on-prem air-gap (export hebdo via job dédié)

Topic : vitakyc.sanctions.lists.updated (key = sourceId).

Payload :

{
"schema": "sanctions.lists.updated.v1",
"source_id": "OFAC_SDN",
"list_version": "v2026-05-02.a",
"run_id": "...",
"kind": "full",
"added_count": 12,
"updated_count": 47,
"removed_count": 3,
"completed_at": "2026-05-02T02:14:33Z"
}

sanctions-svc consomme cet event pour invalider son cache local (Caffeine 5 min TTL) et recharger les hot lists si besoin.


Chaque étape critique émet un event vers audit-svc (cf page Audit log) :

ActionSeverityTrigger
SANCTIONS_SOURCE_FETCHEDINFOConnector.fetch() succès (size, sha256, source_version)
SANCTIONS_LIST_PUBLISHEDINFOOpenSearch alias swapped
SANCTIONS_LIST_FAILEDALERTrun failed après tous les retries
SANCTIONS_SCHEDULE_CHANGEDNOTICEPATCH d’un cron — chaîne avec approval_user_id
SANCTIONS_MANUAL_TRIGGERNOTICEPOST /trigger
SANCTIONS_SNAPSHOT_DOWNLOADEDNOTICEURL MinIO pré-signée téléchargée

  • Connecteurs HTTP actifs, pulls directs depuis sources officielles
  • OpenSearch managé (cluster VitaKYC)
  • MinIO managé (S3 compatible)
  • Kafka multi-tenant
  • Quartz lit sanctions_source global (pas par tenant — listes communes)
  • Pulls directs depuis sources officielles (le réseau banque autorise les FQDN de Treasury, EU, UN, OFSI)
  • OpenSearch + MinIO + Kafka tournent dans le cluster banque
  • Pas de différence fonctionnelle vs SaaS
  • enabled = true mais air_gap_path = '/var/lib/vitakyc/sanctions/<source_id>.xml'
  • Le connector lit le fichier local au lieu d’HTTP
  • Bundle signed transféré via :
    • sFTP one-way (depuis VitaKYC vers le client) — recommandé
    • USB chiffré (rotation hebdo/mensuelle)
  • VKL bundle signé Ed25519, signature vérifiée par le client avant parsing
  • Métrique sanctions_etl_air_gap_lag_hours exposée pour monitoring de la fraîcheur

MetricCible MVPCible V2
Latence p95 ingestion OFAC SDN (~13K entités)≤ 90 s≤ 45 s
Latence p95 ingestion OpenSanctions (~52K entités)≤ 6 min≤ 3 min
Latence p95 ingestion Dow Jones (~3,8M entités)≤ 45 min≤ 20 min
Disponibilité du service99,5 %99,9 %
Lag entre source officielle et OpenSearch (delta)≤ 4h≤ 1h
Échec en série tolérée avant alerte2 runs consécutives1 run

  • Signatures Ed25519 : VKL bundles + audit chain. Clés stockées dans Vault, rotées tous les 12 mois.
  • Vérification SHA-256 : entre fetch et parse pour détecter altération réseau.
  • Rate limiting : respect des limits sources (OFAC max 1/h, EU 6/jour, etc.). Throttling natif dans Quartz.
  • mTLS : entre sanctions-etl-svc et audit-svc/opensearch.
  • Anti-DDoS : un cron qui boucle est détecté et désactivé automatiquement (3 fires < 1 min → kill).
  • Dual-control : tout PATCH ou trigger manuel exige 2 signataires distincts.
  • Rétention : runs en DB 90 jours, snapshots MinIO 10 versions, audit events 10 ans WORM.

ObligationCouverture
BCT 2024 : reproductibilité screening N+10 ansSnapshots MinIO + audit signed ✓
FATF Reco. 6 (sanctions financières ciblées)Sources officielles UN, OFAC, EU, HMT ✓
RGPD art. 17 (droit à l’oubli)Les données sanctions sont de la donnée publique légalement requise — exception RGPD art. 17.3 (obligation légale). Documenté dans la DPIA.
OFAC 50% ruleCouverte indirectement par OpenSanctions agg. (entités liées à > 50%) ; à ré-exécuter manuellement par le compliance officer si OFAC publie une advisory.

  • Validation Quartz cron : 6-field, 5-field rejeté, intervals min/max
  • Normalizer : OFAC/UN/EU/HMT → VKL v1 (1 test par source + roundtrip)
  • Connector : parsing XML/JSON sample fixtures (golden tests)
  • Dual-control : refuser quand auteur == approbateur ; refuser sans rôle DSI ; refuser sans signature
  • Idempotence : re-fetch même sourceVersion → skip avec status skipped
  • Reload scheduler sur event Kafka
  • Testcontainers : PostgreSQL + OpenSearch + MinIO + Kafka
  • Run E2E ingestion OFAC sample (XML fixture local) → OpenSearch index → alias swap → snapshot signed
  • Trigger manuel → audit row inséré + Kafka event émis
  • Mode air-gap : VKL bundle signed depuis /tmp/airgap/
  • Échec connector → retry 3× exponential → status failed → alerte audit
  • Simuler ingestion Dow Jones (3,8M entités) — vérifier mémoire pic < 4 Go, latence < 45 min
  • 100 PATCH concurrents sur cron → reload Quartz cohérent

ItemMVPV2
Sources publiques (OFAC, UN, EU, HMT, OpenSanctions)
Source régionale BCT TNscrape manuelscrape OCR auto
Dow Jonesadapter livré, désactivé par défautactivé selon licence client
Quartz schedulerembarqué dans bio-svc podextrait en service dédié multi-instance
Reload schedulerKafka eventTemporal SignalChannel
SnapshotsMinIO+ Cosign + transparency log
Air-gap distributionsFTP manuelbundle CDN signé + auto-pull script client
OFAC 50% rulemanual advisoryauto-derived via OpenSanctions edges

  • Tables Flyway sanctions_source + sanctions_list_run + sanctions_schedule_audit migrées
  • 6 connecteurs publics opérationnels (OFAC SDN, OFAC Cons, UN, EU, HMT, OpenSanctions) — fixtures XML/JSON sample testés
  • Quartz scheduler boot lit sanctions_source correctement
  • API REST 9 endpoints, dual-control PATCH
  • Sanctions admin UI (page 12.1, 12.2) consomme sanctions-etl-svc API
  • Audit signed SANCTIONS_* events émis et chainés
  • OpenSearch indices sanctions-{source}-active peuplés
  • MinIO snapshots signés Ed25519, vérifiables côté client
  • Kafka event vitakyc.sanctions.lists.updated émis et consommé par sanctions-svc
  • Mode air-gap testé sur 1 tenant pilote on-prem
  • Métriques Prometheus : sanctions_etl_runs_total, sanctions_etl_run_duration_seconds, sanctions_etl_entities_indexed, sanctions_etl_air_gap_lag_hours
  • Runbook on-call (cf Sanctions monitoring) couvre fail OFAC, fail EU, fail OpenSearch, conflit cron


Document de spec sanctions-etl-svc — version 1.0 (2026-05-02). Mises à jour bloquantes nécessitent un ADR.