60 lines
1.4 KiB
Python
60 lines
1.4 KiB
Python
from typing import Optional
|
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
|
from pydantic import Field
|
|
|
|
from servers.base import BaseService
|
|
from sqlmodel import create_engine
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
from sqlalchemy import Engine
|
|
|
|
class DatabaseSettings(BaseSettings):
|
|
"""
|
|
数据库服务配置类
|
|
"""
|
|
model_config = SettingsConfigDict(
|
|
env_prefix="database_",
|
|
case_sensitive=False,
|
|
extra="ignore",
|
|
)
|
|
url: Optional[str] = Field(None, description="数据库连接URL")
|
|
|
|
class DatabaseService(BaseService[Engine]):
|
|
"""
|
|
数据库服务类,定义了数据库服务的基本状态和操作
|
|
"""
|
|
service_name: str = "database"
|
|
|
|
async def _initialize(self) -> Engine:
|
|
"""
|
|
初始化数据库服务
|
|
"""
|
|
setting = DatabaseSettings()
|
|
if not setting.url:
|
|
raise ValueError("数据库连接URL不能为空")
|
|
engine = create_engine(setting.url)
|
|
return engine
|
|
|
|
async def _shutdown(self) -> None:
|
|
"""
|
|
关闭数据库服务
|
|
"""
|
|
if not self._instance:
|
|
return
|
|
self._instance.dispose()
|
|
|
|
async def _check_health(self) -> bool:
|
|
"""
|
|
检查数据库服务健康状态
|
|
"""
|
|
if not self._instance:
|
|
return False
|
|
async with AsyncSession(self._instance) as session:
|
|
await session.exec("SELECT 1")
|
|
return True
|
|
|
|
database_service = DatabaseService()
|
|
|
|
__all__ = [
|
|
"database_service",
|
|
]
|