将 Dataframe 保存到 csv,直接保存到 s3Python

我有一个 pandas DataFrame,我想上传到一个新的 CSV 文件。问题是,在将文件传输到 s3之前,我不想在本地保存它。有没有像 to _ csv 这样的方法可以直接把数据框写到 s3?我在用 boto3。< br > 以下是我目前所掌握的信息:
import boto3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
read_file = s3.get_object(Bucket, Key)
df = pd.read_csv(read_file['Body'])


# Make alterations to DataFrame


# Then export DataFrame to CSV through direct transfer to s3
232660 次浏览

如果将 None作为第一个参数传递给 to_csv(),则数据将作为字符串返回。从那里它是一个简单的步骤,上传到 S3在一去。

也可以将 StringIO对象传递给 to_csv(),但是使用字符串会更容易。

你可使用:

from io import StringIO # python3; python2: BytesIO
import boto3


bucket = 'my_bucket_name' # already created on S3
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'df.csv').put(Body=csv_buffer.getvalue())

我喜欢 S3fs,它允许您像使用本地文件系统一样使用 s3(几乎)。

你可以这样做:

import s3fs


bytes_to_write = df.to_csv(None).encode()
fs = s3fs.S3FileSystem(key=key, secret=secret)
with fs.open('s3://bucket/path/to/file.csv', 'wb') as f:
f.write(bytes_to_write)

s3fs只支持 rbwb模式的打开文件,这就是为什么我做这个 bytes_to_write的东西。

我从 bucket s3中读取了一个带有两列的 csv 文件,并且将文件的内容放入 Pandas dataframe 中。

例如:

Config.json

{
"credential": {
"access_key":"xxxxxx",
"secret_key":"xxxxxx"
}
,
"s3":{
"bucket":"mybucket",
"key":"csv/user.csv"
}
}

Cls _ config. json

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import os
import json


class cls_config(object):


def __init__(self,filename):


self.filename = filename




def getConfig(self):


fileName = os.path.join(os.path.dirname(__file__), self.filename)
with open(fileName) as f:
config = json.load(f)
return config

Cls _ anda. py

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import pandas as pd
import io


class cls_pandas(object):


def __init__(self):
pass


def read(self,stream):


df = pd.read_csv(io.StringIO(stream), sep = ",")
return df

Cls _ s3. py

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import boto3
import json


class cls_s3(object):


def  __init__(self,access_key,secret_key):


self.s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)


def getObject(self,bucket,key):


read_file = self.s3.get_object(Bucket=bucket, Key=key)
body = read_file['Body'].read().decode('utf-8')
return body

Test.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-


from cls_config import *
from cls_s3 import *
from cls_pandas import *


class test(object):


def __init__(self):
self.conf = cls_config('config.json')


def process(self):


conf = self.conf.getConfig()


bucket = conf['s3']['bucket']
key = conf['s3']['key']


access_key = conf['credential']['access_key']
secret_key = conf['credential']['secret_key']


s3 = cls_s3(access_key,secret_key)
ob = s3.getObject(bucket,key)


pa = cls_pandas()
df = pa.read(ob)


print df


if __name__ == '__main__':
test = test()
test.process()

这是一个更新的答案:

import s3fs


s3 = s3fs.S3FileSystem(anon=False)


# Use 'w' for py3, 'wb' for py2
with s3.open('<bucket-name>/<filename>.csv','w') as f:
df.to_csv(f)

StringIO 的问题在于它会消耗你的内存。使用这种方法,您可以将文件流传输到 s3,而不是将其转换为字符串,然后将其写入 s3。在内存中保存大熊猫数据框架及其字符串副本似乎效率很低。

如果您在 ec2中工作,您可以赋予它一个 IAM 角色来启用将它写到 s3,因此您不需要直接传递凭据。但是,您也可以通过向 S3FileSystem()函数传递凭据来连接到桶。见文档: https://s3fs.readthedocs.io/en/latest/

您可以直接使用 S3路径

In [1]: import pandas as pd


In [2]: df = pd.DataFrame( [ [1, 1, 1], [2, 2, 2] ], columns=['a', 'b', 'c'])


In [3]: df
Out[3]:
a  b  c
0  1  1  1
1  2  2  2


In [4]: df.to_csv('s3://experimental/playground/temp_csv/dummy.csv', index=False)


In [5]: pd.__version__
Out[5]: '0.24.1'


In [6]: new_df = pd.read_csv('s3://experimental/playground/temp_csv/dummy.csv')


In [7]: new_df
Out[7]:
a  b  c
0  1  1  1
1  2  2  2


翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳翻译: 奇芳

S3文件处理

熊猫现在使用 s3fs 来处理 S3连接。这不会破坏任何密码。但是,由于 s3fs 不是必需的依赖项,您需要单独安装它,就像在以前版本的熊猫中的 boto 一样。GH11915.

既然你使用的是 boto3.client(),试试:

import boto3
from io import StringIO #python3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
def copy_to_s3(client, df, bucket, filepath):
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=filepath)
print(f'Copy {df.shape[0]} rows to S3 Bucket {bucket} at {filepath}, Done!')


copy_to_s3(client=s3, df=df_to_upload, bucket='abc', filepath='def/test.csv')

你亦可使用 自动气象站数据管理器:

import awswrangler as wr
    

wr.s3.to_csv(
df=df,
path="s3://...",
)

请注意,它将为您处理多部分上传,使上传更快。

我发现这可以做到使用 client也不只是 resource

from io import StringIO
import boto3
s3 = boto3.client("s3",\
region_name=region_name,\
aws_access_key_id=aws_access_key_id,\
aws_secret_access_key=aws_secret_access_key)
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
s3.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key='path/test.csv')

我使用 自动气象站数据管理器,例如:

import awswrangler as wr
import pandas as pd


# read a local dataframe
df = pd.read_parquet('my_local_file.gz')


# upload to S3 bucket
wr.s3.to_parquet(df=df, path='s3://mys3bucket/file_name.gz')

这同样适用于 csv 文件。不要使用 read_parquetto_parquet,而是使用带有适当文件扩展名的 read_csvto_csv

你可以用

  • 熊猫
  • Boto3
  • S3fs (版本≤0.4)

我在路径中使用 to_csvs3://,在路径中使用 storage_options

key = "folder/file.csv"


df.to_csv(
f"s3://{YOUR_S3_BUCKET}/{key}",
index=False,
storage_options={
"key": AWS_ACCESS_KEY_ID,
"secret": AWS_SECRET_ACCESS_KEY,
"token": AWS_SESSION_TOKEN,
},

为了有效地处理大文件,你也可以使用一个开源的兼容 S3的 MinIO,它的 minio Python 客户端包,就像我的这个函数一样:

import minio
import os
import pandas as pd


minio_client = minio.Minio(..)


def write_df_to_minio(df,
minio_client,
bucket_name,
file_name="new-file.csv",
local_temp_folder="/tmp/",
content_type="application/csv",
sep=",",
save_row_index=False):


df.to_csv(os.path.join(local_temp_folder, file_name), sep=sep, index=save_row_index)
    

minio_results = minio_client.fput_object(bucket_name=bucket_name,
object_name=file_name,
file_path=os.path.join(local_temp_folder, file_name),
content_type=content_type)


assert minio_results.object_name == file_name


另一个选择是使用支持 S3的 Cloudpathlib,它还支持 Google Cloud Storage 和 Azure Blob Storage。请看下面的例子。

import pandas as pd
from cloudpathlib import CloudPath


# read data from S3
df = pd.read_csv(CloudPath("s3://covid19-lake/rearc-covid-19-testing-data/csv/states_daily/states_daily.csv"))


# look at some of the data
df.head(1).T.iloc[:10]
#>                                       0
#> date                           20210307
#> state                                AK
#> positive                        56886.0
#> probableCases                       NaN
#> negative                            NaN
#> pending                             NaN
#> totalTestResultsSource  totalTestsViral
#> totalTestResults              1731628.0
#> hospitalizedCurrently              33.0
#> hospitalizedCumulative           1293.0


# writing to S3
with CloudPath("s3://bucket-you-can-write-to/data.csv").open("w") as f:
df.to_csv(f)


CloudPath("s3://bucket-you-can-write-to/data.csv").exists()
#> True

注意,您不能直接调用 df.to_csv(CloudPath("s3://drivendata-public-assets/test-asdf2.csv")),因为 Pandas 处理传递给它的路径/句柄的方式。相反,您需要打开该文件进行写入,并将该句柄直接传递给 to_csv

这在设置 特别的选择不同的认证机制或保持 持久缓存方面有一些额外的好处,所以您不必总是从 S3重新下载。

from io import StringIO
import boto3
#Creating Session With Boto3.
session = boto3.Session(
aws_access_key_id='<your_access_key_id>',
aws_secret_access_key='<your_secret_access_key>'
)
#Creating S3 Resource From the Session.
s3_res = session.resource('s3')
csv_buffer = StringIO()
df.to_csv(csv_buffer)
bucket_name = 'stackvidhya'
s3_object_name = 'df.csv'
s3_res.Object(bucket_name, s3_object_name).put(Body=csv_buffer.getvalue())
print("Dataframe is saved as CSV in S3 bucket.")