Async Validation
Overview
Valid8r supports asynchronous validation for I/O-bound validation operations such as:
Database checks: Verify email uniqueness, username availability
External API calls: Validate API keys, check payment methods
Geolocation services: Validate IP addresses against geographic constraints
Remote file access: Check file existence on remote systems
Any async operation: Custom async validators for your use case
The async validation feature provides:
Concurrent execution: Multiple async validators run concurrently for better performance
Mixed validators: Combine sync and async validators seamlessly
Timeout support: Configure timeouts to prevent hanging on slow operations
Error accumulation: Collect all validation errors across all fields
Type safety: Full type annotations with mypy support
When to Use Async Validation
Use async validation when your validation logic requires I/O operations:
Good use cases:
Checking if an email is already registered (database query)
Validating an API key with an external service (HTTP request)
Verifying an IP address is from a specific country (geolocation API)
Checking file permissions on a remote server (network I/O)
Not recommended:
Simple range checks (use sync validators like
minimum()andmaximum())Regular expression matching (use sync parsers)
Pure computational validation (use sync validators)
Quick Start
Basic Async Validator
An async validator is any async function that takes a value and returns Maybe[T]:
import asyncio
from valid8r.core import parsers, schema
from valid8r.core.maybe import Maybe
# Define an async validator
async def check_email_unique(email: str) -> Maybe[str]:
"""Check if email is unique in database."""
# Simulate database query
await asyncio.sleep(0.1) # Network delay
# In real code, query your database
existing_emails = {'admin@example.com', 'user@example.com'}
if email in existing_emails:
return Maybe.failure('Email already registered')
return Maybe.success(email)
# Use in a schema
user_schema = schema.Schema(fields={
'email': schema.Field(
parser=parsers.parse_email,
validators=[check_email_unique], # Add async validator
required=True
),
})
# Validate asynchronously
async def main():
result = await user_schema.validate_async({
'email': 'new@example.com'
})
match result:
case schema.Success(data):
print(f"Valid: {data['email']}")
case schema.Failure(errors):
for error in errors:
print(f"Error: {error.message}")
asyncio.run(main())
Schema Async Validation
The Schema.validate_async() method validates data with support for async validators.
Method Signature
async def validate_async(
self,
data: dict[str, Any],
path: str = '',
*,
timeout: float | None = None,
) -> Maybe[dict[str, Any]]:
"""Validate data asynchronously with async validators.
Args:
data: Input data to validate (must be dict-like)
path: Current field path for nested validation
timeout: Optional timeout in seconds for async operations
Returns:
Success[dict]: Validated data if all fields pass
Failure[list[ValidationError]]: List of all validation errors
Raises:
asyncio.TimeoutError: If validation exceeds the timeout
"""
Basic Usage
from valid8r.core import parsers, schema, validators
# Define schema with async validators
api_config_schema = schema.Schema(fields={
'api_key': schema.Field(
parser=parsers.parse_str,
validators=[
validators.min_length(10), # Sync validator
verify_api_key_with_service, # Async validator
],
required=True
),
'endpoint': schema.Field(
parser=parsers.parse_url,
required=True
),
})
# Validate asynchronously
result = await api_config_schema.validate_async({
'api_key': 'my-secret-key-123',
'endpoint': 'https://api.example.com/v1'
})
Multiple Async Validators
You can chain multiple async validators on a single field:
async def check_password_not_compromised(password: str) -> Maybe[str]:
"""Check password against breach database."""
await asyncio.sleep(0.1)
# Check haveibeenpwned.com API
if password in ['password123', '12345678']:
return Maybe.failure('Password found in breach database')
return Maybe.success(password)
async def check_password_strength(password: str) -> Maybe[str]:
"""Check password meets strength requirements."""
await asyncio.sleep(0.05)
if len(password) < 8:
return Maybe.failure('Password must be at least 8 characters')
return Maybe.success(password)
password_schema = schema.Schema(fields={
'password': schema.Field(
parser=parsers.parse_str,
validators=[
validators.min_length(1), # Sync: fail fast on empty
check_password_strength, # Async: check strength
check_password_not_compromised, # Async: check breaches
],
required=True
),
})
Execution Order
Valid8r optimizes validation execution for performance:
Sync validators run first (fail-fast)
Sync validators are executed sequentially
If any sync validator fails, async validators are skipped
This prevents unnecessary async operations for already-invalid data
Async validators run concurrently (performance)
All async validators for a field run sequentially (preserving order)
Async validators for different fields run concurrently
This maximizes throughput while maintaining deterministic order
# Example: 3 fields, each with sync + async validators
schema = Schema(fields={
'email': Field(
parser=parse_email,
validators=[
min_length(1), # Sync: runs first
check_email_unique, # Async: runs if sync passes
]
),
'username': Field(
parser=parse_str,
validators=[
matches_pattern(r'^[a-z0-9_]+$'), # Sync: runs first
check_username_available, # Async: runs if sync passes
]
),
'age': Field(
parser=parse_int,
validators=[
minimum(13), # Sync: runs first
verify_age_with_service, # Async: runs if sync passes
]
),
})
# Execution:
# 1. All sync validators run (min_length, matches_pattern, minimum)
# 2. If all sync pass, async validators run concurrently across fields
# (check_email_unique, check_username_available, verify_age_with_service)
Timeout Support
Configure timeouts to prevent hanging on slow validators.
Setting Timeouts
# Set a 5-second timeout for all async operations
result = await user_schema.validate_async(
data={'email': 'user@example.com'},
timeout=5.0
)
Handling Timeouts
Timeouts raise asyncio.TimeoutError:
import asyncio
try:
result = await schema.validate_async(data, timeout=1.0)
except asyncio.TimeoutError:
print("Validation timed out after 1 second")
# Handle timeout: retry, use cached result, fail gracefully
Timeout Best Practices
Set reasonable timeouts
Database queries: 1-3 seconds
HTTP APIs: 5-10 seconds
External services: 10-30 seconds
Handle timeouts gracefully
async def validate_with_fallback(data): try: return await schema.validate_async(data, timeout=5.0) except asyncio.TimeoutError: # Fall back to cached validation or default return Maybe.success(data) # Or appropriate fallback
Test timeout behavior
async def slow_validator(value: str) -> Maybe[str]: await asyncio.sleep(10.0) # Intentionally slow return Maybe.success(value) # This will timeout try: await schema.validate_async(data, timeout=1.0) except asyncio.TimeoutError: pass # Expected
Error Handling
Async validation accumulates all errors across all fields.
Error Structure
Failed validation returns Failure[list[ValidationError]]:
from valid8r.core.errors import ValidationError
result = await schema.validate_async(data)
match result:
case Failure(errors):
for error in errors:
print(f"Field: {error.path}")
print(f"Code: {error.code}")
print(f"Message: {error.message}")
print(f"Context: {error.context}")
Example Output
# Invalid data
result = await user_schema.validate_async({
'email': 'existing@example.com',
'username': 'taken',
'age': '10'
})
# Errors:
# Field: .email, Message: Email already registered
# Field: .username, Message: Username not available
# Field: .age, Message: Must be at least 13
Handling Exceptions in Validators
If an async validator raises an exception, it’s converted to a validation error:
async def buggy_validator(value: str) -> Maybe[str]:
raise ValueError("Oops!") # Unhandled exception
# Converts to ValidationError with code='VALIDATION_ERROR'
# Message: "Unexpected error in validator: Oops!"
Best Practice: Always return Maybe[T] from validators:
async def safe_validator(value: str) -> Maybe[str]:
try:
# Call external API
result = await api_call(value)
return Maybe.success(result)
except APIError as e:
return Maybe.failure(f"API error: {e}")
except Exception as e:
return Maybe.failure(f"Unexpected error: {e}")
Common Patterns
Database Uniqueness Check
import asyncpg # PostgreSQL async driver
async def check_email_unique(
email: str,
db_pool: asyncpg.Pool
) -> Maybe[str]:
"""Check if email is unique in database."""
async with db_pool.acquire() as conn:
count = await conn.fetchval(
'SELECT COUNT(*) FROM users WHERE email = $1',
email
)
if count > 0:
return Maybe.failure('Email already registered')
return Maybe.success(email)
# Use with dependency injection
from functools import partial
db_pool = await asyncpg.create_pool('postgresql://...')
user_schema = Schema(fields={
'email': Field(
parser=parse_email,
validators=[
partial(check_email_unique, db_pool=db_pool)
],
required=True
),
})
External API Validation
import httpx # Async HTTP client
async def verify_api_key(
api_key: str,
client: httpx.AsyncClient
) -> Maybe[str]:
"""Verify API key with external service."""
try:
response = await client.get(
'https://api.example.com/verify',
headers={'Authorization': f'Bearer {api_key}'},
timeout=5.0
)
if response.status_code == 200:
return Maybe.success(api_key)
return Maybe.failure('Invalid API key')
except httpx.TimeoutException:
return Maybe.failure('API verification timed out')
except httpx.HTTPError as e:
return Maybe.failure(f'API error: {e}')
# Use in schema
async with httpx.AsyncClient() as client:
config_schema = Schema(fields={
'api_key': Field(
parser=parse_str,
validators=[
partial(verify_api_key, client=client)
],
required=True
),
})
result = await config_schema.validate_async(data)
Geolocation Validation
async def check_ip_location(
ip: str,
allowed_countries: set[str],
client: httpx.AsyncClient
) -> Maybe[str]:
"""Verify IP address is from allowed country."""
try:
response = await client.get(
f'https://ipapi.co/{ip}/country',
timeout=3.0
)
country = response.text.strip()
if country in allowed_countries:
return Maybe.success(ip)
return Maybe.failure(
f'IP address from {country}, must be from {allowed_countries}'
)
except Exception as e:
return Maybe.failure(f'Geolocation check failed: {e}')
# Use in schema
allowed_countries = {'US', 'CA', 'GB'}
security_schema = Schema(fields={
'client_ip': Field(
parser=parse_ip,
validators=[
partial(
check_ip_location,
allowed_countries=allowed_countries,
client=http_client
)
],
required=True
),
})
Mixing Sync and Async Validators
You can freely mix sync and async validators on the same field.
Example
from valid8r.core import validators
user_schema = Schema(fields={
'email': Field(
parser=parse_email,
validators=[
# Sync validators (fast fail)
validators.min_length(1),
validators.matches_pattern(r'^[^@]+@[^@]+\.[^@]+$'),
# Async validators (I/O operations)
check_email_unique,
check_email_not_disposable,
],
required=True
),
})
Performance Considerations
Sync validators run first: Fast failure before expensive async operations
Async validators run after: Only if sync validators pass
Order matters: Place cheapest validators first
validators=[
min_length(1), # Fastest: simple check
matches_pattern(r'...'), # Fast: regex check
check_local_cache, # Medium: async cache lookup
check_external_api, # Slowest: HTTP request
]
Performance Optimization
Concurrent Field Validation
Valid8r automatically runs async validators for different fields concurrently:
# These async validators run concurrently (not sequentially)
schema = Schema(fields={
'email': Field(validators=[check_email_unique]), # Database
'api_key': Field(validators=[verify_api_key]), # HTTP API
'ip': Field(validators=[check_ip_location]), # Geolocation API
})
# Total time ≈ max(check_email, verify_api, check_ip)
# Not sum(check_email + verify_api + check_ip)
Caching Results
Cache expensive validation results to avoid redundant operations:
from functools import lru_cache
@lru_cache(maxsize=1000)
async def check_email_unique_cached(email: str) -> Maybe[str]:
"""Cached email uniqueness check."""
# Cache hit: return immediately
# Cache miss: query database
return await check_email_unique(email)
Rate Limiting with RateLimitedValidator
The RateLimitedValidator wrapper protects external APIs from excessive requests
using a token bucket algorithm. This is essential when validating against external
services that have rate limits.
from valid8r.async_validators import RateLimitedValidator
# Create an async validator that calls an external API
async def verify_api_key(key: str) -> Maybe[str]:
"""Verify API key with external service."""
# http_client is an httpx.AsyncClient instance
response = await http_client.get(
'https://api.example.com/verify',
headers={'Authorization': f'Bearer {key}'}
)
if response.status_code == 200:
return Maybe.success(key)
return Maybe.failure('Invalid API key')
# Wrap with rate limiting: 10 calls/second, burst up to 5
rate_limited_validator = RateLimitedValidator(
verify_api_key,
rate=10, # 10 calls per second sustained rate
burst=5 # Allow up to 5 immediate calls before rate limiting
)
# Use the rate-limited validator
result = await rate_limited_validator('my-api-key')
Token Bucket Algorithm:
rate: Maximum calls per second (sustained throughput)burst: Maximum immediate calls before throttling begins (defaults torate)
The algorithm allows bursts of traffic up to the burst limit, then throttles
excess calls to the sustained rate. This handles bursty traffic patterns
while protecting backend services.
Example with burst behavior:
# Rate: 2 calls/second, Burst: 5
rate_limited = RateLimitedValidator(my_validator, rate=2, burst=5)
# First 5 calls complete immediately (burst capacity)
for i in range(5):
await rate_limited(f'value_{i}') # No delay
# Subsequent calls are rate-limited to 2/second
await rate_limited('value_5') # Delays ~0.5s to maintain 2/sec rate
Concurrent validation with rate limiting:
from valid8r.async_validators import RateLimitedValidator, parallel_validate
# Wrap validator with rate limiting
rate_limited = RateLimitedValidator(
external_api_validator,
rate=10,
burst=5
)
# Validate many values - rate limiting prevents API overload
values = ['value_1', 'value_2', 'value_3'] # ... more values
results = await parallel_validate(rate_limited, values)
Custom Rate Limiter (Advanced):
For more complex rate limiting scenarios, you can build your own:
import asyncio
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, max_calls: int, period: timedelta):
self.max_calls = max_calls
self.period = period
self.calls: list[datetime] = []
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = datetime.now()
# Remove old calls
self.calls = [
call for call in self.calls
if now - call < self.period
]
if len(self.calls) >= self.max_calls:
# Wait until we can make another call
oldest_call = min(self.calls)
wait_time = (oldest_call + self.period - now).total_seconds()
await asyncio.sleep(max(0, wait_time))
self.calls.append(now)
# Use rate limiter
rate_limiter = RateLimiter(max_calls=10, period=timedelta(seconds=1))
async def rate_limited_validator(value: str) -> Maybe[str]:
await rate_limiter.acquire()
return await expensive_api_call(value)
Retry Logic with Exponential Backoff
Use RetryingValidator to automatically retry async validators on transient failures.
This is essential for handling network hiccups, temporary service unavailability, and
rate-limited APIs.
from valid8r.async_validators import RetryingValidator
from valid8r.core.maybe import Maybe
# Define an async validator that might fail transiently
async def validate_with_api(value: str) -> Maybe[str]:
"""Validate value against external API."""
try:
response = await httpx_client.post(
'https://api.example.com/validate',
json={'value': value}
)
if response.status_code == 200:
return Maybe.success(value)
if response.status_code >= 500:
# Server error - transient failure
return Maybe.failure('Transient: server error')
return Maybe.failure('Validation failed')
except httpx.ConnectError:
return Maybe.failure('Transient: connection error')
# Wrap with retry logic
robust_validator = RetryingValidator(
validate_with_api,
max_retries=3, # Retry up to 3 times
base_delay=1.0, # Start with 1 second delay
max_delay=60.0, # Never wait more than 60 seconds
exponential_base=2.0, # Double delay each retry
jitter=True, # Add randomness to prevent thundering herd
)
# Use the wrapped validator
result = await robust_validator('my-value')
RetryingValidator Parameters
validatorThe async validator function to wrap. Must return
Maybe[T].max_retries(default: 3)Maximum number of retry attempts after the initial call fails. Total attempts = initial + max_retries = 4 by default.
base_delay(default: 1.0)Base delay in seconds for exponential backoff. Delay formula:
base_delay * exponential_base^attemptmax_delay(default: 60.0)Maximum delay cap in seconds. Prevents excessively long waits.
exponential_base(default: 2.0)Base for exponential backoff calculation. With 2.0, delays double each retry: 1s -> 2s -> 4s -> 8s…
jitter(default: True)Add random jitter to delays (0 to calculated_delay). Prevents thundering herd when multiple validators retry simultaneously.
Retry Behavior
All failures are retried: Any
Failureresult triggers a retryExceptions are caught: Exceptions during validation are caught and retried
Last error preserved: Final failure includes the last error message
State tracking:
retry_countandretry_delaysattributes available after call
Example: Tracking Retry Metrics
validator = RetryingValidator(
flaky_api_validator,
max_retries=5,
base_delay=0.5,
jitter=False, # Disable for deterministic testing
)
result = await validator('test-value')
# Check retry metrics
print(f"Retries performed: {validator.retry_count}")
print(f"Delays used: {validator.retry_delays}")
# Output might be:
# Retries performed: 2
# Delays used: [0.5, 1.0] # Two retries with exponential backoff
Best Practices for Retry Logic
Set reasonable max_retries
Database operations: 2-3 retries
HTTP APIs: 3-5 retries
Critical operations: Consider circuit breaker pattern instead
Use jitter in production
# Good: jitter prevents thundering herd RetryingValidator(validator, jitter=True) # Only for testing: disable jitter for deterministic tests RetryingValidator(validator, jitter=False)
Distinguish transient vs permanent failures
async def smart_validator(value: str) -> Maybe[str]: try: response = await api_call(value) if response.status_code == 200: return Maybe.success(value) if response.status_code >= 500: # Transient: will be retried return Maybe.failure('Transient: server error') # Permanent: 4xx errors, will be retried but likely to fail return Maybe.failure('Validation failed') except ConnectionError: # Transient: network issues return Maybe.failure('Transient: network error')
Combine with timeouts
import asyncio async def validate_with_timeout(value): validator = RetryingValidator( api_validator, max_retries=3, base_delay=1.0, ) try: return await asyncio.wait_for( validator(value), timeout=30.0 # Overall timeout for all retries ) except asyncio.TimeoutError: return Maybe.failure('Validation timed out')
Testing Async Validators
Unit Testing
Test async validators using pytest-asyncio:
import pytest
from valid8r.testing import assert_maybe_success, assert_maybe_failure
@pytest.mark.asyncio
async def test_check_email_unique_success():
"""Unique email passes validation."""
result = await check_email_unique('new@example.com')
assert assert_maybe_success(result, 'new@example.com')
@pytest.mark.asyncio
async def test_check_email_unique_failure():
"""Existing email fails validation."""
result = await check_email_unique('existing@example.com')
assert assert_maybe_failure(result, 'already registered')
Mocking Async Dependencies
Mock database and API calls for testing:
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_verify_api_key_success():
"""Valid API key passes validation."""
mock_client = AsyncMock()
mock_client.get.return_value.status_code = 200
result = await verify_api_key('valid-key', mock_client)
assert assert_maybe_success(result, 'valid-key')
@pytest.mark.asyncio
async def test_verify_api_key_failure():
"""Invalid API key fails validation."""
mock_client = AsyncMock()
mock_client.get.return_value.status_code = 401
result = await verify_api_key('invalid-key', mock_client)
assert assert_maybe_failure(result, 'Invalid API key')
Integration Testing
Test with real async operations (database, API):
@pytest.mark.integration
@pytest.mark.asyncio
async def test_schema_validation_with_database(test_db_pool):
"""Full schema validation with real database."""
schema = Schema(fields={
'email': Field(
parser=parse_email,
validators=[
partial(check_email_unique, db_pool=test_db_pool)
],
required=True
),
})
# Test with test database
result = await schema.validate_async({
'email': 'test@example.com'
})
assert result.is_success()
Migration from Sync to Async
If you have existing sync validators, migrate gradually:
Step 1: Keep Sync Validators
# Existing sync validation
schema = Schema(fields={
'email': Field(
parser=parse_email,
validators=[check_format], # Sync validator
required=True
),
})
# Still works with validate()
result = schema.validate(data)
Step 2: Add Async Validators
# Add async validators alongside sync
schema = Schema(fields={
'email': Field(
parser=parse_email,
validators=[
check_format, # Sync: format check
check_email_unique, # Async: database check
],
required=True
),
})
# Use validate_async() for async support
result = await schema.validate_async(data)
Step 3: Gradually Replace
# Eventually replace sync with async where appropriate
schema = Schema(fields={
'email': Field(
parser=parse_email,
validators=[
check_email_unique, # Async only
],
required=True
),
})
Backward Compatibility
validate()method still works (skips async validators)validate_async()method supports both sync and async validatorsNo breaking changes to existing code
See Also
Schema Validation - Schema validation basics
Validators - Built-in validator functions
Understanding the Maybe Monad - Understanding Maybe monad
Error Handling - Error handling patterns