Usage

Ok as the usage nearly mimics that of boto3, I thought it best just to throw lots of examples at you instead. The moral of the story is just prefix boto3 stuff with await.

This library “should” work with Python3.3/3.4 but I havent tested it, so try yield from if you want.

Slight differences

aioboto3.resource will return a boto3 like resource object, but it will also have an awaitable .close() and also has __aenter__ and __aexit__ which allows you to use the async with syntax.

Service resources like s3.Bucket need to be created using await now, e.g. bucket = await s3_resource.Bucket('somebucket')

DynamoDB Examples

Put an item into a DynamoDB table, then query it using the nice Key().eq() abstraction.

import asyncio
import aioboto3
from boto3.dynamodb.conditions import Key


async def main():
    session = aioboto3.Session()
    async with session.resource('dynamodb', region_name='eu-central-1') as dynamo_resource:
        table = await dynamo_resource.Table('test_table')

        await table.put_item(
            Item={'pk': 'test1', 'col1': 'some_data'}
        )

        result = await table.query(
            KeyConditionExpression=Key('pk').eq('test1')
        )

        print(result['Items'])

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# Outputs:
#  [{'col1': 'some_data', 'pk': 'test1'}]

Use the batch writer to take care of dynamodb writing retries etc…

import asyncio
import aioboto3
from boto3.dynamodb.conditions import Key


async def main():
    session = aioboto3.Session()
    async with session.resource('dynamodb', region_name='eu-central-1') as dynamo_resource:
        table = await dynamo_resource.Table('test_table')

        # As the default batch size is 25, all of these will be written in one batch
        async with table.batch_writer() as dynamo_writer:
            await dynamo_writer.put_item(Item={'pk': 'test1', 'col1': 'some_data'})
            await dynamo_writer.put_item(Item={'pk': 'test2', 'col1': 'some_data'})
            await dynamo_writer.put_item(Item={'pk': 'test3', 'col1': 'some_data'})
            await dynamo_writer.put_item(Item={'pk': 'test4', 'col1': 'some_data'})
            await dynamo_writer.put_item(Item={'pk': 'test5', 'col1': 'some_data'})

        result = await table.scan()

        print(result['Count'])

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# Outputs:
#  5

The batch_writer() can take a keyword argument of flush_amount which will change the desired flush amount and a keyword argument of on_exit_loop_sleep. The on_exit_loop_sleep argument will add an async sleep in the flush loop when you exit the context manager.

S3 Examples

Here are some examples of uploading and streaming a file from S3, serving via aiohttp.

Upload

Here we upload from a file object and stream it from a file descriptor.

async def upload(
    suite: str,
    release: str,
    filename: str,
    staging_path: Path,
    bucket: str,
) -> str:
    blob_s3_key = f"{suite}/{release}/{filename}"

    session = aioboto3.Session()
    async with session.client("s3") as s3:
        try:
            with staging_path.open("rb") as spfp:
                LOG.info(f"Uploading {blob_s3_key} to s3")
                await s3.upload_fileobj(spfp, bucket, blob_s3_key)
                LOG.info(f"Finished Uploading {blob_s3_key} to s3")
        except Exception as e:
            LOG.error(f"Unable to s3 upload {staging_path} to {blob_s3_key}: {e} ({type(e)})")
            return ""

    return f"s3://{blob_s3_key}"

Streaming Download

Here we pull the object from S3 in chunks and serve it out to a HTTP request via aiohttp

from aiohttp import web
from multidict import MultiDict


async def serve_blob(
    suite: str,
    release: str,
    filename: str,
    bucket: str,
    request: web.Request,
    chunk_size: int = 69 * 1024
) -> web.StreamResponse:
    blob_s3_key = f"{suite}/{release}/{filename}"

    session = aioboto3.Session()
    async with session.client("s3") as s3:
        LOG.info(f"Serving {bucket} {blob_s3_key}")
        s3_ob = await s3.get_object(Bucket=bucket, Key=blob_s3_key)

        ob_info = s3_ob["ResponseMetadata"]["HTTPHeaders"]
        resp = web.StreamResponse(
            headers=MultiDict(
                {
                    "CONTENT-DISPOSITION": (
                        f"attachment; filename='{filename}'"
                    ),
                    "Content-Type": ob_info["content-type"],
                }
            )
        )
        resp.content_type = ob_info["content-type"]
        resp.content_length = ob_info["content-length"]
        await resp.prepare(request)

        stream = s3_ob["Body"]
        while file_data := await stream.read(chunk_size):
            await resp.write(file_data)

    return resp

S3 Resource Objects

The S3 Bucket object also works but its methods have been asyncified. E.g.

import aioboto3


async def main():
    session = aioboto3.Session()
    async with session.resource("s3") as s3:

        bucket = await s3.Bucket('mybucket')
        async for s3_object in bucket.objects.all():
            print(s3_object)

        async for s3_object in bucket.objects.filter(Prefix='someprefix/'):
            print(s3_object)

        await bucket.objects.all().delete()

        # or
        await bucket.objects.filter(Prefix='test/').delete()

Misc

Clients

As you can see, it also works for standard client connections too.

import asyncio
import aioboto3


async def main():
    session = aioboto3.Session()
    async with session.client('ssm', region_name='eu-central-1') as ssm_client:
        result = await ssm_client.describe_parameters()

        print(result['Parameters'])


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# Outputs:
#  []

Retries

Use AioConfig, the async extension of Config. Pass it to the client as you would with standard boto3.

The code below will eventually retrieve a list of 20 copies of the organization root response. It prints a list of the number of retries required to get each response.

ListRoots is a convenient test function because it only reads data and it has a low throttling limit (per account, 1 per second and 2 burst).

import asyncio
from aioboto3 import Session
from aiobotocore.config import AioConfig

try_hard = AioConfig(retries={"max_attempts": 100})

async def main():
    coro = Session().client("organizations", config=try_hard)
    async with coro as client:
        resp_list = await asyncio.gather(
            *[client.list_roots() for _ in range(20)]
        )
        print([r["ResponseMetadata"]["RetryAttempts"] for r in resp_list])

asyncio.run(main())

It will return a list with objects like this:

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 4, 6, 4, 4, 4, 4, 4, 5, 5]

The list is ordered by response timestamp, earliest first. This time the first 10 responses don’t retry. The next 10 responses each took between 4 and 6 retries.

The default max_retries value is 4. That’s not enough for 20 concurrent requests. If you remove the config parameter, the code will surely fail like this.

botocore.errorfactory.TooManyRequestsException: An error occurred (TooManyRequestsException) when calling the ListRoots operation (reached max retries: 4): AWS Organizations can't complete your request because another request is already in progress. Try again later.

AioHTTP Server Example

Since aioboto3 v8.0.0+, .client and .resource are now async context managers, so it breaks some normal patterns when used with long running processes like web servers.

This example creates an AsyncExitStack which essentially does async with on the context manager retuned by .resource, saves the exit coroutine so that it can be called later to clean up. If you comment out and run _app.on_shutdown.append(shutdown_tasks), you’ll receive a warning stating that an AioHTTP session was not closed.

"""
contextlib.AsyncExitStack requires python 3.7
"""
import contextlib

import aioboto3
from boto3.dynamodb.conditions import Key
from aiohttp import web

routes = web.RouteTableDef()
session = aioboto3.Session()


@routes.get('/')
async def hello(request):

    # request.app['table'] == Table object from boto3 docs
    response = await request.app['table'].query(
        KeyConditionExpression=Key('id').eq('lalalala')
    )

    return web.Response(text=str(response))


async def startup_tasks(app: web.Application) -> None:
    context_stack = contextlib.AsyncExitStack()
    app['context_stack'] = context_stack

    app['dynamo_resource'] = await context_stack.enter_async_context(
        session.resource('dynamodb', region_name='eu-west-1')
    )
    # By now, app['dynamo_resource'] will have methods like .Table() and list_tables() etc...

    # aioboto3 v8.0.0+ all service resources (aka Table(), Bucket() etc...) need to be awaited
    app['table'] = await app['dynamo_resource'].Table('somedynamodbtablename')


async def shutdown_tasks(app: web.Application) -> None:
    await app['context_stack'].aclose()
    # By now, app['dynamo_resource'] would be closed


_app = web.Application()
_app.add_routes(routes)
_app.on_startup.append(startup_tasks)
_app.on_shutdown.append(shutdown_tasks)
web.run_app(_app, port=8000)

TODO

More examples