| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- """
- Alembic environment configuration for async SQLAlchemy.
- """
- import asyncio
- from logging.config import fileConfig
- from sqlalchemy import pool
- from sqlalchemy.engine import Connection
- from sqlalchemy.ext.asyncio import async_engine_from_config
- from alembic import context
- # Import Base and all models so Alembic can detect them
- from app.core.database import Base
- from app.models import * # noqa: F403, F401
- from app.config import settings
- # Alembic Config object
- config = context.config
- # Override sqlalchemy.url with the one from settings
- config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
- # Interpret the config file for Python logging
- if config.config_file_name is not None:
- fileConfig(config.config_file_name)
- # Target metadata for 'autogenerate' support
- target_metadata = Base.metadata
- def run_migrations_offline() -> None:
- """
- Run migrations in 'offline' mode.
- This configures the context with just a URL and not an Engine.
- Calls to context.execute() here emit the given string to the script output.
- """
- url = config.get_main_option("sqlalchemy.url")
- context.configure(
- url=url,
- target_metadata=target_metadata,
- literal_binds=True,
- dialect_opts={"paramstyle": "named"},
- )
- with context.begin_transaction():
- context.run_migrations()
- def do_run_migrations(connection: Connection) -> None:
- """Run migrations with the given connection."""
- context.configure(connection=connection, target_metadata=target_metadata)
- with context.begin_transaction():
- context.run_migrations()
- async def run_async_migrations() -> None:
- """
- Run migrations in 'online' mode using async engine.
- """
- connectable = async_engine_from_config(
- config.get_section(config.config_ini_section, {}),
- prefix="sqlalchemy.",
- poolclass=pool.NullPool,
- )
- async with connectable.connect() as connection:
- await connection.run_sync(do_run_migrations)
- await connectable.dispose()
- def run_migrations_online() -> None:
- """
- Run migrations in 'online' mode.
- """
- asyncio.run(run_async_migrations())
- if context.is_offline_mode():
- run_migrations_offline()
- else:
- run_migrations_online()
|