#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 根据 database 模块下的 md 文档和 _raw_schema.txt 生成表/字段注释的 ALTER SQL。 用法: python gen_table_comments.py [kucoin|copytrade|all] 默认 all:同时生成 kucoin 和 copytrade 两个库的 SQL """ import re import os import sys DB_DIR = os.path.dirname(os.path.abspath(__file__)) SCHEMA_FILE = os.path.join(DB_DIR, '_raw_schema.txt') COPYTRADE_SCHEMA_FILE = os.path.join(DB_DIR, '_copytrade_dump.sql') COPYTRADE_TABLES_FILE = os.path.join(DB_DIR, 'copytrade_tables.txt') SKIP_TABLES = {'activity_copy1', 'country1', 'country2', 'cz'} def parse_mysqldump_schema(dump_path): """解析 mysqldump 导出的 schema,返回与 parse_schema 相同结构""" with open(dump_path, 'r', encoding='utf-8') as f: content = f.read() tables = {} for m in re.finditer(r"CREATE TABLE `([a-z0-9_]+)`\s*\((.*?)\)\s*ENGINE=[^;]+;", content, re.DOTALL): table_name, block = m.group(1), m.group(2) # 从 ) ENGINE=... 到 ; 之间提取 COMMENT tail = content[m.start():m.end()] tc = re.search(r"COMMENT='([^']*)'", tail) table_comment = tc.group(1) if tc else None cols = {} for line in block.split('\n'): sline = line.strip() if sline.startswith(('PRIMARY', 'KEY ', 'UNIQUE ', 'CONSTRAINT ', 'FOREIGN ')): break col_m = re.match(r'\s*`(\w+)`\s+(.+)', line) if not col_m: continue col_name, col_def = col_m.group(1), col_m.group(2).strip().rstrip(',') if col_def.strip().startswith('('): continue col_comment = None cm = re.search(r"\s*COMMENT\s+'([^']*)'", col_def) if cm: col_comment = cm.group(1).strip() col_def_clean = re.sub(r"\s*COMMENT\s+'[^']*'", '', col_def).strip() cols[col_name] = {'def': col_def_clean, 'comment': col_comment} tables[table_name] = {'columns': cols, 'table_comment': table_comment} return tables def load_copytrade_tables(): """加载 copytrade 库的表清单(仅生成这些表的 SQL)""" if os.path.exists(COPYTRADE_TABLES_FILE): tables = set() with open(COPYTRADE_TABLES_FILE, 'r', encoding='utf-8') as f: for line in f: line = line.strip().split('#')[0].strip() if line and re.match(r'^[a-z_][a-z0-9_]*$', line): tables.add(line) if tables: return tables # 默认清单:跟单模块核心表及依赖 return { 'admin', 'admin_access_log', 'admin_permission', 'admin_role', 'admin_role_permission', 'coin', 'contract_coin', 'contract_order_entrust', 'contract_reward_record', 'department', 'follow_lock_config', 'follow_sub_wallet', 'follow_wallet', 'follow_wallet_item', 'funding_rate_history', 'member', 'member_contract_position', 'member_contract_wallet', 'member_level', 'member_transaction', 'member_wallet', 'wallet_trans_record', } def parse_schema(schema_path): """解析 _raw_schema.txt""" with open(schema_path, 'r', encoding='utf-8') as f: content = f.read() tables = {} for m in re.finditer(r'=== TABLE: ([a-z0-9_]+) \(rows: [^)]+\) ===\s*\n(.*?)(?=\n=== TABLE:|\Z)', content, re.DOTALL): table_name, block = m.group(1), m.group(2) m2 = re.search(r"\)\s*ENGINE=\S+[^;]*COMMENT='([^']*)'", block) table_comment = m2.group(1) if m2 else None cols = {} # 按行解析,每行格式: `col_name` type_definition, for line in block.split('\n'): # 跳过 KEY、PRIMARY、CONSTRAINT 等索引/约束行 sline = line.strip() if sline.startswith(('PRIMARY', 'KEY ', 'UNIQUE ', 'CONSTRAINT ', 'FOREIGN ')): break m = re.match(r'\s*`(\w+)`\s+(.+)', line) if not m: continue col_name, col_def = m.group(1), m.group(2).strip().rstrip(',') # 跳过索引定义如 KEY `member_id` (`member_id`) if col_def.strip().startswith('('): continue # 提取列定义中的 COMMENT 作为补充来源(md 优先) col_comment = None cm = re.search(r"\s*COMMENT\s+'([^']*)'", col_def) if cm: col_comment = cm.group(1).strip() col_def_clean = re.sub(r"\s*COMMENT\s+'[^']*'", '', col_def).strip() cols[col_name] = {'def': col_def_clean, 'comment': col_comment} tables[table_name] = {'columns': cols, 'table_comment': table_comment} return tables def parse_md_files(db_dir): """解析 md 文档,提取表名和字段注释""" result = {} for fn in sorted(os.listdir(db_dir)): if not fn.endswith('.md') or fn == 'README.md': continue path = os.path.join(db_dir, fn) with open(path, 'r', encoding='utf-8') as f: content = f.read() # 1. 解析模块概述表格: | 表名 | 说明 | 获取表注释 in_overview = False for line in content.split('\n'): line = line.strip() if not line.startswith('|'): in_overview = False continue cells = [c.strip().replace('`', '') for c in line.split('|') if c.strip()] if len(cells) < 2: continue first = cells[0].lower() if first == '表名': in_overview = True continue if in_overview and re.match(r'^-+$', first): continue if in_overview and re.match(r'^[a-z_][a-z0-9_]*$', cells[0], re.I): tbl, desc = cells[0], cells[1] if tbl not in result: result[tbl] = {'table_comment': None, 'columns': {}} # 仅当尚无表注释时写入,避免「关联扩展表」等覆盖主模块说明 if desc and len(desc) < 150 and result[tbl]['table_comment'] is None: result[tbl]['table_comment'] = desc # 2. 匹配表章节: ## 1. table_name — / ### 1. table_name() for m in re.finditer(r'^#{2,3}\s*(?:\d+\.\s+)?([a-z_][a-z0-9_]*)\s*(?:[—\-—]\s*([^#\n]+?)(?:\s*$)|[((]([^)\)]+)[))])\s*$', content, re.MULTILINE): table_name = m.group(1) # 从标题提取表注释:— 后面的文字 或 ()内的文字(overview 说明优先,不覆盖) title_comment = (m.group(2) or m.group(3) or '').strip().lstrip('— -') if table_name not in result: result[table_name] = {'table_comment': None, 'columns': {}} if title_comment and result[table_name]['table_comment'] is None: result[table_name]['table_comment'] = title_comment start = m.end() # 截取到下一个表章节:## N. 或 ### N. (避免混入其他表) end = re.search(r'\n^#{2,3}\s*\d+\.\s+', content[start:], re.MULTILINE) section = content[start:start + end.start()] if end else content[start:start+15000] if table_name not in result: result[table_name] = {'table_comment': None, 'columns': {}} # 表注释: > 表注释:xxx(overview 说明优先,表注释仅在没有 overview 时补充) tc = re.search(r'表注释[::]\s*[`]?([^`\n]+)', section) if tc: val = tc.group(1).strip() if val and '无(' not in val and '未设置' not in val: # 仅当尚无 overview 说明时使用 表注释 if result[table_name]['table_comment'] is None: result[table_name]['table_comment'] = val # 字段表格: 只解析「字段说明」表(跳过 索引说明、枚举值 等表) in_table = False in_field_table = False # 仅在 字段名/列名 开头的表内才采集 for line in section.split('\n'): line = line.strip() if not line.startswith('|'): in_table = False in_field_table = False continue cells = [c.strip() for c in line.split('|') if c.strip()] if len(cells) < 2: continue first = cells[0].replace('`', '').strip().lower() if re.match(r'^-+$', first): in_table = True continue if first in ('字段', '字段名', 'field', '列名'): in_table = True in_field_table = True continue if first in ('索引名', '索引', '枚举值', '数据库值'): in_field_table = False in_table = True continue if not in_table and len(cells) >= 3: in_table = True if not in_field_table: continue col_name = cells[0].replace('`', '').strip() if not re.match(r'^[a-z_][a-z0-9_]*$', col_name, re.I): continue comment = cells[-1].strip() if len(cells) >= 2 else '' if comment and comment not in ('说明', '') and len(comment) < 200: result[table_name]['columns'][col_name] = comment return result def escape_sql(s): """MySQL 字符串转义:单引号用 '' 转义""" return s.replace("\\", "\\\\").replace("'", "''") # 常见列名的推断注释(无文档时的兜底) INFER_COMMENTS = { 'id': '主键ID', 'create_time': '创建时间', 'update_time': '更新时间', 'create_date': '创建日期', 'update_date': '更新日期', 'status': '状态', 'type': '类型', 'amount': '数量', 'balance': '余额', 'frozen_balance': '冻结余额', 'remark': '备注', 'sort': '排序', 'enable': '是否启用', 'is_show': '是否显示', 'is_default': '是否默认', 'is_top': '是否置顶', 'name': '名称', 'title': '标题', 'content': '内容', 'description': '描述', 'symbol': '交易对/符号', 'price': '价格', 'fee': '手续费', 'total': '合计', 'count': '数量', 'number': '数量', 'username': '用户名', 'password': '密码', 'phone': '手机号', 'email': '邮箱', 'address': '地址', 'url': 'URL', 'image_url': '图片URL', 'link': '链接', 'code': '编码/代码', 'key': '键', 'value': '值', 'start_time': '开始时间', 'end_time': '结束时间', 'deal_time': '成交时间', 'order_id': '订单ID', 'coin_id': '币种ID', 'member_id': '用户ID', 'admin_id': '管理员ID', 'parent_id': '父级ID', 'source_member_id': '来源用户ID', 'activity_id': '活动ID', 'role_id': '角色ID', 'contract_id': '合约ID', 'position_id': '持仓ID', 'unit': '单位', } def infer_comment(col_name): """根据列名推断简短注释(兜底用)""" lower = col_name.lower() if lower in INFER_COMMENTS: return INFER_COMMENTS[lower] if lower.endswith('_id') and lower != 'id': prefix = lower[:-3].replace('_', ' ') return f'关联{prefix}的ID' if lower.endswith('_time'): return lower[:-5].replace('_', ' ') + '时间' if lower.endswith('_amount'): return lower[:-7].replace('_', ' ') + '数量' return None def generate_sql(output_path, db_name): """生成指定库的表/字段注释 SQL""" table_filter = None if db_name == 'copytrade' and os.path.exists(COPYTRADE_SCHEMA_FILE): # copytrade 使用从实际库导出的 schema schema = parse_mysqldump_schema(COPYTRADE_SCHEMA_FILE) else: schema = parse_schema(SCHEMA_FILE) if db_name == 'copytrade': table_filter = load_copytrade_tables() md_data = parse_md_files(DB_DIR) lines = [ f'-- 根据 database 模块文档生成的表/字段注释 SQL(库: {db_name})', '-- 连接: mysql -h rm-3nsx177900f04a7gvwo.mysql.rds.aliyuncs.com -u root -p', '-- 执行前请备份', f'USE {db_name};', '', ] for table_name in sorted(schema.keys()): if table_filter is not None and table_name not in table_filter: continue if table_name in SKIP_TABLES or 'bak' in table_name.lower(): continue s = schema[table_name] md = md_data.get(table_name, {}) tc = md.get('table_comment') or s.get('table_comment') if tc and ('无(' in tc or '未设置' in tc): tc = None if not tc: tc = table_name.replace('_', ' ') # 无文档时用表名兜底 lines.append(f"ALTER TABLE `{table_name}` COMMENT '{escape_sql(tc)}';") for col_name, col_info in s.get('columns', {}).items(): if isinstance(col_info, str): col_def, schema_comment = col_info, None else: col_def, schema_comment = col_info.get('def', ''), col_info.get('comment') comment = md.get('columns', {}).get(col_name) or schema_comment or infer_comment(col_name) if not comment: comment = col_name.replace('_', ' ') full_def = re.sub(r"\s*COMMENT\s+'[^']*'", '', col_def) if col_def else '' lines.append(f"ALTER TABLE `{table_name}` MODIFY COLUMN `{col_name}` {full_def} COMMENT '{escape_sql(comment)}';") if tc or s.get('columns'): lines.append('') with open(output_path, 'w', encoding='utf-8') as f: f.write('\n'.join(lines)) print(f'Generated {output_path}') def main(): dbs = ['kucoin', 'copytrade'] arg = (sys.argv[1] if len(sys.argv) > 1 else 'all').lower() if arg == 'all': for db in dbs: out = os.path.join(DB_DIR, f'alter_table_comments_{db}.sql') generate_sql(out, db) elif arg in dbs: out = os.path.join(DB_DIR, f'alter_table_comments_{arg}.sql') generate_sql(out, arg) # 同时更新 alter_table_comments.sql 为主输出 default_out = os.path.join(DB_DIR, 'alter_table_comments.sql') generate_sql(default_out, arg) else: print(f'用法: python gen_table_comments.py [{"|".join(dbs)}|all]') sys.exit(1) if __name__ == '__main__': main()