竹笋

首页 » 问答 » 环境 » 事件驱动架构GoldenGate创建
TUhjnbcbe - 2025/1/1 19:28:00
北京治疗白癜风的中医院 https://wapjbk.39.net/yiyuanfengcai/ys_bjzkbdfyy/790/

前文:「事件驱动架构」GoldenGate创建从Oracle到Kafka的CDC事件流(1)

步骤7/12:安装并运行ApacheKafka

从VM的桌面环境中打开Firefox并下载ApacheKafka(我使用的是kafka_2.11-2.1.1.tgz)。

现在,打开一个Linuxshell并重置CLASSPATH环境变量(在BigDataLite-4.11虚拟机中设置的当前值会在Kafka中产生冲突):

declare-xCLASSPATH=

从同一个Linuxshell中,解压缩压缩包,启动ZooKeeper和Kafka:

cdtarzxvfDownloads/kafka_2.11-2.1.1.tgzcdkafka_2.11-2.1.1./bin/zookeeper-server-start.sh-daemonconfig/zookeeper.properties./bin/kafka-server-start.sh-daemonconfig/server.properties

你可以通过启动“echostats

nclocalhost”来检查ZooKeeper是否正常:

[oracle

bigdatalite~]$echostats

nclocalhostZookeeperversion:3.4.5-cdh5.13.1--1,builton11/09/:28GMTClients:/.0.0.1:[1](queued=0,recved=,sent=)/0:0:0:0:0:0:0:1:[0](queued=0,recved=1,sent=0)Latencymin/avg/max:0/0/25Received:Sent:Connections:2Outstanding:0Zxid:0x3fMode:standaloneNodecount:25

您可以检查Kafka是否与“echodump

nclocalhost

grep代理”(一个字符串/brokers/ids/0应该出现)

[oracle

bigdatalite~]$echodump

nclocalhost

grepbrokers/brokers/ids/0

用于PoC的BigDataLite-4.11虚拟机已经在启动虚拟机时启动了一个较老的ZooKeeper实例。因此,请确保禁用了步骤1中描述的所有服务。此外,当您打开一个新的Linuxshell时,请注意在启动ZooKeeper和Kafka之前总是要重置CLASSPATH环境变量,这一点在步骤开始时已经解释过了。

步骤8/12:为大数据安装GoldenGate

同样,从这个页面下载OracleGoldenGateforBigData12c只需要使用VM中安装的Firefox浏览器(我在Linuxx86-64上使用OracleGoldenGateforBigData12.3.2.1.1)。请注意,您需要一个(免费)Oracle帐户来获得它。

安装很容易,只是爆炸压缩包内的下载:

cd~/DownloadsunzipOGG_BigData_Linux_x64_12.3.2.1.1.zipcd..mkdirogg-bd-poccdogg-bd-poctarxvf../Downloads/OGG_BigData_Linux_x64_12.3.2.1.1.tar

就这样,GoldenGateforBigData12c被安装在/home/oracle/ogg-bd-poc文件夹中。

同样,BigDataLite-4.11虚拟机已经在/u01/ogg-bd文件夹中安装了用于大数据的GoldenGate。但它是一个较旧的版本,连接Kafka的选项较少。

步骤9/12:启动GoldenGateforBigDataManager

打开大数据大门

cd~/ogg-bd-poc./ggsci

需要更改管理器端口,否则之前启动的与GoldenGate(classic)管理器的冲突将被引发。

因此,从大数据的GoldenGate来看,CLI运行:

createsubdirseditparamsmgr

一个vi实例将开始,只是写这个内容:

PORT

然后保存内容,退出vi,返回CLI,我们终于可以启动GoldenGateforBigDatamanager监听端口:

步骤10/12:创建数据泵(DataPump)

现在,我们需要创建在GoldenGate世界中被称为数据泵的东西。数据泵是一个提取过程,它监视一个跟踪日志,并(实时地)将任何更改推到另一个由不同的(通常是远程的)GoldenGate实例管理的跟踪日志。

对于这个PoC,由GoldenGate(classic)管理的traillogaa将被泵送至GoldenGate管理的traillogbb进行大数据处理。

因此,如果您关闭它,请回到来自Linuxshell的GoldenGate(经典)CLI:

cd/u01/ogg./ggsci

来自GoldenGate(经典)CLI:

editparamspmpeshop

并在vi中加入以下内容:

EXTRACTpmpeshopUSERIDALIASggadminSETENV(ORACLE_SID=orcl)--GoldenGateforBigDataaddress/port:RMTHOSTlocalhost,MGRPORTRMTTRAIL./dirdat/bbPASSTHRU--ThetokenspartitisusefulforwritingintheKafkamessages--theTransactionIDandthedatabaseChangeSerialNumberTABLEorcl.eshop.*,tokens(txid=

GETENV(TRANSACTION,XID),csn=

GETENV(TRANSACTION,CSN));

保存内容并退出vi。

正如已经解释的提取器,保存的内容将存储在/u01/ogg/dirprm/pmpeshop中。人口、难民和移民事务局文件。

现在我们要注册并启动数据泵,从GoldenGateCLI:

dbloginuseridaliasggadminaddextractpmpeshop,exttrailsource./dirdat/aabeginnowaddrmttrail./dirdat/bbextractpmpeshopstartpmpeshop

通过从CLI运行以下命令之一来检查数据泵的状态:

infopmpeshopviewreportpmpeshop

你甚至可以在金门大数据的dirdat文件夹中查看traillogbb是否已经创建:

[oracle

bigdatalitedirdat]$ls-l~/ogg-bd-poc/dirdattotal0-rw-r-----.1oracleoinstall0May:22bb[oracle

bigdatalitedirdat]$

那检查泵送过程呢?来自Linuxshell:

sqlpluseshop/eshop

ORCL

执行这个SQL脚本创建一个新的模拟客户订单:

INSERTINTOCUSTOMER_ORDER(ID,CODE,CREATED,STATUS,UPDATE_TIME)VALUES(CUSTOMER_ORDER_SEQ.NEXTVAL,AAAA02,SYSDATE,SHIPPING,SYSTIMESTAMP);INSERTINTOCUSTOMER_ORDER_ITEM(ID,ID_CUSTOMER_ORDER,DESCRIPTION,QUANTITY)VALUES(CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL,CUSTOMER_ORDER_SEQ.CURRVAL,InsideOut,1);COMMIT;

现在从GoldenGate(经典)CLI运行:

statspmpeshop

用于检查插入操作是否正确计数(在输出的一部分下面):

GGSCI(bigdatalite.localdomainasggadmin

cdb/CDB$ROOT)11statspmpeshopSendingSTATSrequesttoEXTRACTPMPESHOP...StartofStatisticsat-05-:49:00.Outputto./dirdat/bb:ExtractingfromORCL.ESHOP.CUSTOMER_ORDERtoORCL.ESHOP.CUSTOMER_ORDER:***Totalstatisticssince-05-:01:56***Totalinserts1.00Totalupdates0.00Totaldeletes0.00Totaldiscards0.00Totaloperations1.00

此外,您还可以验证GoldenGate中存储的用于测试泵过程的大数据的跟踪日志的时间戳。事务提交后,从Linuxshell运行:“ln-l~/og-bd-poc/dirdat”,并检查最后一个以“bb”作为前缀的文件的时间戳。

步骤11/12:将事务发布到Kafka

最后,我们将在GoldenGate中为BigData创建一个副本流程,以便在Kafka主题中发布泵出的业务事务。replicat将从trail日志bb读取事务中的插入、更新和删除操作,并将它们转换为JSON编码的Kafka消息。

因此,创建一个名为eshop_kafkaconnect的文件。文件夹/home/oracle/ogg-bd-pocd/dirprm中的属性包含以下内容:

#File:/home/oracle/ogg-bd-poc/dirprm/eshop_kafkaconnect.properties#-----------------------------------------------------------#address/portoftheKafkabrokerbootstrap.servers=localhost:acks=1#JSONConverterSettingskey.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=falsevalue.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=false#Adjustforperformancebuffer.memory=batch.size=linger.ms=0#Thispropertyfixastart-uperrorasexplainedbyOracleSupporthere:#

1
查看完整版本: 事件驱动架构GoldenGate创建