目录
- 环境准备
- 软件准备
- 下载地址
- 实施过程
- oracle主机(a)配置
- kafka主机(b)配置
- 配置apache-maven工具
- 配置kafka 2.13-2.6.0
- 配置kafka-connect-oracle-maste
- 启动kafka-connect-oracle
- 启动kafka消费者
- 启动数据库job
环境准备
软件准备
- centos linux 7.6.1810 (2台,a主机,b主机)
- oracle 11.2.0.4(a主机安装)
- kafka 2.13-2.6.0 (b主机安装)
- kafka-connect-oracle-master (b主机安装,开源程序,用于同步oracle数据到kafka)
- apache-maven 3.6.3 (b主机安装,kafka-connect-oracle-master 的打包工具)
- jdk-8u261-linux-x64.rpm (b主机安装)
下载地址
- cenos 和oracle 和 jdk 自行到官网下载
- kafka http://kafka.apache.org/downloads
- kafka connect oracle https://github.com/tianxiancode/kafka-connect-oracle
- apache-maven 3.6.3 http://maven.apache.org/download.cgi
实施过程
oracle主机(a)配置
oracle实例配置项:
- 开启归档日志
- 开启附加日志
- 创建kafka-connect-oracle-master连接用户
- 创建测试数据生成用户及测试表
--开启归档日志 sqlplus / as sysdba sql>shutdown immediate sql>startup mount sql>alter database archivelog; sql>alter database open; --开启附加日志 sql>alter database add supplemental log data (all) columns; --创建kafka-connect-oracle-master连接用户 create role logmnr_role; grant create session to logmnr_role; grant execute_catalog_role,select any transaction ,select any dictionary to logmnr_role; create user kminer identified by kminerpass; grant logmnr_role to kminer; alter user kminer quota unlimited on users; --创建测试数据生成用户及测试表 create tablespace test_date datafile '/u01/app/oracle/oradata/zzsrc/test_date01.dbf' size 100m autoextend on next 10m; create user whtest identified by whtest default tablespace test_date; grant connect,resource to whtest; grant execute on dbms_scheduler to whtest; grant execute on dbms_random to whtest; grant create job to whtest; create table t1 ( id int , name char(10), createtime date default sysdate ); alter table whtest.t1 add constraint pk_id_t1 primary key (id) using index tablespace test_date; create table t2 ( id int , name char(10), createtime date default sysdate ); alter table whtest.t2 add constraint pk_id_t2 primary key (id) using index tablespace test_date; create table t3 ( id int , name char(10), createtime date default sysdate ); alter table whtest.t3 add constraint pk_id_t3 primary key (id) using index tablespace test_date; begin dbms_scheduler.create_job( job_name=> 't1_job', job_type=> 'plsql_block', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t1 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t1 every 5 sec'); end; / begin dbms_scheduler.create_job( job_name=> 't2_job', job_type=> 'plsql_block', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t2 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t1 every 5 sec'); end; / begin dbms_scheduler.create_job( job_name=> 't3_job', job_type=> 'plsql_block', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t3 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t3 every 5 sec'); end; / --job创建之后,暂时先diable,待kafka配置完成之后再enable exec dbms_scheduler.disable('t1_job'); exec dbms_scheduler.disable('t2_job'); exec dbms_scheduler.disable('t3_job'); exec dbms_scheduler.enable('t1_job'); exec dbms_scheduler.enable('t2_job'); exec dbms_scheduler.enable('t3_job');
kafka主机(b)配置
将下载好的kafka 2.13-2.6.0 、kafka-connect-oracle-master、apache-maven 3.6.3、jdk 1.8.0上传至b主机/soft目录待使用。
主机hosts文件添加解析
[root@softdelvily ~]# cat /etc/hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.20.44 softdelvily localhost
安装jdk
[root@softdelvily soft]# rpm -ivh jdk-8u261-linux-x64.rpm warning: jdk-8u261-linux-x64.rpm: header v3 rsa/sha256 signature, key id ec551f03: nokey preparing... ################################# [100%] updating / installing... 1:jdk1.8-2000:1.8.0_261-fcs ################################# [100%] unpacking jar files... tools.jar... plugin.jar... javaws.jar... deploy.jar... rt.jar... jsse.jar... charsets.jar... localedata.jar...
配置apache-maven工具
将apache-maven-3.6.3-bin.tar.gz解压至/usr/local目录,并设置相应的/etc/profile环境变量。
[root@softdelvily soft]# tar xvf apache-maven-3.6.3-bin.tar.gz -c /usr/local/ apache-maven-3.6.3/readme.txt apache-maven-3.6.3/license ..... [root@softdelvily soft]# cd /usr/local/ [root@softdelvily local]# ll total 0 drwxr-xr-x. 6 root root 99 sep 23 09:56 apache-maven-3.6.3 drwxr-xr-x. 2 root root 6 apr 11 2018 bin ..... [root@softdelvily local]# vi /etc/profile ....... ##添加如下环境变量 maven_home=/usr/local/apache-maven-3.6.3 export maven_home export path=${path}:${maven_home}/bin [root@softdelvily local]# source /etc/profile [root@softdelvily local]# mvn -v apache maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f) maven home: /usr/local/apache-maven-3.6.3 java version: 1.8.0_262, vendor: oracle corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/jre default locale: en_us, platform encoding: utf-8 os name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"
配置kafka 2.13-2.6.0
解压kafka 2.13-2.6.0 至/usr/local目录。
[root@softdelvily soft]# tar xvf kafka_2.13-2.6.0.tgz -c /usr/local/ kafka_2.13-2.6.0/ kafka_2.13-2.6.0/license ...... [root@softdelvily soft]# cd /usr/local/ [root@softdelvily local]# ll total 0 drwxr-xr-x. 6 root root 99 sep 23 09:56 apache-maven-3.6.3 drwxr-xr-x. 6 root root 89 jul 29 02:23 kafka_2.13-2.6.0 .....
开启kafka,并创建对应同步数据库过的topic
--1、session1 启动zk [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties [2020-09-23 10:06:49,158] info reading configuration from: ../config/zookeeper.properties ....... [2020-09-23 10:06:49,311] info using checkintervalms=60000 maxperminute=10000 (org.apache.zookeeper.server.containermanager) --2、session2 启动kafka [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./kafka-server-start.sh ../config/server.properties --3、session3 创建cdczztar [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cdczztar created topic cdczztar. [root@softdelvily bin]# ./kafka-topics.sh --zookeeper localhost:2181 --list __consumer_offsets cdczztar
配置kafka-connect-oracle-maste
解压kafka-connect-oracle-master至/soft目录,并配置相应config文件,然后使用maven工具编译程序。
--解压zip包 [root@softdelvily soft]# unzip kafka-connect-oracle-master.zip [root@softdelvily soft]# ll total 201180 -rw-r--r--. 1 root root 9506321 sep 22 16:05 apache-maven-3.6.3-bin.tar.gz -rw-r--r--. 1 root root 127431820 sep 8 10:43 jdk-8u261-linux-x64.rpm -rw-r--r--. 1 root root 65537909 sep 22 15:59 kafka_2.13-2.6.0.tgz drwxr-xr-x. 5 root root 107 sep 8 15:48 kafka-connect-oracle-master -rw-r--r--. 1 root root 3522729 sep 22 14:14 kafka-connect-oracle-master.zip [root@softdelvily soft]# cd kafka-connect-oracle-master/config/ [root@softdelvily config]# ll total 4 -rw-r--r--. 1 root root 1135 sep 8 15:48 oraclesourceconnector.properties --调整properties配置文件 --需要调整项db.name.alias、topic、db.name、db.hostname、db.user、db.user.password、table.whitelist、table.blacklist信息,具体说明参考readme.md [root@softdelvily config]# vi oraclesourceconnector.properties name=oracle-logminer-connector connector.class=com.ecer.kafka.connect.oracle.oraclesourceconnector db.name.alias=zztar tasks.max=1 topic=cdczztar db.name=zztar db.hostname=192.168.xx.xx db.port=1521 db.user=kminer db.user.password=kminerpass db.fetch.size=1 table.whitelist=whtest.t1,whtest.t2 table.blacklist=whtest.t3 parse.dml.data=true reset.offset=true start.scn= multitenant=false --编译程序 [root@softdelvily ~]# cd /soft/kafka-connect-oracle-master [root@softdelvily kafka-connect-oracle-master]# mvn clean package [info] scanning for projects... ....... [info] building jar: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar with assembly file: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar [info] ------------------------------------------------------------------------ [info] build success [info] ------------------------------------------------------------------------ [info] total time: 94.03 s [info] finished at: 2020-09-23t10:25:52+08:00 [info] ------------------------------------------------------------------------
将如下文件复制到kafka工作目录。
- 复制kafka-connect-oracle-1.058.jar 和 lib/ojdbc7.jar 到$kafka_home/lib
- 复制config/oraclesourceconnector.properties 文件到$kafka_home/config
[root@softdelvily config]# cd /soft/kafka-connect-oracle-master/target/ [root@softdelvily target]# cp kafka-connect-oracle-1.0.68.jar /usr/local/kafka_2.13-2.6.0/libs/ [root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/lib [root@softdelvily lib]# cp ojdbc7.jar /usr/local/kafka_2.13-2.6.0/libs/ [root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/config/ [root@softdelvily config]# cp oraclesourceconnector.properties /usr/local/kafka_2.13-2.6.0/config/
启动kafka-connect-oracle
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/oraclesourceconnector.properties ...... (com.ecer.kafka.connect.oracle.oraclesourcetask:187) [2020-09-23 10:40:31,375] info log miner will start at new position scn : 2847346 with fetch size : 1 (com.ecer.kafka.connect.oracle.oraclesourcetask:188)
启动kafka消费者
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic cdczztar
启动数据库job
[oracle@oracle01 ~]$ sqlplus / as sysdba sql*plus: release 11.2.0.4.0 production on wed sep 23 10:45:16 2020 copyright (c) 1982, 2011, oracle. all rights reserved. set pagesize 999 connected to: oracle database 11g enterprise edition release 11.2.0.4.0 - 64bit production with the partitioning, olap, data mining and real application testing options sql> sql> conn whtest/whtest connected. sql> exec dbms_scheduler.enable('t1_job'); pl/sql procedure successfully completed.
kafka消费者界面
出现类似记录,表明同步成功,数据以key:value的形式输出。
{“schema”:{“type”:”struct”,”fields”:[{“type”:”int64″,”optional”:false,”field”:”scn”},{“type”:”string”,”optional”:false,”field”:”seg_owner”},{“type”:”string”,”optional”:false,”field”:”table_name”},{“type”:”int64″,”optional”:false,”name”:”org.apache.kafka.connect.data.timestamp”,”version”:1,”field”:”timestamp”},{“type”:”string”,”optional”:false,”field”:”sql_redo”},{“type”:”string”,”optional”:false,”field”:”operation”},{“type”:”struct”,”fields”:[{“type”:”double”,”optional”:false,”field”:”id”},{“type”:”string”,”optional”:true,”field”:”name”},{“type”:”int64″,”optional”:true,”name”:”org.apache.kafka.connect.data.timestamp”,”version”:1,”field”:”createtime”}],”optional”:true,”name”:”value”,”field”:”data”},{“type”:”struct”,”fields”:[{“type”:”double”,”optional”:false,”field”:”id”},{“type”:”string”,”optional”:true,”field”:”name”},{“type”:”int64″,”optional”:true,”name”:”org.apache.kafka.connect.data.timestamp”,”version”:1,”field”:”createtime”}],”optional”:true,”name”:”value”,”field”:”before”}],”optional”:false,”name”:”zztar.whtest.t1.row”},”payload”:{“scn”:2847668,”seg_owner”:”whtest”,”table_name”:”t1″,”timestamp”:1600829206000,”sql_redo”:”insert into \”whtest\”.\”t1\”(\”id\”,\”name\”,\”createtime\”) values (557005146,’533888119 ‘,timestamp ‘ 2020-09-23 10:46:46’)”,”operation”:”insert”,”data”:{“id”:5.57005146e8,”name”:”533888119″,”createtime”:1600829206000},”before”:null}}
到此这篇关于oracle同步数据到kafka的文章就介绍到这了,更多相关oracle同步数据到kafka内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!