#!/usr/bin/env python3
脚本 03: 生成测试数据并产生慢 SQL
参考博客: https://blog.csdn.net/zuozewei/article/details/
import mysql.connector import time import random import string import sys import os
def generate_random_string(length=100):
"""生成随机字符串""" return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def create_test_database():
GPT plus 代充 只需 145"""创建测试数据库和表""" try: # 尝试使用密码连接 conn = mysql.connector.connect( host="localhost", user="root", password="inscode", autocommit=True ) except mysql.connector.Error as e: # 如果失败,尝试无密码连接 try: conn = mysql.connector.connect( host="localhost", user="root", autocommit=True ) except mysql.connector.Error as e: print(f"❌ 无法连接到 MySQL: {e}") sys.exit(1) cursor = conn.cursor() # 创建测试数据库 print("创建测试数据库 'loadtest'...") cursor.execute("CREATE DATABASE IF NOT EXISTS loadtest") cursor.execute("USE loadtest") # 创建测试表(模拟博客中的 sbtest 表) print("创建测试表 'sbtest'...") cursor.execute(""" CREATE TABLE IF NOT EXISTS sbtest ( id INT PRIMARY KEY AUTO_INCREMENT, k INT NOT NULL, c VARCHAR(500) NOT NULL, pad VARCHAR(200) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_k (k) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) # 创建额外的测试表(模拟博客中的 sbtest21, sbtest23, sbtest33) for table_num in [21, 23, 33]: table_name = f"sbtest{table_num}" print(f"创建测试表 '{table_name}'...") cursor.execute(f""" CREATE TABLE IF NOT EXISTS {table_name} ( id INT PRIMARY KEY, k INT NOT NULL, c VARCHAR(500) NOT NULL, pad VARCHAR(200) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) conn.close() print("✅ 数据库和表创建完成")
def generate_test_data():
"""生成测试数据""" try: conn = mysql.connector.connect( host="localhost", user="root", password="inscode", database="loadtest", autocommit=True ) except mysql.connector.Error as e: try: conn = mysql.connector.connect( host="localhost", user="root", database="loadtest", autocommit=True ) except mysql.connector.Error as e: print(f"❌ 无法连接到数据库: {e}") sys.exit(1) cursor = conn.cursor() # 1. 向 sbtest 表插入大量数据 print("向 sbtest 表插入测试数据...") batch_size = 1000 total_records = 10000 for i in range(0, total_records, batch_size): values = [] for j in range(batch_size): record_id = i + j + 1 k_value = random.randint(1, ) c_value = generate_random_string(100) pad_value = generate_random_string(50) values.append(f"({record_id}, {k_value}, '{c_value}', '{pad_value}')") sql = f"INSERT INTO sbtest (id, k, c, pad) VALUES {','.join(values)}" cursor.execute(sql) if (i // batch_size) % 10 == 0: print(f" 已插入 {i + batch_size}/{total_records} 条记录") print(f"✅ sbtest 表数据插入完成,共 {total_records} 条记录") # 2. 向其他表插入数据(模拟博客示例) print("向 sbtest21, sbtest23, sbtest33 表插入数据...") for table_num in [21, 23, 33]: table_name = f"sbtest{table_num}" # 每个表插入 1000 条记录 for record_id in range(1, 1001): k_value = random.randint(1, ) c_value = generate_random_string(100) pad_value = generate_random_string(50) sql = f""" INSERT INTO {table_name} (id, k, c, pad) VALUES ({record_id}, {k_value}, '{c_value}', '{pad_value}') ON DUPLICATE KEY UPDATE k=VALUES(k), c=VALUES(c), pad=VALUES(pad) """ cursor.execute(sql) print(f"✅ {table_name} 表数据插入完成,共 1000 条记录") conn.close()
def generate_slow_queries():
GPT plus 代充 只需 145"""生成慢查询""" try: conn = mysql.connector.connect( host="localhost", user="root", password="inscode", database="loadtest", autocommit=True ) except mysql.connector.Error as e: try: conn = mysql.connector.connect( host="localhost", user="root", database="loadtest", autocommit=True ) except mysql.connector.Error as e: print(f"❌ 无法连接到数据库: {e}") sys.exit(1) cursor = conn.cursor() print("开始生成慢查询...") # 1. 执行无索引的全表扫描查询(可能产生慢查询) print("执行全表扫描查询...") start_time = time.time() cursor.execute("SELECT COUNT(*) FROM sbtest WHERE c LIKE '%test%'") result = cursor.fetchone() elapsed = time.time() - start_time print(f" 全表扫描查询耗时: {elapsed:.2f} 秒,结果: {result[0]}") # 2. 执行复杂的 JOIN 查询 print("执行复杂 JOIN 查询...") start_time = time.time() cursor.execute(""" SELECT s1.id, s1.k, s2.c FROM sbtest s1 JOIN sbtest s2 ON s1.k = s2.k WHERE s1.id < 1000 ORDER BY s1.id DESC LIMIT 100 """) results = cursor.fetchall() elapsed = time.time() - start_time print(f" 复杂 JOIN 查询耗时: {elapsed:.2f} 秒,返回 {len(results)} 条记录") # 3. 执行大字段更新(模拟博客中的慢 UPDATE) print("执行大字段更新操作...") for i in range(1, 11): new_c_value = generate_random_string(200) # 更长的字符串 start_time = time.time() cursor.execute(f""" UPDATE sbtest23 SET c='{new_c_value}' WHERE id={i} """) elapsed = time.time() - start_time print(f" 更新操作 {i} 耗时: {elapsed:.2f} 秒") if elapsed > 0.5: # 如果耗时较长,等待一下让慢日志记录 time.sleep(0.1) # 4. 执行删除操作(模拟博客中的慢 DELETE) print("执行删除操作...") for i in range(1001, 1011): start_time = time.time() cursor.execute(f"DELETE FROM sbtest21 WHERE id={i}") elapsed = time.time() - start_time print(f" 删除操作 {i} 耗时: {elapsed:.2f} 秒") # 5. 执行插入操作(模拟博客中的慢 INSERT) print("执行插入操作...") for i in range(2001, 2011): k_value = random.randint(1, ) c_value = generate_random_string(300) # 更长的字符串 pad_value = generate_random_string(150) start_time = time.time() cursor.execute(f""" INSERT INTO sbtest33 (id, k, c, pad) VALUES ({i}, {k_value}, '{c_value}', '{pad_value}') """) elapsed = time.time() - start_time print(f" 插入操作 {i} 耗时: {elapsed:.2f} 秒") conn.close() print("✅ 慢查询生成完成") # 检查慢日志文件 slow_log_path = "/var/log/mysql/mysql-slow.log" if os.path.exists(slow_log_path): print(f
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/245285.html