Background Tasks¶
Run periodic tasks while your server is running using the @app.task() decorator.
Interval-Based Tasks¶
Run a task at fixed intervals:
from xitzin import Xitzin
app = Xitzin()
@app.task(interval=3600) # Every hour (in seconds)
async def hourly_cleanup():
await cleanup_old_sessions()
@app.task(interval="30m") # Human-readable format
def check_feeds():
update_rss_feeds()
Supported interval formats:
- Integer seconds:
interval=3600 - Seconds:
interval="30s" - Minutes:
interval="5m" - Hours:
interval="1h" - Days:
interval="1d"
Cron-Based Tasks¶
For more complex schedules, use cron expressions:
@app.task(cron="0 2 * * *") # 2 AM daily
def nightly_backup():
backup_database()
@app.task(cron="0 * * * *") # Every hour at :00
async def hourly_stats():
await update_statistics()
@app.task(cron="0 0 * * 0") # Sunday at midnight
def weekly_report():
generate_weekly_report()
Cron support requires croniter
Install with: pip install 'xitzin[tasks]'
Common cron patterns:
| Pattern | Description |
|---|---|
* * * * * |
Every minute |
0 * * * * |
Every hour |
0 0 * * * |
Daily at midnight |
0 0 * * 0 |
Weekly (Sunday) |
0 0 1 * * |
Monthly (1st) |
Async and Sync Handlers¶
Both async and sync handlers are supported:
# Async handler - runs directly
@app.task(interval="1h")
async def async_task():
await some_async_operation()
# Sync handler - wrapped in executor automatically
@app.task(interval="1h")
def sync_task():
some_blocking_operation()
Accessing Application State¶
Tasks can access app.state via closure:
app = Xitzin()
@app.on_startup
async def setup_db():
app.state.db = await create_db_connection()
@app.task(interval="1h")
async def cleanup():
# Access app.state from the task
await app.state.db.cleanup_old_records()
Task Lifecycle¶
Tasks are automatically managed:
- Start: Tasks begin after startup handlers complete
- Wait: Tasks wait for the first interval/cron trigger before executing
- Execute: Handler runs, errors are logged but don't stop the task
- Repeat: Task continues on schedule
- Stop: Tasks are cancelled before shutdown handlers run
@app.on_startup
def startup():
print("1. Startup handlers run")
@app.task(interval="1h")
def my_task():
print("3. Task executes (after waiting 1 hour)")
@app.on_shutdown
def shutdown():
print("4. Shutdown handlers run")
# Lifecycle:
# 1. Startup handlers run
# 2. Tasks start (but wait for first interval)
# 3. Server runs, tasks execute on schedule
# 4. Ctrl+C pressed
# 5. Tasks cancelled
# 6. Shutdown handlers run
Error Handling¶
Task errors are logged but don't stop the task:
@app.task(interval="5m")
def risky_task():
# If this raises an exception:
# - Error is logged with structlog
# - Task continues running on schedule
external_api_call()
Errors are logged with:
- Task name
- Error message
- Error type
Multiple Tasks¶
Register multiple tasks with different schedules:
@app.task(interval="1m")
async def quick_check():
await check_health()
@app.task(interval="1h")
async def hourly_maintenance():
await cleanup_cache()
@app.task(cron="0 3 * * *")
def daily_backup():
backup_all_data()
Configuration Validation¶
The @app.task() decorator validates configuration at decoration time:
# Error: Neither interval nor cron provided
@app.task() # Raises TaskConfigurationError
def my_task():
pass
# Error: Both interval and cron provided
@app.task(interval="1h", cron="* * * * *") # Raises TaskConfigurationError
def my_task():
pass
# Error: Invalid interval format
@app.task(interval="invalid") # Raises ValueError
def my_task():
pass
Best Practices¶
Keep Tasks Fast¶
Long-running tasks block their own schedule:
# Bad: Task takes 10 minutes, interval is 5 minutes
@app.task(interval="5m")
def slow_task():
time.sleep(600) # Blocks for 10 minutes!
# Good: Use shorter intervals or async operations
@app.task(interval="5m")
async def fast_task():
await quick_check() # Returns quickly
Handle External Dependencies¶
Gracefully handle failures in external services:
@app.task(interval="5m")
def check_external_api():
try:
response = requests.get("https://api.example.com/status")
response.raise_for_status()
process_response(response)
except requests.RequestException as e:
# Log but don't crash - task will retry next interval
print(f"API check failed: {e}")
Use Async for I/O Operations¶
Prefer async handlers for I/O-bound tasks:
import aiohttp
@app.task(interval="1m")
async def fetch_updates():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/updates") as resp:
data = await resp.json()
await process_updates(data)
Complete Example¶
from xitzin import Xitzin
app = Xitzin(title="Task Demo")
# Application state
@app.on_startup
def setup():
app.state.request_count = 0
app.state.cache = {}
# Track requests
@app.gemini("/")
def home(request):
app.state.request_count += 1
return f"# Welcome\nTotal requests: {app.state.request_count}"
# Background tasks
@app.task(interval="1m")
def log_stats():
print(f"Requests in last minute: {app.state.request_count}")
@app.task(interval="1h")
def cleanup_cache():
old_keys = [k for k, v in app.state.cache.items() if v.is_expired()]
for key in old_keys:
del app.state.cache[key]
print(f"Cleaned {len(old_keys)} cache entries")
@app.task(cron="0 0 * * *") # Daily at midnight
def daily_report():
print(f"Daily stats: {app.state.request_count} total requests")
app.state.request_count = 0 # Reset counter
if __name__ == "__main__":
app.run()