代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 import uuidfrom pydantic import BaseModelfrom sqlmodel import Field, SQLModel, String, TypeDecorator class BaseEntity (SQLModel, BaseModel): id : str = Field(primary_key=True , default=uuid.uuid4().__str__()) class Config : arbitrary_types_allowed = True
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from datetime import datetimefrom sqlalchemy.engine import Dialectfrom sqlmodel import String, TypeDecoratorfrom util.time import unix_begin_dateclass DatetimeToStrType (TypeDecorator[datetime]): impl = String cache_ok = True def process_bind_param (self, value: datetime | None , dialect: Dialect ) -> str : if value is not None : return value.strftime("%Y-%m-%d_%H:%M:%S%z" ) else : return unix_begin_date def process_result_value (self, value: str , dialect: Dialect ) -> datetime | None : return datetime.strptime(value, "%Y-%m-%d_%H:%M:%S%z" )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from datetime import datetimefrom sqlmodel import Column, Field from model.dto.bilibili import DynamicType, LiveStatusTypefrom model.entity.base import BaseEntityfrom model.entity.type import DatetimeToStrTypefrom util.time import unix_begin_dateclass BilibiliSubscribeEntity (BaseEntity, table=True ): __tablename__ = "bilibili_subscribe" uid: str uname: str last_dynamic_time: datetime = Field( default=unix_begin_date, sa_column=Column(DatetimeToStrType) ) last_dynamic_type: DynamicType = Field(default=DynamicType.TEXT_ONLY) last_live_status: LiveStatusType = Field(default=LiveStatusType.NoLiveStream)
这里 sa_column=Column(DatetimeToStrType)
是关键,没有就会抛异常。类型注解依然可以使用 datetime
,而 sa_column
应该就是指定数据库的数据类型的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asynciofrom datetime import datetimefrom sqlalchemy.ext.asyncio import create_async_enginefrom sqlmodel import SQLModelfrom sqlmodel.ext.asyncio.session import AsyncSessionfrom model.entity.bilibili import BilibiliSubscribeEntityasync def main (): b = BilibiliSubscribeEntity( uid="123" , uname="adwjo" , last_dynamic_time=datetime.now() ) engine = create_async_engine("sqlite+aiosqlite:///test.db" ) async with engine.begin() as conn: await conn.run_sync(SQLModel.metadata.drop_all) await conn.run_sync(SQLModel.metadata.create_all) async with AsyncSession(engine) as session: session.add(b) await session.commit() asyncio.run(main())