使用Flink CDC实现 Oracle数据库数据同步
Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式实现数据库同步,同时也提供了Flink CDC Source Connector API。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
本文通过flink-connector-oracle-cdc来实现Oracle数据库的数据同步。
一、开启归档日志
1)数据库服务器终端,使用sysdba角色连接数据库
1 2 3 4 | sqlplus / as sysdba 或 sqlplus /nolog CONNECT sys /password AS SYSDBA; |
2)检查归档日志是否开启
1 | archive log list; |
(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3)启用归档日志
1 2 3 4 5 6 | alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile; shutdown immediate; startup mount; alter database archivelog; alter database open ; |
注意:
启用归档日志需要重启数据库。
归档日志会占用大量的磁盘空间,应定期清除过期的日志文件
4)启动完成后重新执行 archive log list; 查看归档打开状态
二、创建flinkcdc专属用户
2.1 对于Oracle 非CDB数据库,执行如下sql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flinkuser; GRANT SET CONTAINER TO flinkuser; GRANT SELECT ON V_$ DATABASE to flinkuser; GRANT FLASHBACK ANY TABLE TO flinkuser; GRANT SELECT ANY TABLE TO flinkuser; GRANT SELECT_CATALOG_ROLE TO flinkuser; GRANT EXECUTE_CATALOG_ROLE TO flinkuser; GRANT SELECT ANY TRANSACTION TO flinkuser; GRANT LOGMINING TO flinkuser; GRANT ANALYZE ANY TO flinkuser; GRANT CREATE TABLE TO flinkuser; -- need not to execute if set scan.incremental.snapshot.enabled=true(default) GRANT LOCK ANY TABLE TO flinkuser; GRANT ALTER ANY TABLE TO flinkuser; GRANT CREATE SEQUENCE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser; GRANT SELECT ON V_$LOGFILE TO flinkuser; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser; |
2.2 对于Oracle CDB数据库,执行如下sql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER= ALL ; GRANT CREATE SESSION TO flinkuser CONTAINER= ALL ; GRANT SET CONTAINER TO flinkuser CONTAINER= ALL ; GRANT SELECT ON V_$ DATABASE to flinkuser CONTAINER= ALL ; GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER= ALL ; GRANT SELECT ANY TABLE TO flinkuser CONTAINER= ALL ; GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER= ALL ; GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER= ALL ; GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER= ALL ; GRANT LOGMINING TO flinkuser CONTAINER= ALL ; GRANT CREATE TABLE TO flinkuser CONTAINER= ALL ; -- need not to execute if set scan.incremental.snapshot.enabled=true(default) GRANT LOCK ANY TABLE TO flinkuser CONTAINER= ALL ; GRANT CREATE SEQUENCE TO flinkuser CONTAINER= ALL ; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER= ALL ; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER= ALL ; GRANT SELECT ON V_$LOG TO flinkuser CONTAINER= ALL ; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER= ALL ; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER= ALL ; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER= ALL ; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER= ALL ; GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER= ALL ; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER= ALL ; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER= ALL ; |
三、指定oracle表、库级启用
1 2 3 4 5 6 | -- 指定表启用补充日志记录: ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA ( ALL ) COLUMNS; -- 为数据库的所有表启用 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA ( ALL ) COLUMNS; -- 指定数据库启用补充日志记录 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; |
四、使用flink-connector-oracle-cdc实现数据库同步
4.1 引入pom依赖
1 2 3 4 5 | <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-oracle-cdc</artifactId> <version>2.4.0</version> </dependency> |
4.2 Java主代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | package test.datastream.cdc.oracle; import com.ververica.cdc.connectors.oracle.OracleSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.types.Row; import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction; import test.datastream.cdc.oracle.function.CdcString2RowMap; import test.datastream.cdc.oracle.function.DbCdcSinkFunction; import java.util.Properties; public class OracleCdcExample { public static void main(String[] args) throws Exception { Properties properties = new Properties(); //数字类型数据 转换为字符 properties.setProperty( "decimal.handling.mode" , "string" ); SourceFunction<String> sourceFunction = OracleSource.<String>builder() // .startupOptions(StartupOptions.latest()) // 从最晚位点启动 .url( "jdbc:oracle:thin:@localhost:1521:orcl" ) .port( 1521 ) .database( "ORCL" ) // monitor XE database .schemaList( "c##flink_user" ) // monitor inventory schema .tableList( "c##flink_user.TEST2" ) // monitor products table .username( "c##flink_user" ) .password( "flinkpw" ) .debeziumProperties(properties) .deserializer( new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism( 1 ); // use parallelism 1 for sink to keep message ordering SingleOutputStreamOperator<Row> mapStream = source.flatMap( new CdcString2RowMap()); SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds( 5 ))) .process( new CacheDataAllWindowFunction()); //批量同步 winStream.addSink( new DbCdcSinkFunction( null )); env.execute(); } } |
4.3json转换为row
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | package test.datastream.cdc.oracle.function; import cn.com.victorysoft.common.configuration.VsConfiguration; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import test.datastream.cdc.CdcConstants; import java.sql.Timestamp; import java.util.HashMap; import java.util.Map; import java.util.Set; /** * @desc cdc json解析,并转换为Row */ public class CdcString2RowMap extends RichFlatMapFunction<String, Row> { private Map<String,Integer> columnMap = new HashMap<>(); @Override public void open(Configuration parameters) throws Exception { columnMap.put( "ID" , 0 ); columnMap.put( "NAME" , 1 ); columnMap.put( "DESCRIPTION" , 2 ); columnMap.put( "AGE" , 3 ); columnMap.put( "CREATE_TIME" , 4 ); columnMap.put( "SCORE" , 5 ); columnMap.put( "C_1" , 6 ); columnMap.put( "B_1" , 7 ); } @Override public void flatMap(String s, Collector<Row> collector) throws Exception { System.out.println( "receive: " +s); VsConfiguration conf=VsConfiguration.from(s); String op = conf.getString(CdcConstants.K_OP); VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE); VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER); Row row = null ; if (CdcConstants.OP_C.equals(op)){ //插入,使用after数据 row = convertToRow(after); row.setKind(RowKind.INSERT); } else if (CdcConstants.OP_U.equals(op)){ //更新,使用after数据 row = convertToRow(after); row.setKind(RowKind.UPDATE_AFTER); } else if (CdcConstants.OP_D.equals(op)){ //删除,使用before数据 row = convertToRow(before); row.setKind(RowKind.DELETE); } else { //r 操作,使用after数据 row = convertToRow(after); row.setKind(RowKind.INSERT); } collector.collect(row); } private Row convertToRow(VsConfiguration data){ Set<String> keys = data.getKeys(); int size = keys.size(); Row row= new Row( 8 ); int i= 0 ; for (String key:keys) { Integer index = this .columnMap.get(key); Object value=data.get(key); if (key.equals( "CREATE_TIME" )){ //long日期转timestamp value=long2Timestamp((Long)value); } row.setField(index,value); } return row; } private static java.sql.Timestamp long2Timestamp(Long time){ Timestamp timestamp = new Timestamp(time/ 1000 ); System.out.println(timestamp); return timestamp; } } |