on 04-13-2026 05:43 AM
Incorta introduced log-based incremental load enabling reliable incremental loading for inserts and updates without requiring specific columns like LAST_UPDATED_TIMESTAMP column or a column with a monotonically increasing maximum value , while also eliminating performance impact on source systems.
Connector version 2.2.5.0 introduces schema registry integration with Apicurio, fundamentally improving incremental data ingestion performance by externalizing schema definitions from individual Kafka messages to a schema registry service, eliminating redundant schema metadata embedded in each message.
To use the log-based incremental load, do the following:
|
Software |
Version |
|
Incorta |
2025.7.6 |
|
Oracle Database |
Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production |
|
Kafka |
3.9.2 |
|
Debezium |
3.4 (latest stable build) |
|
Apicurio Registry |
2.6.13 |
Login to CDB, verify if the logging is enabled.
If not ARCHIVELOG mode, then enable the logging similar to following steps.
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/u01/app/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should now "Database log mode: Archive Mode"
archive log list
Debezium needs a user that can "see" across the container to mine the logs. Execute this in the CDB$ROOT connection.
Create logminer table space from CDB$ROOT connection
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/MORAL/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
Create logminer table space from PDB connection
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/MORAL/MORALPDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
Create the common user using CDB$ROOT. The user name should start with “c##”
CREATE USER c##dbzuser IDENTIFIED BY <<password>> DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
Grant following permissions to the user.
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$MYSTAT TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$STATNAME TO c##dbzuser CONTAINER=ALL;
ALTER TABLE incorta_apps.inventory_sample ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
GRANT READ ANY TABLE TO c##dbzuser;
Organize the downloaded folders as follows.
cdc_root/
├── kafka/
│ ├── config/
│ │ ├── zookeeper.properties
│ │ ├── server.properties
│ │ ├── connect-standalone.properties
│ │ ├── oracle-pdb.properties
│ │ └── (other configs)
│ ├── lib/
│ │ └── ojdbc11-21.15.0.0.jar
│ ├── plugins/
│ │ ├── apicurio-registry-converter/
│ │ │ ├── apicurio-*.jar
│ │ │ ├── ojdbc11-21.15.0.0.jar
│ │ │ └── (dependencies)
│ │ └── debezium-connector-oracle/
│ │ ├── debezium-*.jar
│ │ └── (dependencies)
│ └── (other kafka files)
└── apicurio-server/
└── (apicurio files)
OJDBC 11 jar is required with debezium oracle connector 3.4. “ojdbc11-21.15.0.0.jar” is available under the package. If it is not available then we can download it from appropriate channels
https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc11/23.2.0.0/ojdbc11-23.2.0.0.jar
The OJDBC jar should be placed under following directories
name=oracle-pdb-connector
connector.class=io.debezium.connector.oracle.OracleConnector
# Connection - Use the CDB credentials but point to the PDB
database.hostname=DBHOST
database.port=DBPORT
database.user=c##dbzuser
database.password=password
database.dbname=MORAL
database.pdb.name=MORALPDB
# LogMiner Setup
database.connection.adapter=logminer
log.mining.strategy=online_catalog
topic.prefix=ora_standalone
tasks.max=1
# Schema History (Required for DDL tracking)
schema.history.internal.kafka.topic=schema-history.oracle
schema.history.internal.kafka.bootstrap.servers=localhost:9092
For using apicurio registry, we have to update the connect-standalone.properties, oracle-pdb.properties with additional properties.
connect-standalone.properties
# --- Default json Converters ---
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
# --- Apicurio Avro Converters ---
key.converter=io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.url=http://localhost:9099/apis/registry/v2
key.converter.apicurio.registry.auto-register=true
value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url=http://localhost:9099/apis/registry/v2
value.converter.apicurio.registry.auto-register=true
value.converter.apicurio.registry.find-latest=false
# Skip corrupted/missing schema records
skip.corrupted.records=true
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
oracle-pdb.properties
schema.name.adjustment.mode=avro
decimal.handling.mode=string
binary.handling.mode=base64
Services should be started in the following order
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &nohup java -Dquarkus.http.port=9099 \
-Dregistry.storage.kind=kafkasql \
-Dregistry.kafkasql.bootstrap.servers=BrokerIP:9092 \
-Dregistry.kafkasql.topic=apicurio-registry-storage \
-jar apicurio-registry-storage-kafkasql-2.6.13.Final-runner.jar > registry.log 2>&1 &nohup bin/connect-standalone.sh config/connect-standalone.properties config/oracle-pdb.properties > connect.log 2>&1 &Create a connection to Oracle DB
Create a table using this connection as follows
After we validate the source sql query, two additional columns added by Incorta can be noticed.
Perform full load and verify the results. Full load uses the oracle jdbc connection defined in the connector.
Incremental loads use the Kafka topics to read new incoming records.
INSERT INTO inventory_sample (name, quantity) VALUES ('Ipad26', 70);
COMMIT;
UPDATE inventory_sample
SET quantity = 170
WHERE ID = 28;
DELETE FROM inventory_sample
WHERE id = 28;
COMMIT;
Debezium Connector for Oracle
https://debezium.io/documentation/reference/stable/connectors/oracle.html
Avro Serialization
https://debezium.io/documentation/reference/stable/configuration/avro.html
Incorta Documentation
https://docs.incorta.com/latest/concepts-log-based-incremental-load/