AWS boto3 시작하기

IT기술/cloud, docker|2021. 1. 16. 00:24

2020-01-10 작성

 

boto3은 AWS를 python 코드로 사용할 수 있게 해주는 라이브러리이다.
처음 시작은 boto3의 QuickStart를 보고 하는게 빠르다.

 

Boto is the Amazon Web Services (AWS) SDK for Python. It enables Python developers to create, configure, and manage AWS services, such as EC2 and S3. Boto provides an easy to use, object-oriented API, as well as low-level access to AWS services.]

https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html

 

1. boto3을 설치한다.

pip install boto3

2. AWS > iam > 사용자 > 보안 자격 증명에서 액세스 키 생성

 

3. credentials 설정파일 입력

touch ~/.aws/credentials
vim ~/.aws/credentials

### 다음 부분을 입력한다.
[default]
aws_access_key_id = YOUR_ACCESS_KEY
aws_secret_access_key = YOUR_SECRET_KEY

4. region 설정파일 입력

touch ~/.aws/config
vim ~/.aws/config

### 다음 부분을 입력한다. ap-northeast-2는 서울 리전임
[default]
region=ap-northeast-2

5. s3 bucket 목록 가져오기(여기부터는 python 파일을 만들어서 python으로 실행하면 된다.)

import boto3
 
# Let's use Amazon S3
s3 = boto3.resource('s3')

# Print out bucket names
for bucket in s3.buckets.all():
	print(bucket.name)

6. 파일 업로드 하기

# Upload a new file
data = open('test.parquet', 'rb')
s3.Bucket('my-bucket').put_object(Key='test.parquet', Body=data)

7. parquet 파일을 s3select로 query해보고 출력하기

s3 = boto3.client('s3')
response = s3.select_object_content(
    Bucket=bucket_name,
    Key=file_name,
    ExpressionType='SQL',
    Expression="select * from s3object s limit 10",
    InputSerialization = {'Parquet': {}},
    OutputSerialization = {'JSON': {}},
)
 
 
for event in response['Payload']:
    if 'Records' in event:
        records = event['Records']['Payload'].decode('utf-8')
        print(records)
    elif 'Stats' in event:
        statsDetails = event['Stats']['Details']
        print("Stats details bytesScanned: ")
        print(statsDetails['BytesScanned'])
        print("Stats details bytesProcessed: ")
        print(statsDetails['BytesProcessed'])

8. athena create database

ath = boto3.client('athena')
 
# create database
ath.start_query_execution(
    QueryString='create database mytest',
    ResultConfiguration={'OutputLocation': s3_query_path})

9. athena create table(여기는 ddl file을 생성해서 진행, 경우에 따라서 직접 create query를 만들어줄 수 있음)

# create table
with open('./mydata.ddl') as ddl:
    ath.start_query_execution(
        QueryString=ddl.read(),
        ResultConfiguration={'OutputLocation': s3_query_path})
# mydata.ddl
CREATE EXTERNAL TABLE IF NOT EXISTS
mytest.mytable (
  col1 string,
  col2 integer
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://mytest/mytest/';

10. athena query 실행

#Function for starting athena query
def run_query(query, database, s3_output):
    client = boto3.client('athena')
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
            },
        ResultConfiguration={
            'OutputLocation': s3_output,
            }
        )
    query_execution_id=response['QueryExecutionId']
 
    # get execution status
    for i in range(1, 1 + RETRY_COUNT):
        # get query execution
        query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
        query_execution_status = query_status['QueryExecution']['Status']['State']
        if query_execution_status == 'SUCCEEDED':
            print("STATUS:" + query_execution_status)
            break
        if query_execution_status == 'FAILED':
            raise Exception("STATUS:" + query_execution_status)
        else:
            print("STATUS:" + query_execution_status)
            time.sleep(i)
    else:
        client.stop_query_execution(QueryExecutionId=query_execution_id)
        raise Exception('TIME OVER')
 
 
    # get query results
    result = client.get_query_results(QueryExecutionId=query_execution_id)
    print(result)
    return response

댓글()