Async Validators Guide

This guide covers the async validator library in valid8r, which provides non-blocking validators for I/O-bound validation operations.

Overview

Async validators enable efficient validation against external systems (databases, APIs, DNS) without blocking the event loop. They follow the same Maybe monad pattern as synchronous validators, making them composable and easy to integrate into existing validation pipelines.

Key Features

  • Non-blocking: All validators use async/await for efficient I/O

  • Maybe monad: Returns Success[T] or Failure[T] for composable error handling

  • Database validation: Check uniqueness and foreign key constraints

  • Type-safe: Full type annotations and mypy compliance

Installation

The async validators module is included with valid8r:

pip install valid8r

For database validation, you’ll also need an async database library:

# PostgreSQL
pip install asyncpg

# MySQL
pip install aiomysql

# SQLite
pip install aiosqlite

Database Validators (MVP)

unique_in_db - Uniqueness Validation

Validates that a value is unique in a database table. Use this for checking email addresses, usernames, or any field that must be unique.

Example:

import asyncio
import asyncpg
from valid8r.async_validators import unique_in_db

async def register_user(email: str):
    # Connect to database
    conn = await asyncpg.connect('postgresql://localhost/mydb')

    # Create validator
    email_validator = await unique_in_db(
        field='email',
        table='users',
        connection=conn
    )

    # Validate email uniqueness
    result = await email_validator(email)

    match result:
        case Success(value):
            print(f"Email {value} is available!")
            # Proceed with user registration
        case Failure(error):
            print(f"Email is taken: {error}")

    await conn.close()

asyncio.run(register_user('new@example.com'))

Parameters:

  • field (str): Database column to check (e.g., ‘email’, ‘username’)

  • table (str): Database table to query (e.g., ‘users’, ‘accounts’)

  • connection (Any): Async database connection with execute() method

Returns:

  • Success(value) if the value is unique

  • Failure(error_msg) if the value already exists or database error occurs

exists_in_db - Foreign Key Validation

Validates that a value exists in a database table. Use this for validating foreign keys or ensuring referenced entities exist.

Example:

import asyncio
import asyncpg
from valid8r.async_validators import exists_in_db

async def create_product(category_id: str):
    # Connect to database
    conn = await asyncpg.connect('postgresql://localhost/mydb')

    # Create validator
    category_validator = await exists_in_db(
        field='id',
        table='categories',
        connection=conn
    )

    # Validate that category exists
    result = await category_validator(category_id)

    match result:
        case Success(value):
            print(f"Category {value} exists")
            # Proceed with product creation
        case Failure(error):
            print(f"Invalid category: {error}")

    await conn.close()

asyncio.run(create_product('electronics'))

Parameters:

  • field (str): Database column to check (e.g., ‘id’, ‘category_id’)

  • table (str): Database table to query (e.g., ‘categories’, ‘products’)

  • connection (Any): Async database connection with execute() method

Returns:

  • Success(value) if the value exists

  • Failure(error_msg) if the value doesn’t exist or database error occurs

Usage Patterns

Concurrent Validation

Validate multiple values in parallel for maximum efficiency:

async def validate_batch(emails: list[str]):
    conn = await asyncpg.connect('postgresql://localhost/mydb')
    validator = await unique_in_db(field='email', table='users', connection=conn)

    # Run validations concurrently
    results = await asyncio.gather(*[validator(email) for email in emails])

    # Process results
    for email, result in zip(emails, results, strict=False):
        if result.is_success():
            print(f"{email}: Available")
        else:
            print(f"{email}: {result.error_or('')}")

    await conn.close()

Integration with Async Frameworks

Async validators work seamlessly with FastAPI, aiohttp, and other async frameworks:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncpg

app = FastAPI()

class UserRegistration(BaseModel):
    email: str
    password: str

@app.post("/register")
async def register(user: UserRegistration):
    conn = await asyncpg.connect('postgresql://localhost/mydb')

    # Validate email uniqueness
    validator = await unique_in_db(field='email', table='users', connection=conn)
    result = await validator(user.email)

    if result.is_failure():
        await conn.close()
        raise HTTPException(status_code=400, detail=result.error_or(''))

    # Create user...
    await conn.close()
    return {"message": "User registered successfully"}

Error Handling

All async validators catch database errors and return them as Failure results:

# Database connection fails
validator = await unique_in_db(field='email', table='users', connection=bad_conn)
result = await validator('test@example.com')

if result.is_failure():
    error = result.error_or('')
    if 'Database error' in error:
        # Handle database connection issues
        print("Database unavailable, try again later")

Database Compatibility

The async validators are compatible with any async database library that provides:

  • An execute() method that accepts a query string and parameters

  • A result object with a scalar() method

Tested with:

  • asyncpg (PostgreSQL)

  • aiomysql (MySQL)

  • aiosqlite (SQLite)

Performance Considerations

  1. Connection Pooling: Use connection pools for production applications

  2. Query Optimization: Ensure indexed columns for uniqueness checks

  3. Concurrent Limits: Use asyncio.Semaphore to limit concurrent database queries

  4. Timeout Handling: Wrap validators with asyncio.wait_for() for timeout control

Example with timeout:

try:
    result = await asyncio.wait_for(
        validator('test@example.com'),
        timeout=5.0  # 5 second timeout
    )
except asyncio.TimeoutError:
    print("Validation timed out")

Validator Composition

Valid8r provides powerful composition functions to combine multiple async validators into complex validation pipelines.

all_of - Parallel AND Composition

Runs all validators in parallel. All must succeed for validation to pass.

import asyncio
from valid8r.async_validators import all_of
from valid8r.core.maybe import Maybe

async def check_length(value: str) -> Maybe[str]:
    if len(value) >= 3:
        return Maybe.success(value)
    return Maybe.failure('Too short')

async def check_alpha(value: str) -> Maybe[str]:
    if value.isalpha():
        return Maybe.success(value)
    return Maybe.failure('Must be alphabetic')

async def check_not_reserved(value: str) -> Maybe[str]:
    reserved = ['admin', 'root', 'system']
    if value.lower() not in reserved:
        return Maybe.success(value)
    return Maybe.failure('Reserved word')

async def validate_username(username: str):
    # All three validators run in parallel
    validator = all_of(check_length, check_alpha, check_not_reserved)
    result = await validator(username)

    if result.is_success():
        print(f"Username '{result.value_or('')}' is valid!")
    else:
        print(f"Validation failed: {result.error_or('')}")

asyncio.run(validate_username('JohnDoe'))

Parameters:

  • *validators: Variable number of async validators

  • fail_fast (bool): If True (default), returns first error. If False, collects all errors.

With error aggregation:

# Collect all errors when fail_fast=False
validator = all_of(check_length, check_alpha, fail_fast=False)
result = await validator('x1')
# Error: "Too short; Must be alphabetic"

any_of - Parallel OR Composition

Runs all validators in parallel. At least one must succeed.

import asyncio
from valid8r.async_validators import any_of
from valid8r.core.maybe import Maybe

async def check_email_format(value: str) -> Maybe[str]:
    if '@' in value and '.' in value.split('@')[-1]:
        return Maybe.success(value)
    return Maybe.failure('Not a valid email')

async def check_phone_format(value: str) -> Maybe[str]:
    digits = ''.join(c for c in value if c.isdigit())
    if len(digits) >= 10:
        return Maybe.success(value)
    return Maybe.failure('Not a valid phone number')

async def validate_contact(contact: str):
    # Either email OR phone format is acceptable
    validator = any_of(check_email_format, check_phone_format)
    result = await validator(contact)

    if result.is_success():
        print(f"Contact '{result.value_or('')}' is valid!")
    else:
        print(f"Validation failed: {result.error_or('')}")

asyncio.run(validate_contact('user@example.com'))  # Valid as email
asyncio.run(validate_contact('555-123-4567'))      # Valid as phone

Notes:

  • Empty validator list returns Failure (nothing can succeed)

  • Returns the first successful result’s value

sequence - Sequential Composition

Runs validators one after another, passing each result to the next. Stops on first failure.

import asyncio
from valid8r.async_validators import sequence
from valid8r.core.maybe import Maybe

async def trim_whitespace(value: str) -> Maybe[str]:
    return Maybe.success(value.strip())

async def to_lowercase(value: str) -> Maybe[str]:
    return Maybe.success(value.lower())

async def validate_not_empty(value: str) -> Maybe[str]:
    if value:
        return Maybe.success(value)
    return Maybe.failure('Value cannot be empty')

async def check_unique_in_db(value: str) -> Maybe[str]:
    # Simulating database check
    existing = ['admin', 'user', 'test']
    if value not in existing:
        return Maybe.success(value)
    return Maybe.failure(f'Username "{value}" already exists')

async def process_username(raw_input: str):
    # Each validator transforms and passes to the next
    validator = sequence(
        trim_whitespace,
        to_lowercase,
        validate_not_empty,
        check_unique_in_db
    )

    result = await validator(raw_input)

    if result.is_success():
        print(f"Processed username: {result.value_or('')}")
    else:
        print(f"Processing failed: {result.error_or('')}")

asyncio.run(process_username('  JohnDoe  '))  # -> 'johndoe'

Key difference from all_of:

  • Runs sequentially, not in parallel

  • Each validator receives the output of the previous one

  • Use when validators have dependencies or transform data

Mixed Composition

Combine composition types for complex validation logic:

import asyncio
from valid8r.async_validators import all_of, any_of, sequence
from valid8r.core.maybe import Maybe

# Define individual validators
async def check_min_length(value: str) -> Maybe[str]:
    if len(value) >= 8:
        return Maybe.success(value)
    return Maybe.failure('Must be at least 8 characters')

async def check_has_digit(value: str) -> Maybe[str]:
    if any(c.isdigit() for c in value):
        return Maybe.success(value)
    return Maybe.failure('Must contain a digit')

async def check_has_uppercase(value: str) -> Maybe[str]:
    if any(c.isupper() for c in value):
        return Maybe.success(value)
    return Maybe.failure('Must contain uppercase')

async def check_has_special(value: str) -> Maybe[str]:
    special = '!@#$%^&*'
    if any(c in special for c in value):
        return Maybe.success(value)
    return Maybe.failure('Must contain special character')

async def hash_password(value: str) -> Maybe[str]:
    # Simulated hashing
    return Maybe.success(f'hashed_{value}')

async def validate_password(password: str):
    # Complex validation: all requirements + hashing
    validator = sequence(
        # First: validate all password requirements (parallel)
        all_of(
            check_min_length,
            check_has_digit,
            check_has_uppercase,
            check_has_special,
            fail_fast=False  # Collect all missing requirements
        ),
        # Then: hash the valid password
        hash_password
    )

    result = await validator(password)

    if result.is_success():
        print(f"Password valid and hashed: {result.value_or('')}")
    else:
        print(f"Password requirements not met: {result.error_or('')}")

asyncio.run(validate_password('MyP@ss123'))  # Valid
asyncio.run(validate_password('weak'))       # All errors listed

Performance Comparison

Function

Execution

Use Case

all_of

Parallel

Independent checks, all must pass

any_of

Parallel

Alternative formats, one must pass

sequence

Sequential

Dependent validators, data transformation

Timing example with 3 validators, each taking 0.1s:

  • all_of: ~0.1s (parallel)

  • any_of: ~0.1s (parallel)

  • sequence: ~0.3s (sequential)

Validator Wrappers

TimeoutValidator

Wraps any async validator with a timeout to ensure slow validations fail gracefully instead of hanging indefinitely.

import asyncio
from valid8r.async_validators import TimeoutValidator
from valid8r.core.maybe import Maybe

async def slow_api_validator(value: str) -> Maybe[str]:
    """Simulate a slow external API call."""
    await asyncio.sleep(10)  # Takes 10 seconds
    return Maybe.success(value)

async def main():
    # Wrap with 2 second timeout
    validator = TimeoutValidator(slow_api_validator, timeout=2.0)

    result = await validator('test-value')

    if result.is_failure():
        print(result.error_or(''))  # "Validation timeout after 2.0s"

asyncio.run(main())

Parameters:

  • validator: The async validator function to wrap

  • timeout: Maximum time in seconds to wait

Behavior:

  • Uses asyncio.wait_for() internally

  • Returns Failure with descriptive message on timeout

  • Original validator’s results pass through unchanged

CachedValidator

Wraps an async validator with TTL-based result caching to avoid redundant external calls.

import asyncio
from valid8r.async_validators import CachedValidator
from valid8r.core.maybe import Maybe

async def expensive_api_validator(value: str) -> Maybe[str]:
    """Validate against external API (expensive)."""
    print(f'API call for {value}')
    await asyncio.sleep(1.0)
    return Maybe.success(value)

async def main():
    # Cache results for 5 minutes
    validator = CachedValidator(expensive_api_validator, ttl=300.0)

    # First call - hits the API
    await validator('test')  # Prints: "API call for test"

    # Second call - uses cache (no print)
    await validator('test')

    print(f'API calls: {validator.call_count}')  # "API calls: 1"

asyncio.run(main())

Parameters:

  • validator: The async validator function to wrap

  • ttl (default: 300.0): Time-to-live in seconds for cached results

  • key_func (default: str): Function to generate cache keys from values

Key behaviors:

  • Only successful validations are cached

  • Failed validations are NOT cached (allows immediate retry)

  • Uses time.monotonic() for accurate timing

Methods:

  • invalidate(value): Remove a specific value from cache

  • clear(): Remove all cached entries

Property:

  • call_count: Number of times the wrapped validator was called (excluding cache hits)

Batch Validation with max_concurrency

The parallel_validate function now supports limiting concurrent validations:

import asyncio
from valid8r.async_validators import parallel_validate
from valid8r.core.maybe import Maybe

async def validate_email(email: str) -> Maybe[str]:
    """Validate email against rate-limited API."""
    await asyncio.sleep(0.1)  # Simulate API call
    return Maybe.success(email)

async def main():
    emails = [f'user{i}@example.com' for i in range(100)]

    # Limit to 10 concurrent validations
    results = await parallel_validate(
        validate_email,
        emails,
        max_concurrency=10
    )

    successes = [r for r in results if r.is_success()]
    print(f'Valid: {len(successes)}')

asyncio.run(main())

Parameters:

  • validator: The async validator function

  • values: Sequence of values to validate

  • max_concurrency (default: None): Maximum concurrent validations. Uses asyncio.Semaphore internally.

When to use:

  • Rate-limited external APIs

  • Limited database connection pools

  • Services unstable under high load

Future Features

This module continues to evolve. Upcoming features include:

  • More validators: Additional API validators and service integrations

  • Circuit breaker: Prevent cascade failures in external services

Examples

Complete working examples are available in examples/async-validation/:

  • database_example.py: Database validation examples with mock connection

Troubleshooting

“Database error: connection refused”

Ensure your database is running and the connection string is correct.

“Query takes too long”

Check that your database columns are indexed, especially for uniqueness checks.

“AttributeError: ‘Connection’ object has no attribute ‘execute’”

Verify you’re using an async database library (not sync). Use asyncpg not psycopg2.

Support

For questions or issues: