Aller au contenu

AML Streaming Engine — Kafka Streams + rules DSL

Module : tx-monitoring-svc (microservice Kotlin + Kafka Streams). Plus pipeline tx-normalizer (POC livré).

ADRs : ADR-001 (Kafka), ADR-004 (DSL), ADR-025 (grammaire prédicats), ADR-031 (architecture).

Pages liées : AML Transaction Monitoring (vue d’ensemble haut-niveau), POC rules engine, Case Management.

Cette page fixe la spec d’ingénierie deep du moteur de streaming pour Transaction Monitoring. Un développeur qui la lit doit pouvoir construire tx-monitoring-svc sans question résiduelle :

  1. Le runtime Kafka Streams (topology, state stores, changelog)
  2. Le DSL de règles (modèle, sérialisation, évaluation)
  3. Les sliding windows (hopping vs tumbling, grace period)
  4. L’alert deduplication (signature SHA-256, outbox pattern)
  5. Les typologies de règles MVP (structuring, velocity, threshold, pattern, peer-deviation, aggregate)
  6. Le shadow mode + backtest (publication dual-control)

Flow nominal (~300-500 ms p95) :

  1. Tx Kafka arrive sur tx.normalized (partitionné par accountId)
  2. Kafka Streams topology consomme, applique les règles ACTIVE du tenant
  3. Pour chaque règle, met à jour les state stores (sliding windows + agrégats)
  4. Si threshold croisé → génère alert candidate
  5. Dédup par signature SHA-256 (ruleId + accountId + windowStart + sortedTxIds)
  6. Outbox writer : insert Postgres alert_outbox + maj state — atomique
  7. Sweeper indépendant pousse les rows non-publiées vers aml.alert.published
  8. Consommé par case-mgmt-svc (crée case si severity ≥ MEDIUM) + notification-svc

@Serializable
data class CanonicalTx(
val txId: String, // UUID issued by tx-normalizer
val tenantId: String,
val timestamp: String, // ISO-8601 with milliseconds
val accountId: String, // primary partitioning key
val counterpartyAccountId: String?,
val counterpartyName: String?,
val counterpartyCountry: String?, // ISO 3166-1 alpha-2
val amount: BigDecimal,
val currency: String, // ISO 4217
val amountInRefCurrency: BigDecimal, // converted at tx timestamp rate
val refCurrency: String, // typically TND
val direction: TxDirection, // CREDIT, DEBIT
val channel: String, // ATM, ONLINE, BRANCH, MOBILE, WIRE, SEPA, ...
val purpose: String?, // free text or coded
val reference: String?, // bank reference
val source: TxSource // origin of normalization
)
enum class TxDirection { CREDIT, DEBIT }
enum class TxSource { ISO20022, SWIFT_MT103, SWIFT_MT202, BATCH_CSV, KAFKA_NATIVE }
@Serializable
data class AmlRule(
val ruleId: String, // unique within tenant
val ruleVersion: String, // SemVer
val tenantId: String,
val name: String,
val description: String,
val typology: RuleTypology, // STRUCTURING, VELOCITY, ...
val status: RuleStatus, // DRAFT, SHADOW, ACTIVE, ARCHIVED
val window: WindowSpec,
val partitionBy: PartitionKey, // accountId | clientId | counterparty
val filter: Predicate?, // pre-filter (réutilise grammaire ADR-025)
val aggregate: AggregateExpr,
val threshold: ThresholdExpr,
val severity: AlertSeverity,
val alertTemplate: String, // i18n key for human message
val signedBy: List<Ed25519Signature>, // dual-control
val createdAt: String,
val publishedAt: String?
)
enum class RuleTypology { STRUCTURING, VELOCITY, THRESHOLD, PATTERN, PEER_DEVIATION, AGGREGATE, NEW_BENEFICIARY }
enum class RuleStatus { DRAFT, SHADOW, ACTIVE, ARCHIVED }
enum class AlertSeverity { LOW, MEDIUM, HIGH, URGENT }
enum class PartitionKey { ACCOUNT, CLIENT, COUNTERPARTY }
@Serializable
data class WindowSpec(
val kind: WindowKind, // HOPPING, TUMBLING, SESSION
val sizeSeconds: Long, // ex 86400 = 24h
val stepSeconds: Long? = null, // for HOPPING ; null = tumbling
val gracePeriodSeconds: Long = 300 // late events tolerance
)
enum class WindowKind { HOPPING, TUMBLING, SESSION }
@Serializable
sealed class AggregateExpr {
@Serializable data class Count(val filter: Predicate? = null) : AggregateExpr()
@Serializable data class Sum(val field: String, val filter: Predicate? = null) : AggregateExpr()
@Serializable data class Avg(val field: String) : AggregateExpr()
@Serializable data class DistinctCount(val field: String) : AggregateExpr()
@Serializable data class Velocity(val window2Seconds: Long) : AggregateExpr() // ratio current/baseline
}
@Serializable
sealed class ThresholdExpr {
@Serializable data class Gt(val value: Double) : ThresholdExpr()
@Serializable data class Gte(val value: Double) : ThresholdExpr()
@Serializable data class GtPercent(val pct: Double, val baselineWindow: Long) : ThresholdExpr()
@Serializable data class CountGt(val n: Int) : ThresholdExpr()
}
@Serializable
data class AmlAlert(
val alertId: String, // sha256(ruleId + accountId + windowStart + sortedTxIds)
val tenantId: String,
val ruleId: String,
val ruleVersion: String,
val accountId: String,
val windowStart: String, // ISO-8601
val windowEnd: String,
val severity: AlertSeverity,
val typology: RuleTypology,
val triggeringTxIds: List<String>,
val aggregateValue: Double, // ex sum or count
val thresholdValue: Double,
val message: String, // resolved i18n
val producedAt: String,
val signature: String // Ed25519 signed
)

fun buildTopology(rules: List<AmlRule>): Topology {
val builder = StreamsBuilder()
val txStream: KStream<String, CanonicalTx> = builder
.stream("tx.normalized", Consumed.with(Serdes.String(), TxSerde))
rules.filter { it.status in setOf(RuleStatus.ACTIVE, RuleStatus.SHADOW) }
.forEach { rule -> attachRuleBranch(txStream, rule) }
return builder.build()
}
private fun attachRuleBranch(stream: KStream<String, CanonicalTx>, rule: AmlRule) {
val keyed = stream
.filter { _, tx -> tx.tenantId == rule.tenantId }
.filter { _, tx -> rule.filter?.let { evalPredicate(it, tx) } ?: true }
.selectKey { _, tx -> partitionKey(tx, rule.partitionBy) }
val windowed: KStream<Windowed<String>, Double> = when (rule.window.kind) {
WindowKind.HOPPING -> keyed
.groupByKey(Grouped.with(Serdes.String(), TxSerde))
.windowedBy(TimeWindows.ofSizeAndGrace(
Duration.ofSeconds(rule.window.sizeSeconds),
Duration.ofSeconds(rule.window.gracePeriodSeconds)
).advanceBy(Duration.ofSeconds(rule.window.stepSeconds!!)))
.aggregate(::aggInit, { _, tx, agg -> aggUpdate(rule.aggregate, agg, tx) })
.toStream()
.mapValues { agg -> aggExtract(rule.aggregate, agg) }
WindowKind.TUMBLING -> keyed
.groupByKey(Grouped.with(Serdes.String(), TxSerde))
.windowedBy(TimeWindows.ofSizeAndGrace(
Duration.ofSeconds(rule.window.sizeSeconds),
Duration.ofSeconds(rule.window.gracePeriodSeconds)
))
.aggregate(::aggInit, { _, tx, agg -> aggUpdate(rule.aggregate, agg, tx) })
.toStream()
.mapValues { agg -> aggExtract(rule.aggregate, agg) }
WindowKind.SESSION -> /* session windows for inactivity-bounded patterns */ TODO()
}
val triggered = windowed.filter { _, value -> evalThreshold(rule.threshold, value) }
triggered.foreach { key, value -> emitAlertCandidate(rule, key, value) }
}
private fun emitAlertCandidate(rule: AmlRule, key: Windowed<String>, value: Double) {
val triggeringTxIds = stateStore.getTxIdsForWindow(rule.ruleId, key)
val alertId = sha256(rule.ruleId, key.key(), key.window().startTime(), triggeringTxIds.sorted())
if (alertOutboxRepo.alreadyEmitted(alertId)) return // dedup
val alert = AmlAlert(
alertId = alertId,
tenantId = rule.tenantId,
ruleId = rule.ruleId,
ruleVersion = rule.ruleVersion,
accountId = key.key(),
windowStart = key.window().startTime().toIso(),
windowEnd = key.window().endTime().toIso(),
severity = rule.severity,
typology = rule.typology,
triggeringTxIds = triggeringTxIds,
aggregateValue = value,
thresholdValue = thresholdValue(rule.threshold),
message = i18nResolve(rule.alertTemplate, value, threshold),
producedAt = Instant.now().toIso(),
signature = sign(alert)
)
val targetTopic = if (rule.status == RuleStatus.SHADOW) "aml.alert.shadow" else "aml.alert.published"
alertOutboxRepo.append(alert, targetTopic) // atomic with state store update
}
StoreTypeContenuBacked by
aml-window-{ruleId}WindowedKeyValueStore<String, Aggregate>accumulateurs par window keychangelog topic compacted
aml-tx-window-{ruleId}WindowedKeyValueStore<String, List<String>>tx_ids par window pour reproductibilitéchangelog topic compacted
aml-baseline-{ruleId}KeyValueStore<String, Aggregate>baseline par accountId (peer-deviation)changelog compacted

Persistence : RocksDB local sur SSD, ~250 MB pour 50 K comptes actifs avec 5 règles actives.

application.id: tx-monitoring-{tenantId}
bootstrap.servers: kafka:9092
num.stream.threads: 4 # parallelism per pod
processing.guarantee: exactly_once_v2 # transactional Kafka write
cache.max.bytes.buffering: 134217728 # 128 MB cache
commit.interval.ms: 5000 # flush every 5s
state.dir: /var/lib/tx-monitoring/state
default.deserialization.exception.handler: io.confluent.kafka.streams.LogAndContinueExceptionHandler
default.production.exception.handler: io.vitakyc.aml.OutboxFallbackHandler
metrics.recording.level: INFO

”≥ 10 transactions de < 9 999 TND sur 7 jours sur le même compte”

val structuring = amlRule {
ruleId = "RULE_STRUCTURING_TND_9999"
typology = STRUCTURING
window = WindowSpec(HOPPING, sizeSeconds = 7 * 86400, stepSeconds = 3600, gracePeriodSeconds = 300)
partitionBy = PartitionKey.ACCOUNT
filter = predicate {
(field("amount") gt 1000) and (field("amount") lt 10000) and (field("direction") eq "DEBIT")
}
aggregate = AggregateExpr.Count()
threshold = ThresholdExpr.CountGt(10)
severity = HIGH
alertTemplate = "aml.structuring.detected"
}

“Volume cumulé sur 24 h dépasse 3× la moyenne 30 j sur ce compte”

val velocity = amlRule {
ruleId = "RULE_VELOCITY_3X_30D"
typology = VELOCITY
window = WindowSpec(HOPPING, sizeSeconds = 86400, stepSeconds = 3600, gracePeriodSeconds = 300)
partitionBy = PartitionKey.ACCOUNT
aggregate = AggregateExpr.Velocity(window2Seconds = 30 * 86400)
threshold = ThresholdExpr.GtPercent(pct = 300.0, baselineWindow = 30 * 86400)
severity = MEDIUM
alertTemplate = "aml.velocity.deviation"
}

“Toute transaction > 50 000 EUR vers un pays high-risk”

val threshold = amlRule {
ruleId = "RULE_HIGH_AMOUNT_HIGH_RISK_COUNTRY"
typology = THRESHOLD
window = WindowSpec(TUMBLING, sizeSeconds = 60) // immediate, single tx
partitionBy = PartitionKey.ACCOUNT
filter = predicate {
(field("amountInRefCurrency") gt 50000) and
(field("counterpartyCountry") inAny listOf("AF","BY","KP","IR","LY","MM","SO","SS","SD","SY","YE","RU"))
}
aggregate = AggregateExpr.Count()
threshold = ThresholdExpr.CountGt(0)
severity = URGENT
alertTemplate = "aml.threshold.high_risk_country"
}

“Crédit reçu suivi de débit ≥ 95 % du montant vers compte tiers dans les 48 h”

val pattern = amlRule {
ruleId = "RULE_PASS_THROUGH"
typology = PATTERN
window = WindowSpec(SESSION, sizeSeconds = 48 * 3600, gracePeriodSeconds = 600)
partitionBy = PartitionKey.ACCOUNT
aggregate = AggregateExpr.Custom(
// pseudocode : detect CREDIT then DEBIT >= 0.95 * CREDIT amount within session
kind = "PASS_THROUGH",
params = mapOf("debitToCreditRatioGt" to 0.95)
)
threshold = ThresholdExpr.CountGt(0)
severity = HIGH
alertTemplate = "aml.pattern.pass_through"
}

“Volume mensuel ≥ 5× la médiane des comptes même profession (ex salarie, retraite)“

val peerDeviation = amlRule {
ruleId = "RULE_PEER_DEVIATION_5X"
typology = PEER_DEVIATION
window = WindowSpec(TUMBLING, sizeSeconds = 30 * 86400)
partitionBy = PartitionKey.ACCOUNT
aggregate = AggregateExpr.Sum("amountInRefCurrency")
// baseline = median of peers grouped by client.profession (joined from CPS)
threshold = ThresholdExpr.GtPercent(pct = 500.0, baselineWindow = 0)
severity = MEDIUM
alertTemplate = "aml.peer.deviation"
}

“Somme des virements internationaux 24 h dépasse 200 000 EUR”

val aggregate = amlRule {
ruleId = "RULE_INT_WIRE_24H_200K"
typology = AGGREGATE
window = WindowSpec(HOPPING, sizeSeconds = 86400, stepSeconds = 3600, gracePeriodSeconds = 300)
partitionBy = PartitionKey.ACCOUNT
filter = predicate { (field("channel") eq "WIRE") and (field("counterpartyCountry") neq tenantCountry) }
aggregate = AggregateExpr.Sum("amountInRefCurrency")
threshold = ThresholdExpr.Gt(200000.0)
severity = HIGH
alertTemplate = "aml.aggregate.int_wires_24h"
}

fun computeAlertId(
ruleId: String,
partitionKey: String,
windowStart: Instant,
triggeringTxIds: List<String>
): String {
val canonical = "$ruleId|$partitionKey|${windowStart.epochSecond}|${triggeringTxIds.sorted().joinToString(",")}"
val digest = MessageDigest.getInstance("SHA-256").digest(canonical.toByteArray(Charsets.UTF_8))
return "alert-" + digest.joinToString("") { "%02x".format(it) }.take(16)
}

Propriétés :

  • Déterministe : même inputs → même alertId. Re-processing après crash = pas de double alert.
  • Reproductible audit : un screening rejoué 5 ans plus tard produit le même alertId.
  • Détection late-arrival : si une tx arrive en retard et change le set des triggeringTxIds, alertId change → nouvelle alerte légitime (l’ancienne reste valide pour son timestamp).

CREATE TABLE alert_outbox (
alert_id VARCHAR(80) PRIMARY KEY,
tenant_id UUID NOT NULL,
payload JSONB NOT NULL,
target_topic VARCHAR(64) NOT NULL,
produced_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
signature TEXT NOT NULL,
CHECK (target_topic IN ('aml.alert.published', 'aml.alert.shadow'))
);
CREATE INDEX idx_alert_outbox_unpublished ON alert_outbox (produced_at)
WHERE published_at IS NULL;
ALTER TABLE alert_outbox ENABLE ROW LEVEL SECURITY;
CREATE POLICY alert_outbox_tenant_isolation ON alert_outbox
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

Un goroutine indépendant (Kotlin coroutine) toutes les 1 s :

suspend fun outboxSweeper() {
while (isRunning) {
val batch = repo.fetchUnpublished(limit = 500)
for (alert in batch) {
val record = ProducerRecord(alert.targetTopic, alert.tenantId, alert.payload)
kafka.send(record).await()
repo.markPublished(alert.alertId)
}
delay(1.seconds)
}
}

Garanties :

  • At-least-once Kafka : si crash entre kafka.send et markPublished, on republie au reboot. Le consumer side dédupe par alertId (idempotence).
  • Pas de double-write : insert outbox + state store dans la même Kafka Streams transaction (processing.guarantee=exactly_once_v2).
  • Failure isolation : sweeper crash n’impacte pas le streams thread.

DRAFT → SHADOW (14j) → backtest 6 mois → dual-control → ACTIVE → ARCHIVED
ÉtapeEffet
DRAFTrègle créée, non évaluée par Kafka Streams
SHADOWrègle évaluée, alertes émises sur aml.alert.shadow (pas vers case-mgmt)
Backtestrejeu sur 6 mois historique, comparaison FP/FN vs règles ACTIVE
Dual-control2 sig Ed25519 (compliance + DSI), justification obligatoire
ACTIVEalertes vers aml.alert.published → case-mgmt
ARCHIVEDnon évaluée, conservée 10 ans audit
class BacktestRunner(
private val tenantId: String,
private val ruleCandidate: AmlRule,
private val historicalTxs: Sequence<CanonicalTx>,
private val historicalAlerts: List<AmlAlert> // alertes confirmées vraies positives par agents
) {
fun run(): BacktestReport {
val emittedAlerts = mutableListOf<AmlAlert>()
val mockTopology = buildSingleRuleTopology(ruleCandidate)
// simulate windowed aggregation in-memory
for (tx in historicalTxs) {
mockTopology.process(tx)
}
emittedAlerts.addAll(mockTopology.flushAlerts())
val truePositives = emittedAlerts.count { it.matchesTruth(historicalAlerts) }
val falsePositives = emittedAlerts.size - truePositives
val falseNegatives = historicalAlerts.count { !emittedAlerts.matches(it) }
return BacktestReport(
ruleCandidate = ruleCandidate,
period = "last_6_months",
txCount = historicalTxs.count(),
alertsEmitted = emittedAlerts.size,
truePositives = truePositives,
falsePositives = falsePositives,
falseNegatives = falseNegatives,
precision = truePositives.toDouble() / emittedAlerts.size,
recall = truePositives.toDouble() / (truePositives + falseNegatives)
)
}
}

Une règle ne peut passer en ACTIVE que si precision ≥ 0.70 ET recall ≥ 0.85 (configurable tenant). Sinon retour DRAFT pour ajustement.


MéthodeEndpointDescription
GET/v1/aml/rules?status=liste rules avec filtres
POST/v1/aml/rulescrée rule DRAFT
PUT/v1/aml/rules/:ruleIdmet à jour DRAFT
POST/v1/aml/rules/:ruleId/publish-shadowpasse à SHADOW
POST/v1/aml/rules/:ruleId/backtestlance backtest async (Temporal)
GET/v1/aml/rules/:ruleId/backtest/:runIdpoll status + report
POST/v1/aml/rules/:ruleId/publish-activeexige 2 sig Ed25519
POST/v1/aml/rules/:ruleId/archivearchive
GET/v1/aml/alerts?accountId=&from=&to=&severity=recherche alertes
GET/v1/aml/alerts/:alertIddétail signed
MéthodeEndpointDescription
GET/v1/aml/streams/healthstatus Kafka Streams (RUNNING, REBALANCING, ERROR)
GET/v1/aml/streams/metricsthroughput, lag, state store size
GET/v1/aml/streams/topologydump topology pour debug

MetricMVP cibleV2 cible
Latence p95 (tx → alert published)≤ 500 ms≤ 100 ms
Throughput≥ 1 000 tx/s/pod≥ 10 000 tx/s/pod
Cold-start (replay changelog)≤ 60 s≤ 20 s
State store par 50 K comptes actifs≤ 250 MB RocksDB≤ 200 MB
Heap JVM4-8 GB4-8 GB
Disponibilité99,5 %99,9 %
Late event drop rate≤ 0,1 %≤ 0,01 %

  • Chiffrement RocksDB au repos (KEK per-tenant, KMS Vault).
  • mTLS entre tx-monitoring-svc et Kafka brokers.
  • RLS Postgres sur alert_outbox, aml_rule, aml_alert_audit.
  • Signature Ed25519 des alertes par tenant key.
  • Dual-control sur publication règle (compliance + DSI).
  • Audit append-only sur changements de règles : aml_rule_audit immutable.
  • PII protection : transactions traitées contiennent montants + counterparty. Logs applicatifs masquent ces champs.
  • Rate limit sur API admin : 100 req/min/agent.

ItemMVP (V0)V2 (S+12)
Typologies6 (structuring, velocity, threshold, pattern, peer, aggregate)+ new beneficiary, structuring across accounts, velocity gradient
ML streamingnonautoencoder anomaly detection avec Flink
Visual rule builderDSL Kotlin uniquementGUI no-code avec preview backtest
Multi-regionsingle regionactive-active multi-region Kafka MirrorMaker
Throughput1 K tx/s10 K tx/s + Flink offload
Real-time peer baselinerecalcul dailycontinuous

  • Topology Kafka Streams 6 typologies opérationnelle
  • 4 règles standard pré-livrées (structuring, velocity, threshold high-risk country, pass-through)
  • Backtest runner sur 6 mois historique avec rapport precision/recall
  • Shadow mode 14 j enforced avant publication
  • Dual-control publication enforced (impossible sans 2 sig)
  • Outbox pattern Postgres → Kafka opérationnel, exactly-once vérifié
  • Audit log signé append-only pour changements règles
  • State stores RocksDB chiffrés au repos
  • Métriques Prometheus exposées (throughput, latency, alert dedup ratio)
  • Dashboards Grafana : streams health + alert breakdown + backtest results
  • Pilote tenant TN-BANQUEX : 50 000 tx/jour, 100 alertes/jour, ≤ 5 % FP
  • Runbook on-call (rebalance long, state store corruption, late event spike)


Document de spec AML streaming engine — version 1.0 (2026-04-27).