Oracle同步数据到kafka的方法

目录
  • 环境准备
    • 软件准备
    • 下载地址
  • 实施过程
    • oracle主机(a)配置
    • kafka主机(b)配置
    • 配置apache-maven工具
    • 配置kafka 2.13-2.6.0
    • 配置kafka-connect-oracle-maste
    • 启动kafka-connect-oracle
    • 启动kafka消费者
    • 启动数据库job

环境准备

软件准备

  • centos linux 7.6.1810 (2台,a主机,b主机)
  • oracle 11.2.0.4(a主机安装)
  • kafka 2.13-2.6.0 (b主机安装)
  • kafka-connect-oracle-master (b主机安装,开源程序,用于同步oracle数据到kafka)
  • apache-maven 3.6.3 (b主机安装,kafka-connect-oracle-master 的打包工具)
  • jdk-8u261-linux-x64.rpm (b主机安装)

下载地址

  • cenos 和oracle 和 jdk 自行到官网下载
  • kafka http://kafka.apache.org/downloads

  • kafka connect oracle    https://github.com/tianxiancode/kafka-connect-oracle
  • apache-maven 3.6.3 http://maven.apache.org/download.cgi

实施过程

oracle主机(a)配置

oracle实例配置项:

  • 开启归档日志
  • 开启附加日志
  • 创建kafka-connect-oracle-master连接用户
  • 创建测试数据生成用户及测试表
--开启归档日志
sqlplus / as sysdba    
sql>shutdown immediate
sql>startup mount
sql>alter database archivelog;
sql>alter database open;
--开启附加日志
sql>alter database add supplemental log data (all) columns;
--创建kafka-connect-oracle-master连接用户
create role logmnr_role;
grant create session to logmnr_role;
grant  execute_catalog_role,select any transaction ,select any dictionary to logmnr_role;
create user kminer identified by kminerpass;
grant  logmnr_role to kminer;
alter user kminer quota unlimited on users;
--创建测试数据生成用户及测试表
create tablespace test_date datafile '/u01/app/oracle/oradata/zzsrc/test_date01.dbf' size 100m autoextend on next 10m;
create user whtest identified by whtest default tablespace test_date;
grant connect,resource to whtest;
grant execute on dbms_scheduler to whtest;
grant execute on dbms_random to whtest;
grant   create  job  to  whtest;
create table t1 (
id int ,
name char(10),
createtime date default sysdate
);
alter table whtest.t1  add constraint pk_id_t1 primary key (id)  using index   tablespace test_date;

create table t2 (
id int ,
name char(10),
createtime date default sysdate
);
alter table whtest.t2  add constraint pk_id_t2 primary key (id)  using index   tablespace test_date;
create table t3 (
id int ,
name char(10),
createtime date default sysdate
);
alter table whtest.t3  add constraint pk_id_t3 primary key (id)  using index   tablespace test_date;
begin
dbms_scheduler.create_job(
job_name=> 't1_job',
job_type=> 'plsql_block',
job_action =>'declare
v_id int;
v_name char(10);
begin
  for i in 1..10 loop
    v_id := round(dbms_random.value(1,1000000000));
    v_name :=round(dbms_random.value(1,1000000000));
    insert into whtest.t1 (id,name)values(v_id,v_name);
  end loop;
end;',
enabled=>true,
repeat_interval=>'sysdate + 5/86400',
comments=>'insert into t1 every 5 sec');
end;
/ 

begin
dbms_scheduler.create_job(
job_name=> 't2_job',
job_type=> 'plsql_block',
job_action =>'declare
v_id int;
v_name char(10);
begin
  for i in 1..10 loop
    v_id := round(dbms_random.value(1,1000000000));
    v_name :=round(dbms_random.value(1,1000000000));
    insert into whtest.t2 (id,name)values(v_id,v_name);
  end loop;
end;',
enabled=>true,
repeat_interval=>'sysdate + 5/86400',
comments=>'insert into t1 every 5 sec');
end;
/ 

begin
dbms_scheduler.create_job(
job_name=> 't3_job',
job_type=> 'plsql_block',
job_action =>'declare
v_id int;
v_name char(10);
begin
  for i in 1..10 loop
    v_id := round(dbms_random.value(1,1000000000));
    v_name :=round(dbms_random.value(1,1000000000));
    insert into whtest.t3 (id,name)values(v_id,v_name);
  end loop;
end;',
enabled=>true,
repeat_interval=>'sysdate + 5/86400',
comments=>'insert into t3 every 5 sec');
end;
/ 
--job创建之后,暂时先diable,待kafka配置完成之后再enable
exec dbms_scheduler.disable('t1_job');
exec dbms_scheduler.disable('t2_job');
exec dbms_scheduler.disable('t3_job');

exec dbms_scheduler.enable('t1_job');
exec dbms_scheduler.enable('t2_job');
exec dbms_scheduler.enable('t3_job');

kafka主机(b)配置

​ 将下载好的kafka 2.13-2.6.0 、kafka-connect-oracle-master、apache-maven 3.6.3、jdk 1.8.0上传至b主机/soft目录待使用。

主机hosts文件添加解析

[root@softdelvily ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.20.44 softdelvily localhost

安装jdk

[root@softdelvily soft]# rpm -ivh jdk-8u261-linux-x64.rpm 
warning: jdk-8u261-linux-x64.rpm: header v3 rsa/sha256 signature, key id ec551f03: nokey
preparing...                          ################################# [100%]
updating / installing...
   1:jdk1.8-2000:1.8.0_261-fcs        ################################# [100%]
unpacking jar files...
        tools.jar...
        plugin.jar...
        javaws.jar...
        deploy.jar...
        rt.jar...
        jsse.jar...
        charsets.jar...
        localedata.jar...

配置apache-maven工具

​ 将apache-maven-3.6.3-bin.tar.gz解压至/usr/local目录,并设置相应的/etc/profile环境变量。

[root@softdelvily soft]# tar xvf apache-maven-3.6.3-bin.tar.gz -c /usr/local/
apache-maven-3.6.3/readme.txt
apache-maven-3.6.3/license
.....
[root@softdelvily soft]# cd /usr/local/
[root@softdelvily local]# ll
total 0
drwxr-xr-x. 6 root root  99 sep 23 09:56 apache-maven-3.6.3
drwxr-xr-x. 2 root root   6 apr 11  2018 bin
.....
[root@softdelvily local]# vi /etc/profile
.......
##添加如下环境变量
maven_home=/usr/local/apache-maven-3.6.3
export maven_home
export path=${path}:${maven_home}/bin
[root@softdelvily local]# source /etc/profile
[root@softdelvily local]# mvn -v
apache maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
maven home: /usr/local/apache-maven-3.6.3
java version: 1.8.0_262, vendor: oracle corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/jre
default locale: en_us, platform encoding: utf-8
os name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"

配置kafka 2.13-2.6.0

​ 解压kafka 2.13-2.6.0 至/usr/local目录。

[root@softdelvily soft]# tar xvf kafka_2.13-2.6.0.tgz -c /usr/local/
kafka_2.13-2.6.0/
kafka_2.13-2.6.0/license
......
[root@softdelvily soft]# cd /usr/local/
[root@softdelvily local]# ll
total 0
drwxr-xr-x. 6 root root 99 sep 23 09:56 apache-maven-3.6.3
drwxr-xr-x. 6 root root 89 jul 29 02:23 kafka_2.13-2.6.0
.....

​ 开启kafka,并创建对应同步数据库过的topic

--1、session1 启动zk
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
[2020-09-23 10:06:49,158] info reading configuration from: ../config/zookeeper.properties 
.......
[2020-09-23 10:06:49,311] info using checkintervalms=60000 maxperminute=10000 (org.apache.zookeeper.server.containermanager)
--2、session2 启动kafka
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./kafka-server-start.sh ../config/server.properties
--3、session3 创建cdczztar
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cdczztar
created topic cdczztar.
[root@softdelvily bin]# ./kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
cdczztar

配置kafka-connect-oracle-maste

解压kafka-connect-oracle-master至/soft目录,并配置相应config文件,然后使用maven工具编译程序。

--解压zip包
[root@softdelvily soft]# unzip kafka-connect-oracle-master.zip 
[root@softdelvily soft]# ll
total 201180
-rw-r--r--. 1 root root   9506321 sep 22 16:05 apache-maven-3.6.3-bin.tar.gz
-rw-r--r--. 1 root root 127431820 sep  8 10:43 jdk-8u261-linux-x64.rpm
-rw-r--r--. 1 root root  65537909 sep 22 15:59 kafka_2.13-2.6.0.tgz
drwxr-xr-x. 5 root root       107 sep  8 15:48 kafka-connect-oracle-master
-rw-r--r--. 1 root root   3522729 sep 22 14:14 kafka-connect-oracle-master.zip
[root@softdelvily soft]# cd kafka-connect-oracle-master/config/
[root@softdelvily config]# ll
total 4
-rw-r--r--. 1 root root 1135 sep  8 15:48 oraclesourceconnector.properties
--调整properties配置文件
--需要调整项db.name.alias、topic、db.name、db.hostname、db.user、db.user.password、table.whitelist、table.blacklist信息,具体说明参考readme.md
[root@softdelvily config]# vi oraclesourceconnector.properties 
name=oracle-logminer-connector
connector.class=com.ecer.kafka.connect.oracle.oraclesourceconnector
db.name.alias=zztar
tasks.max=1
topic=cdczztar
db.name=zztar
db.hostname=192.168.xx.xx
db.port=1521
db.user=kminer
db.user.password=kminerpass
db.fetch.size=1
table.whitelist=whtest.t1,whtest.t2
table.blacklist=whtest.t3
parse.dml.data=true
reset.offset=true
start.scn=
multitenant=false
--编译程序
[root@softdelvily ~]# cd /soft/kafka-connect-oracle-master
[root@softdelvily kafka-connect-oracle-master]# mvn clean package
[info] scanning for projects...
.......
[info] building jar: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar
with assembly file: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar
[info] ------------------------------------------------------------------------
[info] build success
[info] ------------------------------------------------------------------------
[info] total time:  94.03 s
[info] finished at: 2020-09-23t10:25:52+08:00
[info] ------------------------------------------------------------------------

​ 将如下文件复制到kafka工作目录。

  • 复制kafka-connect-oracle-1.058.jar 和 lib/ojdbc7.jar 到$kafka_home/lib
  • 复制config/oraclesourceconnector.properties 文件到$kafka_home/config
[root@softdelvily config]# cd /soft/kafka-connect-oracle-master/target/
[root@softdelvily target]# cp kafka-connect-oracle-1.0.68.jar /usr/local/kafka_2.13-2.6.0/libs/
[root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/lib
[root@softdelvily lib]# cp ojdbc7.jar /usr/local/kafka_2.13-2.6.0/libs/
[root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/config/
[root@softdelvily config]# cp oraclesourceconnector.properties /usr/local/kafka_2.13-2.6.0/config/

启动kafka-connect-oracle

[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/oraclesourceconnector.properties
......
(com.ecer.kafka.connect.oracle.oraclesourcetask:187)
[2020-09-23 10:40:31,375] info log miner will start at new position scn : 2847346 with fetch size : 1 (com.ecer.kafka.connect.oracle.oraclesourcetask:188)

启动kafka消费者

[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/
[root@softdelvily bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic cdczztar

启动数据库job

[oracle@oracle01 ~]$ sqlplus / as sysdba

sql*plus: release 11.2.0.4.0 production on wed sep 23 10:45:16 2020

copyright (c) 1982, 2011, oracle.  all rights reserved.

set pagesize 999

connected to:
oracle database 11g enterprise edition release 11.2.0.4.0 - 64bit production
with the partitioning, olap, data mining and real application testing options

sql> sql> conn whtest/whtest
connected.
sql> exec dbms_scheduler.enable('t1_job');

pl/sql procedure successfully completed.

kafka消费者界面

出现类似记录,表明同步成功,数据以key:value的形式输出。

{“schema”:{“type”:”struct”,”fields”:[{“type”:”int64″,”optional”:false,”field”:”scn”},{“type”:”string”,”optional”:false,”field”:”seg_owner”},{“type”:”string”,”optional”:false,”field”:”table_name”},{“type”:”int64″,”optional”:false,”name”:”org.apache.kafka.connect.data.timestamp”,”version”:1,”field”:”timestamp”},{“type”:”string”,”optional”:false,”field”:”sql_redo”},{“type”:”string”,”optional”:false,”field”:”operation”},{“type”:”struct”,”fields”:[{“type”:”double”,”optional”:false,”field”:”id”},{“type”:”string”,”optional”:true,”field”:”name”},{“type”:”int64″,”optional”:true,”name”:”org.apache.kafka.connect.data.timestamp”,”version”:1,”field”:”createtime”}],”optional”:true,”name”:”value”,”field”:”data”},{“type”:”struct”,”fields”:[{“type”:”double”,”optional”:false,”field”:”id”},{“type”:”string”,”optional”:true,”field”:”name”},{“type”:”int64″,”optional”:true,”name”:”org.apache.kafka.connect.data.timestamp”,”version”:1,”field”:”createtime”}],”optional”:true,”name”:”value”,”field”:”before”}],”optional”:false,”name”:”zztar.whtest.t1.row”},”payload”:{“scn”:2847668,”seg_owner”:”whtest”,”table_name”:”t1″,”timestamp”:1600829206000,”sql_redo”:”insert into \”whtest\”.\”t1\”(\”id\”,\”name\”,\”createtime\”) values (557005146,’533888119 ‘,timestamp ‘ 2020-09-23 10:46:46’)”,”operation”:”insert”,”data”:{“id”:5.57005146e8,”name”:”533888119″,”createtime”:1600829206000},”before”:null}}

到此这篇关于oracle同步数据到kafka的文章就介绍到这了,更多相关oracle同步数据到kafka内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

(0)
上一篇 2022年3月21日
下一篇 2022年3月21日

相关推荐