everytab/pipeline/03_icon_download/db.go

86 lines
1.9 KiB
Go

package main
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
)
// IconRow represents a row from the icons table to be downloaded.
type IconRow struct {
ID int64
URL string
}
// claimBatch atomically claims a batch of unscanned icons for processing.
// Uses md5 shuffle to spread requests across different hosts.
func claimBatch(ctx context.Context, pool *pgxpool.Pool, limit int) ([]IconRow, error) {
rows, err := pool.Query(ctx, `
UPDATE icons SET scan_state = 'in_progress'
WHERE id IN (
SELECT id FROM icons
WHERE scan_state = 'unscanned'
LIMIT $1
FOR UPDATE SKIP LOCKED
)
RETURNING id, url
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var icons []IconRow
for rows.Next() {
var icon IconRow
if err := rows.Scan(&icon.ID, &icon.URL); err != nil {
return nil, err
}
icons = append(icons, icon)
}
return icons, rows.Err()
}
// DownloadResult holds the outcome of downloading one icon.
type DownloadResult struct {
S3Key string
ContentType string
Width int
Height int
FileSize int
Dedup bool
Err string
ErrType string // "dns", "timeout", "http", "invalid", "too_large", "other"
}
// updateIcon writes the download result back to the icons table.
func updateIcon(ctx context.Context, pool *pgxpool.Pool, iconID int64, result DownloadResult) error {
if result.Err != "" {
_, err := pool.Exec(ctx,
`UPDATE icons SET scan_state = 'failed', error = $1 WHERE id = $2`,
result.Err, iconID)
return err
}
_, err := pool.Exec(ctx, `
UPDATE icons SET
scan_state = 'completed',
s3_key = $1,
content_type = $2,
width = $3,
height = $4,
file_size = $5,
downloaded_at = now()
WHERE id = $6`,
result.S3Key, result.ContentType,
nilIntIf(result.Width, 0), nilIntIf(result.Height, 0),
result.FileSize, iconID)
return err
}
func nilIntIf(v int, zero int) *int {
if v == zero {
return nil
}
return &v
}