Analyser les changements avec Debezium et Kafka Streams

Le Change Data Capture (CDC) est un excellent moyen d'introduire l'analyse en continu dans votre base de données existante, et l'utilisation de Debezium vous permet d'envoyer vos données de changement via Apache Kafka®.
Bien que la plupart des systèmes CDC vous donnent deux versions d'un enregistrement, comme c'était le cas avant et après le changement, il peut être difficile de travailler avec, si vous maintenez une réplique ailleurs. Le connecteur CDC Debezium MongoDB vous donne juste les changements, enregistrement par enregistrement, qui vous permettent de faire exactement ce que vous désirez, surtout si le delta de changement lui-même a une valeur analytique. Cet article de blog examine comment combiner les flux et les tables Kafka pour maintenir une réplique dans Kafka et comment adapter l'enregistrement de sortie d'un flux.

Imaginez que vous ayez une application qui collecte les prix des articles que vous avez ajoutés à une liste de souhaits. Vous fixez un prix cible que vous êtes prêt à payer et vous ajoutez des liens vers quelques magasins différents qui proposent l'article. Vous stockez ensuite chaque article comme un document dans une collection MongoDB, comme celle-ci :

{
  "item": "Software Engineering at Google",
  "target_prize": 20,
  "store": "amazon.co.uk",
  "price": 31.99,
  "last_check": "2020-05-24T16:05:43Z",
  "url":  "https://www.amazon.co.uk/dp/1492082791/"
}

L'application peut vérifier si un nouveau prix dépasse le prix indicatif, mais vous vous rendez compte que vous n'avez pas suivi les changements au fil du temps. Heureusement, en arrière-plan, MongoDB garde une trace de chaque changement dans le cadre de sa capacité de réplication. Vous pouvez même aller jusqu'à considérer qu'il s'agit d'un flux continu de changements.

Modifier la saisie de données avec Debezium
Debezium est une collection de connecteurs Kafka Connect pour différentes bases de données. En tant que plugin Kafka Connect, Debezium nécessite le téléchargement du connecteur CDC Debezium MongoDB et son ajout au plugin.path de Connect. Une fois que le plugin est présent, vous pouvez configurer un connecteur pour la collection MongoDB :

echo '{
  "name": "product-source",
  "config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "MONGO_REPLICA_SET/MONGO_HOST:MONGO_PORT",
"mongodb.name": "mongocdc",
"mongodb.ssl.enabled": false,
"mongodb.user": "MONGO_USERNAME",
"mongodb.password": "MONGO_PASSWORD",
"collection.whitelist": "wishlist.product",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}' | curl -X POST -d @- http://KAFKA_CONNECT_HOST:8083/connectors --header "Content-Type:application/json"

Cette configuration permet de choisir la collection de produits dans la base de données des listes de souhaits, mais vous pouvez également utiliser des jokers pour choisir plusieurs collections. Le connecteur se mettra au travail et commencera à envoyer des documents à un sujet qu'il a créé, appelé mongocdc.wishlist.product. Si vous regardez le nouveau sujet ci-dessous, vous pouvez voir le document à l'intérieur de l'enveloppe de transport de Debezium :

{
   "Schema":{
      "type": "struct",
      "fields": [
         {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Json",
            "version": 1,
            "field": "after"
         },
         ...
         {
            "type": "struct",
            "fields": [...],
            "optional": true,
            "field": "transaction"
         }
      ],
      "optional": false,
      "name": "mongocdc.wishlist.product.Envelope"
   },
   "payload": {
      "after": "{"_id": {"$oid": "5ee39875c94bffb83b95288e"},"item": "Software Engineering at Google","target_prize": 20,"store": "amazon.co.uk","price": 31.99,"last_check": "2020-05-24T16:05:43Z","url": "https://www.amazon.co.uk/dp/1492082791/"}",
      "patch": null,
      "filter": null,
      "source": {
         "version": "1.1.2.Final",
         "connector": "mongodb",
         "name": "mongocdc",
         "ts_ms": 1591974225000,
         "snapshot": "last",
         "db": "wishlist",
         "rs": "mongo-replica-set",
         "collection": "product",
         "ord": 1,
         "h": 0,
         "tord": null
      },
      "op": "r",
      "ts_ms": 1591974232360,
      "transaction": null
   }
}

L'enveloppe se compose d'un schéma (en abrégé ici) et d'une charge utile, dans laquelle on peut voir une propriété postérieure qui contient le document complet sous forme de chaîne. Vous êtes maintenant opérationnel au moins jusqu'à ce que vous rencontriez votre premier changement.

{
   "schema": {...},
   "payload": {
      "after": null,
      "patch": "{"$v": 1,"$set": {"last_check": {"$date": 1591974366044},"price": 24.9}}",
      "filter": "{"_id": {"$oid": "5ee39875c94bffb83b95288e"}}",
      "source": {
         "version": "1.1.2.Final",
         "connector": "mongodb",
         "name": "mongocdc",
         "ts_ms": 1591974366000,
         "snapshot": "false",
         "db": "wishlist",
         "rs": "mongo-replica-set",
         "collection": "product",
         "ord": 1,
         "h": 0,
         "tord": null
      },
      "op": "u",
      "ts_ms": 1591974365467,
      "transaction": null
   }
}

Voici le défi. Contrairement aux connecteurs Debezium CDC pour les autres bases de données, vous n'obtenez qu'un patch. Vous pourriez écrire à vos consommateurs pour qu'ils s'en occupent, mais cela aurait deux conséquences négatives. Premièrement, chaque consommateur devrait lire l'ensemble du sujet afin d'obtenir la version initiale complète du document et tous les changements qui ont eu lieu jusqu'à présent. Deuxièmement, vous auriez une double logique pour gérer la fusion de la séquence des patchs.

Kafka Connect a la capacité de spécifier des transformateurs, dont certains sont inclus dans Debezium ; cependant, vous avez besoin de plus qu'une transformation - vous avez besoin d'un moyen de maintenir l'état entre les changements pour chaque document de la collection. Heureusement, vous pouvez résoudre ces deux problèmes en utilisant Kafka Streams. Avant d'examiner la solution de plus près, vous trouverez ci-dessous un bref aperçu de certains concepts clés de Kafka Streams.

Modernisez vos plateformes Data

Les flux et les tables avec Kafka Streams
Kafka Streams propose deux résumés différents en plus des sujets, flux et tableaux habituels de Kafka. La meilleure façon de les distinguer est d'identifier l'objectif qu'ils servent pour les données qui y sont contenues. Un flux est une séquence d'événements qui doit généralement être traitée dans un certain ordre, représentant une évolution élément par élément de l'ensemble des données. Un tableau, en revanche, est un instantané de l'ensemble des données à un moment donné.

Il existe une interaction intéressante entre les flux et les tableaux : Le traitement de l'un produit l'autre. Par exemple, si vous accumulez tous les événements jusqu'à un moment donné, vous produisez un tableau. Si vous émettez ensuite chaque modification de ce tableau, vous produisez un nouveau flux. C'est ce jeu qui résout les deux défis - d'une pierre deux coups, comme on dit.

Avec Debezium, les modifications apportées à la table MongoDB sont émises dans un sujet représentant un flux de modifications. Avec Kafka Streams, vous les accumulez dans une table en appliquant chaque patch au fur et à mesure qu'ils arrivent, et lorsque la table change, elle émet l'enregistrement complet sous la forme d'un nouveau flux. Cela signifie que les nouveaux consommateurs peuvent commencer à lire le flux fusionné à tout moment car il contiendra toujours des enregistrements complets, et les consommateurs n'ont pas besoin de maintenir leur propre état de document ou leur propre logique de fusion.

Mettre en œuvre les flux Kafka pour Debezium
Il est assez facile d'écrire une application Kafka Streams par rapport aux connecteurs et aux consommateurs. Une certaine configuration est fournie, comme par exemple l'emplacement de Kafka, et vous pouvez utiliser un StreamsBuilder pour créer le flux (KStream) et la table (KTable). En utilisant une syntaxe de type SQL, vous pouvez manipuler les données pour atteindre votre objectif et enfin commencer l'exécution.

public static void main(String[] args) {
   String source = "mongocdc.wishlist.product";
   String dest = "product";
   String kafka = "localhost:9092";
   Properties props = new Properties();
   props.put(StreamsConfig.APPLICATION_ID_CONFIG, source + "-cdc");
   props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
   props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
             Serdes.Bytes().getClass()
   );
   props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
             Serdes.Bytes().getClass()
   );
   StreamsBuilder builder = new StreamsBuilder();
   KStream<Bytes, Bytes> cdc = builder.stream(source);
   KTable<Bytes, Bytes> table = builder.table(dest);
   cdc.leftJoin(table, (left, right) -> merge(right, left))
      .groupByKey()
      .reduce((agg, bytes) -> bytes, Materialized.as(dest + ".table"))
      .toStream().to(dest);
   KafkaStreams streams = new KafkaStreams(builder.build(), props);
   streams.start();
}

La plus grande partie du code est une configuration assez simple ; cependant, les quatre lignes de syntaxe de type SQL sont l'endroit où tout le travail se passe et méritent quelques explications supplémentaires.

  cdc.leftJoin(table, (left, right) -> merge(right, left))
      .groupByKey()
      .reduce((agg, bytes) -> bytes, Materialized.as(dest + ".table"))
      .toStream().to(dest);
     

Vous pouvez prendre le KStream, cdc ; effectuer une jointure à gauche avec la KTable, table ; et appliquer votre propre fonction de merge (expliquée ci-dessous) pour chaque ligne jointe. Une jointure à gauche est nécessaire car vous recevrez de nouveaux documents qui n'existent pas encore dans le tableau, qui est le côté droit de votre jointure. Pour qu'un enregistrement dans le flux corresponde à une ligne du tableau, la clé du sujet sous-jacent est utilisée.

Le résultat de la jointure et du merge (fusion) est le document complet avec le patch appliqué, mais celui-ci n'existe actuellement que dans votre application. Afin d'enregistrer vos modifications, regroupez les données par clé à des fins de partitionnement et réduisez le groupe afin de matérialiser le flux sous forme de tableau. Vous pouvez ensuite envoyer les résultats dans un nouveau flux.

La fonction de merge (fusion) est spécifique à votre source de données, le changelog de MongoDB traité par Debezium. Comme l'ont montré les événements de Debezium plus tôt, vous recevrez un document JSON avec les données brutes de MongoDB contenues dans une enveloppe riche en métadonnées. L'élément clé est la payload (charge utile), car elle contient à la fois les données réelles de MongoDB et l'opération (op) qui s'est produite dans MongoDB pour provoquer cet événement de données.

static Bytes merge(Bytes oldValue, Bytes newValue, UpdateStrategy strategy) {
   try {
       if (newValue == null) {
           return null;
       }
       ObjectMapper mapper = new ObjectMapper();
       JsonNode newjson = mapper.readTree(
               new String(newValue.get(), StandardCharsets.UTF_8)
       );
       JsonNode payload = newjson.get("payload");
       String data = null;
       switch (payload.get("op").asText()) {
           case "c":
           case "r":
               data = payload.get("after").asText();
               break;
           case "u":
               data = update(
                       new String(oldValue.get(), StandardCharsets.UTF_8),
                       payload.get("patch").asText()
               );
               break;
           case "d":
               data = null;
               break;
       }
       if (data != null) {
           return new Bytes(data.getBytes(StandardCharsets.UTF_8));
       } else {
           return null;
       }
   } catch (IOException e) {
       e.printStackTrace();
       return null;
   }
}

L'opération, op, prend l'une des quatre valeurs suivantes. Les opérations c (créer) et r (lire) sont toutes deux traitées de la même manière, c'est-à-dire comme un nouveau document complet contenu dans l'élément after. d (supprimer) est également facile. Il suffit de renvoyer null pour supprimer le document. u (update) nécessite plus de travail pour appliquer le correctif, comme le montre la fonction update (mise à jour) ci-dessous.

static String update(String oldjson, String patch) throws IOException {
   ObjectMapper mapper = new ObjectMapper();
   ObjectNode after = (ObjectNode) mapper.readTree(oldjson);
   JsonNode json = mapper.readTree(patch);
   if (json.has("$set")) {
       JsonNode set = json.get("$set");
       for (Iterator<String> it = set.fieldNames(); it.hasNext(); ) {
           String key = it.next();
           after.put(key, tidyValue(set.get(key)));
       }
   }
   if (json.has("$unset")) {
       JsonNode unset = json.get("$unset");
       for (Iterator<String> it = unset.fieldNames(); it.hasNext(); ) {
           String key = it.next();
           after.remove(key);
       }
   }
   return after.toString();
}

Modernisez vos plateformes Data

La valeur du patch provient de MongoDB lui-même et varie en fonction de la version utilisée, en l'occurrence, 4.2. Il peut contenir deux listes de paires clé/valeur, l'une appelée $set et l'autre $unset. Avec $set, assurez-vous simplement qu'il y a une clé dans votre document qui est définie à la valeur contenue dans le patch. Pour $unset, il vous suffit de supprimer la clé si elle est toujours présente dans votre document.

Une dernière chose à régler est la façon dont un horodatage de MongoDB arrive dans le patch. Vous avez peut-être remarqué que dans le document initial, il y avait un format date/heure ISO8601, mais le patch contient une heure d'époque codée par $date. La méthode tidyValue supervise la conversion en une date/heure ISO8601 afin que vos consommateurs reçoivent un format cohérent.

static String tidyValue(JsonNode value) {
   if (value.has("$date")) {
       Date date = new Date(Long.parseLong(value.get("$date").asText()));
       return sdf.format(date);
   } else return value.asText();
}

Debezium connecté à Kafka Streams
Pour faire fonctionner l'application Kafka Streams, vous devez faire deux choses. Premièrement, il faut créer un thème ayant une valeur de produit. Vous pouvez ensuite utiliser Apache Maven pour compiler et exécuter votre application.

mvn compile exec:java \
  -Dexec.mainClass=com.github.gh_mlfowler.mongocdcdemo.MongoCDCKStream \
  -Dexec.cleanupDaemonThreads=false

Les consommateurs de votre sujet de destination recevront désormais le document complet sans enveloppe Debezium :

{
   "item":" Software Engineering at Google”,
   "price": 30.99,
   "_id": {
      "$oid": "5ecd8edd9b47ccc632c1aed8"
   },
   "store": "amazon.co.uk",
   "Last_check": “2020-05-24T17:05:43”,
   "target_prize": 20,
   "url":"https://www.amazon.co.uk/dp/1492082791/"
}

En apportant quelques modifications à votre logique de fusion, vous pouvez en fait émettre les versions avant et après (parfois aussi appelées anciennes et nouvelles) plus typiques des autres systèmes CDC. Vous devez apporter une petite modification à l'énoncé du commutateur op pour placer le document initial dans un élément after dans le document résultant :

case "r":
   JsonNode after = mapper.readTree(payload.get("after").asText());
   ObjectNode out = mapper.createObjectNode();
   out.set("after", after);
   data = out.toString();
   break;

Pour chaque nouvelle mise à jour, prenez le précédent after et mettez-le dans un élément before. Appliquez ensuite les $set et $unset comme auparavant à ce qui sera maintenant stocké dans l'élément after.

static String update(String oldjson, String patch) throws IOException {
   ObjectMapper mapper = new ObjectMapper();
   ObjectNode out = (ObjectNode) mapper.readTree(oldjson);
   out.set("before", out.get("after"));
   ObjectNode after = out.get("after").deepCopy();
   JsonNode json = mapper.readTree(patch);
   if (json.has("$set")) {...}
   if (json.has("$unset")) {...}
   out.set("after", after);
   return out.toString();
}

Une fois que vous aurez commencé à traiter les mises à jour d'un document initial, vos consommateurs verront les documents :

{
   "before":{
      "item":"Software Engineering at Google",
      "price":31.99,
      "_id":{
         "$oid":"5ecd8edd9b47ccc632c1aed8"
      },
      "store":"amazon.co.uk",
      "last_check":"2020-05-24T16:05:43",
      "target_prize":20,
      "url":"https://www.amazon.co.uk/dp/1492082791/"
    },
   "after":{
      "item":"Software Engineering at Google",
      "price":30.99,
      "_id":{
         "$oid":"5ecd8edd9b47ccc632c1aed8"
      },
      "store":"amazon.co.uk",
      "last_check":"2020-05-24T17:05:43",
      "target_prize":20,
      "url":"https://www.amazon.co.uk/dp/1492082791/"
   }
}

Mais pourquoi s'arrêter là ? Pourquoi ne pas introduire une troisième version, le delta ? Bien que le code ne soit pas montré ici, vous pouvez être très créatif dans ce que vous générez en traitant les éléments $set. Par exemple, vous pourriez utiliser le type d'informations fournies par l'enveloppe Debezium pour mettre en œuvre le traitement par type.

{
   "before":{
      ...
    },
   "delta":{
      "price":  -1,
      "last_check":"PT1H",
   }
   "after":{
      ...
   }
}

En résumé
Si vous souhaitez poursuivre vos expériences, consultez l' environnement de démonstration utilisant Vagrant, disponible sur GitHub. Chacune des trois stratégies de mise à jour est mise en œuvre, ce qui vous permet de jouer avec les résultats. Un simple script modifie un document dans MongoDB toutes les minutes, ce qui permet d'observer de nombreux changements.

En combinant Debezium et Kafka Streams, vous pouvez enrichir les données de MongoDB avec l'état historique du document pour produire des documents complets pour une consommation ultérieure. En utilisant les métadonnées d'enveloppe de Debezium, vous pouvez accéder aux versions avant et après typiques que d'autres systèmes CDC génèrent ainsi qu'à la version delta, qui dans certains scénarios peut être tout ce dont vous avez besoin.

Pour découvrir d'autres connecteurs Kafka, n'hésitez pas à visiter le Hub Confluent.

Cet article a été initialelemnt publié sur le Hub Confluent. Retrouvez-le sur ce lien.

Lire aussi : Automatisation Azure HDInsight et EventGrid

Modernisez vos plateformes Data