cancel
Showing results for 
Search instead for 
Did you mean: 
bikash_rout
Employee
Employee

 

Overview

Incorta introduced log-based incremental load enabling reliable incremental loading for inserts and updates without requiring specific columns like LAST_UPDATED_TIMESTAMP column or a column with a monotonically increasing maximum value , while also eliminating performance impact on source systems.

Connector version 2.2.5.0 introduces schema registry integration with Apicurio, fundamentally improving incremental data ingestion performance by externalizing schema definitions from individual Kafka messages to a schema registry service, eliminating redundant schema metadata embedded in each message.

bikash_rout_0-1775759167737.png

Prerequisites

To use the log-based incremental load, do the following:

  1. Install and configure Apache Kafka and Kafka Connect.
  2. Set up the source database so that Debezium connector can capture the transaction change.
  3. Configure the Debezium connector. Debezium is an open-source distributed platform for Change Data Capture (CDC). Check this link for Oracle side config.
  4. Disable snapshot while configuring Debezium.
  5. Ensure that the Debezium connector is configured to send data types to Incorta
  6. Log-based incremental load supports only database physical tables with primary keys.
  7. Configure the Apicurio schema registry before enabling the feature.
  8. Configure the Debezium connector as the message format is Avro

Tech Stack

 

Software

Version

Incorta

2025.7.6

Oracle Database

Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production

Kafka

3.9.2

Debezium

3.4 (latest stable build)

Apicurio Registry

2.6.13

 

Changes in Oracle Database

Enable Archivelog & Supplemental Logging

Login to CDB, verify if the logging is enabled.

bikash_rout_1-1774864188742.png

If not ARCHIVELOG  mode, then enable the logging similar to following steps.

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/u01/app/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should now "Database log mode: Archive Mode"
archive log list

 

bikash_rout_2-1774864188743.png
bikash_rout_3-1774864188772.png

 

bikash_rout_4-1774864188751.png

Enable Minimal Supplemental Logging:

This ensures the redo logs contain enough information to identify changed rows.
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
 
bikash_rout_5-1774864188755.png

Create the Debezium Common User

Debezium needs a user that can "see" across the container to mine the logs. Execute this in the CDB$ROOT connection.

Create logminer table space from CDB$ROOT connection

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/MORAL/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

 

bikash_rout_6-1774864188756.png

 

bikash_rout_7-1774864188757.png

Create logminer table space from PDB connection 

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/MORAL/MORALPDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

bikash_rout_8-1774864188757.png

Create the common user using CDB$ROOT. The user name should start with “c##”

CREATE USER c##dbzuser IDENTIFIED BY <<password>> DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;

bikash_rout_9-1774864188758.png

Grant following permissions to the user.

GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL; 
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL; 
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; 
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL; 
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL; 

GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL; 
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL; 
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL; 

GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL; 
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL; 

GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL; 

GRANT SELECT ON V_$MYSTAT TO c##dbzuser CONTAINER=ALL; 
GRANT SELECT ON V_$STATNAME TO c##dbzuser CONTAINER=ALL;

 

bikash_rout_10-1774864188759.png

Enable logging of all columns, if required

ALTER TABLE incorta_apps.inventory_sample ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

bikash_rout_11-1774864188759.png

GRANT READ ANY TABLE TO c##dbzuser;

bikash_rout_12-1774864188760.png

Kafka

Download kafka 3.1+ from url https://downloads.apache.org/kafka/3.9.2/kafka_2.13-3.9.2.tgz
 
bikash_rout_13-1774864188760.pngUnarchive the downloaded file as follows
$tar -xvf kafka_2.13-3.9.2.tgzbikash_rout_14-1774864188772.png
Verify the kafka connect binaries are available as part of the distribution 
bikash_rout_15-1774864188762.png

Apicurio Registry

Apicurio registry server:
Apicurio connect converter distribution package:
bikash_rout_17-1774864188763.png
bikash_rout_18-1774864188764.png

Organize folders

Organize the downloaded folders as follows.

cdc_root/
├── kafka/
│   ├── config/
│   │   ├── zookeeper.properties
│   │   ├── server.properties
│   │   ├── connect-standalone.properties
│   │   ├── oracle-pdb.properties
│   │   └── (other configs)
│   ├── lib/
│   │   └── ojdbc11-21.15.0.0.jar
│   ├── plugins/
│   │   ├── apicurio-registry-converter/
│   │   │   ├── apicurio-*.jar
│   │   │   ├── ojdbc11-21.15.0.0.jar
│   │   │   └── (dependencies)
│   │   └── debezium-connector-oracle/
│   │       ├── debezium-*.jar
│   │       └── (dependencies)
│   └── (other kafka files)
└── apicurio-server/
    └── (apicurio files)

OJDBC Jar

OJDBC 11 jar is required with debezium oracle connector 3.4. “ojdbc11-21.15.0.0.jar” is available under the package. If it is not available then we can download it from appropriate channels

https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc11/23.2.0.0/ojdbc11-23.2.0.0.jar

The OJDBC jar should be placed under following directories

  • kafka/lib
  • kafka/plugins/debezium-connector-oracle

Configurations

  • Edit the kafka zookeeper.properties to run on 2182 port ( to avoid conflict with incorta zookeeper )
  • Edit the kafka server.properties to update zookeeper.connect property
  • Edit the connect-standalone.properties as follows
    • plugin.path=/incorta/sbux_cdc/kafka_2.13-3.9.2/plugins
  • create the oracle-pdb.properties file as follows
name=oracle-pdb-connector
connector.class=io.debezium.connector.oracle.OracleConnector

# Connection - Use the CDB credentials but point to the PDB
database.hostname=DBHOST
database.port=DBPORT
database.user=c##dbzuser
database.password=password
database.dbname=MORAL
database.pdb.name=MORALPDB

# LogMiner Setup
database.connection.adapter=logminer
log.mining.strategy=online_catalog
topic.prefix=ora_standalone
tasks.max=1

# Schema History (Required for DDL tracking)
schema.history.internal.kafka.topic=schema-history.oracle
schema.history.internal.kafka.bootstrap.servers=localhost:9092

For using apicurio registry, we have to update the connect-standalone.properties, oracle-pdb.properties with additional properties.

connect-standalone.properties

# --- Default json Converters ---
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter

# --- Apicurio Avro Converters ---
key.converter=io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.url=http://localhost:9099/apis/registry/v2
key.converter.apicurio.registry.auto-register=true

value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url=http://localhost:9099/apis/registry/v2
value.converter.apicurio.registry.auto-register=true
value.converter.apicurio.registry.find-latest=false

# Skip corrupted/missing schema records
skip.corrupted.records=true

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

oracle-pdb.properties

schema.name.adjustment.mode=avro
decimal.handling.mode=string
binary.handling.mode=base64

Starting Services

Services should be started in the following order

  1.  Kafka Zookeeper
    nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
  2. Kafka Server
    nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
  3. Apicurio Registry (if needed)
    nohup java -Dquarkus.http.port=9099 \
    -Dregistry.storage.kind=kafkasql \
    -Dregistry.kafkasql.bootstrap.servers=BrokerIP:9092 \
    -Dregistry.kafkasql.topic=apicurio-registry-storage \
    -jar apicurio-registry-storage-kafkasql-2.6.13.Final-runner.jar > registry.log 2>&1 &
  4. Kafka Connect
    nohup bin/connect-standalone.sh config/connect-standalone.properties config/oracle-pdb.properties > connect.log 2>&1 &

Incorta

Create a connection to Oracle DB

bikash_rout_19-1774864188766.png
bikash_rout_20-1774864188767.png

Create a table using this connection as follows

bikash_rout_21-1774864188767.png
bikash_rout_22-1774864188768.png

After we validate the source sql query, two additional columns added by Incorta can be noticed.

  • Reserved_incorta_extraction_timestamp: system column added to all incorta tables
  • Reserved_incorta_is_deleted: system column added to cdc enabled tabled

Perform full load and verify the results. Full load uses the oracle jdbc connection defined in the connector.

bikash_rout_23-1774864188768.png

Incremental loads use the Kafka topics to read new incoming records.

INSERT INTO inventory_sample (name, quantity) VALUES ('Ipad26', 70);
COMMIT;

bikash_rout_24-1774864188769.png

UPDATE inventory_sample
SET quantity = 170
WHERE ID = 28;

bikash_rout_25-1774864188769.png

DELETE FROM inventory_sample
WHERE id = 28;
COMMIT;

bikash_rout_26-1774864188770.png

References

Debezium Connector for Oracle
https://debezium.io/documentation/reference/stable/connectors/oracle.html

Avro Serialization
https://debezium.io/documentation/reference/stable/configuration/avro.html

Incorta Documentation
https://docs.incorta.com/latest/concepts-log-based-incremental-load/

Best Practices Index
Best Practices

Just here to browse knowledge? This might help!

Contributors
Version history
Last update:
‎04-10-2026 12:32 PM
Updated by: