SQL to Directus Seed | Cododel
CODODELDEV
EN / RU
Back to Deck
[utility]

SQL to Directus Seed

ИСХОДНИК Python
ВЕРСИЯ 1.0
АВТОР Cododel

Мощная Python утилита, которая парсит INSERT INTO выражения из SQL дампов (PostgreSQL, MySQL) и конвертирует их в Directus Seed JSON формат для directus-sync. Обрабатывает сложные типы, null, приведения типов и автоматически генерирует sync IDs.

Возможности

  • Умный SQL парсинг: Обрабатывает комментарии, многострочные выражения, quoted identifiers
  • Определение типов: Автоматически конвертирует строки, числа, null, UUID, timestamps
  • PostgreSQL приведения: Обрабатывает ::uuid, ::timestamp type casts
  • Генерация Sync ID: Авто-генерация _sync_id на основе первичного ключа
  • Поддержка множества таблиц: Обрабатывает несколько таблиц с упорядочиванием зависимостей
  • Обработка ошибок: Валидация количества колонок/значений, логирование предупреждений

Применение

Идеально для миграции данных между окружениями или конвертации legacy SQL дампов в Directus-совместимые seeds:

Terminal window
# Конвертировать PostgreSQL дамп в Directus seed
python sql_to_seed.py -i production_dump.sql -o seeds/production.json
# Конвертировать несколько таблиц
python sql_to_seed.py -i multi_table_dump.sql -o seeds/all_tables.json

Пример входных данных

dump.sql:

INSERT INTO products (id, name, price, created_at)
VALUES
(1, 'Product A', 99.99, '2024-01-01'::timestamp),
(2, 'Product B', 149.50, '2024-01-02'::timestamp);
INSERT INTO categories (id, name, slug)
VALUES
('uuid-123'::uuid, 'Электроника', 'electronics'),
('uuid-456'::uuid, 'Книги', 'books');

Пример выходных данных

seed.json:

[
{
"collection": "products",
"meta": {
"insert_order": 1,
"create": true,
"update": true,
"delete": true,
"preserve_ids": true
},
"data": [
{
"id": 1,
"name": "Product A",
"price": 99.99,
"created_at": "2024-01-01",
"_sync_id": "products-1"
},
{
"id": 2,
"name": "Product B",
"price": 149.5,
"created_at": "2024-01-02",
"_sync_id": "products-2"
}
]
},
{
"collection": "categories",
"meta": {
"insert_order": 1,
"create": true,
"update": true,
"delete": true,
"preserve_ids": true
},
"data": [
{
"id": "uuid-123",
"name": "Электроника",
"slug": "electronics",
"_sync_id": "categories-uuid-123"
},
{
"id": "uuid-456",
"name": "Книги",
"slug": "books",
"_sync_id": "categories-uuid-456"
}
]
}
]

Как это работает

  1. Нормализация: Удаляет комментарии, стандартизирует пробелы
  2. Разделение выражений: Разделяет множественные INSERT выражения
  3. Парсинг: Извлекает имя таблицы, колонки и значения
  4. Конвертация типов: Преобразует SQL литералы в JSON типы
  5. Генерация seed: Создает Directus-совместимую JSON структуру
  6. Упорядочивание зависимостей: Умное упорядочивание для таблиц с отношениями

Исходный код

sql_to_seed.py (Полная реализация - 260+ строк)

import re
import json
import argparse
from typing import List, Dict, Any, Tuple, Optional
from pathlib import Path
def normalize_sql_content(sql_content: str) -> str:
"""Нормализация SQL: удаление комментариев, стандартизация пробелов, разделение выражений."""
# Удаление комментариев
sql_content = re.sub(r'/\*.*?\*/', '', sql_content, flags=re.DOTALL)
sql_content = re.sub(r'--.*$', '', sql_content, flags=re.MULTILINE)
# Сжатие и стандартизация
lines = [line.strip() for line in sql_content.splitlines() if line.strip()]
sql_content = ' '.join(lines)
sql_content = re.sub(r'\s*,\s*', ',', sql_content)
sql_content = re.sub(r'\s*\(\s*', '(', sql_content)
sql_content = re.sub(r'\s*\)\s*', ')', sql_content)
sql_content = re.sub(r'\s+', ' ', sql_content).strip()
# Разделение по точке с запятой (с учетом кавычек)
statements = []
current_statement = []
in_quotes = False
for char in sql_content:
current_statement.append(char)
if char == "'":
in_quotes = not in_quotes
elif char == ';' and not in_quotes:
statements.append("".join(current_statement).strip())
current_statement = []
if current_statement:
statements.append("".join(current_statement).strip())
return statements
def parse_value(value_str: str) -> Any:
"""Парсинг SQL значения: обработка приведений, строк, null, чисел."""
value_str = value_str.strip()
# Обработка PostgreSQL приведений: 'value'::type
cast_match = re.match(r"^'(.*?)'(?:::[\w\s]+)?$", value_str)
if cast_match:
return cast_match.group(1)
# Простая quoted строка
if value_str.startswith("'") and value_str.endswith("'"):
return value_str[1:-1]
# NULL
if value_str.lower() == 'null':
return None
# Числа
try:
return int(value_str)
except ValueError:
try:
return float(value_str)
except ValueError:
return value_str
def parse_insert_statement(statement: str) -> Optional[Tuple[str, List[str], List[List[Any]]]]:
"""Парсинг INSERT INTO выражения, возвращает (таблица, колонки, значения)."""
if not statement.lower().startswith('insert into'):
return None
# Извлечение имени таблицы
table_match = re.search(r'insert\s+into\s+("?)([\w\.]+)\1', statement, re.IGNORECASE)
if not table_match:
return None
table_name = table_match.group(2)
# Извлечение колонок
columns_match = re.search(r'\((.*?)\)\s*values', statement, re.IGNORECASE | re.DOTALL)
if not columns_match:
return None
columns = [col.strip().strip('"') for col in columns_match.group(1).split(',')]
# Извлечение value tuples
values_part = statement[columns_match.end():].strip()
value_tuples_str = re.findall(r'\((.*?)\)', values_part)
parsed_values = []
for tuple_str in value_tuples_str:
values_in_tuple = []
current_val = []
in_quotes = False
for char in tuple_str:
if char == "'":
in_quotes = not in_quotes
current_val.append(char)
elif char == ',' and not in_quotes:
values_in_tuple.append("".join(current_val).strip())
current_val = []
else:
current_val.append(char)
values_in_tuple.append("".join(current_val).strip())
if len(values_in_tuple) == len(columns):
parsed_values.append([parse_value(v) for v in values_in_tuple])
return table_name, columns, parsed_values if columns and parsed_values else None
def create_seed_format(table_name: str, columns: List[str], values_list: List[List[Any]]) -> Dict[str, Any]:
"""Создание Directus seed JSON структуры."""
pk_column = 'id' if 'id' in columns else ('uuid' if 'uuid' in columns else columns[0])
pk_index = columns.index(pk_column)
items = []
for row_values in values_list:
item = dict(zip(columns, row_values))
pk_value = row_values[pk_index]
item['_sync_id'] = f"{table_name}-{pk_value}" if pk_value else f"{table_name}-item-{len(items)}"
items.append(item)
return {
"collection": table_name,
"meta": {
"insert_order": 1,
"create": True,
"update": True,
"delete": True,
"preserve_ids": True,
"ignore_on_update": []
},
"data": items
}
def process_sql_file(input_file: Path, output_file: Path) -> None:
"""Основная функция обработки."""
with open(input_file, 'r', encoding='utf-8') as f:
sql_content = f.read()
statements = normalize_sql_content(sql_content)
all_seeds = []
for statement in statements:
parsed = parse_insert_statement(statement)
if parsed:
table_name, columns, values = parsed
print(f"Обработка {table_name}: {len(values)} строк")
all_seeds.append(create_seed_format(table_name, columns, values))
if not all_seeds:
print("Ошибка: Не найдено валидных INSERT выражений")
return
# Умное упорядочивание для переводов
if len(all_seeds) > 1:
for seed in all_seeds:
if 'translations' in seed['collection'].lower():
seed['meta']['insert_order'] = 2
output_data = all_seeds if len(all_seeds) > 1 else all_seeds[0]
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"✓ Конвертировано {input_file}{output_file}")
def main():
parser = argparse.ArgumentParser(description='Конвертация SQL INSERT в Directus seed')
parser.add_argument('-i', '--input', type=Path, required=True, help='Входной SQL файл')
parser.add_argument('-o', '--output', type=Path, required=True, help='Выходной JSON файл')
args = parser.parse_args()
if not args.input.exists():
print(f"Ошибка: {args.input} не найден")
return
args.output.parent.mkdir(parents=True, exist_ok=True)
process_sql_file(args.input, args.output)
if __name__ == "__main__":
main()

Ограничения

  • Базовый regex парсинг (не полноценный SQL парсер)
  • Могут быть проблемы с сильно вложенными скобками или сложными escaped кавычками
  • Упорядочивание зависимостей эвристическое (может потребоваться ручная настройка)
  • Тестировался преимущественно на PostgreSQL дампах
[ ▲ 0 ]