67 lines
1.6 KiB
Python
67 lines
1.6 KiB
Python
from typing import Optional
|
|
from pydantic import Field
|
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
|
|
|
from redis.asyncio import Redis
|
|
from servers.base import BaseService
|
|
|
|
class RedisSettings(BaseSettings):
|
|
"""
|
|
Redis服务配置类
|
|
"""
|
|
model_config = SettingsConfigDict(
|
|
env_prefix="redis_",
|
|
case_sensitive=False,
|
|
extra="ignore",
|
|
)
|
|
url: Optional[str] = Field(None, description="Redis连接URL")
|
|
host: Optional[str] = Field(None, description="Redis主机地址")
|
|
port: Optional[int] = Field(6379, description="Redis端口号")
|
|
db: Optional[int] = Field(0, description="Redis数据库索引")
|
|
password: Optional[str] = Field(None, description="Redis密码")
|
|
|
|
class RedisService(BaseService[Redis]):
|
|
"""
|
|
Redis服务类,定义了Redis服务的基本状态和操作
|
|
"""
|
|
service_name: str = "redis"
|
|
async def _initialize(self) -> Redis:
|
|
"""
|
|
初始化Redis服务
|
|
"""
|
|
setting = RedisSettings()
|
|
if not setting.url or not setting.host:
|
|
raise ValueError("Redis连接URL或主机地址不能为空")
|
|
|
|
if setting.url:
|
|
return Redis.from_url(setting.url)
|
|
return Redis(
|
|
host=setting.host,
|
|
port=setting.port,
|
|
db=setting.db,
|
|
password=setting.password,
|
|
)
|
|
|
|
async def _shutdown(self) -> None:
|
|
"""
|
|
关闭Redis服务
|
|
"""
|
|
if not self._instance:
|
|
return
|
|
await self._instance.aclose()
|
|
|
|
async def _check_health(self) -> bool:
|
|
"""
|
|
检查Redis服务健康状态
|
|
"""
|
|
if not self._instance:
|
|
return False
|
|
await self._instance.ping()
|
|
return True
|
|
|
|
redis_service = RedisService()
|
|
|
|
__all__ = [
|
|
"redis_service",
|
|
]
|