everytab/pipeline/05_bundle_gen/s3.go

80 lines
1.9 KiB
Go

package main
import (
"bytes"
"context"
"fmt"
"io"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
var s3Client *s3.Client
func initS3() error {
cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion("us-east-1"))
if err != nil {
return err
}
s3Client = s3.NewFromConfig(cfg)
return nil
}
// s3Download fetches an object from S3.
func s3Download(ctx context.Context, bucket, key string) ([]byte, error) {
resp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
// s3UploadBundle uploads a bundle JSON to S3.
func s3UploadBundle(ctx context.Context, bucket, key string, data []byte) error {
_, err := s3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
ContentType: aws.String("application/json"),
})
return err
}
// s3DeletePrefix deletes all objects under a prefix in S3.
func s3DeletePrefix(ctx context.Context, bucket, prefix string) error {
paginator := s3.NewListObjectsV2Paginator(s3Client, &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return err
}
if len(page.Contents) == 0 {
continue
}
var objects []types.ObjectIdentifier
for _, obj := range page.Contents {
objects = append(objects, types.ObjectIdentifier{Key: obj.Key})
}
_, err = s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(bucket),
Delete: &types.Delete{Objects: objects},
})
if err != nil {
return fmt.Errorf("delete batch: %w", err)
}
}
return nil
}