Oracle GoldenGate(OGG)- 超级详细

Oracle GoldenGate(OGG)- 超级详细1 OGG 简介 OGG 是一种基于日志的结构化数据复制软件 它通过解析源数据库在线日志或归档日志获得数据的增删改变化 数据量只有日志的四分之一左右 OGG 能够实现大量交易数据的实时捕捉 变换和投递 实现源数据库与目标数据库的数据同步 保持最少 10ms 的数据延迟 2 应用场景 高可用容灾

大家好,我是讯享网,很高兴认识大家。

1. OGG简介

  • OGG 是一种基于日志的结构化数据复制软件,它通过解析源数据库在线日志或归档日志获得数据的增删改变化(数据量只有日志的四分之一左右)
  • OGG 能够实现大量交易数据的实时捕捉,变换和投递,实现源数据库与目标数据库的数据同步,保持最少10ms的数据延迟。
    在这里插入图片描述
    讯享网

2. 应用场景

  • 高可用容灾
  • 数据库迁移、升级(支持跨版本、异构数据库、零宕机时间、亚秒级恢复)
  • 实时数据集成(支持异构数据库、多源数据库)
    在这里插入图片描述

3. 基本原理

基于日志捕获技术的实时增量数据集成
在这里插入图片描述
Oracle GoldenGate 数据复制过程如下:

  • 利用抽取进程(Extract Process)在源端数据库中读取Online Redo Log或者Archive Log,然后进行解析,只提取其中数据的变化信息,比如DML操作——增、删、改操作
  • 将抽取的信息转换为GoldenGate自定义的中间格式存放在队列文件(trail file)中
  • 再利用传输进程将队列文件(trail file)通过TCP/IP传送到目标系统。
  • 目标端有一个进程叫Server Collector,这个进程接受了从源端传输过来的数据变化信息
  • 把信息缓存到GoldenGate 队列文件(trail file)当中,等待目标端的复制进程读取数据。
  • GoldenGate 复制进程(replicat process)从队列文件(trail file)中读取数据变化信息,并创建对应的SQL语句,通过数据库的本地接口执行,提交到目标端数据库,提交成功后更新自己的检查点,记录已经完成复制的位置,数据的复制过程最终完成。

4. 基本架构

Oracle GoldenGate主要由如下组件组成

组件 说明
Manager 不管是源端还是目标端必须并且只能有一个Manager进程,可以启动、关闭、监控其他进程的健康状态,报告错误事件、分配数据存储空间,发布阀值报告等,其作用:
1:监控与启动 GoldenGate 的其它进程
2:管理 trail 文件及 Reporting
Extract Extract 进程运行在数据库源端上,它是Golden Gate的捕获机制,可以配置Extract 进程来做如下工作:
1:初始数据装载:对于初始数据装载,Extract 进程直接从源对象中提取数据
2:同步变化捕获:保持源数据与其它数据集的同步。初始数据同步完成后,Extract 进程捕获源数据的变化;如DML变化、 DDL变化等
Replicat Replicat 进程是运行在目标端系统的一个进程,负责读取 Extract 进程提取到的数据(变更的事务或 DDL 变化)并应用到目标数据库,就像 Extract 进程一样,也可以配置 Replicat 进程来完成如下工作:
1:初始化数据装载:对于初始化数据装载,Replicat 进程应用数据到目标对象或者路由它们到一个高速的 Bulk-load 工具上;
2:数据同步,将 Extract 进程捕获到的提交了的事务应用到目标数据库中;
Collector Collector 是运行在目标端的一个后台进程,接收从 TCP/IP 网络传输过来的数据库变化,并写到 Trail 文件里
Trails 为了持续地提取与复制数据库变化,GoldenGate 将捕获到的数据变化临时存放在磁盘上的一系列文件中,这些文件就叫做 Trail 文件
Data Pumps Data Pump 是一个配置在源端的辅助的 Extract 机制,Data Pump 是一个可选组件,如果不配置 Data Pump,那么由 Extract 主进程将数据发送到目标端的 Remote Trail 文件中;如果配置了 Data Pump,会由 Data Pump将Extract 主进程写好的本地 Trail 文件通过网络发送到目标端的 Remote Trail 文件中

5. 常用的拓扑结构

在这里插入图片描述

  • 单向复制:由一个源数据库复制到一个目的数据库,一般用于高可用性和容灾,为生产机保持一个活动的备份数据库,从而在发生灾难的时候迅速切换,减少数据丢失和系统宕机时间;
  • 双向复制:利用GoldenGate TDM可以实现两个数据库之间数据的双向复制,任何一方的数据变化都会被传递到另一端,可以利用此模式开展双业务中心;
  • 广播复制:由一个数据库向多个数据库复制,利用GoldenGate TDM的数据过滤功能可以实现数据的有选择分发;
  • 集中复制:由多个数据库向一个数据库复制,可以将分布的、跨平台或异构的多个数据库集中到一个数据库。此种模式广泛应用于n+1模式的容灾,通过将多个系统数据库集中到一起,可以充分利用备份中心的设施,大幅减少投资;另外也用于跨平台多系统的数据集成,为这些提供系统提供一个统一视图便于查询和统计数据。
  • 多层复制:由A数据库向B复制,同时又由B向C复制,可以在以上几种模式基础上无限制扩展。

由此可见,GoldenGate TDM的复制模式非常灵活,用户可以根据自己的需求选择特定的复制方式,并根据系统扩展对复制进行扩展。

6. 支持的环境

源和目标的操作系统和数据库可以进行任意的组合
在这里插入图片描述

7. OGG安装部署

注:在Docker环境下,整合Oracle,

主机名 IP OGG
node1 192.168.88.10 源端
node2 192.168.88.20 目标端

7.1 配置Oracle11gR2数据库

7.1.1 Oracle11gR2打开归档模式

需要切换到oracle用户操作:

su - oracle 

讯享网

因为配置数据库需要在sqlplus中执行,所以使用sysdba用户登录:

讯享网sqlplus / as sysdba 
  • 验证数据库是否开启自动归档
    执行归档查询命令:
archive log list 

在这里插入图片描述

Automatic archival是Disabled状态,因为Oracle默认是不开启自动归档的

  • 开启自动归档
    以DBA的身份连接数据库,执行命令:
讯享网conn /as sysdba 

在这里插入图片描述
关闭数据库,执行命令:

shutdown immediate 

在这里插入图片描述

启动并装载数据库,但没有打开数据文件,该命令常用来修改数据库运行模式或恢复数据库。执行命令:

讯享网startup mount 

在这里插入图片描述
执行开启归档命令:

alter database archivelog; 

在这里插入图片描述

执行打开数据库命令:

讯享网alter database open; 

在这里插入图片描述

执行自动归档命令:

alter system archive log start; 

在这里插入图片描述

  • 验证是否开启自动归档成功
    执行归档查询命令:
讯享网archive log list 

在这里插入图片描述

Automatic archival变成了Enabled状态,表示已经开启自动归档成功

7.1.2 Oracle开启辅助日志和补充日志

  • 验证数据库是否开启辅助日志和补充日志
    执行SQL语句验证:
select force_logging,supplemental_log_data_min from v$database; 

在这里插入图片描述
当显示NO的时候表示没有开启,需要调整

  • 开启数据库的辅助日志和补充日志
    开启强制日志后数据库会记录除临时表空间或临时回滚段外所有的操作,命令:
讯享网alter database force logging; 

在这里插入图片描述
开启辅助日志命令:

alter database add supplemental log data; 

在这里插入图片描述
开启主键附加日志命令:

讯享网alter database add supplemental log data (primary key) columns; 

开启全列附加日志命令:

alter database add supplemental log data (all) columns; 
  • 检查数据库是否成功开启辅助日志和补充日志
    执行SQL语句验证:
讯享网select force_logging,supplemental_log_data_min from v$database; 

在这里插入图片描述
当显示为YES的时候表示开启成功。

7.2 安装OGG源端

7.2.1 解压和安装OGG源端软件包

  1. 创建OGG源端的目录,使用root用户创建:
mkdir /u01/app/ogg/src 
  1. 添加OGG源端的目录到oracle用户的环境变量中(需要切换到oracle用户操作)
讯享网su - oracle vim ~/.bash_profile export OGG_SRC_HOME=/u01/app/ogg/src export LD_LIBRARY_PATH=$ORACLE_HOME/lib:/usr/lib source ~/.bash_profile # 退出oracle用户shell命令: exit 

在这里插入图片描述

  1. 解压OGG源端软件
    OGG源端的软件包是V34339-01.zip,存放在/export/softwares/oracle/ogg目录下。需要使用root用户解压
cd /export/softwares/oracle/ogg 

创建src文件夹是用来存放解压后的OGG源端软件

讯享网mkdir /export/softwares/oracle/ogg/src/ 

解压OGG源端软件到src文件夹下

unzip /export/softwares/oracle/ogg/V34339-01.zip -d /export/softwares/oracle/ogg/src/ 

在这里插入图片描述

讯享网cd /export/softwares/oracle/ogg/src/ 

在这里插入图片描述
fbo_ggs_Linux_x64_ora11g_64bit.tar文件才是OGG源端的软件包,解压该文件到/u01/app/ogg/src目录下,执行命令:

tar -xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /u01/app/ogg/src 
  1. 配置/u01/app/ogg/src目录及其所有文件的权限
    使用root用户执行授权命令:
讯享网chown -R oracle:oinstall /u01/app/ogg/src 

在这里插入图片描述

可以看到/u01/app/ogg/目录下的src属于oracle用户和oinstall组
在这里插入图片描述

可以看到/u01/app/ogg/src目录下的所有文件都属于oracle用户和oinstall组

7.2.2 在Oracle中创建OGG相关的用户和表空间

创建表空间在磁盘中的物理路径(需要到root用户操作)

mkdir -p /u01/app/oracle/oggdata/orcl/ chown -R oracle:oinstall /u01/app/oracle/oggdata/orcl 
讯享网su - oracle 
create tablespace oggtbs datafile '/u01/app/oracle/oggdata/orcl/oggtbs.dbf' size 500M autoextend on; 

在这里插入图片描述
创建ogg用户(用户名和密码都是ogg)

讯享网create user ogg identified by ogg default tablespace oggtbs; 

在这里插入图片描述

赋予ogg用户dba权限

grant dba to ogg; 

在这里插入图片描述

7.2.3 OGG源端初始化

使用oracle用户登录源端OGG的命令行中

讯享网su – oracle cd $OGG_SRC_HOME ./ggsci 

在这里插入图片描述
初始化源端OGG目录
注意:如果不在OGG_SRC_HOME下,初始化OGG目录时会报错

create subdirs 

在这里插入图片描述
退出OGG命令行客户端:exit
在这里插入图片描述
检查源端OGG初始化后的目录
初始化完成后,可以查询在$OGG_SRC_HOME下是否存在dirchk、dirdat、dirdef、dirjar、dirout、dirpcs、dirprm、dirrpt、dirsql、dirtmp共11个目录。
在这里插入图片描述

7.3 配置OGG源端

7.3.1 Oracle创建测试表

讯享网#切换到oracle用户: su – oracle #登录sqlplus: sqlplus "/as sysdba" 
#在oracle中创建test_ogg用户: create user test_ogg identified by test_ogg default tablespace users; #为test_ogg用户授权: grant dba to test_ogg; #使用test_ogg用户登录: conn test_ogg/test_ogg; #创建test_ogg表: create table test_ogg(id int ,name varchar(20),primary key(id)); 

在这里插入图片描述

7.3.2 配置OGG的全局变量

  • 使用oracle用户进入OGG_SRC_HOME目录下
讯享网#切换到oracle用户下: su – oracle #打印源端OGG_SRC_HOME: echo $OGG_SRC_HOME #进入OGG_SRC_HOME: cd $OGG_SRC_HOME 

在这里插入图片描述

  • 进入源端OGG命令行
./ggsci 

在这里插入图片描述

  • 使用oracle中的ogg用户登录
讯享网dblogin userid ogg password ogg 

在这里插入图片描述

  • 配置全局变量
edit param ./globals 

在这里插入图片描述

oggschema ogg

然后跟使用vi一样,在新窗口中添加oggschema ogg后保存退出编辑

7.3.3 配置管理器MGR进程

  • 进入源端OGG命令行
讯享网./ggsci 

创建mgr进程:

edit param mgr 

PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

在这里插入图片描述

参数名称 参数说明
PORT mgr的默认监听端口
DYNAMICPORTLIST 当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 重启EXTRACT进程的参数,最多5次,每次间隔3分钟
PURGEOLDEXTRACTS TRAIL文件的定期清理

7.3.4 添加复制表

  • 进入源端OGG命令行
讯享网./ggsci add trandata test_ogg.test_ogg info trandata test_ogg.test_ogg 

在这里插入图片描述

7.3.5 配置extract进程

配置Extract进程:

edit param extkafka 

新增内容:

讯享网extract extkafka 

dynamicresolution
SETENV (ORACLE_SID = “orcl”)
SETENV (NLS_LANG = “american_america.AL32UTF8”)
userid ogg,password ogg
exttrail /u01/app/ogg/src/dirdat/to
table test_ogg.test_ogg;

在这里插入图片描述
在这里插入图片描述

参数名称 参数说明
extract extkafka 定义extract进程名称
dynamicresolution 启用动态解析
SETENV (ORACLE_SID = “orcl”) 设置Oracle数据库
SETENV (NLS_LANG = “american_america.AL32UTF8”) 设置字符集
userid ogg,password ogg OGG连接Oracle数据库的帐号密码
exttrail /u01/app/ogg/src/dirdat/to 定义trail文件的保存位置以及文件名,文件字母最多2个,否则会报错
table test_ogg.test_ogg; 复制表的表名,支持*通配,必须以;结尾

添加Extract进程:

add extract extkafka,tranlog,begin now 

在这里插入图片描述

将trail文件配置与extract进程绑定:

讯享网add exttrail /u01/app/ogg/src/dirdat/to,extract extkafka 

在这里插入图片描述

7.3.6 配置pump进程

配置Pump进程:

edit param pukafka 

新增内容:

extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost localhost mgrport 7809
rmttrail /u01/app/ogg/tgr/dirdat/to
table test_ogg.test_ogg;

在这里插入图片描述
 extract进程名称;passthru即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;dynamicresolution动态解析;userid ogg,password ogg即OGG连接Oracle数据库的帐号密码rmthost和mgrhost即目标端(kafka)OGG的mgr服务的地址以及监听端口;rmttrail即目标端trail文件存储位置以及名称。

参数名称 参数说明
extract pukafka 定义pump进程名称
passthru 因使用了pump逻辑传输,所以禁止OGG与Oracle交互
dynamicresolution 配置动态解析
userid ogg,password ogg OGG连接Oracle数据库的帐号密码
rmthost localhost mgrport 7809 目标端OGG的mgr服务的地址以及监听端口
rmttrail /u01/app/ogg/tgr/dirdat/to 目标端OGG的trail文件存储位置以及名称
table test_ogg.test_ogg; 要采集的表,必须使用;结尾

将源端trail文件绑定到Extract进程:

讯享网add extract pukafka,exttrailsource /u01/app/ogg/src/dirdat/to 

在这里插入图片描述
将目标端trail文件绑定到Extract进程:

add rmttrail /u01/app/ogg/tgr/dirdat/to,extract pukafka 

在这里插入图片描述

7.3.7 配置define文件

 注意:该文件用来在异构数据源之间传输时,需明确知道表之间的映射关系,比如:Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,在OGG命令行执行:

配置define文件:

讯享网edit param test_ogg 

defsfile /u01/app/ogg/src/dirdef/test_ogg.test_ogg
userid ogg,password ogg
table test_ogg.test_ogg;

在这里插入图片描述
生成表schema文件:(在OGG_SRC_HOME目录下执行(oracle用户))

./defgen paramfile dirprm/test_ogg.prm 

在这里插入图片描述
将生成的/u01/app/ogg/src/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:

讯享网scp -r /u01/app/ogg/src/dirdef/test_ogg.test_ogg /u01/app/ogg/tgr/dirdef/ 

因为目标端目录还没有创建,因此发送文件可能会失败,所以执行完目标端配置后发送即可

7.4 配置OGG目标端

7.4.1 解压和安装OGG目标端软件包

  • 创建OGG目标端的目录
    使用root用户创建:
mkdir /u01/app/ogg/tgr 
  • 添加OGG目标端的目录到oracle用户的环境变量中
    从root用户切换到oracle用户:
讯享网su oracle 

注意【非常重要】:在这里,需要调整oracle用户的.bash_profile,主要是更新LD_LIBRARY_PATH的值,来确保源端和目标端的ggsci命令都可以正常使用。如果不更新的话,目标端ggsci命令可以使用,但源端的ggsci命令无法使用,会报错./ggsci: error while loading shared libraries: libclntsh.so.11.1: cannot open shared object file: No such file or directory

在这里插入图片描述

vim ~/.bash_profile export OGG_TGR_HOME=/u01/app/ogg/tgr export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$ORACLE_HOME/lib:/usr/lib source ~/.bash_profile #退出oracle用户shell命令: exit 

在这里插入图片描述

  • 解压OGG目标端软件
    OGG源端的软件包是V-01.zip,存放在/export/softwares/oracle/ogg目录下。需要使用root用户解压
讯享网cd /export/softwares/oracle/ogg 

创建tgr文件夹是用来存放解压后的OGG目标端软件

mkdir -p /export/softwares/oracle/ogg/tgr/ 

解压OGG目标端软件到tgr文件夹下

讯享网unzip /export/softwares/oracle/ogg/V-01.zip -d /export/softwares/oracle/ogg/tgr/ 

在这里插入图片描述

cd /export/softwares/oracle/ogg/tgr/ 

在这里插入图片描述
ggs_Adapters_Linux_x64.tar文件是真正的OGG目标端软件包,解压该文件到/u01/app/ogg/tgr目录下,执行命令:

讯享网tar -xf ggs_Adapters_Linux_x64.tar -C /u01/app/ogg/tgr/ 
  • 配置/u01/app/ogg/tgr目录及其所有文件的权限
    解压后的默认用户和组
    在这里插入图片描述
    使用root用户执行授权命令:
chown -R oracle:oinstall /u01/app/ogg/tgr 

在这里插入图片描述
可以看到/u01/app/ogg/目录下的tgr属于oracle用户和oinstall组。

讯享网ll /u01/app/ogg/tgr 

在这里插入图片描述
可以看到/u01/app/ogg/tgr目录下的所有文件都属于oracle用户和oinstall组。

7.4.2 OGG目标端初始化

  • 使用oracle用户登录目标端OGG的命令行中
    可以看到/u01/app/ogg/目录下的tgr属于oracle用户和oinstall组。
su oracle 

切换oracle用户时需要重新加载环境变量:

讯享网source ~/.bash_profile cd $OGG_TGR_HOME ./ggsci 

在这里插入图片描述

  • 初始化目标端OGG目录
    注意:如果不在OGG_TGR_HOME下,初始化目标端OGG目录时会报错
create subdirs 

在这里插入图片描述
退出OGG命令行客户端:exit

  • 检查目标端OGG初始化后的目录
    初始化完成后,可以查询在$OGG_TGR_HOME下是否存在dirchk、dircrd、dirdat、dirdef、dirdmp、diretc、dirout、dirpcs、dirprm、dirrpt、dirsql、dirtmp、dirwlt、dirwww共14个目录。

7.4.3 拷贝源端的define文件到目标端

将生成的/u01/app/ogg/src/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:

讯享网scp -r $OGG_SRC_HOME/dirdef/test_ogg.test_ogg $OGG_TGR_HOME/dirdef/ 

7.4.4 安装zookeeper和Kafka

  • 安装ZooKeeper(使用root用户操作)
    解压:
tar -zxf /export/softwares/zookeeper-3.4.14.tar.gz -C /export/services/ 

创建软连接:

讯享网ln -s /export/services/zookeeper-3.4.14 /export/services/zookeeper 

创建zoo.cfg:

cp /export/services/zookeeper/conf/zoo_sample.cfg /export/services/zookeeper/conf/zoo.cfg 

配置zoo.cfg:

讯享网vim /export/services/zookeeper/conf/zoo.cfg 

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/export/datas/zookeeper/data
dataLogDir=/export/datas/zookeeper/log
clientPort=2181

创建ZooKeeper的数据路径:

mkdir -p /export/datas/zookeeper/data mkdir -p /export/datas/zookeeper/log 

添加到环境变量:

讯享网vim /etc/profile 
source /etc/profile 

启动ZooKeeper:

讯享网zkServer.sh start zkServer.sh status 
  • 安装kafka(使用root用户操作)
    解压:
tar -zxf /export/softwares/kafka_2.11-2.2.0.tgz -C /export/services/ 

创建软连接:

讯享网ln -s /export/services/kafka_2.11-2.2.0 /export/services/kafka 

配置server.prperties:

vim /export/services/kafka/config/server.properties 

listeners=PLAINTEXT://server01:9092
broker.id=0
zookeeper.connect=server01:2181

添加环境变量:

讯享网vim /etc/profile 
export KAFKA_HOME=/export/services/kafka export PATH=.:$KAFKA_HOME/bin:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH 
讯享网source /etc/profile 

启动Kafka:

kafka-server-start.sh -daemon /export/services/kafka/config/server.properties 

创建主题:

讯享网kafka-topics.sh --create --zookeeper server01:2181 --replication-factor 1 --partitions 1 --topic test_ogg 

查看主题:

kafka-topics.sh --list --zookeeper server01:2181 

7.4.5 配置管理器MRG进程

讯享网su – oracle 

打印目标端OGG_TGR_HOME:

echo $OGG_TGR_HOME 

进入OGG_TGR_HOME:

讯享网cd $OGG_TGR_HOME 

启动ggsci:

./ggsci 
讯享网edit param mgr 

新增内容:

PORT 7810
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

在这里插入图片描述

7.4.6 配置checkpoint

edit param ./GLOBALS 

新增内容:

CHECKPOINTTABLE test_ogg.checkpoint

在这里插入图片描述

7.4.7 配置Replicate进程

讯享网edit param rekafka 

REPLICAT rekafka
sourcedefs /u01/app/ogg/tgr/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

在这里插入图片描述

7.4.8 添加trail文件到replicate进程

添加trail文件到Replicate进程

add replicat rekafka exttrail /u01/app/ogg/tgr/dirdat/to,checkpointtable test_ogg.checkpoint 

在这里插入图片描述

7.4.9 配置kafka.props

  • 配置kafka.props
讯享网cd $OGG_TGR_HOME vim dirprm/kafka.props 

新增内容:

gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/export/services/kafka/libs/*:/u01/app/ogg/tgr/:/u01/app/ogg/tgr/lib/*

在这里插入图片描述

  • 配置custom_kafka_producer.properties
cd $OGG_TGR_HOME vim dirprm/custom_kafka_producer.properties 

新增内容:

讯享网bootstrap.servers=server01:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size= linger.ms=10000 

在这里插入图片描述

7.4.10 最后确认所有的进程

在目标端,主要做了4个操作,共包括2个进程,分别是MANAGER和REPLICAT。
在这里插入图片描述

7.5 OGG测试

7.5.1 启动OGG的源端和目标端

  • 第一步:启动源端mgr进程
  • 第二步:启动目标端mgr进程
  • 第三步:启动源端extract进程
  • 第四步:启动源端pump进程
  • 第五步:启动目标端replicate进程
  1. 启动源端mgr进程
cd $OGG_SRC_HOME ./ggsci #查看所有进程状态: info all #启动MANAGER进程: start mgr #检查所有进程状态: info all 

在这里插入图片描述

  1. 启动目标端mgr进程
讯享网cd $OGG_TGR_HOME ./ggsci #启动MANAGER进程: start mgr #查看所有进程状态: info all 

在这里插入图片描述
3. 启动源端extract进程

cd $OGG_SRC_HOME ./ggsci #启动EXTRACT进程: start extkafka #查看所有进程状态: info all 

在这里插入图片描述

  1. 启动源端pump进程
    启动pump进程:
讯享网start pukafka 

查看所有进程状态:

info all 

在这里插入图片描述

  1. 启动目标端replicat进程
讯享网cd $OGG_TGR_HOME ./ggsci 

启动replicat进程:

start rekafka 

查看所有进程状态:

讯享网info all 

在这里插入图片描述

  1. 确认源端和目标端进程运行情况
    源端:
    在这里插入图片描述
    目标端:
    在这里插入图片描述

7.5.2 测试OGG的数据采集

  • 使用oracle用户登录到oracle11g数据库(源端)
su – oracle sqlplus "/as sysdba" conn test_ogg/test_ogg 

在这里插入图片描述

  • 对表进行DML操作(每一条SQL都需要手动执行commit提交)
    登录到test_ogg用户下:
讯享网conn test_ogg/test_ogg 

在这里插入图片描述
查看该用户拥有的表:

select table_name from user_tables; 

在这里插入图片描述
查看TEST_OGG表的字段信息:

讯享网select column_name,data_type from user_tab_columns where table_name = upper('TEST_OGG'); 

在这里插入图片描述
插入数据:

insert into test_ogg values(1,'beijing'); 

在这里插入图片描述
执行Commit:

讯享网commit; 

在这里插入图片描述
查看kafka是否多出了一个叫做test_ogg的主题:

kafka-topics.sh --list --zookeeper server01:2181 

在这里插入图片描述
然后启动kafka的消费者来消费test_ogg主题的数据:

讯享网kafka-console-consumer.sh --bootstrap-server server01:9092 --topic test_ogg --from-beginning 

再查看kafka的test_ogg主题下是否有了数据:
在这里插入图片描述
执行修改数据(修改id=2的name为china-shanghai):

update test_ogg set name='china-shanghai' where id=2; 

在这里插入图片描述
查看kafka中是否有了id为2的china-shanghai的这条记录:
在这里插入图片描述
删除id为2的数据:

讯享网delete test_ogg where id=2; 

在这里插入图片描述
检查kafka是否多个一条被标记为删除的数据:
在这里插入图片描述

:{ 
   "table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2020-05-28 09:22:18.000129", { 
   "table":"TEST_OGG.TEST_OGG","op_type":"U","op_ts":"2020-05-28 09:25:17.000140","current_ts":"2020-05-28T09:25:22.085000","pos":"00000000000000001227","before":{ 
   },"after":{ 
   "ID":1,"NAME":"china-shanghai"}} 
  • 关于数据文件的检查
    如果数据库中发生了事务(都被commit后)操作,会在源端和目标端的dirdat下生成trail数据文件。数据文件名称只能使用2个字母,多了会报错。
    当源端发生事务后,检查源端的trail文件:
讯享网ll /u01/app/ogg/src/dirdat/ 

在这里插入图片描述
目标端会接收到源端pump进程传过来的数据文件:

ll /u01/app/ogg/tgr/dirdat/ 

在这里插入图片描述

7.5.3 注意事项

  • 必须严格遵守OGG的启动顺序;
  • 必须严格遵守OGG的关闭顺序;
  • 如果需要修改OGG的进程,必须先停止,修改成功后,再重启;

7.5.4 错误日志位置

源端错误日志路径

/u01/app/ogg/src/ggserr.log

目标端错误日志路径

/u01/app/ogg/tgr/ggserr.log

8. 初始化业务数据

8.1 安装OGG源端

8.1.1 在Oracle中创建OGG相关的用户和表空间

  1. 创建表空间在磁盘中的物理路径(需要到root用户操作)
讯享网mkdir -p /u01/app/oracle/oggdata/orcl/ chown -R oracle:oinstall /u01/app/oracle/oggdata/orcl 
  1. 进入sqlplus
    切换到oracle用户:su - oracle
    登录sqlplus:
sqlplus "/as sysdba" 
  1. 创建oggtbs表空间
讯享网CREATE TABLESPACE "TBS_LOGISTICS" DATAFILE '/u01/app/oracle/oradata/orcl/tbs_logistics.dat' SIZE 500M AUTOEXTEND ON NEXT 32M MAXSIZE UNLIMITED EXTENT MANAGEMENT LOCAL; 

在这里插入图片描述

  1. 创建erainm用户(用户名和密码都是erainm)
CREATE USER erainm IDENTIFIED BY erainm DEFAULT TABLESPACE TBS_LOGISTICS; 

在这里插入图片描述

  1. 赋予erainm用户dba权限
讯享网GRANT connect,resource,dba to erainm; 

在这里插入图片描述

8.1.2 OGG源端初始化

  1. 使用oracle用户登录源端OGG的命令行中
su – oracle cd $OGG_SRC_HOME ./ggsci 

在这里插入图片描述

讯享网create subdirs 

在这里插入图片描述
退出OGG命令行客户端:exit

  1. 检查源端OGG初始化后的目录
    初始化完成后,可以查询在$OGG_SRC_HOME下是否存在dirchk、dirdat、dirdef、dirjar、dirout、dirpcs、dirprm、dirrpt、dirsql、dirtmp共11个目录。
    在这里插入图片描述

8.2 配置OGG源端

8.2.1 Oracle创建项目相关表

属性名 属性值
主机地址 192.168.88.10
端口号 1521
数据库实例名称 ORCL
数据库实例类型 Service Name
用户名 erainm
角色 Normal
密码 erainm
JDBC访问URL jdbc:oracle:thin:@//192.168.88.10:1521:ORCL
JDBC驱动名称 使用OracleDataSource连接池,无需配置Driver
-- 创建所有的序列 CREATE SEQUENCE tbl_emp_info_map_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_driver_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_emp_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_warehouse_tt_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_charge_standard_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_company_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_company_dot_map_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_company_route_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_company_warehouse_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_courier_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_deliver_region_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_delivery_record_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_department_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_fixed_area_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_goods_rack_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_job_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_out_warehouse_dtl_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_pkg_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_postal_standard_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_push_warehouse_dtl_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_service_evaluation_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_store_grid_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_vehicle_monitor_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_warehouse_rack_map_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_warehouse_receipt_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_waybill_line_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_waybill_record_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_work_time_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_test_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_areas_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_deliver_package_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_customer_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_codes_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_warehouse_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_consumer_address_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_warehouse_receipt_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_warehouse_send_vehicle_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_warehouse_vehicle_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_dot_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_transport_tool_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_dot_transport_tool_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_address_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_route_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_push_warehouse_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_out_warehouse_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_warehouse_emp_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_express_package_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_express_bill_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_consumer_sender_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_collect_package_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; CREATE SEQUENCE tbl_waybill_id_seq INCREMENT BY 1 START WITH 1 MINVALUE 1 MAXVALUE 9999; -- 创建所有的业务表 CREATE TABLE "tbl_emp_info_map" ( "id" NUMBER(19,0) NOT NULL ENABLE, "company_id" NUMBER(19,0), "dot_id" NUMBER(19,0), "emp_id" NUMBER(19,0), "job_id" NUMBER(19,0), "dep_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_EMP_INFO_MAP" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_driver" ( "id" NUMBER(19,0) NOT NULL ENABLE, "job_number" NVARCHAR2(50), "name" NVARCHAR2(50), "gender" NVARCHAR2(100), "birathday" DATE, "state" NUMBER(19,0), "driver_license_number" NVARCHAR2(100), "driver_license_type" NUMBER(19,0), "get_driver_license_dt" DATE, "car_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_DRIVER" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_emp" ( "id" NUMBER(19,0) NOT NULL ENABLE, "emp_number" NVARCHAR2(50), "emp_name" NVARCHAR2(50), "emp_gender" NUMBER(10,0), "emp_birathday" DATE, "state" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_EMP" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_warehouse_transport_tool" ( "id" NUMBER(19,0) NOT NULL ENABLE, "warehouse_id" NUMBER(19,0), "transport_tool_id" NUMBER(19,0), "allocate_dt" DATE, "state" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_WAREHOUSE_TRANSPORT_TOOL" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_charge_standard" ( "id" NUMBER(19,0) NOT NULL ENABLE, "start_area_id" NUMBER(19,0), "stop_area_id" NUMBER(19,0), "first_weight_charge" NUMBER(19,0), "follow_up_weight_charge" NUMBER(19,0), "prescription" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_CHARGE_STANDARD" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_company" ( "id" NUMBER(19,0) NOT NULL ENABLE, "company_name" NVARCHAR2(50), "city_id" NUMBER(19,0), "company_number" NVARCHAR2(50), "company_addr" NVARCHAR2(100), "company_addr_gis" NVARCHAR2(100), "company_tel" NVARCHAR2(20), "is_sub_company" NUMBER(19,0), "state" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_COMPANY" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_company_dot_map" ( "id" NUMBER(19,0) NOT NULL ENABLE, "company_id" NUMBER(19,0), "dot_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_COMPANY_DOT_MAP" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_company_transport_route_ma" ( "id" NUMBER(19,0) NOT NULL ENABLE, "company_id" NUMBER(19,0), "transport_route_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_COMPANY_TRANSPORT_ROUTE" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_company_warehouse_map" ( "id" NUMBER(19,0) NOT NULL ENABLE, "company_id" NUMBER(19,0), "warehouse_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_COMPANY_WAREHOUSE_MAP" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_courier" ( "id" NUMBER(19,0) NOT NULL ENABLE, "job_num" NVARCHAR2(50), "name" NVARCHAR2(50), "birathday" DATE, "tel" NVARCHAR2(20), "pda_num" NVARCHAR2(50), "car_id" NUMBER(19,0), "postal_standard_id" NUMBER(19,0), "work_time_id" NUMBER(19,0), "dot_id" NUMBER(19,0), "state" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_COURIER" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_deliver_region" ( "id" NUMBER(19,0) NOT NULL ENABLE, "search_keyword" NVARCHAR2(100), "search_assist_keyword" NVARCHAR2(100), "area_id" NUMBER(19,0), "fixed_area_id" NUMBER(19,0), "state" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_DELIVER_REGION" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_delivery_record" ( "id" NUMBER(19,0) NOT NULL ENABLE, "cur_warehouse_id" NVARCHAR2(50), "vehicle_id" NUMBER(19,0), "start_vehicle_dt" DATE, "next_warehouse_id" NUMBER(19,0), "predict_arrivals_dt" DATE, "actua_arrivals_dt" DATE, "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_DELIVERY_RECORD" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_department" ( "id" NUMBER(19,0) NOT NULL ENABLE, "dep_name" NVARCHAR2(50), "dep_level" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_DEPARTMENT" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_fixed_area" ( "id" NUMBER(19,0) NOT NULL ENABLE, "name" NVARCHAR2(50), "emp_id" NUMBER(19,0), "operator_dt" DATE, "operator_id" NUMBER(19,0), "gis_fence" NVARCHAR2(200), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_FIXED_AREA" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_goods_rack" ( "id" NUMBER(19,0) NOT NULL ENABLE, "warehouse_name" NVARCHAR2(50), "warehouse_addr" NVARCHAR2(100), "warehouse_addr_gis" NVARCHAR2(50), "company_id" NUMBER(19,0), "employee_id" NVARCHAR2(200), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_GOODS_RACK" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_job" ( "id" NUMBER(19,0) NOT NULL ENABLE, "job_name" NVARCHAR2(50), "job_level" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_JOB" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_out_warehouse_detail" ( "id" NUMBER(19,0) NOT NULL ENABLE, "push_warehouse_id" NUMBER(19,0), "push_warehouse_bill" NVARCHAR2(100), "warehouse_id" NUMBER(19,0), "waybill_id" NUMBER(19,0), "pkg_id" NUMBER(19,0), "pkg_desc" NVARCHAR2(100), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_OUT_WAREHOUSE_DETAIL" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_pkg" ( "id" NUMBER(19,0) NOT NULL ENABLE, "pw_bill" NVARCHAR2(50), "pw_dot_id" NUMBER(19,0), "warehouse_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_PKG" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_postal_standard" ( "id" NUMBER(19,0) NOT NULL ENABLE, "name" NVARCHAR2(50), "min_weight" NVARCHAR2(50), "min_length" NVARCHAR2(50), "max_length" NVARCHAR2(50), "trajectory" NVARCHAR2(50), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_POSTAL_STANDARD" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_push_warehouse_detail" ( "id" NUMBER(19,0) NOT NULL ENABLE, "push_warehouse_id" NUMBER(19,0), "push_warehouse_bill" NVARCHAR2(50), "warehouse_id" NUMBER(19,0), "pw_start_dt" NVARCHAR2(50), "pw_end_dt" NVARCHAR2(50), "pack_id" NUMBER(19,0), "pack_desc" NVARCHAR2(50), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_PUSH_WAREHOUSE_DETAIL" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_service_evaluation" ( "id" NUMBER(19,0) NOT NULL ENABLE, "express_bill_id" NVARCHAR2(100), "express_bill" NUMBER(19,0), "pack_score" NUMBER(10,0), "delivery_time_score" NUMBER(10,0), "courier_score" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_SERVICE_EVALUATION" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_store_grid" ( "id" NUMBER(19,0) NOT NULL ENABLE, "warehouse_name" NVARCHAR2(50), "warehouse_addr" NVARCHAR2(100), "warehouse_addr_gis" NVARCHAR2(50), "company_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_STORE_GRID" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_vehicle_monitor" ( "id" NUMBER(19,0) NOT NULL ENABLE, "delivery_record" NUMBER(19,0), "empId" NUMBER(19,0), "express_bill__id" NVARCHAR2(200), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_VEHICLE_MONITOR" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_warehouse_rack_map" ( "id" NUMBER(19,0) NOT NULL ENABLE, "warehouse_name" NVARCHAR2(50), "warehouse_addr" NVARCHAR2(100), "warehouse_addr_gis" NVARCHAR2(50), "company_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_WAREHOUSE_RACK_MAP" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_warehouse_receipt_detail" ( "id" NUMBER(19,0) NOT NULL ENABLE, "waybill_id" NUMBER(19,0), "pkg_id" NUMBER(19,0), "receipt_bill_id" NUMBER(19,0), "receipt_bill" NVARCHAR2(100), "operator_id" NUMBER(19,0), "state" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_WAREHOUSE_RECEIPT_DETAI" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_waybill_line" ( "id" NUMBER(19,0) NOT NULL ENABLE, "waybill_number" NVARCHAR2(100), "route_id" NUMBER(19,0), "serial_number" NVARCHAR2(100), "transport_tool" NUMBER(19,0), "delivery_record_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_WAYBILL_LINE" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_waybill_state_record" ( "id" NUMBER(19,0) NOT NULL ENABLE, "waybill_id" NUMBER(19,0), "waybill_number" NVARCHAR2(100), "employee_id" NVARCHAR2(100), "consignee_id" NUMBER(19,0), "cur_warehouse_id" NUMBER(10,0), "next_warehouse_id" NUMBER(10,0), "deliverer_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_WAYBILL_STATE_RECORD" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_work_time" ( "id" NUMBER(19,0) NOT NULL ENABLE, "name" NVARCHAR2(50), "start_dt" NVARCHAR2(100), "stop_dt" NVARCHAR2(100), "saturday_start_dt" NVARCHAR2(100), "saturday_stop_dt" NVARCHAR2(100), "sunday_start_dt" NVARCHAR2(100), "sunday_stop_dt" NVARCHAR2(100), "state" NUMBER(10,0), "company_id" NUMBER(10,0), "operator_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_WORK_TIME" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_test" ( "id" NUMBER(19,0) NOT NULL ENABLE, "name" NVARCHAR2(50), CONSTRAINT "PK_TBL_TEST" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_areas" ( "id" NUMBER(11,0) NOT NULL ENABLE, "name" NVARCHAR2(40), "pid" NUMBER(11,0), "sname" NVARCHAR2(40), "level" NVARCHAR2(11), "citycode" NVARCHAR2(20), "yzcode" NVARCHAR2(20), "mername" NVARCHAR2(100), "lng" NUMBER(11,4), "lat" NUMBER(11,4), "pinyin" NVARCHAR2(100), CONSTRAINT "PK_TBL_AREAS" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_deliver_package" ( "id" NUMBER(19,0) NOT NULL ENABLE, "emp_id" NUMBER(19,0), "waybill_id" NUMBER(19,0), "waybill_number" NVARCHAR2(100), "express_bill_id" NUMBER(19,0), "express_bill_number" NVARCHAR2(100), "package_id" NUMBER(19,0), "collect_package_dt" DATE, "rece_type" NUMBER(19,0), "rece_dt" DATE, "state" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_DELIVER_PACKAGE" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_customer" ( "id" NUMBER(19,0) NOT NULL ENABLE, "name" NVARCHAR2(50), "tel" NVARCHAR2(20), "mobile" NVARCHAR2(20), "email" NVARCHAR2(50), "type" NUMBER(10,0), "is_own_reg" NUMBER(10,0), "reg_dt" DATE, "reg_channel_id" NUMBER(10,0), "state" NUMBER(19,0), "cdt" DATE, "udt" DATE, "last_login_dt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_CUSTOMER" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_codes" ( "id" NUMBER(19,0) NOT NULL ENABLE, "name" NVARCHAR2(50), "type" NUMBER(19,0), "code" NVARCHAR2(50), "code_desc" NVARCHAR2(100), "code_type" NVARCHAR2(50), "state" NUMBER(19,0), "cdt" DATE, "udt" DATE, CONSTRAINT "PK_TBL_CODES" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_warehouse" ( "id" NUMBER(19,0) NOT NULL ENABLE, "name" NVARCHAR2(50), "addr" NVARCHAR2(19), "addr_gis" NVARCHAR2(50), "company_id" NUMBER(19,0), "employee_id" NUMBER(19,0), "type" NUMBER(10,0), "area" NVARCHAR2(50), "is_lease" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_WAREHOUSE" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_consumer_address_map" ( "id" NUMBER(19,0) NOT NULL ENABLE, "consumer_id" NUMBER(19,0), "address_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_CUSTOMER_SENDER_MAP" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_warehouse_receipt" ( "id" NUMBER(19,0) NOT NULL ENABLE, "bill" NVARCHAR2(100), "type" NUMBER(19,0), "warehouse_id" NUMBER(19,0), "operator_id" NUMBER(19,0), "state" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_WAREHOUSE_RECEIPT" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_warehouse_send_vehicle" ( "id" NUMBER(19,0) NOT NULL ENABLE, "out_warehouse_id" NUMBER(19,0), "out_warehouse_waybill_id" NUMBER(19,0), "out_warehouse_waybill_number" NVARCHAR2(100), "vehicle_id" NUMBER(19,0), "driver1_id" NUMBER(19,0), "driver2_id" NUMBER(19,0), "start_vehicle_dt" DATE, "next_warehouse_id" NUMBER(19,0), "predict_arrivals_dt" DATE, "actual_arrivals_dt" DATE, "state" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_WAREHOUSE_SEND_VEHICLE" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_warehouse_vehicle_map" ( "id" NUMBER(19,0) NOT NULL ENABLE, "warehouse_id" NUMBER(19,0), "vehicle_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_COMPANY_VEHICLE_MAP" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_dot" ( "id" NUMBER(19,0) NOT NULL ENABLE, "dot_number" NVARCHAR2(50), "dot_name" NVARCHAR2(50), "dot_addr" NVARCHAR2(100), "dot_gis_addr" NVARCHAR2(100), "dot_tel" NVARCHAR2(20), "company_id" NUMBER(19,0), "manage_area_id" NUMBER(19,0), "manage_area_gis" NVARCHAR2(100), "state" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_DOT" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_transport_tool" ( "id" NUMBER(19,0) NOT NULL ENABLE, "brand" NVARCHAR2(100), "model" NVARCHAR2(100), "type" NUMBER(19,0), "given_load" NVARCHAR2(100), "load_cn_unit" NVARCHAR2(100), "load_en_unit" NVARCHAR2(100), "buy_dt" DATE, "license_plate" NVARCHAR2(100), "state" NVARCHAR2(100), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_TRANSPORT_TOOL" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_dot_transport_tool" ( "id" NUMBER(19,0) NOT NULL ENABLE, "dot_id" NUMBER(19,0), "transport_tool_id" NUMBER(19,0), "allocate_dt" DATE, "state" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_DOT_TRANSPORT_TOOL" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_address" ( "id" NUMBER(19,0) NOT NULL ENABLE, "name" NVARCHAR2(50), "tel" NVARCHAR2(20), "mobile" NVARCHAR2(20), "detail_addr" NVARCHAR2(100), "area_id" NUMBER(19,0), "gis_addr" NVARCHAR2(20), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_CUSTOMER_ADDRESS" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_route" ( "id" NUMBER(19,0) NOT NULL ENABLE, "start_station" NVARCHAR2(50), "start_station_area_id" NUMBER(19,0), "start_warehouse_id" NUMBER(19,0), "end_station" NVARCHAR2(50), "end_station_area_id" NUMBER(19,0), "end_warehouse_id" NUMBER(19,0), "mileage_m" NUMBER(10,0), "time_consumer_minute" NUMBER(10,0), "state" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_ROUTE" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_push_warehouse" ( "id" NUMBER(19,0) NOT NULL ENABLE, "pw_waybill_id" NUMBER(19,0), "pw_waybill_number" NVARCHAR2(50), "pw_dot_id" NUMBER(19,0), "warehouse_id" NUMBER(19,0), "emp_id" NUMBER(19,0), "pw_start_dt" DATE, "pw_end_dt" DATE, "pw_position" NVARCHAR2(50), "pw_reg_emp_id" NUMBER(19,0), "ow_reg_emp_scan_gun_id" NUMBER(19,0), "pw_confirm_emp_id" NUMBER(19,0), "ow_confirm_emp_scan_gun_id" NUMBER(19,0), "pw_box_emp_id" NUMBER(19,0), "pw_box_scan_gun_id" NUMBER(19,0), "pw_after_seal_img" NVARCHAR2(100), "pw_receipt_number" NVARCHAR2(100), "pw_receipt_dt" DATE, "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_PUSH_WAREHOUSE" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_out_warehouse" ( "id" NUMBER(19,0) NOT NULL ENABLE, "pw_waybill_id" NUMBER(19,0), "pw_waybill_number" NVARCHAR2(100), "ow_dot_id" NUMBER(19,0), "warehouse_id" NUMBER(19,0), "ow_vehicle_id" NUMBER(19,0), "ow_driver_emp_id" NUMBER(19,0), "ow_follow1_emp_id" NUMBER(19,0), "ow_follow2_emp_id" NUMBER(19,0), "ow_start_dt" DATE, "ow_end_dt" DATE, "ow_position" NVARCHAR2(50), "ow_reg_emp_id" NUMBER(19,0), "ow_reg_scan_gun_id" NUMBER(19,0), "ow_confirm_emp_id" NUMBER(19,0), "ow_confirm_scan_gun_id" NUMBER(19,0), "ow_pre_seal_img" NVARCHAR2(100), "ow_receipt_number" NVARCHAR2(100), "ow_receipt_dt" DATE, "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_OUT_WAREHOUSE" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_warehouse_emp" ( "id" NUMBER(19,0) NOT NULL ENABLE, "job_num" NVARCHAR2(50), "name" NVARCHAR2(50), "birathday" DATE, "tel" NVARCHAR2(20), "type" NUMBER(10,0), "warehouse_id" NUMBER(19,0), "state" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_WAREHOUSE_EMP" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_express_package" ( "id" NUMBER(19,0) NOT NULL ENABLE, "scan_gun_id" NVARCHAR2(19), "name" NVARCHAR2(50), "cid" NUMBER(10,2), "weight" NUMBER(10,2), "amount" NUMBER(10,2), "coupon_id" NUMBER(19,0), "coupon_amount" NUMBER(10,2), "actual_amount" NUMBER(10,2), "insured_price" NUMBER(10,2), "is_fragile" NVARCHAR2(20), "send_address_id" NUMBER(19,0), "recv_address_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_EXPRESS_PACKAGE" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_express_bill" ( "id" NUMBER(19,0) NOT NULL ENABLE, "express_number" NVARCHAR2(50), "cid" NUMBER(19,0), "eid" NUMBER(19,0), "order_channel_id" NUMBER(19,0), "order_dt" DATE, "order_terminal_type" NUMBER(10,0), "order_terminal_os_type" NUMBER(10,0), "reserve_dt" DATE, "is_collect_package_timeout" NUMBER(10,0), "timeout_dt" DATE, "type" NUMBER(10,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_EXPRESS_BILL" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_consumer_sender_info" ( "id" NUMBER(19,0) NOT NULL ENABLE, "ciid" NUMBER(19,0), "pkg_id" NUMBER(19,0), "express_bill_id" NUMBER(19,0), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_CUSTOMER_SENDER_INFO" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_collect_package" ( "id" NUMBER(19,0) NOT NULL ENABLE, "cid" NUMBER(19,0), "eid" NUMBER(19,0), "pkg_id" NUMBER(19,0), "express_bill_id" NUMBER(19,0), "express_bill_number" NVARCHAR2(100), "state" NUMBER(10,0), "collect_package_dt" DATE, "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_COLLECT_PACKAGE" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; CREATE TABLE "tbl_waybill" ( "id" NUMBER(19,0) NOT NULL ENABLE, "express_bill_number" NVARCHAR2(100), "waybill_number" NVARCHAR2(100), "cid" NUMBER(19,0), "eid" NUMBER(19,0), "order_channel_id" NUMBER(19,0), "order_dt" DATE, "order_terminal_type" NUMBER(10,0), "order_terminal_os_type" NUMBER(10,0), "reserve_dt" DATE, "is_collect_package_timeout" NUMBER(10,0), "pkg_id" NUMBER(19,0), "pkg_number" NVARCHAR2(100), "timeout_dt" NVARCHAR2(100), "transform_type" NUMBER(10,0), "delivery_customer_name" NVARCHAR2(100), "delivery_addr" NVARCHAR2(100), "delivery_mobile" NVARCHAR2(100), "delivery_tel" NVARCHAR2(100), "receive_customer_name" NVARCHAR2(100), "receive_addr" NVARCHAR2(100), "receive_mobile" NVARCHAR2(100), "receive_tel" NVARCHAR2(100), "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_WAYBILL" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; -- 运输记录表 CREATE TABLE "tbl_transport_record" ( "id" NUMBER(19,0) NOT NULL ENABLE, "pw_id" NUMBER(19,0), "pw_waybill_id" NUMBER(19,0), "pw_waybill_number" NVARCHAR2(100), "ow_id" NUMBER(19,0), "ow_waybill_id" NUMBER(19,0), "ow_waybill_number" NVARCHAR2(100), "sw_id" NUMBER(19,0), "ew_id" NUMBER(19,0), "transport_tool_id" NUMBER(19,0), "pw_driver1_id" NUMBER(19,0), "pw_driver2_id" NUMBER(19,0), "pw_driver3_id" NUMBER(19,0), "ow_driver1_id" NUMBER(19,0), "ow_driver2_id" NUMBER(19,0), "ow_driver3_id" NUMBER(19,0), "route_id" NUMBER(19,0), "distance" NUMBER(10,0), "duration" NUMBER(10,0), "state" NUMBER(10,0), "start_vehicle_dt" DATE, "predict_arrivals_dt" DATE, "actual_arrivals_dt" DATE, "cdt" DATE, "udt" DATE, "remark" NVARCHAR2(100), CONSTRAINT "PK_TBL_TRANSPORT_RECORD" PRIMARY KEY ("id") ) TABLESPACE "TBS_LOGISTICS"; 

8.2.2 配置管理器MGR进程

  • 进入源端OGG命令行
讯享网./ggsci #创建mgr进程: edit param mgr 

PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

在这里插入图片描述

8.2.3 配置extract进程

配置Extract进程:

edit param extkafka # 新增内容: extract extkafka 

GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES
dynamicresolution
SETENV (ORACLE_SID = “orcl”)
SETENV (NLS_LANG = “american_america.AL32UTF8”)
userid erainm,password erainm
exttrail /u01/app/ogg/src/dirdat/to
table erainm.*;

在这里插入图片描述

参数名称 参数说明
extract extkafka 定义extract进程名称
dynamicresolution 启用动态解析
SETENV (ORACLE_SID = “orcl”) 设置Oracle数据库
SETENV (NLS_LANG = “american_america.AL32UTF8”) 设置字符集
userid erainm,password erainm erainm用户连接Oracle数据库的帐号密码
exttrail /u01/app/ogg/src/dirdat/to 定义trail文件的保存位置以及文件名,文件字母最多2个,否则会报错
table erainm.*; 复制表的表名,支持*通配,必须以;结尾

8.2.4 配置pump进程

配置Pump进程:

讯享网edit param pukafka 

新增内容:

extract pukafka
passthru
dynamicresolution
userid erainm,password erainm
rmthost localhost mgrport 7809
rmttrail /u01/app/ogg/tgr/dirdat/to
table erainm.*;

 extract进程名称;passthru即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;dynamicresolution动态解析;userid ogg,password ogg即OGG连接Oracle数据库的帐号密码rmthost和mgrhost即目标端(kafka)OGG的mgr服务的地址以及监听端口;rmttrail即目标端trail文件存储位置以及名称。

参数名称 参数说明
extract pukafka 定义pump进程名称
passthru 因使用了pump逻辑传输,所以禁止OGG与Oracle交互
dynamicresolution 配置动态解析
userid erainm,password erainm OGG连接Oracle数据库的帐号密码
rmthost localhost mgrport 7809 目标端OGG的mgr服务的地址以及监听端口
rmttrail /u01/app/ogg/tgr/dirdat/to 目标端OGG的trail文件存储位置以及名称
table erainm.*; 要采集的表,必须使用;结尾

8.2.5 配置define文件

配置define文件:

edit param test_ogg defsfile /u01/app/ogg/src/dirdef/test_ogg.test_ogg userid erainm,password erainm table erainm.*; 

在这里插入图片描述
生成表schema文件:(在OGG_SRC_HOME目录下执行(oracle用户))

讯享网./defgen paramfile dirprm/test_ogg.prm 

将生成的/u01/app/ogg/src/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:

scp -r /u01/app/ogg/src/dirdef/test_ogg.test_ogg /u01/app/ogg/tgr/dirdef/ 

因为目标端目录还没有创建,因此发送文件可能会失败,所以执行完目标端配置后发送即可

8.3 配置OGG目标端

8.3.1 拷贝源端的define文件到目标端

将生成的/u01/app/ogg/src/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:

讯享网scp -r $OGG_SRC_HOME/dirdef/test_ogg.test_ogg $OGG_TGR_HOME/dirdef/ 

8.3.2 配置管理器MRG进程

  1. 使用oracle用户进入OGG_SRC_HOME目录下
# 切换到oracle用户下: su – oracle #打印目标端OGG_TGR_HOME: echo $OGG_TGR_HOME #进入OGG_TGR_HOME: cd $OGG_TGR_HOME #启动ggsci: ./ggsci 
  1. 配置目标端MRG进程
    配置MGR进程:
讯享网edit param mgr 

新增内容:

PORT 7810
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

在这里插入图片描述

8.3.3 配置Replicate进程

edit param rekafka 

REPLICAT rekafka
sourcedefs /u01/app/ogg/tgr/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP erainm.*, TARGET erainm.*;

8.3.4 配置kafka.props

  • 配置kafka.props
讯享网cd $OGG_TGR_HOME vim dirprm/kafka.props 

新增内容:

gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=logistics
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/export/services/kafka/libs/*:/u01/app/ogg/tgr/:/u01/app/ogg/tgr/lib/*

在这里插入图片描述

  • 配置custom_kafka_producer.properties
cd $OGG_TGR_HOME vim dirprm/custom_kafka_producer.properties 

新增内容:

bootstrap.servers=node2:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=
linger.ms=10000

8.3.5 最后确认所有的进程

在目标端,主要做了4个操作,共包括2个进程,分别是MANAGER和REPLICAT。
在这里插入图片描述

8.4 OGG测试

8.4.1 启动kafka消费者

  1. 启动node2服务器(大数据服务器)
  2. 启动kafka消费者
讯享网kafka-console-consumer --bootstrap-server node2:9092 --topic logistics 
  1. 启动oracle客户端
    插入一条数据
INSERT INTO ERAINM."tbl_company"("id", "company_name", "city_id", "company_number", "company_addr", "company_addr_gis", "company_tel", "is_sub_company", "state", "cdt", "udt", "remark") VALUES(11, '深圳超级速递邮箱公司', , NULL, '深圳南山', '117.918_31.1399', NULL, 1, 1, TO_DATE('2020-06-13 15:24:51','yyyy-mm-dd hh24:mi:ss'), TO_DATE('2020-06-13 15:24:51','yyyy-mm-dd hh24:mi:ss'), NULL); 
  1. 查看kafka消费者是否可以打印出来日志
    修改一条数据
讯享网UPDATE ERAINM."tbl_company" SET "company_name"='深圳超级速递有限公司-1' WHERE "id"=11; 

8.4.2 archivelog日志路径

在这里插入图片描述




超级详细,59692 字数,建议收藏

小讯
上一篇 2025-01-10 12:22
下一篇 2025-01-23 23:24

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/66809.html