MySQL慢日志分析工具[可运行源码]

MySQL慢日志分析工具[可运行源码]usr bin env python3 脚本 03 生成测试数据并产生慢 SQL 参考博客 https blog csdn net zuozewei article details import mysql connector import time import random import string import sys import os def

大家好,我是讯享网,很高兴认识大家。这里提供最前沿的Ai技术和互联网信息。

#!/usr/bin/env python3

脚本 03: 生成测试数据并产生慢 SQL

参考博客: https://blog.csdn.net/zuozewei/article/details/

import mysql.connector import time import random import string import sys import os

def generate_random_string(length=100):

"""生成随机字符串""" return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) 

def create_test_database():

GPT plus 代充 只需 145"""创建测试数据库和表""" try: # 尝试使用密码连接 conn = mysql.connector.connect( host="localhost", user="root", password="inscode", autocommit=True ) except mysql.connector.Error as e: # 如果失败,尝试无密码连接 try: conn = mysql.connector.connect( host="localhost", user="root", autocommit=True ) except mysql.connector.Error as e: print(f"❌ 无法连接到 MySQL: {e}") sys.exit(1) cursor = conn.cursor() # 创建测试数据库 print("创建测试数据库 'loadtest'...") cursor.execute("CREATE DATABASE IF NOT EXISTS loadtest") cursor.execute("USE loadtest") # 创建测试表(模拟博客中的 sbtest 表) print("创建测试表 'sbtest'...") cursor.execute(""" CREATE TABLE IF NOT EXISTS sbtest ( id INT PRIMARY KEY AUTO_INCREMENT, k INT NOT NULL, c VARCHAR(500) NOT NULL, pad VARCHAR(200) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_k (k) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) # 创建额外的测试表(模拟博客中的 sbtest21, sbtest23, sbtest33) for table_num in [21, 23, 33]: table_name = f"sbtest{table_num}" print(f"创建测试表 '{table_name}'...") cursor.execute(f""" CREATE TABLE IF NOT EXISTS {table_name} ( id INT PRIMARY KEY, k INT NOT NULL, c VARCHAR(500) NOT NULL, pad VARCHAR(200) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) conn.close() print("✅ 数据库和表创建完成") 

def generate_test_data():

"""生成测试数据""" try: conn = mysql.connector.connect( host="localhost", user="root", password="inscode", database="loadtest", autocommit=True ) except mysql.connector.Error as e: try: conn = mysql.connector.connect( host="localhost", user="root", database="loadtest", autocommit=True ) except mysql.connector.Error as e: print(f"❌ 无法连接到数据库: {e}") sys.exit(1) cursor = conn.cursor() # 1. 向 sbtest 表插入大量数据 print("向 sbtest 表插入测试数据...") batch_size = 1000 total_records = 10000 for i in range(0, total_records, batch_size): values = [] for j in range(batch_size): record_id = i + j + 1 k_value = random.randint(1, ) c_value = generate_random_string(100) pad_value = generate_random_string(50) values.append(f"({record_id}, {k_value}, '{c_value}', '{pad_value}')") sql = f"INSERT INTO sbtest (id, k, c, pad) VALUES {','.join(values)}" cursor.execute(sql) if (i // batch_size) % 10 == 0: print(f" 已插入 {i + batch_size}/{total_records} 条记录") print(f"✅ sbtest 表数据插入完成,共 {total_records} 条记录") # 2. 向其他表插入数据(模拟博客示例) print("向 sbtest21, sbtest23, sbtest33 表插入数据...") for table_num in [21, 23, 33]: table_name = f"sbtest{table_num}" # 每个表插入 1000 条记录 for record_id in range(1, 1001): k_value = random.randint(1, ) c_value = generate_random_string(100) pad_value = generate_random_string(50) sql = f""" INSERT INTO {table_name} (id, k, c, pad) VALUES ({record_id}, {k_value}, '{c_value}', '{pad_value}') ON DUPLICATE KEY UPDATE k=VALUES(k), c=VALUES(c), pad=VALUES(pad) """ cursor.execute(sql) print(f"✅ {table_name} 表数据插入完成,共 1000 条记录") conn.close() 

def generate_slow_queries():

GPT plus 代充 只需 145"""生成慢查询""" try: conn = mysql.connector.connect( host="localhost", user="root", password="inscode", database="loadtest", autocommit=True ) except mysql.connector.Error as e: try: conn = mysql.connector.connect( host="localhost", user="root", database="loadtest", autocommit=True ) except mysql.connector.Error as e: print(f"❌ 无法连接到数据库: {e}") sys.exit(1) cursor = conn.cursor() print("开始生成慢查询...") # 1. 执行无索引的全表扫描查询(可能产生慢查询) print("执行全表扫描查询...") start_time = time.time() cursor.execute("SELECT COUNT(*) FROM sbtest WHERE c LIKE '%test%'") result = cursor.fetchone() elapsed = time.time() - start_time print(f" 全表扫描查询耗时: {elapsed:.2f} 秒,结果: {result[0]}") # 2. 执行复杂的 JOIN 查询 print("执行复杂 JOIN 查询...") start_time = time.time() cursor.execute(""" SELECT s1.id, s1.k, s2.c FROM sbtest s1 JOIN sbtest s2 ON s1.k = s2.k WHERE s1.id < 1000 ORDER BY s1.id DESC LIMIT 100 """) results = cursor.fetchall() elapsed = time.time() - start_time print(f" 复杂 JOIN 查询耗时: {elapsed:.2f} 秒,返回 {len(results)} 条记录") # 3. 执行大字段更新(模拟博客中的慢 UPDATE) print("执行大字段更新操作...") for i in range(1, 11): new_c_value = generate_random_string(200) # 更长的字符串 start_time = time.time() cursor.execute(f""" UPDATE sbtest23 SET c='{new_c_value}' WHERE id={i} """) elapsed = time.time() - start_time print(f" 更新操作 {i} 耗时: {elapsed:.2f} 秒") if elapsed > 0.5: # 如果耗时较长,等待一下让慢日志记录 time.sleep(0.1) # 4. 执行删除操作(模拟博客中的慢 DELETE) print("执行删除操作...") for i in range(1001, 1011): start_time = time.time() cursor.execute(f"DELETE FROM sbtest21 WHERE id={i}") elapsed = time.time() - start_time print(f" 删除操作 {i} 耗时: {elapsed:.2f} 秒") # 5. 执行插入操作(模拟博客中的慢 INSERT) print("执行插入操作...") for i in range(2001, 2011): k_value = random.randint(1, ) c_value = generate_random_string(300) # 更长的字符串 pad_value = generate_random_string(150) start_time = time.time() cursor.execute(f""" INSERT INTO sbtest33 (id, k, c, pad) VALUES ({i}, {k_value}, '{c_value}', '{pad_value}') """) elapsed = time.time() - start_time print(f" 插入操作 {i} 耗时: {elapsed:.2f} 秒") conn.close() print("✅ 慢查询生成完成") # 检查慢日志文件 slow_log_path = "/var/log/mysql/mysql-slow.log" if os.path.exists(slow_log_path): print(f 
小讯
上一篇 2026-03-19 18:05
下一篇 2026-03-19 18:03

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/245285.html