Documentation développeur Odoo Odoo2Kafka
November 26, 2025 at 2:48 AMJob extraction de données odoo2kafka
Debezium
La base de données de odoo est une base postgresql. Pour odoo 11, il est possible d’utiliser la version 9.6 de la base. Or, a partir de cette version, des outils internes permettent d’utiliser le projet debezium.io pour extraire des données de table vers kafka.
Connecteur
Debzium fonctionne en principe sur kafka-connect. Kafka-connect est un service distribué dans lequel se deploie (via une api rest) des ‘connecteurs’. Les connecteurs peuvent se connecter à divers type de sauvegarde de données pour soit les extraire pour les produire dans kafka (connecteur SOURCE), soit pour y enregistrer des données provenant de kafka (connecteur SINK). Pour notre besoin, un connecteur jdbc pourrait être suffisant pour extraire les données de postgres, mais il fonctionne sur un systeme de polling des tables, en repose sur le caractère strictment croissant d’une colonne (typiquement une date de modification). N’étant pas responsable du schéma de la base de données, et fonctionnant exclusivement sur le polling, nous nous sommes tourné sur le connecteur postgres de debezium.
Connecteur debezium
Le connecteur postgres de debezium repose sur des mecanismes de streaming interne a postgres, ce qui est moins contraigant que le connecteur jdbc. Pour qu’une table puisse être extraite il est necessaire qu’elle ait une clé primaire. Toutefois il est necessaire que un plugin développé par debezium soit installé dans la base de données (ou plus simplement utiliser le docker debezium basé sur le docker officiel postgres hub).
note : il est possible d’utiliser une base debezium/postgres:9.6 à la place de postgres:9.6. Si l’on n’installe pas une base vide, deux etapes sont necessaire pour que le plugin soit pris en compte :
- sh /docker-entrypoint-initdb.d/init-permissions.sh
- cp /usr/share/postgresql/postgresql.conf.sample /var/lib/postgresql/data/pgdata/postgresql.conf
qui sont normalement faites à l’init de la base
Il est possible ensuite de déployer un connecteur postgres dans une instance de kafka-connect (si la lib java y est intégré, cf docker debezium/connect basé sur le docker officiel de kafka/connect).
Dans notre architecture en microservice, le principe de kafka-connect n’est pas recommendable car le suivi des connecteur au sein de kafka-connect, est séparé de l’instance. Cela impose un mode de suivi via les api rest pour connaitre l’état des connecteurs.
De plus, debezium propose un mode embedded, qui permet de faire fonctionner ses connecteurs hors de kafka-connect.
Mode embedded
Pour profiter de la puissance du connecteur debezium et s’affranchir du mode de déploiement de kafka-connect, nous avons développé un job java pgdbz-exportjob basé sur debezium-embedded, qui par défaut lit dans son fichier properties ExportJob.properties les parametres du connecteur postgres.
Lors d’un déploiement kafka-connect, les properties du connecteur sont par exemple :
name=connector-name
connector.class=io.debezium.connector.postgresql.PostgresConnector
offset.storage=org.apache.kafka.connect.KafkaOffsetBackingStore
offset.storage.topic=odoo.debezium.offset
offset.storage.partitions=1
offset.storage.replication.factor=1
offset.flush.interval.ms=60000
tasks.max=1
database.hostname=172.17.0.1
database.port=5432
database.user=odoo
database.password=***
database.server.id=184055
database.server.name=odoo2kafka
decimal.handling.mode=double
time.precision.mode=connect
database.dbname=odoo
schema.whilelist=public
table.whitelist=public.product_.*
Ces paramètres sont préfixé de debezium. dans le fichier properties. D’autres paramètres existent (par exemple blacklist pour les tables) qu’il est possible d’ajouter pour modifier le comportement du connecteur. (Il est aussi possible d’utiliser la sauvegarde des offset dans un fichier plutot qu’un topic, simplement en modifiant ces parametres).
Ce job d’export est compilé dans une image docker docker-registry.isi/pgdbz-exportjob
Docker pgdbz-exportjob
Properties
Le job et donc le docker pgdbz-exportjob sont générique pour l’export de données postgres (+debezium) vers kafka … il n’est pas spécialisé pour cineges ni pour odoo.
Pour le déploiement particulier, il faut changer les propriétées du job au lancement, ce qui est possible de faire via les variables l’environnement. Les variables de la forme OVERRIDE_* changent ou ajoute des propriétés du fichier .properties lors du lancement du job.
Dans le cas de odoo voici l’extait .yaml de son déploiement :
spec:
containers:
- args:
- ExportJob
env:
- name: TZ
value: Pacific/Noumea
- name: REPLICATION_FACTOR
value: "1"
- name: TASK_OPTS
value: -Xmx512M -XX:MaxPermSize=256M -XX:MaxMetaspaceSize=256M
- name: JAVA_OPTS
value: -Xmx16M -XX:MaxPermSize=256M -XX:MaxMetaspaceSize=256M
- name: OVERRIDE_DEBEZIUM_OFFSET_STORAGE_TOPIC
value: odoo2kafka.offset
- name: OVERRIDE_DEBEZIUM_DATABASE_HOSTNAME
value: postgres
- name: OVERRIDE_DEBEZIUM_DATABASE_PORT
value: "5432"
- name: OVERRIDE_DEBEZIUM_DATABASE_USER
value: odoo
- name: OVERRIDE_DEBEZIUM_DATABASE_PASSWORD
value: P@ssw0rd
- name: OVERRIDE_DEBEZIUM_DATABASE_SERVER_NAME
value: odoo2kafka
- name: OVERRIDE_DEBEZIUM_DATABASE_DBNAME
value: odoo
- name: OVERRIDE_DEBEZIUM_SCHEMA_WHITELIST
value: public
- name: OVERRIDE_DEBEZIUM_TABLE_WHITELIST
value: public.product_.*
image: docker-registry.isi/hickson/pgdbz-exportjob:e1e7418
Ainsi le nom du job, les informations de connexion à la base (OVERRIDE_DEBEZIUM_DATABASE_HOSTNAME, OVERRIDE_… _PORT, …_USER, …_PASSWORD, …_DBNAME) sont renseignable. ainsi que les parametre pour signifier les tables à exporter (OVERRIDE_DEBEZIUM_TABLE_WHITELIST_ , qui est une liste de tables (prefixé du schéma), ou liste de regex pour signifier quelle tables sont exporter … dans l’exemple ci-dessus uniquement les tables commencant par product_ ).
Pour changer les tables à exporter, il suffie de modifier ces variables et de relancer le job.
Autres propriétés importantes
| property | description |
| — | — |
| database.server.name | nom unique à l’origine entre tous connecteur déployé dans kafka connect |
| schema.whitelist | liste separée par virgules de regex pour selectionner les schémas exportées |
| schema.blacklist | liste separée par virgules de regex pour ne pas selectionner les schémas |
| table.whitelist | liste separée par virgules de regex pour selectionner les tables à exporter |
| table.blacklist | liste separée par virgules de regex pour de pas selectionner les tables à exporter |
| column.blacklist | liste separée par virgules de regex pour de pas exporte des colonnes des tables (de la forme schemé.table.colonne ) |
| database.sslmode | avec les autres paramètres ssl, permet une connexion à la base en ssl |
Certain paramètres sont choisi par default par le job exportJob (et différent du défaut choisis de debezium) et sont :
- time.precision.mode=connect : tous les champs date/time/timestamp sont en second après epoch. Dans les autres modes, la représentaion dépend du type de donnée et peut être de la nanoseconde, microseconde, ou second après epoch, mais sans en connaitre précisement quel représentation quand on n’a pas le schéma de données (cf ci-après)
- decimal.handling.mode=double : les type numerique sont repésenté en double plutot qu’en bitarray+num de precision. Il est recommandé de ne pas changer ces propriétés.
note Les connecteurs déployés dans kafka-connect béneficient d’une fontionnalité qui sont les ‘transforms’. Il est possible de définir des transformations des données de sortie avant d’être produit dans son topic kafka. Malheuresement, debezium-embedded n’embarque pas cette fonctionnalité. Pour export Job, nous avons choisit d’utiliser une transformation mais en dur.
Creation des topics de sortie
Les topics de sorties sont crées s’ils n’existent pas encore. Il est donc important de présenter dans le fichier properties (ou via les variables d’environnements) les propriétés suivantes
topic.create.partitions=1
topic.create.replication.factor=1
Toute autre configuration de topic peut être décrite dans le fichier de properties prefixé de ‘topic.create.’ exemple :
topic.create.cleanup.policy=compact
Format de sortie
Debezieum-embedded ne fournit comme fonctionnalité que la lecture de la base de données (en fonction des parametres de connexion, des offsets, etc.) et delègue la manipulation des messages dans le reste du code, ie exportJob.
C’est bien notre code qui fait le choix de l’écriture dans kafka, et du format de sortie (en fonction du format du message fourni).
La version d’exportjob documentée ici ne permet que l’ecriture au **format json. ** La clé du message est celle générée par le connecteur ie la clé primaire de la table ; la valeur du message est uniquement les nouvelles valeurs des colonnes de la table exporté (le message par défaut peut proposer l’ancienne valeur ainsi que la nouvelle valeur / lors d’un update … Quand les tables sont exportées pour la première fois (snapshot) les précédentes valeurs ne sont de toutes façons pas disponibles). Le schéma de données, produit par défautl par kafka-connect, et par debezium-embedded est volontairement retiré.
exemple :
{
"id": 34,
"name": "Gold Membership",
"sequence": 1,
"type": "service",
"rental": false,
"categ_id": 2,
"list_price": 180,
"volume": 0,
"weight": 0,
"sale_ok": true,
"purchase_ok": true,
"uom_id": 1,
"uom_po_id": 1,
"company_id": 1,
"active": true,
"create_date": 1516379219863,
"write_date": 1516379682657,
"service_type": "manual",
"invoice_policy": "order",
"responsible_id": 1,
"sale_delay": 0,
"tracking": "none",
"location_id": null,
"available_in_pos": false,
"purchase_method": "receive",
}
Voyez les dates qui sont en seconde, et les type numeric qui sont bien ecrit en double.
Si le format de donnée n’est pas satisfaisant, il faut faire évoluer le job pour offir de nouvelles fonctonnalités.