用 boto3完成 DynamoDb 的扫描

我的表是大约220mb 与250k 记录在其中。我正在尝试把所有这些数据输入到 python 中。我意识到这需要一个分块批处理过程并循环通过,但是我不确定如何将批处理设置为从前一个停止的地方开始。

有办法过滤我的扫描吗?据我所知,过滤发生在加载后,加载停止在1mb,所以我不会实际上能够扫描新的对象。

如有任何帮助,我将不胜感激。

import boto3
dynamodb = boto3.resource('dynamodb',
aws_session_token = aws_session_token,
aws_access_key_id = aws_access_key_id,
aws_secret_access_key = aws_secret_access_key,
region_name = region
)


table = dynamodb.Table('widgetsTableName')


data = table.scan()
127711 次浏览

Turns out that Boto3 captures the "LastEvaluatedKey" as part of the returned response. This can be used as the start point for a scan:

data= table.scan(
ExclusiveStartKey=data['LastEvaluatedKey']
)

I plan on building a loop around this until the returned data is only the ExclusiveStartKey

boto3 offers paginators that handle all the pagination details for you. Here is the doc page for the scan paginator. Basically, you would use it like so:

import boto3


client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')


for page in paginator.paginate():
# do something

Code for deleting dynamodb format type as @kungphu mentioned.

import boto3


from boto3.dynamodb.types import TypeDeserializer
from boto3.dynamodb.transform import TransformationInjector


client = boto3.client('dynamodb')
paginator = client.get_paginator('query')
service_model = client._service_model.operation_model('Query')
trans = TransformationInjector(deserializer = TypeDeserializer())
for page in paginator.paginate():
trans.inject_attribute_value_output(page, service_model)

I think the Amazon DynamoDB documentation regarding table scanning answers your question.

In short, you'll need to check for LastEvaluatedKey in the response. Here is an example using your code:

import boto3
dynamodb = boto3.resource('dynamodb',
aws_session_token=aws_session_token,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region
)


table = dynamodb.Table('widgetsTableName')


response = table.scan()
data = response['Items']


while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
data.extend(response['Items'])

Riffing off of Jordon Phillips's answer, here's how you'd pass a FilterExpression in with the pagination:

import boto3


client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
operation_parameters = {
'TableName': 'foo',
'FilterExpression': 'bar > :x AND bar < :y',
'ExpressionAttributeValues': {
':x': {'S': '2017-01-31T01:35'},
':y': {'S': '2017-01-31T02:08'},
}
}


page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
# do something

The 2 approaches suggested above both have problems: Either writing lengthy and repetitive code that handles paging explicitly in a loop, or using Boto paginators with low-level sessions, and foregoing the advantages of higher-level Boto objects.

A solution using Python functional code to provide a high-level abstraction allows higher-level Boto methods to be used, while hiding the complexity of AWS paging:

import itertools
import typing


def iterate_result_pages(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Generator:
"""A wrapper for functions using AWS paging, that returns a generator which yields a sequence of items for
every response


Args:
function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
This could be a bound method of an object.


Returns:
A generator which yields the 'Items' field of the result for every response
"""
response = function_returning_response(*args, **kwargs)
yield response["Items"]
while "LastEvaluatedKey" in response:
kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"]
response = function_returning_response(*args, **kwargs)
yield response["Items"]


return


def iterate_paged_results(function_returning_response: typing.Callable, *args, **kwargs) -> typing.Iterator:
"""A wrapper for functions using AWS paging, that returns an iterator of all the items in the responses.
Items are yielded to the caller as soon as they are received.


Args:
function_returning_response: A function (or callable), that returns an AWS response with 'Items' and optionally 'LastEvaluatedKey'
This could be a bound method of an object.


Returns:
An iterator which yields one response item at a time
"""
return itertools.chain.from_iterable(iterate_result_pages(function_returning_response, *args, **kwargs))


# Example, assuming 'table' is a Boto DynamoDB table object:
all_items = list(iterate_paged_results(ProjectionExpression = 'my_field'))

I had some problems with Vincent's answer related to the transformation being applied to the LastEvaluatedKey and messing up the pagination. Solved as follows:

import boto3


from boto3.dynamodb.types import TypeDeserializer
from boto3.dynamodb.transform import TransformationInjector


client = boto3.client('dynamodb')
paginator = client.get_paginator('scan')
operation_model = client._service_model.operation_model('Scan')
trans = TransformationInjector(deserializer = TypeDeserializer())
operation_parameters = {
'TableName': 'tablename',
}
items = []


for page in paginator.paginate(**operation_parameters):
has_last_key = 'LastEvaluatedKey' in page
if has_last_key:
last_key = page['LastEvaluatedKey'].copy()
trans.inject_attribute_value_output(page, operation_model)
if has_last_key:
page['LastEvaluatedKey'] = last_key
items.extend(page['Items'])

DynamoDB limits the scan method to 1mb of data per scan.

Documentation: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan

Here is an example loop to get all the data from a DynamoDB table using LastEvaluatedKey:

import boto3
client = boto3.client('dynamodb')


def dump_table(table_name):
results = []
last_evaluated_key = None
while True:
if last_evaluated_key:
response = client.scan(
TableName=table_name,
ExclusiveStartKey=last_evaluated_key
)
else:
response = client.scan(TableName=table_name)
last_evaluated_key = response.get('LastEvaluatedKey')
        

results.extend(response['Items'])
        

if not last_evaluated_key:
break
return results


# Usage
data = dump_table('your-table-name')


# do something with data


I can't work out why Boto3 provides high-level resource abstraction but doesn't provide pagination. When it does provide pagination, it's hard to use!

The other answers to this question were good but I wanted a super simple way to wrap the boto3 methods and provide memory-efficient paging using generators:

import typing
import boto3
import boto3.dynamodb.conditions




def paginate_dynamodb_response(dynamodb_action: typing.Callable, **kwargs) -> typing.Generator[dict, None, None]:


# Using the syntax from https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/python/example_code/dynamodb/GettingStarted/MoviesScan.py
keywords = kwargs


done = False
start_key = None


while not done:
if start_key:
keywords['ExclusiveStartKey'] = start_key


response = dynamodb_action(**keywords)


start_key = response.get('LastEvaluatedKey', None)
done = start_key is None


for item in response.get("Items", []):
yield item




## Usage ##
dynamodb_res = boto3.resource('dynamodb')
dynamodb_table = dynamodb_res.Table('my-table')


query = paginate_dynamodb_response(
dynamodb_table.query, # The boto3 method. E.g. query or scan
KeyConditionExpression=boto3.dynamodb.conditions.Key('id').eq('1234') # Regular Query or Scan parameters
)


for x in query:
print(x)

If you are landing here looking for a paginated scan with some filtering expression(s):

def scan(table, **kwargs):
response = table.scan(**kwargs)
yield from response['Items']
while response.get('LastEvaluatedKey'):
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], **kwargs)
yield from response['Items']

Example usage:

table = boto3.Session(...).resource('dynamodb').Table('widgetsTableName')


items = list(scan(table, FilterExpression=Attr('name').contains('foo')))