| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 根据 database 模块下的 md 文档和 _raw_schema.txt 生成表/字段注释的 ALTER SQL。
- 用法: python gen_table_comments.py [kucoin|copytrade|all]
- 默认 all:同时生成 kucoin 和 copytrade 两个库的 SQL
- """
- import re
- import os
- import sys
- DB_DIR = os.path.dirname(os.path.abspath(__file__))
- SCHEMA_FILE = os.path.join(DB_DIR, '_raw_schema.txt')
- COPYTRADE_SCHEMA_FILE = os.path.join(DB_DIR, '_copytrade_dump.sql')
- COPYTRADE_TABLES_FILE = os.path.join(DB_DIR, 'copytrade_tables.txt')
- SKIP_TABLES = {'activity_copy1', 'country1', 'country2', 'cz'}
- def parse_mysqldump_schema(dump_path):
- """解析 mysqldump 导出的 schema,返回与 parse_schema 相同结构"""
- with open(dump_path, 'r', encoding='utf-8') as f:
- content = f.read()
- tables = {}
- for m in re.finditer(r"CREATE TABLE `([a-z0-9_]+)`\s*\((.*?)\)\s*ENGINE=[^;]+;", content, re.DOTALL):
- table_name, block = m.group(1), m.group(2)
- # 从 ) ENGINE=... 到 ; 之间提取 COMMENT
- tail = content[m.start():m.end()]
- tc = re.search(r"COMMENT='([^']*)'", tail)
- table_comment = tc.group(1) if tc else None
- cols = {}
- for line in block.split('\n'):
- sline = line.strip()
- if sline.startswith(('PRIMARY', 'KEY ', 'UNIQUE ', 'CONSTRAINT ', 'FOREIGN ')):
- break
- col_m = re.match(r'\s*`(\w+)`\s+(.+)', line)
- if not col_m:
- continue
- col_name, col_def = col_m.group(1), col_m.group(2).strip().rstrip(',')
- if col_def.strip().startswith('('):
- continue
- col_comment = None
- cm = re.search(r"\s*COMMENT\s+'([^']*)'", col_def)
- if cm:
- col_comment = cm.group(1).strip()
- col_def_clean = re.sub(r"\s*COMMENT\s+'[^']*'", '', col_def).strip()
- cols[col_name] = {'def': col_def_clean, 'comment': col_comment}
- tables[table_name] = {'columns': cols, 'table_comment': table_comment}
- return tables
- def load_copytrade_tables():
- """加载 copytrade 库的表清单(仅生成这些表的 SQL)"""
- if os.path.exists(COPYTRADE_TABLES_FILE):
- tables = set()
- with open(COPYTRADE_TABLES_FILE, 'r', encoding='utf-8') as f:
- for line in f:
- line = line.strip().split('#')[0].strip()
- if line and re.match(r'^[a-z_][a-z0-9_]*$', line):
- tables.add(line)
- if tables:
- return tables
- # 默认清单:跟单模块核心表及依赖
- return {
- 'admin', 'admin_access_log', 'admin_permission', 'admin_role', 'admin_role_permission',
- 'coin', 'contract_coin', 'contract_order_entrust', 'contract_reward_record',
- 'department', 'follow_lock_config', 'follow_sub_wallet', 'follow_wallet',
- 'follow_wallet_item', 'funding_rate_history', 'member', 'member_contract_position',
- 'member_contract_wallet', 'member_level', 'member_transaction', 'member_wallet',
- 'wallet_trans_record',
- }
- def parse_schema(schema_path):
- """解析 _raw_schema.txt"""
- with open(schema_path, 'r', encoding='utf-8') as f:
- content = f.read()
- tables = {}
- for m in re.finditer(r'=== TABLE: ([a-z0-9_]+) \(rows: [^)]+\) ===\s*\n(.*?)(?=\n=== TABLE:|\Z)', content, re.DOTALL):
- table_name, block = m.group(1), m.group(2)
- m2 = re.search(r"\)\s*ENGINE=\S+[^;]*COMMENT='([^']*)'", block)
- table_comment = m2.group(1) if m2 else None
- cols = {}
- # 按行解析,每行格式: `col_name` type_definition,
- for line in block.split('\n'):
- # 跳过 KEY、PRIMARY、CONSTRAINT 等索引/约束行
- sline = line.strip()
- if sline.startswith(('PRIMARY', 'KEY ', 'UNIQUE ', 'CONSTRAINT ', 'FOREIGN ')):
- break
- m = re.match(r'\s*`(\w+)`\s+(.+)', line)
- if not m:
- continue
- col_name, col_def = m.group(1), m.group(2).strip().rstrip(',')
- # 跳过索引定义如 KEY `member_id` (`member_id`)
- if col_def.strip().startswith('('):
- continue
- # 提取列定义中的 COMMENT 作为补充来源(md 优先)
- col_comment = None
- cm = re.search(r"\s*COMMENT\s+'([^']*)'", col_def)
- if cm:
- col_comment = cm.group(1).strip()
- col_def_clean = re.sub(r"\s*COMMENT\s+'[^']*'", '', col_def).strip()
- cols[col_name] = {'def': col_def_clean, 'comment': col_comment}
- tables[table_name] = {'columns': cols, 'table_comment': table_comment}
- return tables
- def parse_md_files(db_dir):
- """解析 md 文档,提取表名和字段注释"""
- result = {}
- for fn in sorted(os.listdir(db_dir)):
- if not fn.endswith('.md') or fn == 'README.md':
- continue
- path = os.path.join(db_dir, fn)
- with open(path, 'r', encoding='utf-8') as f:
- content = f.read()
- # 1. 解析模块概述表格: | 表名 | 说明 | 获取表注释
- in_overview = False
- for line in content.split('\n'):
- line = line.strip()
- if not line.startswith('|'):
- in_overview = False
- continue
- cells = [c.strip().replace('`', '') for c in line.split('|') if c.strip()]
- if len(cells) < 2:
- continue
- first = cells[0].lower()
- if first == '表名':
- in_overview = True
- continue
- if in_overview and re.match(r'^-+$', first):
- continue
- if in_overview and re.match(r'^[a-z_][a-z0-9_]*$', cells[0], re.I):
- tbl, desc = cells[0], cells[1]
- if tbl not in result:
- result[tbl] = {'table_comment': None, 'columns': {}}
- # 仅当尚无表注释时写入,避免「关联扩展表」等覆盖主模块说明
- if desc and len(desc) < 150 and result[tbl]['table_comment'] is None:
- result[tbl]['table_comment'] = desc
- # 2. 匹配表章节: ## 1. table_name — / ### 1. table_name()
- for m in re.finditer(r'^#{2,3}\s*(?:\d+\.\s+)?([a-z_][a-z0-9_]*)\s*(?:[—\-—]\s*([^#\n]+?)(?:\s*$)|[((]([^)\)]+)[))])\s*$', content, re.MULTILINE):
- table_name = m.group(1)
- # 从标题提取表注释:— 后面的文字 或 ()内的文字(overview 说明优先,不覆盖)
- title_comment = (m.group(2) or m.group(3) or '').strip().lstrip('— -')
- if table_name not in result:
- result[table_name] = {'table_comment': None, 'columns': {}}
- if title_comment and result[table_name]['table_comment'] is None:
- result[table_name]['table_comment'] = title_comment
- start = m.end()
- # 截取到下一个表章节:## N. 或 ### N. (避免混入其他表)
- end = re.search(r'\n^#{2,3}\s*\d+\.\s+', content[start:], re.MULTILINE)
- section = content[start:start + end.start()] if end else content[start:start+15000]
- if table_name not in result:
- result[table_name] = {'table_comment': None, 'columns': {}}
- # 表注释: > 表注释:xxx(overview 说明优先,表注释仅在没有 overview 时补充)
- tc = re.search(r'表注释[::]\s*[`]?([^`\n]+)', section)
- if tc:
- val = tc.group(1).strip()
- if val and '无(' not in val and '未设置' not in val:
- # 仅当尚无 overview 说明时使用 表注释
- if result[table_name]['table_comment'] is None:
- result[table_name]['table_comment'] = val
- # 字段表格: 只解析「字段说明」表(跳过 索引说明、枚举值 等表)
- in_table = False
- in_field_table = False # 仅在 字段名/列名 开头的表内才采集
- for line in section.split('\n'):
- line = line.strip()
- if not line.startswith('|'):
- in_table = False
- in_field_table = False
- continue
- cells = [c.strip() for c in line.split('|') if c.strip()]
- if len(cells) < 2:
- continue
- first = cells[0].replace('`', '').strip().lower()
- if re.match(r'^-+$', first):
- in_table = True
- continue
- if first in ('字段', '字段名', 'field', '列名'):
- in_table = True
- in_field_table = True
- continue
- if first in ('索引名', '索引', '枚举值', '数据库值'):
- in_field_table = False
- in_table = True
- continue
- if not in_table and len(cells) >= 3:
- in_table = True
- if not in_field_table:
- continue
- col_name = cells[0].replace('`', '').strip()
- if not re.match(r'^[a-z_][a-z0-9_]*$', col_name, re.I):
- continue
- comment = cells[-1].strip() if len(cells) >= 2 else ''
- if comment and comment not in ('说明', '') and len(comment) < 200:
- result[table_name]['columns'][col_name] = comment
- return result
- def escape_sql(s):
- """MySQL 字符串转义:单引号用 '' 转义"""
- return s.replace("\\", "\\\\").replace("'", "''")
- # 常见列名的推断注释(无文档时的兜底)
- INFER_COMMENTS = {
- 'id': '主键ID',
- 'create_time': '创建时间',
- 'update_time': '更新时间',
- 'create_date': '创建日期',
- 'update_date': '更新日期',
- 'status': '状态',
- 'type': '类型',
- 'amount': '数量',
- 'balance': '余额',
- 'frozen_balance': '冻结余额',
- 'remark': '备注',
- 'sort': '排序',
- 'enable': '是否启用',
- 'is_show': '是否显示',
- 'is_default': '是否默认',
- 'is_top': '是否置顶',
- 'name': '名称',
- 'title': '标题',
- 'content': '内容',
- 'description': '描述',
- 'symbol': '交易对/符号',
- 'price': '价格',
- 'fee': '手续费',
- 'total': '合计',
- 'count': '数量',
- 'number': '数量',
- 'username': '用户名',
- 'password': '密码',
- 'phone': '手机号',
- 'email': '邮箱',
- 'address': '地址',
- 'url': 'URL',
- 'image_url': '图片URL',
- 'link': '链接',
- 'code': '编码/代码',
- 'key': '键',
- 'value': '值',
- 'start_time': '开始时间',
- 'end_time': '结束时间',
- 'deal_time': '成交时间',
- 'order_id': '订单ID',
- 'coin_id': '币种ID',
- 'member_id': '用户ID',
- 'admin_id': '管理员ID',
- 'parent_id': '父级ID',
- 'source_member_id': '来源用户ID',
- 'activity_id': '活动ID',
- 'role_id': '角色ID',
- 'contract_id': '合约ID',
- 'position_id': '持仓ID',
- 'unit': '单位',
- }
- def infer_comment(col_name):
- """根据列名推断简短注释(兜底用)"""
- lower = col_name.lower()
- if lower in INFER_COMMENTS:
- return INFER_COMMENTS[lower]
- if lower.endswith('_id') and lower != 'id':
- prefix = lower[:-3].replace('_', ' ')
- return f'关联{prefix}的ID'
- if lower.endswith('_time'):
- return lower[:-5].replace('_', ' ') + '时间'
- if lower.endswith('_amount'):
- return lower[:-7].replace('_', ' ') + '数量'
- return None
- def generate_sql(output_path, db_name):
- """生成指定库的表/字段注释 SQL"""
- table_filter = None
- if db_name == 'copytrade' and os.path.exists(COPYTRADE_SCHEMA_FILE):
- # copytrade 使用从实际库导出的 schema
- schema = parse_mysqldump_schema(COPYTRADE_SCHEMA_FILE)
- else:
- schema = parse_schema(SCHEMA_FILE)
- if db_name == 'copytrade':
- table_filter = load_copytrade_tables()
- md_data = parse_md_files(DB_DIR)
- lines = [
- f'-- 根据 database 模块文档生成的表/字段注释 SQL(库: {db_name})',
- '-- 连接: mysql -h rm-3nsx177900f04a7gvwo.mysql.rds.aliyuncs.com -u root -p',
- '-- 执行前请备份',
- f'USE {db_name};',
- '',
- ]
- for table_name in sorted(schema.keys()):
- if table_filter is not None and table_name not in table_filter:
- continue
- if table_name in SKIP_TABLES or 'bak' in table_name.lower():
- continue
- s = schema[table_name]
- md = md_data.get(table_name, {})
- tc = md.get('table_comment') or s.get('table_comment')
- if tc and ('无(' in tc or '未设置' in tc):
- tc = None
- if not tc:
- tc = table_name.replace('_', ' ') # 无文档时用表名兜底
- lines.append(f"ALTER TABLE `{table_name}` COMMENT '{escape_sql(tc)}';")
- for col_name, col_info in s.get('columns', {}).items():
- if isinstance(col_info, str):
- col_def, schema_comment = col_info, None
- else:
- col_def, schema_comment = col_info.get('def', ''), col_info.get('comment')
- comment = md.get('columns', {}).get(col_name) or schema_comment or infer_comment(col_name)
- if not comment:
- comment = col_name.replace('_', ' ')
- full_def = re.sub(r"\s*COMMENT\s+'[^']*'", '', col_def) if col_def else ''
- lines.append(f"ALTER TABLE `{table_name}` MODIFY COLUMN `{col_name}` {full_def} COMMENT '{escape_sql(comment)}';")
- if tc or s.get('columns'):
- lines.append('')
- with open(output_path, 'w', encoding='utf-8') as f:
- f.write('\n'.join(lines))
- print(f'Generated {output_path}')
- def main():
- dbs = ['kucoin', 'copytrade']
- arg = (sys.argv[1] if len(sys.argv) > 1 else 'all').lower()
- if arg == 'all':
- for db in dbs:
- out = os.path.join(DB_DIR, f'alter_table_comments_{db}.sql')
- generate_sql(out, db)
- elif arg in dbs:
- out = os.path.join(DB_DIR, f'alter_table_comments_{arg}.sql')
- generate_sql(out, arg)
- # 同时更新 alter_table_comments.sql 为主输出
- default_out = os.path.join(DB_DIR, 'alter_table_comments.sql')
- generate_sql(default_out, arg)
- else:
- print(f'用法: python gen_table_comments.py [{"|".join(dbs)}|all]')
- sys.exit(1)
- if __name__ == '__main__':
- main()
|