Usage¶
Ok as the usage nearly mimics that of boto3, I thought it best just to throw lots of examples at you instead. The moral of the story is just prefix boto3 stuff with await.
This library “should” work with Python3.3/3.4 but I havent tested it, so try yield from if you want.
Slight differences¶
aioboto3.resource
will return a boto3 like resource object, but it will also have an awaitable .close()
and also
has __aenter__
and __aexit__
which allows you to use the async with
syntax.
Service resources like s3.Bucket
need to be created using await now, e.g. bucket = await s3_resource.Bucket('somebucket')
DynamoDB Examples¶
Put an item into a DynamoDB table, then query it using the nice Key().eq()
abstraction.
import asyncio
import aioboto3
from boto3.dynamodb.conditions import Key
async def main():
session = aioboto3.Session()
async with session.resource('dynamodb', region_name='eu-central-1') as dynamo_resource:
table = await dynamo_resource.Table('test_table')
await table.put_item(
Item={'pk': 'test1', 'col1': 'some_data'}
)
result = await table.query(
KeyConditionExpression=Key('pk').eq('test1')
)
print(result['Items'])
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# Outputs:
# [{'col1': 'some_data', 'pk': 'test1'}]
Use the batch writer to take care of dynamodb writing retries etc…
import asyncio
import aioboto3
from boto3.dynamodb.conditions import Key
async def main():
session = aioboto3.Session()
async with session.resource('dynamodb', region_name='eu-central-1') as dynamo_resource:
table = await dynamo_resource.Table('test_table')
# As the default batch size is 25, all of these will be written in one batch
async with table.batch_writer() as dynamo_writer:
await dynamo_writer.put_item(Item={'pk': 'test1', 'col1': 'some_data'})
await dynamo_writer.put_item(Item={'pk': 'test2', 'col1': 'some_data'})
await dynamo_writer.put_item(Item={'pk': 'test3', 'col1': 'some_data'})
await dynamo_writer.put_item(Item={'pk': 'test4', 'col1': 'some_data'})
await dynamo_writer.put_item(Item={'pk': 'test5', 'col1': 'some_data'})
result = await table.scan()
print(result['Count'])
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# Outputs:
# 5
The batch_writer()
can take a keyword argument of flush_amount
which will change the desired flush amount and a keyword argument
of on_exit_loop_sleep
. The on_exit_loop_sleep
argument will add an async sleep in the flush loop when you exit the context manager.
S3 Examples¶
Here are some examples of uploading and streaming a file from S3, serving via aiohttp.
Upload¶
Here we upload from a file object and stream it from a file descriptor.
async def upload(
suite: str,
release: str,
filename: str,
staging_path: Path,
bucket: str,
) -> str:
blob_s3_key = f"{suite}/{release}/{filename}"
session = aioboto3.Session()
async with session.client("s3") as s3:
try:
with staging_path.open("rb") as spfp:
LOG.info(f"Uploading {blob_s3_key} to s3")
await s3.upload_fileobj(spfp, bucket, blob_s3_key)
LOG.info(f"Finished Uploading {blob_s3_key} to s3")
except Exception as e:
LOG.error(f"Unable to s3 upload {staging_path} to {blob_s3_key}: {e} ({type(e)})")
return ""
return f"s3://{blob_s3_key}"
Streaming Download¶
Here we pull the object from S3 in chunks and serve it out to a HTTP request via aiohttp
from aiohttp import web
from multidict import MultiDict
async def serve_blob(
suite: str,
release: str,
filename: str,
bucket: str,
request: web.Request,
chunk_size: int = 69 * 1024
) -> web.StreamResponse:
blob_s3_key = f"{suite}/{release}/{filename}"
session = aioboto3.Session()
async with session.client("s3") as s3:
LOG.info(f"Serving {bucket} {blob_s3_key}")
s3_ob = await s3.get_object(Bucket=bucket, Key=blob_s3_key)
ob_info = s3_ob["ResponseMetadata"]["HTTPHeaders"]
resp = web.StreamResponse(
headers=MultiDict(
{
"CONTENT-DISPOSITION": (
f"attachment; filename='{filename}'"
),
"Content-Type": ob_info["content-type"],
}
)
)
resp.content_type = ob_info["content-type"]
resp.content_length = ob_info["content-length"]
await resp.prepare(request)
stream = s3_ob["Body"]
while file_data := stream.read(chunk_size):
await resp.write(file_data)
return resp
S3 Resource Objects¶
The S3 Bucket object also works but its methods have been asyncified. E.g.
import aioboto3
async def main():
session = aioboto3.Session()
async with session.resource("s3") as s3:
bucket = await s3.Bucket('mybucket')
async for s3_object in bucket.objects.all():
print(s3_object)
async for s3_object in bucket.objects.filter(Prefix='someprefix/'):
print(s3_object)
await bucket.objects.all().delete()
# or
await bucket.objects.filter(Prefix='test/').delete()
Misc¶
As you can see, it also works for standard client connections too.
import asyncio
import aioboto3
async def main():
session = aioboto3.Session()
async with session.client('ssm', region_name='eu-central-1') as ssm_client:
result = await ssm_client.describe_parameters()
print(result['Parameters'])
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# Outputs:
# []
AioHTTP Server Example¶
Since aioboto3 v8.0.0+, .client
and .resource
are now async context managers, so it breaks some normal patterns when used with long
running processes like web servers.
This example creates an AsyncExitStack which essentially does async with
on the context manager retuned by .resource
, saves the exit
coroutine so that it can be called later to clean up. If you comment out and run _app.on_shutdown.append(shutdown_tasks)
, you’ll
receive a warning stating that an AioHTTP session was not closed.
"""
contextlib.AsyncExitStack requires python 3.7
"""
import contextlib
import aioboto3
from boto3.dynamodb.conditions import Key
from aiohttp import web
routes = web.RouteTableDef()
session = aioboto3.Session()
@routes.get('/')
async def hello(request):
# request.app['table'] == Table object from boto3 docs
response = await request.app['table'].query(
KeyConditionExpression=Key('id').eq('lalalala')
)
return web.Response(text=str(response))
async def startup_tasks(app: web.Application) -> None:
context_stack = contextlib.AsyncExitStack()
app['context_stack'] = context_stack
app['dynamo_resource'] = await context_stack.enter_async_context(
session.resource('dynamodb', region_name='eu-west-1')
)
# By now, app['dynamo_resource'] will have methods like .Table() and list_tables() etc...
# aioboto3 v8.0.0+ all service resources (aka Table(), Bucket() etc...) need to be awaited
app['table'] = await app['dynamo_resource'].Table('somedynamodbtablename')
async def shutdown_tasks(app: web.Application) -> None:
await app['context_stack'].aclose()
# By now, app['dynamo_resource'] would be closed
_app = web.Application()
_app.add_routes(routes)
_app.on_startup.append(startup_tasks)
_app.on_shutdown.append(shutdown_tasks)
web.run_app(_app, port=8000)
TODO¶
More examples