#!/usr/bin/env python3
-- coding: utf-8 --
""" Oracle到MySQL自动分页迁移工具 基于CSDN博客:https://blog.csdn.net/wallace2018/article/details/ """
import cx_Oracle import pymysql import logging from datetime import datetime import time
配置日志
logging.basicConfig(
level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('migration.log', encoding='utf-8'), logging.StreamHandler() ]
) logger = logging.getLogger(name)
class OracleToMySQLMigration:
def __init__(self, oracle_config, mysql_config, batch_size=1000): """ 初始化迁移工具 Args: oracle_config: Oracle连接配置 mysql_config: MySQL连接配置 batch_size: 每批次处理的数据量 """ self.oracle_config = oracle_config self.mysql_config = mysql_config self.batch_size = batch_size self.current_page = 1 self.total_pages = 0 self.total_records = 0 # 建立数据库连接 self.oracle_conn = None self.mysql_conn = None self.connect_databases() def connect_databases(self): """建立数据库连接""" try: # Oracle连接 self.oracle_conn = cx_Oracle.connect( f"{self.oracle_config['user']}/{self.oracle_config['password']}@{self.oracle_config['host']}:{self.oracle_config['port']}/{self.oracle_config['service_name']}" ) logger.info("Oracle数据库连接成功") # MySQL连接 self.mysql_conn = pymysql.connect( host=self.mysql_config['host'], port=self.mysql_config['port'], user=self.mysql_config['user'], password=self.mysql_config['password'], database=self.mysql_config['database'], charset='utf8mb4' ) logger.info("MySQL数据库连接成功") except Exception as e: logger.error(f"数据库连接失败: {str(e)}") raise def get_total_records(self): """获取总记录数""" try: cursor = self.oracle_conn.cursor() sql = f"SELECT COUNT(1) as total_count FROM {self.oracle_config['source_table']}" cursor.execute(sql) result = cursor.fetchone() self.total_records = result[0] if result else 0 # 计算总页数 self.total_pages = (self.total_records + self.batch_size - 1) // self.batch_size logger.info(f"总记录数: {self.total_records}, 总页数: {self.total_pages}") return self.total_records except Exception as e: logger.error(f"获取总记录数失败: {str(e)}")
raise
def get_page_data(self, page_num): """获取指定页的数据""" try: cursor = self.oracle_conn.cursor() # 分页查询SQL (Oracle) start_row = (page_num - 1) * self.batch_size + 1 end_row = page_num * self.batch_size sql = f""" SELECT * FROM ( SELECT t.*, ROWNUM rn FROM {self.oracle_config['source_table']} t WHERE ROWNUM <= {end_row} ) WHERE rn >= {start_row} """ cursor.execute(sql) columns = [desc[0] for desc in cursor.description] data = cursor.fetchall() logger.info(f"第{page_num}页数据获取完成,共{len(data)}条记录") return columns, data except Exception as e: logger.error(f"获取第{page_num}页数据失败: {str(e)}") return None, None def insert_data_to_mysql(self, columns, data): """将数据插入MySQL""" try: cursor = self.mysql_conn.cursor() # 构建插入SQL column_str = ', '.join(columns) placeholder_str = ', '.join(['%s'] * len(columns)) insert_sql = f""" INSERT INTO ({', '.join(columns)}) VALUES ({', '.join(['%s'] * len(columns))}) """ # 批量插入 batch_data = [] for row in data: # 处理数据类型转换 processed_row = [] for value in row: if isinstance(value, cx_Oracle.LOB): # Oracle LOB转字符串 processed_row.append(value.read() if value else None) else: processed_row.append(value) batch_data.append(processed_row) # 执行批量插入 cursor.executemany(insert_sql, batch_data) self.mysql_conn.commit() logger.info(f"成功插入{len(batch_data)}条记录到MySQL") return len(batch_data) except Exception as e: logger.error(f"插入MySQL失败: {str(e)}") self.mysql_conn.rollback() return 0 def migrate_page(self, page_num): """迁移单页数据""" logger.info(f"开始迁移第{page_num}页数据...") # 获取数据 columns, data = self.get_page_data(page_num) if data is None: return False if len(data) == 0: logger.info(f"第{page_num}页无数据") return True # 插入数据 inserted_count = self.insert_data_to_mysql(columns, data) logger.info(f"第{page_num}页迁移完成,插入{inserted_count}条记录") return inserted_count > 0 def run_migration(self): """执行完整迁移""" logger.info("开始Oracle到MySQL分页迁移...") start_time = time.time() try: # 获取总记录数 self.get_total_records() if self.total_records == 0: logger.warning("源表无数据,迁移结束") return # 逐页迁移 success_count = 0 for page in range(1, self.total_pages + 1): logger.info(f"进度: {page}/{self.total_pages} ({page/self.total_pages*100:.1f}%)") if self.migrate_page(page): success_count += 1 # 页码递增 self.current_page = page # 统计结果 end_time = time.time() duration = end_time - start_time logger.info("=" * 50) logger.info("迁移完成统计:") logger.info(f"总记录数: {self.total_records}") logger.info(f"总页数: {self.total_pages}") logger.info(f"成功页数: {success_count}") logger.info(f"失败页数: {self.total_pages - success_count}") logger.info(f"总耗时: {duration:.2f}秒") logger.info(f"平均速度: {self.total_records/duration*60:.0f}条/分钟") logger.info("=" * 50) except Exception as e: logger.error(f"迁移过程出错: {str(e)}") raise def close_connections(self): """关闭数据库连接""" try: if self.oracle_conn: self.oracle_conn.close() logger.info("Oracle连接已关闭") if self.mysql_conn: self.mysql_conn.close() logger.info("MySQL连接已关闭") except Exception as e: logger.error(f"
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/257408.html