AML Streaming Engine — Kafka Streams + rules DSL
Module :
tx-monitoring-svc(microservice Kotlin + Kafka Streams). Plus pipelinetx-normalizer(POC livré).ADRs : ADR-001 (Kafka), ADR-004 (DSL), ADR-025 (grammaire prédicats), ADR-031 (architecture).
Pages liées : AML Transaction Monitoring (vue d’ensemble haut-niveau), POC rules engine, Case Management.
Cette page fixe la spec d’ingénierie deep du moteur de streaming pour Transaction Monitoring. Un développeur qui la lit doit pouvoir construire tx-monitoring-svc sans question résiduelle :
- Le runtime Kafka Streams (topology, state stores, changelog)
- Le DSL de règles (modèle, sérialisation, évaluation)
- Les sliding windows (hopping vs tumbling, grace period)
- L’alert deduplication (signature SHA-256, outbox pattern)
- Les typologies de règles MVP (structuring, velocity, threshold, pattern, peer-deviation, aggregate)
- Le shadow mode + backtest (publication dual-control)
1. Vue d’ensemble
Section intitulée « 1. Vue d’ensemble »Flow nominal (~300-500 ms p95) :
- Tx Kafka arrive sur
tx.normalized(partitionné paraccountId) - Kafka Streams topology consomme, applique les règles
ACTIVEdu tenant - Pour chaque règle, met à jour les state stores (sliding windows + agrégats)
- Si threshold croisé → génère alert candidate
- Dédup par signature SHA-256 (ruleId + accountId + windowStart + sortedTxIds)
- Outbox writer : insert Postgres
alert_outbox+ maj state — atomique - Sweeper indépendant pousse les rows non-publiées vers
aml.alert.published - Consommé par
case-mgmt-svc(crée case si severity ≥ MEDIUM) +notification-svc
2. Modèle de données
Section intitulée « 2. Modèle de données »2.1 Transaction normalisée (entrée)
Section intitulée « 2.1 Transaction normalisée (entrée) »@Serializabledata class CanonicalTx( val txId: String, // UUID issued by tx-normalizer val tenantId: String, val timestamp: String, // ISO-8601 with milliseconds val accountId: String, // primary partitioning key val counterpartyAccountId: String?, val counterpartyName: String?, val counterpartyCountry: String?, // ISO 3166-1 alpha-2 val amount: BigDecimal, val currency: String, // ISO 4217 val amountInRefCurrency: BigDecimal, // converted at tx timestamp rate val refCurrency: String, // typically TND val direction: TxDirection, // CREDIT, DEBIT val channel: String, // ATM, ONLINE, BRANCH, MOBILE, WIRE, SEPA, ... val purpose: String?, // free text or coded val reference: String?, // bank reference val source: TxSource // origin of normalization)
enum class TxDirection { CREDIT, DEBIT }enum class TxSource { ISO20022, SWIFT_MT103, SWIFT_MT202, BATCH_CSV, KAFKA_NATIVE }2.2 Règle (modèle interne)
Section intitulée « 2.2 Règle (modèle interne) »@Serializabledata class AmlRule( val ruleId: String, // unique within tenant val ruleVersion: String, // SemVer val tenantId: String, val name: String, val description: String, val typology: RuleTypology, // STRUCTURING, VELOCITY, ... val status: RuleStatus, // DRAFT, SHADOW, ACTIVE, ARCHIVED val window: WindowSpec, val partitionBy: PartitionKey, // accountId | clientId | counterparty val filter: Predicate?, // pre-filter (réutilise grammaire ADR-025) val aggregate: AggregateExpr, val threshold: ThresholdExpr, val severity: AlertSeverity, val alertTemplate: String, // i18n key for human message val signedBy: List<Ed25519Signature>, // dual-control val createdAt: String, val publishedAt: String?)
enum class RuleTypology { STRUCTURING, VELOCITY, THRESHOLD, PATTERN, PEER_DEVIATION, AGGREGATE, NEW_BENEFICIARY }enum class RuleStatus { DRAFT, SHADOW, ACTIVE, ARCHIVED }enum class AlertSeverity { LOW, MEDIUM, HIGH, URGENT }enum class PartitionKey { ACCOUNT, CLIENT, COUNTERPARTY }
@Serializabledata class WindowSpec( val kind: WindowKind, // HOPPING, TUMBLING, SESSION val sizeSeconds: Long, // ex 86400 = 24h val stepSeconds: Long? = null, // for HOPPING ; null = tumbling val gracePeriodSeconds: Long = 300 // late events tolerance)
enum class WindowKind { HOPPING, TUMBLING, SESSION }
@Serializablesealed class AggregateExpr { @Serializable data class Count(val filter: Predicate? = null) : AggregateExpr() @Serializable data class Sum(val field: String, val filter: Predicate? = null) : AggregateExpr() @Serializable data class Avg(val field: String) : AggregateExpr() @Serializable data class DistinctCount(val field: String) : AggregateExpr() @Serializable data class Velocity(val window2Seconds: Long) : AggregateExpr() // ratio current/baseline}
@Serializablesealed class ThresholdExpr { @Serializable data class Gt(val value: Double) : ThresholdExpr() @Serializable data class Gte(val value: Double) : ThresholdExpr() @Serializable data class GtPercent(val pct: Double, val baselineWindow: Long) : ThresholdExpr() @Serializable data class CountGt(val n: Int) : ThresholdExpr()}2.3 Alert (sortie)
Section intitulée « 2.3 Alert (sortie) »@Serializabledata class AmlAlert( val alertId: String, // sha256(ruleId + accountId + windowStart + sortedTxIds) val tenantId: String, val ruleId: String, val ruleVersion: String, val accountId: String, val windowStart: String, // ISO-8601 val windowEnd: String, val severity: AlertSeverity, val typology: RuleTypology, val triggeringTxIds: List<String>, val aggregateValue: Double, // ex sum or count val thresholdValue: Double, val message: String, // resolved i18n val producedAt: String, val signature: String // Ed25519 signed)3. Topology Kafka Streams
Section intitulée « 3. Topology Kafka Streams »3.1 Pseudo-code de la topology
Section intitulée « 3.1 Pseudo-code de la topology »fun buildTopology(rules: List<AmlRule>): Topology { val builder = StreamsBuilder()
val txStream: KStream<String, CanonicalTx> = builder .stream("tx.normalized", Consumed.with(Serdes.String(), TxSerde))
rules.filter { it.status in setOf(RuleStatus.ACTIVE, RuleStatus.SHADOW) } .forEach { rule -> attachRuleBranch(txStream, rule) }
return builder.build()}
private fun attachRuleBranch(stream: KStream<String, CanonicalTx>, rule: AmlRule) { val keyed = stream .filter { _, tx -> tx.tenantId == rule.tenantId } .filter { _, tx -> rule.filter?.let { evalPredicate(it, tx) } ?: true } .selectKey { _, tx -> partitionKey(tx, rule.partitionBy) }
val windowed: KStream<Windowed<String>, Double> = when (rule.window.kind) { WindowKind.HOPPING -> keyed .groupByKey(Grouped.with(Serdes.String(), TxSerde)) .windowedBy(TimeWindows.ofSizeAndGrace( Duration.ofSeconds(rule.window.sizeSeconds), Duration.ofSeconds(rule.window.gracePeriodSeconds) ).advanceBy(Duration.ofSeconds(rule.window.stepSeconds!!))) .aggregate(::aggInit, { _, tx, agg -> aggUpdate(rule.aggregate, agg, tx) }) .toStream() .mapValues { agg -> aggExtract(rule.aggregate, agg) }
WindowKind.TUMBLING -> keyed .groupByKey(Grouped.with(Serdes.String(), TxSerde)) .windowedBy(TimeWindows.ofSizeAndGrace( Duration.ofSeconds(rule.window.sizeSeconds), Duration.ofSeconds(rule.window.gracePeriodSeconds) )) .aggregate(::aggInit, { _, tx, agg -> aggUpdate(rule.aggregate, agg, tx) }) .toStream() .mapValues { agg -> aggExtract(rule.aggregate, agg) }
WindowKind.SESSION -> /* session windows for inactivity-bounded patterns */ TODO() }
val triggered = windowed.filter { _, value -> evalThreshold(rule.threshold, value) }
triggered.foreach { key, value -> emitAlertCandidate(rule, key, value) }}
private fun emitAlertCandidate(rule: AmlRule, key: Windowed<String>, value: Double) { val triggeringTxIds = stateStore.getTxIdsForWindow(rule.ruleId, key) val alertId = sha256(rule.ruleId, key.key(), key.window().startTime(), triggeringTxIds.sorted()) if (alertOutboxRepo.alreadyEmitted(alertId)) return // dedup
val alert = AmlAlert( alertId = alertId, tenantId = rule.tenantId, ruleId = rule.ruleId, ruleVersion = rule.ruleVersion, accountId = key.key(), windowStart = key.window().startTime().toIso(), windowEnd = key.window().endTime().toIso(), severity = rule.severity, typology = rule.typology, triggeringTxIds = triggeringTxIds, aggregateValue = value, thresholdValue = thresholdValue(rule.threshold), message = i18nResolve(rule.alertTemplate, value, threshold), producedAt = Instant.now().toIso(), signature = sign(alert) )
val targetTopic = if (rule.status == RuleStatus.SHADOW) "aml.alert.shadow" else "aml.alert.published" alertOutboxRepo.append(alert, targetTopic) // atomic with state store update}3.2 State stores
Section intitulée « 3.2 State stores »| Store | Type | Contenu | Backed by |
|---|---|---|---|
aml-window-{ruleId} | WindowedKeyValueStore<String, Aggregate> | accumulateurs par window key | changelog topic compacted |
aml-tx-window-{ruleId} | WindowedKeyValueStore<String, List<String>> | tx_ids par window pour reproductibilité | changelog topic compacted |
aml-baseline-{ruleId} | KeyValueStore<String, Aggregate> | baseline par accountId (peer-deviation) | changelog compacted |
Persistence : RocksDB local sur SSD, ~250 MB pour 50 K comptes actifs avec 5 règles actives.
3.3 Configuration Kafka Streams
Section intitulée « 3.3 Configuration Kafka Streams »application.id: tx-monitoring-{tenantId}bootstrap.servers: kafka:9092num.stream.threads: 4 # parallelism per podprocessing.guarantee: exactly_once_v2 # transactional Kafka writecache.max.bytes.buffering: 134217728 # 128 MB cachecommit.interval.ms: 5000 # flush every 5sstate.dir: /var/lib/tx-monitoring/statedefault.deserialization.exception.handler: io.confluent.kafka.streams.LogAndContinueExceptionHandlerdefault.production.exception.handler: io.vitakyc.aml.OutboxFallbackHandlermetrics.recording.level: INFO4. Typologies de règles MVP
Section intitulée « 4. Typologies de règles MVP »4.1 Structuring (smurfing)
Section intitulée « 4.1 Structuring (smurfing) »”≥ 10 transactions de < 9 999 TND sur 7 jours sur le même compte”
val structuring = amlRule { ruleId = "RULE_STRUCTURING_TND_9999" typology = STRUCTURING window = WindowSpec(HOPPING, sizeSeconds = 7 * 86400, stepSeconds = 3600, gracePeriodSeconds = 300) partitionBy = PartitionKey.ACCOUNT filter = predicate { (field("amount") gt 1000) and (field("amount") lt 10000) and (field("direction") eq "DEBIT") } aggregate = AggregateExpr.Count() threshold = ThresholdExpr.CountGt(10) severity = HIGH alertTemplate = "aml.structuring.detected"}4.2 Velocity (vélocité)
Section intitulée « 4.2 Velocity (vélocité) »“Volume cumulé sur 24 h dépasse 3× la moyenne 30 j sur ce compte”
val velocity = amlRule { ruleId = "RULE_VELOCITY_3X_30D" typology = VELOCITY window = WindowSpec(HOPPING, sizeSeconds = 86400, stepSeconds = 3600, gracePeriodSeconds = 300) partitionBy = PartitionKey.ACCOUNT aggregate = AggregateExpr.Velocity(window2Seconds = 30 * 86400) threshold = ThresholdExpr.GtPercent(pct = 300.0, baselineWindow = 30 * 86400) severity = MEDIUM alertTemplate = "aml.velocity.deviation"}4.3 Threshold (seuil simple)
Section intitulée « 4.3 Threshold (seuil simple) »“Toute transaction > 50 000 EUR vers un pays high-risk”
val threshold = amlRule { ruleId = "RULE_HIGH_AMOUNT_HIGH_RISK_COUNTRY" typology = THRESHOLD window = WindowSpec(TUMBLING, sizeSeconds = 60) // immediate, single tx partitionBy = PartitionKey.ACCOUNT filter = predicate { (field("amountInRefCurrency") gt 50000) and (field("counterpartyCountry") inAny listOf("AF","BY","KP","IR","LY","MM","SO","SS","SD","SY","YE","RU")) } aggregate = AggregateExpr.Count() threshold = ThresholdExpr.CountGt(0) severity = URGENT alertTemplate = "aml.threshold.high_risk_country"}4.4 Pattern (séquence)
Section intitulée « 4.4 Pattern (séquence) »“Crédit reçu suivi de débit ≥ 95 % du montant vers compte tiers dans les 48 h”
val pattern = amlRule { ruleId = "RULE_PASS_THROUGH" typology = PATTERN window = WindowSpec(SESSION, sizeSeconds = 48 * 3600, gracePeriodSeconds = 600) partitionBy = PartitionKey.ACCOUNT aggregate = AggregateExpr.Custom( // pseudocode : detect CREDIT then DEBIT >= 0.95 * CREDIT amount within session kind = "PASS_THROUGH", params = mapOf("debitToCreditRatioGt" to 0.95) ) threshold = ThresholdExpr.CountGt(0) severity = HIGH alertTemplate = "aml.pattern.pass_through"}4.5 Peer-deviation
Section intitulée « 4.5 Peer-deviation »“Volume mensuel ≥ 5× la médiane des comptes même profession (ex
salarie,retraite)“
val peerDeviation = amlRule { ruleId = "RULE_PEER_DEVIATION_5X" typology = PEER_DEVIATION window = WindowSpec(TUMBLING, sizeSeconds = 30 * 86400) partitionBy = PartitionKey.ACCOUNT aggregate = AggregateExpr.Sum("amountInRefCurrency") // baseline = median of peers grouped by client.profession (joined from CPS) threshold = ThresholdExpr.GtPercent(pct = 500.0, baselineWindow = 0) severity = MEDIUM alertTemplate = "aml.peer.deviation"}4.6 Aggregate (somme glissante)
Section intitulée « 4.6 Aggregate (somme glissante) »“Somme des virements internationaux 24 h dépasse 200 000 EUR”
val aggregate = amlRule { ruleId = "RULE_INT_WIRE_24H_200K" typology = AGGREGATE window = WindowSpec(HOPPING, sizeSeconds = 86400, stepSeconds = 3600, gracePeriodSeconds = 300) partitionBy = PartitionKey.ACCOUNT filter = predicate { (field("channel") eq "WIRE") and (field("counterpartyCountry") neq tenantCountry) } aggregate = AggregateExpr.Sum("amountInRefCurrency") threshold = ThresholdExpr.Gt(200000.0) severity = HIGH alertTemplate = "aml.aggregate.int_wires_24h"}5. Alert deduplication (signature SHA-256)
Section intitulée « 5. Alert deduplication (signature SHA-256) »fun computeAlertId( ruleId: String, partitionKey: String, windowStart: Instant, triggeringTxIds: List<String>): String { val canonical = "$ruleId|$partitionKey|${windowStart.epochSecond}|${triggeringTxIds.sorted().joinToString(",")}" val digest = MessageDigest.getInstance("SHA-256").digest(canonical.toByteArray(Charsets.UTF_8)) return "alert-" + digest.joinToString("") { "%02x".format(it) }.take(16)}Propriétés :
- Déterministe : même inputs → même
alertId. Re-processing après crash = pas de double alert. - Reproductible audit : un screening rejoué 5 ans plus tard produit le même
alertId. - Détection late-arrival : si une tx arrive en retard et change le set des
triggeringTxIds,alertIdchange → nouvelle alerte légitime (l’ancienne reste valide pour son timestamp).
6. Outbox pattern Postgres → Kafka
Section intitulée « 6. Outbox pattern Postgres → Kafka »6.1 Schéma SQL
Section intitulée « 6.1 Schéma SQL »CREATE TABLE alert_outbox ( alert_id VARCHAR(80) PRIMARY KEY, tenant_id UUID NOT NULL, payload JSONB NOT NULL, target_topic VARCHAR(64) NOT NULL, produced_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), published_at TIMESTAMPTZ, signature TEXT NOT NULL, CHECK (target_topic IN ('aml.alert.published', 'aml.alert.shadow')));
CREATE INDEX idx_alert_outbox_unpublished ON alert_outbox (produced_at)WHERE published_at IS NULL;
ALTER TABLE alert_outbox ENABLE ROW LEVEL SECURITY;CREATE POLICY alert_outbox_tenant_isolation ON alert_outbox USING (tenant_id = current_setting('app.current_tenant_id')::uuid);6.2 Sweeper
Section intitulée « 6.2 Sweeper »Un goroutine indépendant (Kotlin coroutine) toutes les 1 s :
suspend fun outboxSweeper() { while (isRunning) { val batch = repo.fetchUnpublished(limit = 500) for (alert in batch) { val record = ProducerRecord(alert.targetTopic, alert.tenantId, alert.payload) kafka.send(record).await() repo.markPublished(alert.alertId) } delay(1.seconds) }}Garanties :
- At-least-once Kafka : si crash entre
kafka.sendetmarkPublished, on republie au reboot. Le consumer side dédupe paralertId(idempotence). - Pas de double-write : insert outbox + state store dans la même Kafka Streams transaction (
processing.guarantee=exactly_once_v2). - Failure isolation : sweeper crash n’impacte pas le streams thread.
7. Shadow mode + backtest
Section intitulée « 7. Shadow mode + backtest »7.1 Cycle de vie d’une règle
Section intitulée « 7.1 Cycle de vie d’une règle »DRAFT → SHADOW (14j) → backtest 6 mois → dual-control → ACTIVE → ARCHIVED| Étape | Effet |
|---|---|
| DRAFT | règle créée, non évaluée par Kafka Streams |
| SHADOW | règle évaluée, alertes émises sur aml.alert.shadow (pas vers case-mgmt) |
| Backtest | rejeu sur 6 mois historique, comparaison FP/FN vs règles ACTIVE |
| Dual-control | 2 sig Ed25519 (compliance + DSI), justification obligatoire |
| ACTIVE | alertes vers aml.alert.published → case-mgmt |
| ARCHIVED | non évaluée, conservée 10 ans audit |
7.2 Backtest
Section intitulée « 7.2 Backtest »class BacktestRunner( private val tenantId: String, private val ruleCandidate: AmlRule, private val historicalTxs: Sequence<CanonicalTx>, private val historicalAlerts: List<AmlAlert> // alertes confirmées vraies positives par agents) { fun run(): BacktestReport { val emittedAlerts = mutableListOf<AmlAlert>() val mockTopology = buildSingleRuleTopology(ruleCandidate) // simulate windowed aggregation in-memory for (tx in historicalTxs) { mockTopology.process(tx) } emittedAlerts.addAll(mockTopology.flushAlerts())
val truePositives = emittedAlerts.count { it.matchesTruth(historicalAlerts) } val falsePositives = emittedAlerts.size - truePositives val falseNegatives = historicalAlerts.count { !emittedAlerts.matches(it) }
return BacktestReport( ruleCandidate = ruleCandidate, period = "last_6_months", txCount = historicalTxs.count(), alertsEmitted = emittedAlerts.size, truePositives = truePositives, falsePositives = falsePositives, falseNegatives = falseNegatives, precision = truePositives.toDouble() / emittedAlerts.size, recall = truePositives.toDouble() / (truePositives + falseNegatives) ) }}Une règle ne peut passer en ACTIVE que si precision ≥ 0.70 ET recall ≥ 0.85 (configurable tenant). Sinon retour DRAFT pour ajustement.
8. API REST
Section intitulée « 8. API REST »8.1 Admin
Section intitulée « 8.1 Admin »| Méthode | Endpoint | Description |
|---|---|---|
GET | /v1/aml/rules?status= | liste rules avec filtres |
POST | /v1/aml/rules | crée rule DRAFT |
PUT | /v1/aml/rules/:ruleId | met à jour DRAFT |
POST | /v1/aml/rules/:ruleId/publish-shadow | passe à SHADOW |
POST | /v1/aml/rules/:ruleId/backtest | lance backtest async (Temporal) |
GET | /v1/aml/rules/:ruleId/backtest/:runId | poll status + report |
POST | /v1/aml/rules/:ruleId/publish-active | exige 2 sig Ed25519 |
POST | /v1/aml/rules/:ruleId/archive | archive |
GET | /v1/aml/alerts?accountId=&from=&to=&severity= | recherche alertes |
GET | /v1/aml/alerts/:alertId | détail signed |
8.2 Health & ops
Section intitulée « 8.2 Health & ops »| Méthode | Endpoint | Description |
|---|---|---|
GET | /v1/aml/streams/health | status Kafka Streams (RUNNING, REBALANCING, ERROR) |
GET | /v1/aml/streams/metrics | throughput, lag, state store size |
GET | /v1/aml/streams/topology | dump topology pour debug |
9. Performance et capacité
Section intitulée « 9. Performance et capacité »| Metric | MVP cible | V2 cible |
|---|---|---|
| Latence p95 (tx → alert published) | ≤ 500 ms | ≤ 100 ms |
| Throughput | ≥ 1 000 tx/s/pod | ≥ 10 000 tx/s/pod |
| Cold-start (replay changelog) | ≤ 60 s | ≤ 20 s |
| State store par 50 K comptes actifs | ≤ 250 MB RocksDB | ≤ 200 MB |
| Heap JVM | 4-8 GB | 4-8 GB |
| Disponibilité | 99,5 % | 99,9 % |
| Late event drop rate | ≤ 0,1 % | ≤ 0,01 % |
10. Sécurité
Section intitulée « 10. Sécurité »- Chiffrement RocksDB au repos (KEK per-tenant, KMS Vault).
- mTLS entre
tx-monitoring-svcet Kafka brokers. - RLS Postgres sur
alert_outbox,aml_rule,aml_alert_audit. - Signature Ed25519 des alertes par tenant key.
- Dual-control sur publication règle (compliance + DSI).
- Audit append-only sur changements de règles :
aml_rule_auditimmutable. - PII protection : transactions traitées contiennent montants + counterparty. Logs applicatifs masquent ces champs.
- Rate limit sur API admin : 100 req/min/agent.
11. Plan de migration MVP → V2
Section intitulée « 11. Plan de migration MVP → V2 »| Item | MVP (V0) | V2 (S+12) |
|---|---|---|
| Typologies | 6 (structuring, velocity, threshold, pattern, peer, aggregate) | + new beneficiary, structuring across accounts, velocity gradient |
| ML streaming | non | autoencoder anomaly detection avec Flink |
| Visual rule builder | DSL Kotlin uniquement | GUI no-code avec preview backtest |
| Multi-region | single region | active-active multi-region Kafka MirrorMaker |
| Throughput | 1 K tx/s | 10 K tx/s + Flink offload |
| Real-time peer baseline | recalcul daily | continuous |
12. Checklist go-live MVP
Section intitulée « 12. Checklist go-live MVP »- Topology Kafka Streams 6 typologies opérationnelle
- 4 règles standard pré-livrées (structuring, velocity, threshold high-risk country, pass-through)
- Backtest runner sur 6 mois historique avec rapport precision/recall
- Shadow mode 14 j enforced avant publication
- Dual-control publication enforced (impossible sans 2 sig)
- Outbox pattern Postgres → Kafka opérationnel, exactly-once vérifié
- Audit log signé append-only pour changements règles
- State stores RocksDB chiffrés au repos
- Métriques Prometheus exposées (throughput, latency, alert dedup ratio)
- Dashboards Grafana : streams health + alert breakdown + backtest results
- Pilote tenant TN-BANQUEX : 50 000 tx/jour, 100 alertes/jour, ≤ 5 % FP
- Runbook on-call (rebalance long, state store corruption, late event spike)
13. Références
Section intitulée « 13. Références »- ADR-001, ADR-004, ADR-025, ADR-029, ADR-030, ADR-031
- AML Transaction Monitoring (vue d’ensemble)
- POC rules engine
- POC tx-normalizer
- Case Management
- BCT Circulaire 2017-08 §IV (transactions atypiques temps réel), 2018-07 (4-eyes)
- FATF Recommendation 20 (STR), Wolfsberg Group transaction monitoring guidance
- Kafka Streams documentation : Confluent docs
Document de spec AML streaming engine — version 1.0 (2026-04-27).