A template app for async machine learning inference, with built-in user management, service access policies, monitoring and loaded with various dev tools.
- Api base model for machine learning with FastAPI
- FastAPI async web application (separated request and response endpoints)
- User authentication and management using FastAPI-Users
- Async task processing with Celery
- PostgreSQL DB with async SQLAlchemy ORM
- Redis for response caching and as Celery message broker
- Prometheus and Grafana for monitoring (WIP)
- Nginx as a reverse proxy (WIP)
This project aims to follow a feature driven architecture:
- Under
project/
, folders refer to separate routers and features - Each of the subfolders has it's own set of models, tasks, views (API endpoints), ...
fu-core/
contains the set of routers specific to fastapi-users (User DB, security)inference/
contains the router with the machine learning endpoints and service access management- New features can be added quickly this way, like a
dashboard/
router for example
.
├── run.sh # The app's starting point: ./run.sh help
├── compose/ # Docker and build related content
├── project/
│ ├── fu_core/ # FastAPI-Users functions (users, security)
│ ├── inference/ # Machine learning inference code
│ ├── __init__.py # FastAPI app generator
│ ├── celery_utils.py
│ ├── config.py # Application config
│ ├── database.py
│ ├── logging.py
│ └── redis_utils.py
├── prometheus-grafana/ # Separate docker compose and config files
├── tests/
├── .env.example # CHANGE ME TO .env
├── docker-compose.template.yml # Docker Compose generated in the build
├── requirements-worker.txt # Put libs for the workhorse container (ex. Pytorch)
└── requirements.txt # Put every other required python lib here
- Docker and Docker Compose
- Poetry for Python package management
- Not essential, as this project is fully containerized, but strongly advised for code completions and database migrations with
alembic
poetry install
poetry shell
cp .env.example .env
Modify this config file if needed
See here what happens under the hood
chmod +x ./run.sh
./run.sh build-all
See here what happens under the hood
./run.sh up-dev
# Stop
./run.sh down
# or
docker compose down
# Start
./run.sh up
# or
docker compose up
./run.sh monitoring-up
# Stop the service with:
./run.sh monitoring-down
See here what happens under the hood
./run.sh init-alembic
./run.sh get-revision-postgres
./run.sh generate-servers-json
Caution Use this only in development, this file will contain sensitive DB credentials, it must not be shared public
- Go to
localhost/docs
if Nginx is enabled, elselocalhost:28010/docs
- Click on the
auth/register
route and follow instructions to add a new user - Go to PGAdmin service at
localhost:5052
, enter credentials (see/compose/pgadmin/servers.json
) - In the left pane, navigate through
.../Databases/Schemas/Tables/user
andedit/view data
- Change the field
is_superuser
totrue
for the new user - Commit your changes!
- Login the superuser: click on any lock button on
localhost/docs
and type in your credentials - use the route
/inference/pair_user_model
- Your user id is an UUID like
"c5aec529-57cf-4494-82e9-57c5ab02b265"
.- As a superuser, you can pair any model with any user
- Default
access_policy
is1
- Default
inference_model
is2
: a dummy temperature predictor using geo coordinates and time
- Go to
inference/predict-temp/{model_id}
route - Enter
model
:2
and any date / coordinates - Your ticket is ready! Copy
task_id
from
Example of request logs, showing database update and caching:
celery_worker-1 | [2024-09-11 06:09:57,074: INFO/MainProcess] Task project.inference.tasks.run_model[1076c231-7dd0-4906-86c1-0e8208aa1724] received
celery_worker-1 | [2024-09-11 06:09:57,076: INFO] [/app/project/celery_utils.py:66] Starting task run_model with args: (<@task: project.inference.tasks.run_model of default at 0x743c11a45890>, 2, {'latitude': 40, 'longitude': 120, 'month': 11, 'hour': 4}), kwargs: {}
web-1 | INFO: 172.25.0.1:45932 - "POST /api/v1/inference/predict-temp/2 HTTP/1.1" 200
celery_worker-1 | [2024-09-11 06:09:57,076: INFO] [/app/project/inference/tasks.py:50] Running model with id 2
celery_worker-1 | [2024-09-11 06:09:57,643: INFO] [/app/project/inference/tasks.py:60] Generated cache key: model_2_result_-9114292356772756584
celery_worker-1 | [2024-09-11 06:09:57,644: INFO] [/app/project/inference/tasks.py:71] Model 2 executed successfully with result: temperature=8.560720654765046
celery_worker-1 | [2024-09-11 06:09:57,644: INFO] [/app/project/inference/tasks.py:75] Cached result for model 2 with key model_2_result_-9114292356772756584
celery_worker-1 | [2024-09-11 06:09:57,644: INFO] [/app/project/celery_utils.py:72] Completed task run_model with result: {'temperature': 8.560720654765046}
celery_worker-1 | [2024-09-11 06:09:57,646: INFO] [/app/project/inference/crud.py:124] Fetching service call with task ID: 1076c231-7dd0-4906-86c1-0e8208aa1724
celery_worker-1 | [2024-09-11 06:09:57,684: INFO] [/app/project/inference/crud.py:130] Service call found for task ID: 1076c231-7dd0-4906-86c1-0e8208aa1724, updating time_completed
celery_worker-1 | [2024-09-11 06:09:57,687: INFO] [/app/project/inference/crud.py:134] Service call with task ID: 1076c231-7dd0-4906-86c1-0e8208aa1724 updated successfully
celery_worker-1 | [2024-09-11 06:09:57,687: INFO/ForkPoolWorker-16] Task project.inference.tasks.run_model[1076c231-7dd0-4906-86c1-0e8208aa1724] succeeded in 0.5697411990004184s: {'temperature': 8.560720654765046}
- Go to
inference/task-status/{task_id}
route - Paste the
task-id
from above - Voilà, the results are in the response json!
- Notice: each distinct input schema must have it's own custom request endpoint
- However, to keep Celery task management modular, no response schema is enforced on the Celery side. The route
task_status
just passes indiscriminately whatever output was retrieved from the worker.
- Any library used by the ML model must be registered in
/requirements-worker.txt
- Each model file should contain a class with a
predict
method - Imports necessary should be placed INSIDE the
__init__
method, not outside the class! This way, heavy library imports only happen in thecelery worker
, won't have to be installed in any other container! - Move the file containing the model in
project/inference/ml_models
This part allows to register a model in the database, map it with users and access policies, etc.
- In
inference/model_registry
, copy-paste a new@register_model
function - Fill some info about the model in the decorator (classification / regression, version, service access policy etc)
- Return an instance of your model in the function
- Note: Packaged models (ex
VaderSentimentAnalyzer
) dont require a separate file, instantiate them directly in a @register_model function
A Pydantic
class for input models must be added to /project/inference/schemas.py
Example:
from pydantic import BaseModel
class TemperatureModelInput(BaseModel):
latitude: int
longitude: int
month: int
hour: int
- Each new input schema needs a new post request
- Add a new request function at
/project/inference/views.py
Example:
@inference_router.post("/predict-temp/{model_id}")
async def predict_temperature(
model_id: int,
input_data: TemperatureModelInput, ######### CHANGE THIS ############
current_user: models.User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session)
):
user_id: UUID = current_user.id
if model_id not in model_registry:
raise HTTPException(status_code=404, detail=f"Model with id {model_id} not found")
# Check if the user has access to the model and update their access record
has_access, message = await crud.check_user_access_and_update(
session, user_id, model_id
)
if not has_access:
raise HTTPException(status_code=403, detail=message)
# Create a service call record
service_call = await crud.create_service_call(session, model_id, user_id)
task = tasks.run_model.delay(model_id, input_data.dict())
service_call.celery_task_id = task.task_id
await session.commit()
return JSONResponse({"task_id": task.task_id})
./run.sh lint
- Uses pre-commit run to perform thorough linting / formatting fixes
./run.sh run-tests
- Runs tests + generates a coverage report
- pure unit tests using factories and monkeypatching
./run.sh monitoring-up
./run.sh monitoring-down
- Access at
http://localhost:9090
- Stores the metrics from FastAPI (backend), Nginx (proxy) CAdvisor (system)
- Access at
http://localhost:3000
, default credentials:admin
/admin
- Render the metrics into dashboards
- Add on-push Github Actions
- How to handle login data storage when logging in via web page?
- Cookies
- How handle security issues with communication backend / frontend?
- CORS Middleware against CSRF attacks
- How to handle async request / response end points when rendering a web page?