Aller au contenu

Sanctions screening — OpenSearch broad + re-ranker Kotlin + RCA

Module : sanctions-svc (microservice JVM Kotlin + OpenSearch packagé). Plus pipeline ETL sanctions-etl-svc.

ADRs : ADR-002 (RLS), ADR-006 (listes AML), ADR-030 (architecture).

POC : poc-sanctions-matcher (Kotlin pur, re-ranker + RCA, dataset OFAC SDN public).

Cette page fixe la spec d’ingénierie complète de sanctions-svc. Un développeur qui la lit doit pouvoir construire le module sans question résiduelle :

  1. Les sources de listes (publiques free + Dow Jones option) avec URL canoniques et formats
  2. Le pipeline ETL (parsing → normalisation FtM → bulk index OpenSearch + dénormalisation RCA)
  3. Le modèle d’indexation OpenSearch (mappings ICU + phonetic + n-gram + flattened RCA)
  4. L’algorithme de matching (broad search → re-ranker Kotlin → threshold par typologie)
  5. Le scoring RCA (≤ 2 sauts dénormalisés)
  6. L’audit log signé (modèle, signature Ed25519, reproductibilité)
  7. Les API REST + events Kafka
  8. Le packaging (embedded vs external cluster) + reproductibilité audit BCT

Flow nominal (~150 ms p95) :

  1. Pipeline ETL ingère les listes selon cadence (quotidien publiques, full-refresh hebdo + delta quotidien DJ).
  2. Au moment d’un screening, sanctions-svc reçoit une ScreeningQuery {name, dob?, nationality?, type, ...}.
  3. Pass 1 broad : OpenSearch retourne top-50 candidats avec multi-fields (ICU, phonetic, n-gram).
  4. Pass 2 re-rank : Kotlin recalcule un score consolidé déterministe (Jaro-Winkler + phonetic + DOB + alias).
  5. Pass 3 threshold : verdict par typologie (OFAC strict, PEP tolérant, RCA décay, adverse media manuel).
  6. Audit log signé (queryNormalisée, listVersion, topN, scores, rcaPath, signature) → Kafka sanctions.screening.completed.

SourceURLFormatCadenceVolume
OFAC SDNhttps://www.treasury.gov/ofac/downloads/sdn.xmlXMLquotidien~13 K
OFAC Consolidated (FSE, NS-PLC, etc.)https://www.treasury.gov/ofac/downloads/consolidated/consolidated.xmlXMLquotidien~7 K
UN Security Council Consolidatedhttps://scsanctions.un.org/resources/xml/en/consolidated.xmlXMLhebdomadaire~700
EU CFSPhttps://webgate.ec.europa.eu/fsd/fsfXML (token gratuit)quotidien~3 K
UK HM Treasury OFSI Consolidatedhttps://ofsistorage.blob.core.windows.net/publishlive/2022format/ConList.xmlXMLquotidien~7 K
World Bank Debarred Firmshttps://www.worldbank.org/en/projects-operations/procurement/debarred-firmsCSVmensuel~3 K
OpenSanctions agrégateurhttps://www.opensanctions.org/datasets/JSON / FtM / CSVquotidien~50 K (recouvre les autres)

Volume total après dédup : ~35 K entrées uniques. Index OpenSearch ~600 MB avec multi-fields ICU + phonetic.

Adapter optionnel activé par feature flag tenant. Détails à compléter à l’activation par le tenant (format de feed, identifiants, schéma RCA spécifique).

MétriqueValeur typique
Volume entités3,8 M (PEP, sanctioned, RCA, SI/adverse media, DI/disqualified)
Volume associations8 M edges (RCA family, business, address)
Cadencefull hebdo + delta quotidien
FormatXML feed propriétaire (parser dédié dj-feed-parser)
CategoriesSAN, PEP, RCA, SI, DI
DatasetsWatchlist consolidated, Adverse Media, State-Owned Enterprise

L’adapter DJ implémente l’interface SanctionsListAdapter (cf §3.2) au même titre que les listes publiques.


interface SanctionsListAdapter {
val sourceName: String // "OFAC_SDN", "UN_CONSOLIDATED", ...
val urlSource: String
val cadence: Cadence // DAILY, WEEKLY, MONTHLY, ON_DEMAND
val format: SourceFormat // XML, JSON, CSV, FtM_JSONL
fun fetch(lastModifiedAt: Instant?): SourcePayload // streaming si possible
fun parse(payload: SourcePayload): Sequence<RawEntity>
fun normalize(raw: RawEntity): NormalizedEntity // → schéma FtM
}
data class NormalizedEntity(
val ftmId: String, // hash canonique
val schema: FtmSchema, // Person, Company, Vessel, Aircraft, Position, ...
val primaryName: String,
val primaryNameFolded: String, // ICU folded
val akas: List<String>, // alias variants
val datesOfBirth: List<String>, // multi-DOB possibles
val citizenships: List<String>, // ISO 3166 alpha-2
val countries: List<String>, // résidence, opérations, etc.
val identifiers: Map<String, String>, // passport, NIF, DUNS
val categories: Set<Category>, // SAN, PEP, RCA, SI, DI
val sourceListsHits: List<SourceHit>,
val associatedEntityIds: List<AssociationEdge>, // edges (cible, type)
val flattenedAssociatedSanctioned: List<RcaPath>, // pré-calculé ≤ 2 sauts
val firstSeenAt: String, // ISO-8601
val lastSeenAt: String
)
data class AssociationEdge(
val targetFtmId: String,
val type: AssociationType, // FAMILY, BUSINESS, ADDRESS, EMPLOYER, OWNER
val strength: Double // 1.0 direct, 0.5 reported, 0.3 weak
)
data class RcaPath(
val targetFtmId: String,
val depth: Int, // 1 ou 2
val pathTypes: List<AssociationType>,
val pathDecay: Double // produit des strength × depth_decay
)

À chaque ETL :

val listVersion = sha256(entries.sortedBy { it.ftmId }.joinToString { "${it.ftmId}|${it.lastSeenAt}" })

L’index OpenSearch est nommé sanctions_{tenantId}_{listVersion}. Un alias sanctions_{tenantId} pointe vers la version active (rotation atomique au moment du switch). Snapshot vers MinIO conservé 10 ans WORM.

ÉtapeCadenceHeure UTC
Full reindex (toutes sources publiques)hebdomadairedimanche 02:00
Delta sync (OFAC, EU, UK quotidiennes)quotidien06:00
Force-refresh on-demandadmin via APIn/a
Dow Jones fullhebdomadairedimanche 03:00
Dow Jones deltaquotidien06:30

{
"settings": {
"analysis": {
"analyzer": {
"name_icu_folded": {
"type": "custom",
"tokenizer": "icu_tokenizer",
"filter": ["icu_folding", "lowercase"]
},
"name_phonetic_bm": {
"type": "custom",
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "beider_morse_phonetic"]
},
"name_ngram": {
"type": "custom",
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "edge_ngram_3_15"]
}
},
"filter": {
"beider_morse_phonetic": {
"type": "phonetic",
"encoder": "beider_morse",
"rule_type": "approx",
"name_type": "generic",
"languageset": ["any", "arabic", "french", "english", "russian"]
},
"edge_ngram_3_15": {
"type": "edge_ngram",
"min_gram": 3,
"max_gram": 15
}
}
}
},
"mappings": {
"properties": {
"ftm_id": { "type": "keyword" },
"schema": { "type": "keyword" },
"primary_name": {
"type": "text",
"fields": {
"raw": { "type": "keyword" },
"icu_folded": { "type": "text", "analyzer": "name_icu_folded" },
"phonetic": { "type": "text", "analyzer": "name_phonetic_bm" },
"ngram": { "type": "text", "analyzer": "name_ngram" }
}
},
"akas": {
"type": "text",
"fields": {
"icu_folded": { "type": "text", "analyzer": "name_icu_folded" },
"phonetic": { "type": "text", "analyzer": "name_phonetic_bm" }
}
},
"dates_of_birth": { "type": "keyword" },
"citizenships": { "type": "keyword" },
"countries": { "type": "keyword" },
"categories": { "type": "keyword" },
"source_lists_hits": {
"type": "nested",
"properties": {
"source": { "type": "keyword" },
"list_version": { "type": "keyword" },
"last_seen_at": { "type": "date" }
}
},
"flattened_associated_sanctioned": {
"type": "nested",
"properties": {
"target_ftm_id": { "type": "keyword" },
"depth": { "type": "integer" },
"path_decay": { "type": "float" }
}
},
"last_seen_at": { "type": "date" }
}
}
}
ProfilNœudsHeap JVMRAM hôteVolume indexProfil tenant
embedded-light14 GB8 GB≤ 2 GB (publiques)tenant on-prem light, no DJ
embedded-full116 GB32 GB≤ 30 GB (publiques + DJ)tenant on-prem full, DJ activé
external-cluster316 GB / nœud32 GB / nœud≤ 50 GBtenant tier-1 cluster existant

POST /sanctions_{tenantId}/_search
{
"size": 50,
"query": {
"bool": {
"should": [
{
"dis_max": {
"tie_breaker": 0.3,
"queries": [
{ "match": { "primaryName.icu_folded": { "query": "{queryName}", "boost": 4.0 } } },
{ "match": { "primaryName.phonetic": { "query": "{queryName}", "boost": 2.5 } } },
{ "match": { "primaryName.ngram": { "query": "{queryName}", "boost": 1.5 } } },
{ "match": { "akas.icu_folded": { "query": "{queryName}", "boost": 3.0 } } },
{ "match": { "akas.phonetic": { "query": "{queryName}", "boost": 2.0 } } }
]
}
}
],
"filter": [
{ "term": { "schema": "{Person|Company}" } }
]
}
},
"_source": ["ftmId","primaryName","akas","datesOfBirth","citizenships","categories","sourceListsHits","flattenedAssociatedSanctioned"]
}

Cible : rappel ≥ 95 % sur top-50, latence p95 ≤ 30 ms.

data class RerankerScore(
val nameScore: Double, // Jaro-Winkler max(primary, akas)
val phoneticScore: Double, // Beider-Morse equality 0/1
val dobScore: Double, // exact match 1.0, partial 0.5, none 0.0
val nationalityScore: Double, // exact match 1.0, mismatch 0.0
val aliasUnfoldScore: Double, // ICU translit + Levenshtein
val sourceTypeBoost: Double, // OFAC=1.0, PEP=0.85, etc.
val finalScore: Double // pondération
)
fun score(query: ScreeningQuery, candidate: NormalizedEntity): RerankerScore {
val nameScore = maxOf(
jaroWinkler(query.name.normalized, candidate.primaryName.normalized),
candidate.akas.maxOfOrNull { jaroWinkler(query.name.normalized, it.normalized) } ?: 0.0
)
val phoneticScore = if (
beiderMorse(query.name) intersects beiderMorse(candidate.primaryName)
) 1.0 else 0.0
val dobScore = matchDob(query.dob, candidate.datesOfBirth)
val nationalityScore = matchNationality(query.nationality, candidate.citizenships)
val aliasUnfoldScore = unfoldArabicTranslit(query.name)
.map { variant -> candidate.akas.maxOfOrNull { aka -> 1.0 - levenshtein(variant, aka) / max(variant.length, aka.length).toDouble() } ?: 0.0 }
.maxOrNull() ?: 0.0
val sourceTypeBoost = candidate.categories.maxOfOrNull { categoryBoost(it) } ?: 1.0
val final = (
0.40 * nameScore +
0.15 * phoneticScore +
0.20 * dobScore +
0.10 * nationalityScore +
0.15 * aliasUnfoldScore
) * sourceTypeBoost
return RerankerScore(nameScore, phoneticScore, dobScore, nationalityScore, aliasUnfoldScore, sourceTypeBoost, final)
}

Pondérations défaut MVP : 0.40/0.15/0.20/0.10/0.15. Configurables par tenant.

Pourquoi ces poids : nameScore domine (40 %) — le nom écrit reste l’élément le plus discriminant ; dobScore 20 % car une DOB exacte casse les faux positifs ; phonetic 15 % capture les variantes orthographiques arabes/slaves ; aliasUnfold 15 % traite les translittérations multiples (Mohammedمحمد) ; nationality 10 % est un signal faible mais utile.

TypologieSourceThresholdVerdict
Direct sanctionsOFAC SDN, UN, EU CFSP, UK OFSI≥ 0.92MATCH_DIRECT_SANCTIONS → block + case manuel
Direct PEPDJ PEP, OpenSanctions PEP≥ 0.85MATCH_PEP → enhanced due diligence
RCA 1-hopflattened depth=1≥ 0.80MATCH_RCA_DIRECT → review manuel
RCA 2-hopflattened depth=2≥ 0.70MATCH_RCA_INDIRECT → review manuel + lien path
Adverse mediaDJ SI≥ 0.75POTENTIAL_ADVERSE_MEDIA → manuel only
Below thresholdn/a< thresholdCLEAR → log mais pas de hit

Tous les seuils sont par tenant (BCT peut imposer une politique plus stricte qu’un tenant fintech). Configurables via SanctionsPolicy (DSL Kotlin réutilisant grammaire ADR-004).


Pour chaque entité X de la base, l’ETL pré-calcule :

val flattenedAssociatedSanctioned = bfs(
start = X,
edges = associationGraph,
maxDepth = 2,
edgeFilter = { it.target.categories intersects setOf(SAN, PEP) }
).map { (target, depth, edgePath) ->
RcaPath(
targetFtmId = target.ftmId,
depth = depth,
pathTypes = edgePath.map { it.type },
pathDecay = edgePath.map { it.strength }.fold(1.0, Double::times) * decayPerDepth(depth)
)
}

Fonction de décroissance par profondeur : decayPerDepth(1) = 1.0, decayPerDepth(2) = 0.6. Configurable.

Au screening, si une entité matche un candidat (finalScore ≥ 0.70) avec flattenedAssociatedSanctioned non vide, on retourne en plus du match direct la liste des paths sanctionnés atteignables :

data class ScreeningHit(
val candidate: NormalizedEntity,
val directScore: Double,
val typology: HitTypology,
val rcaHits: List<RcaHit> // les paths sanctionnés depuis ce candidat
)
data class RcaHit(
val sanctionedTargetFtmId: String,
val depth: Int,
val pathTypes: List<AssociationType>,
val rcaScore: Double // = directScore * pathDecay
)
  • 2 sauts max au MVP — couvre 90 % des cas banque (frère sanctionné, UBO indirect via une SARL). > 2 sauts = nouvel ADR avec graph DB.
  • Recompute hebdomadaire — un nouvel edge ajouté dans la liste DJ entre 2 reindex est invisible jusqu’au reindex hebdo. Acceptable pour les cas RCA (pas de SLA temps-réel).

@Serializable
data class ScreeningAuditEvent(
val eventId: String,
val tenantId: String,
val screeningId: String,
val queryNormalized: ScreeningQuery,
val listVersion: String, // sha256 de la liste indexée
val openSearchAlias: String, // "sanctions_{tenant}"
val openSearchIndex: String, // "sanctions_{tenant}_{listVersion}"
val topNCandidates: List<CandidateRecord>, // top 50 ranked
val verdict: ScreeningVerdict,
val rerankerVersion: String, // SemVer du re-ranker
val rerankerWeights: Map<String, Double>, // pondérations utilisées
val thresholds: Map<String, Double>, // thresholds par typologie
val signature: Ed25519Signature,
val previousEventHash: String,
val hash: String,
val occurredAt: String // ISO-8601
)
data class CandidateRecord(
val ftmId: String,
val primaryName: String,
val finalScore: Double,
val nameScore: Double,
val phoneticScore: Double,
val dobScore: Double,
val nationalityScore: Double,
val aliasUnfoldScore: Double,
val sourceTypeBoost: Double,
val rcaPath: List<RcaHit>?
)

Pour reproduire un screening 5 ans plus tard :

  1. Récupère l’event dans screening_audit (table append-only avec RLS, cf ADR-002).
  2. Vérifie le chaînage hash et la signature Ed25519 (clé publique tenant pinned).
  3. Récupère le snapshot OpenSearch correspondant à listVersion depuis MinIO.
  4. Réjoue le re-ranker (version rerankerVersion, pondérations rerankerWeights) sur topNCandidates.
  5. Vérifie que verdict est identique.

Garantie : si le re-ranker Kotlin est déterministe (testé en CI sur corpus golden) et la liste figée par snapshot, le verdict est strictement reproductible.

CREATE TABLE screening_audit (
event_id UUID PRIMARY KEY,
screening_id UUID NOT NULL,
tenant_id UUID NOT NULL,
query_normalized JSONB NOT NULL,
list_version VARCHAR(74) NOT NULL,
opensearch_alias VARCHAR(128) NOT NULL,
opensearch_index VARCHAR(128) NOT NULL,
top_n_candidates JSONB NOT NULL,
verdict JSONB NOT NULL,
reranker_version VARCHAR(16) NOT NULL,
reranker_weights JSONB NOT NULL,
thresholds JSONB NOT NULL,
signature TEXT NOT NULL,
previous_event_hash CHAR(74) NOT NULL,
hash CHAR(74) NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL
);
ALTER TABLE screening_audit ENABLE ROW LEVEL SECURITY;
CREATE POLICY screening_audit_tenant_isolation ON screening_audit
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
REVOKE UPDATE, DELETE ON screening_audit FROM app_role;

MéthodeEndpointDescription
POST/v1/sanctions/screenscreening unitaire (synchrone)
POST/v1/sanctions/screen/batchbatch screening (async, Temporal workflow)
GET/v1/sanctions/screenings/:idpoll batch status
GET/v1/sanctions/screenings/:id/auditaudit complet d’un screening
GET/v1/sanctions/listsliste des sources actives + listVersion + stats
POST/v1/sanctions/lists/:source/refreshforce refresh on-demand (admin)
GET/v1/sanctions/policiesrécupère SanctionsPolicy tenant
PUT/v1/sanctions/policiesmet à jour (avec dual-control + audit)
TopicÉmis quandPayload
sanctions.screening.completedscreening unitaire fini{screeningId, verdict, hits, listVersion}
sanctions.batch.progressbatch en cours{batchId, processed, total}
sanctions.list.reindexedETL finalisé{source, listVersion, entityCount}
sanctions.policy.updatedtenant change la policy{tenantId, version, signedBy}

Topics partitionnés par tenantId. Rétention 30 j broker (audit long est dans screening_audit).

Spec complète : voir /api/openapi/ section Sanctions. Contract tests Pact en CI.


  • Container Docker single-node vitakyc/sanctions-svc-embedded:1.0
  • Image inclut OpenSearch 2.x + plugins ICU + phonetic + Java 17
  • 4-8 GB RAM recommandé (pour publiques only)
  • Healthcheck /_cluster/health
  • Volumes persistants /data/opensearch + /data/snapshots
  • Backups vers MinIO via cron interne (snapshots OS S3)
  • Pas de cluster, pas de master election, pas de sharding
  • Pré-tuné pour 35 K entrées + future expansion DJ
  • Le tenant fournit l’URL d’un cluster OpenSearch existant (3+ nœuds, son ops)
  • Image VitaKYC sanctions-svc:1.0 (sans OpenSearch embarqué) qui s’y connecte
  • Index sanctions_{tenantId}_* créés par VitaKYC dans le cluster tenant
  • Tenant gère monitoring + backups + upgrades du cluster

Feature flag tenant sanctions.deployment.mode = embedded | external au niveau tenant config. Switch possible à n’importe quel moment via re-export / re-import des données + bascule de l’alias.


MetricMVP cibleV2 cible
Latence broad OpenSearch p95≤ 30 ms≤ 15 ms
Latence re-ranker p95 (top-50)≤ 50 ms≤ 25 ms
Latence pipeline complète p95≤ 200 ms≤ 100 ms
Throughput screening unitaire≥ 100 /s /tenant≥ 500 /s
Throughput batch screening≥ 1000 /min /tenant≥ 10 000 /min
Reindex full publiques (35 K)≤ 1 min≤ 30 s
Reindex full DJ (3,8 M + 8 M edges)≤ 60 min≤ 30 min
Disponibilité99,5 %99,9 %
Stockage index publiques≤ 600 MB≤ 400 MB
Stockage index DJ≤ 30 GB≤ 25 GB

  • mTLS entre sanctions-svc et opensearch (cluster ou embedded).
  • Auth admin : OIDC + MFA pour modification SanctionsPolicy (dual-control via Vault, cf ADR-006).
  • Signature Ed25519 des audit events par clé tenant pinned (rotation 12 mois).
  • RLS PostgreSQL sur screening_audit (cf ADR-002).
  • Index OpenSearch séparés par tenant — pas de partage cross-tenant.
  • PII protection : pas de hashing des noms (sinon screening impossible). Mais tous les accès logs anonymisés (queryNormalized contient le nom mais le log applicatif tronque).
  • Rate limit : 30 screenings/s/tenant, 1000/min/utilisateur API.
  • Audit append-only : signatures + chainage hash → tampering détectable.

ItemMVP (V0)V2 (S+12)
Listes publiques7 sources, full hebdo + delta quotidien+ custom tenant lists (whitelist/blacklist propriétaires)
Dow Jonesadapter optionnel+ Refinitiv WorldCheck adapter, ComplyAdvantage adapter
RCA depth≤ 2 sauts dénormalisés≤ 4 sauts via graph DB (Neo4j ou Postgres CTE)
PhoneticBeider-Morse + Soundex+ custom MENA-tuned (ANSI X3.30)
ML re-rankeraucun (Kotlin déterministe)optionnel learn-to-rank avec model card audit
Cache24 h sur (queryNormalisée, listVersion)+ Redis distribué
Reproductibilitésnapshots OpenSearch MinIO+ replay automatique pour audits aléatoires
Streamingbatch only+ transaction streaming (Kafka Streams)

  • 7 sources publiques ingérées (OFAC SDN + Consolidated, UN, EU CFSP, UK OFSI, World Bank, OpenSanctions agrégat)
  • Pipeline ETL opérationnel : full hebdo + delta quotidien
  • Mappings OpenSearch ICU + Beider-Morse + n-gram déployés
  • Re-ranker Kotlin v1.0 livré avec corpus golden 100 cas
  • Threshold par typologie configuré (OFAC 0.92 / PEP 0.85 / RCA 0.80-0.70 / SI 0.75)
  • RCA flattening ≤ 2 sauts opérationnel sur DJ (mock POC)
  • Audit log signé Ed25519 en place, table screening_audit append-only avec RLS
  • Snapshot OpenSearch → MinIO automatique au reindex (rétention 10 ans WORM)
  • Alias rotation atomique testée (zero-downtime reindex)
  • API REST + OpenAPI publiée
  • Events Kafka émis vers case-mgmt + risk-matrix
  • Container embedded-light pré-tuné déployé chez tenant pilote
  • Métriques cibles atteintes : p95 ≤ 200 ms, précision @ 0.92 ≥ 99 %
  • Runbook on-call : flow incident reindex échec, drift listVersion, OpenSearch down


Document de spec sanctions screening — version 1.0 (2026-04-27). Mises à jour bloquantes nécessitent un ADR.