added icon downloader
This commit is contained in:
parent
8b5693b5c6
commit
5a2e37ae06
10 changed files with 829 additions and 68 deletions
89
pipeline/03_icon_download/db.go
Normal file
89
pipeline/03_icon_download/db.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
// IconRow represents a row from the icons table to be downloaded.
|
||||
type IconRow struct {
|
||||
ID int64
|
||||
URL string
|
||||
}
|
||||
|
||||
// claimBatch atomically claims a batch of unscanned icons for processing.
|
||||
// Uses md5 shuffle to spread requests across different hosts.
|
||||
func claimBatch(ctx context.Context, pool *pgxpool.Pool, limit int) ([]IconRow, error) {
|
||||
rows, err := pool.Query(ctx, `
|
||||
UPDATE icons SET scan_state = 'in_progress'
|
||||
WHERE id IN (
|
||||
SELECT id FROM icons
|
||||
WHERE scan_state = 'unscanned'
|
||||
AND (source = 'favicon_ico'
|
||||
OR rel_sizes IS NULL
|
||||
OR rel_sizes IN ('16x16','32x32','48x48','64x64'))
|
||||
ORDER BY md5(id::text)
|
||||
LIMIT $1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id, url
|
||||
`, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var icons []IconRow
|
||||
for rows.Next() {
|
||||
var icon IconRow
|
||||
if err := rows.Scan(&icon.ID, &icon.URL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
icons = append(icons, icon)
|
||||
}
|
||||
return icons, rows.Err()
|
||||
}
|
||||
|
||||
// DownloadResult holds the outcome of downloading one icon.
|
||||
type DownloadResult struct {
|
||||
S3Key string
|
||||
ContentType string
|
||||
Width int
|
||||
Height int
|
||||
FileSize int
|
||||
Dedup bool
|
||||
Err string
|
||||
ErrType string // "dns", "timeout", "http", "invalid", "too_large", "other"
|
||||
}
|
||||
|
||||
// updateIcon writes the download result back to the icons table.
|
||||
func updateIcon(ctx context.Context, pool *pgxpool.Pool, iconID int64, result DownloadResult) error {
|
||||
if result.Err != "" {
|
||||
_, err := pool.Exec(ctx,
|
||||
`UPDATE icons SET scan_state = 'failed', error = $1 WHERE id = $2`,
|
||||
result.Err, iconID)
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := pool.Exec(ctx, `
|
||||
UPDATE icons SET
|
||||
scan_state = 'completed',
|
||||
s3_key = $1,
|
||||
content_type = $2,
|
||||
width = $3,
|
||||
height = $4,
|
||||
file_size = $5
|
||||
WHERE id = $6`,
|
||||
result.S3Key, result.ContentType,
|
||||
nilIntIf(result.Width, 0), nilIntIf(result.Height, 0),
|
||||
result.FileSize, iconID)
|
||||
return err
|
||||
}
|
||||
|
||||
func nilIntIf(v int, zero int) *int {
|
||||
if v == zero {
|
||||
return nil
|
||||
}
|
||||
return &v
|
||||
}
|
||||
157
pipeline/03_icon_download/download.go
Normal file
157
pipeline/03_icon_download/download.go
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// processIcon downloads, validates, and uploads a single icon.
|
||||
func processIcon(ctx context.Context, icon IconRow, cfg Config) DownloadResult {
|
||||
// Download
|
||||
data, contentType, err := downloadIcon(icon.URL, cfg.Timeout, cfg.MaxSize)
|
||||
if err != nil {
|
||||
errType := classifyError(err)
|
||||
return DownloadResult{Err: err.Error(), ErrType: errType}
|
||||
}
|
||||
|
||||
// Validate it's an image
|
||||
detectedType := detectImageType(data)
|
||||
if detectedType == "" {
|
||||
return DownloadResult{Err: "not a valid image", ErrType: "invalid"}
|
||||
}
|
||||
|
||||
// Use detected type over HTTP Content-Type (more reliable)
|
||||
if contentType == "" || contentType == "application/octet-stream" {
|
||||
contentType = detectedType
|
||||
}
|
||||
|
||||
// Get dimensions
|
||||
width, height := getImageDimensions(data, detectedType)
|
||||
|
||||
// Compute SHA-256 for content-addressed storage
|
||||
hash := sha256.Sum256(data)
|
||||
s3Key := hex.EncodeToString(hash[:])
|
||||
|
||||
// Upload to S3 (skip if already exists — dedup)
|
||||
dedup := false
|
||||
if !cfg.DryRun {
|
||||
exists, err := s3Exists(ctx, s3Key)
|
||||
if err == nil && exists {
|
||||
dedup = true
|
||||
} else {
|
||||
if err := s3Upload(ctx, s3Key, data, contentType); err != nil {
|
||||
return DownloadResult{Err: fmt.Sprintf("s3 upload: %v", err), ErrType: "other"}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return DownloadResult{
|
||||
S3Key: s3Key,
|
||||
ContentType: contentType,
|
||||
Width: width,
|
||||
Height: height,
|
||||
FileSize: len(data),
|
||||
Dedup: dedup,
|
||||
}
|
||||
}
|
||||
|
||||
// httpTransport is shared across all goroutines for connection pooling and TLS session reuse.
|
||||
var httpTransport = &http.Transport{
|
||||
MaxIdleConns: 1000,
|
||||
MaxIdleConnsPerHost: 2,
|
||||
IdleConnTimeout: 30 * time.Second,
|
||||
DisableKeepAlives: false,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 5 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).DialContext,
|
||||
TLSHandshakeTimeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
// downloadIcon fetches an icon URL with timeouts and size limits.
|
||||
func downloadIcon(url string, timeout time.Duration, maxSize int64) ([]byte, string, error) {
|
||||
client := &http.Client{
|
||||
Timeout: timeout,
|
||||
Transport: httpTransport,
|
||||
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||
if len(via) >= 3 {
|
||||
return fmt.Errorf("too many redirects")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("bad url: %w", err)
|
||||
}
|
||||
req.Header.Set("User-Agent", "Mozilla/5.0 (compatible; EveryTabBot/1.0; +https://everytab.site/bot)")
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, "", fmt.Errorf("http %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
// Read with size limit
|
||||
limited := io.LimitReader(resp.Body, maxSize+1)
|
||||
data, err := io.ReadAll(limited)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("read: %w", err)
|
||||
}
|
||||
if int64(len(data)) > maxSize {
|
||||
return nil, "", fmt.Errorf("exceeds %dKB", maxSize/1024)
|
||||
}
|
||||
|
||||
contentType := resp.Header.Get("Content-Type")
|
||||
// Strip charset suffix
|
||||
if idx := strings.Index(contentType, ";"); idx != -1 {
|
||||
contentType = strings.TrimSpace(contentType[:idx])
|
||||
}
|
||||
|
||||
return data, contentType, nil
|
||||
}
|
||||
|
||||
// classifyError categorizes a download error for stats.
|
||||
func classifyError(err error) string {
|
||||
msg := err.Error()
|
||||
|
||||
// DNS errors
|
||||
if _, ok := err.(*net.DNSError); ok {
|
||||
return "dns"
|
||||
}
|
||||
if strings.Contains(msg, "no such host") || strings.Contains(msg, "dns") {
|
||||
return "dns"
|
||||
}
|
||||
|
||||
// Timeouts
|
||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||
return "timeout"
|
||||
}
|
||||
if strings.Contains(msg, "timeout") || strings.Contains(msg, "deadline") {
|
||||
return "timeout"
|
||||
}
|
||||
|
||||
// HTTP errors
|
||||
if strings.Contains(msg, "http ") {
|
||||
return "http"
|
||||
}
|
||||
|
||||
// Too large
|
||||
if strings.Contains(msg, "exceeds") {
|
||||
return "too_large"
|
||||
}
|
||||
|
||||
return "other"
|
||||
}
|
||||
139
pipeline/03_icon_download/image.go
Normal file
139
pipeline/03_icon_download/image.go
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"image"
|
||||
_ "image/gif"
|
||||
_ "image/jpeg"
|
||||
_ "image/png"
|
||||
|
||||
_ "golang.org/x/image/webp"
|
||||
)
|
||||
|
||||
// detectImageType checks magic bytes to determine the actual image format.
|
||||
// Returns empty string if not a recognized image format.
|
||||
func detectImageType(data []byte) string {
|
||||
if len(data) < 4 {
|
||||
return ""
|
||||
}
|
||||
|
||||
// PNG: 89 50 4E 47
|
||||
if data[0] == 0x89 && data[1] == 'P' && data[2] == 'N' && data[3] == 'G' {
|
||||
return "image/png"
|
||||
}
|
||||
|
||||
// GIF: GIF87a or GIF89a
|
||||
if data[0] == 'G' && data[1] == 'I' && data[2] == 'F' {
|
||||
return "image/gif"
|
||||
}
|
||||
|
||||
// JPEG: FF D8 FF
|
||||
if data[0] == 0xFF && data[1] == 0xD8 && data[2] == 0xFF {
|
||||
return "image/jpeg"
|
||||
}
|
||||
|
||||
// ICO: 00 00 01 00
|
||||
if data[0] == 0x00 && data[1] == 0x00 && data[2] == 0x01 && data[3] == 0x00 {
|
||||
return "image/x-icon"
|
||||
}
|
||||
|
||||
// BMP: BM
|
||||
if data[0] == 'B' && data[1] == 'M' {
|
||||
return "image/bmp"
|
||||
}
|
||||
|
||||
// WebP: RIFF....WEBP
|
||||
if len(data) >= 12 && string(data[0:4]) == "RIFF" && string(data[8:12]) == "WEBP" {
|
||||
return "image/webp"
|
||||
}
|
||||
|
||||
// SVG: look for <?xml or <svg in first 256 bytes
|
||||
if len(data) > 5 {
|
||||
header := string(data[:min(256, len(data))])
|
||||
if bytes.Contains([]byte(header), []byte("<svg")) || bytes.Contains([]byte(header), []byte("<?xml")) {
|
||||
return "image/svg+xml"
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
// getImageDimensions reads image dimensions from the data.
|
||||
// Returns (0, 0) for SVG or if dimensions can't be determined.
|
||||
func getImageDimensions(data []byte, contentType string) (int, int) {
|
||||
switch contentType {
|
||||
case "image/svg+xml":
|
||||
return 0, 0
|
||||
case "image/x-icon":
|
||||
return getICODimensions(data)
|
||||
default:
|
||||
// Use Go's image.DecodeConfig for standard formats
|
||||
cfg, _, err := image.DecodeConfig(bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return 0, 0
|
||||
}
|
||||
return cfg.Width, cfg.Height
|
||||
}
|
||||
}
|
||||
|
||||
// getICODimensions reads the ICO directory to find the largest image ≤64x64.
|
||||
// ICO format: 6-byte header + 16-byte directory entries.
|
||||
func getICODimensions(data []byte) (int, int) {
|
||||
if len(data) < 6 {
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
numImages := int(binary.LittleEndian.Uint16(data[4:6]))
|
||||
if numImages == 0 || len(data) < 6+numImages*16 {
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
bestW, bestH := 0, 0
|
||||
for i := 0; i < numImages; i++ {
|
||||
offset := 6 + i*16
|
||||
w := int(data[offset])
|
||||
h := int(data[offset+1])
|
||||
// ICO uses 0 to mean 256
|
||||
if w == 0 {
|
||||
w = 256
|
||||
}
|
||||
if h == 0 {
|
||||
h = 256
|
||||
}
|
||||
|
||||
// Pick the largest that's ≤64x64
|
||||
if w <= 64 && h <= 64 && w*h > bestW*bestH {
|
||||
bestW = w
|
||||
bestH = h
|
||||
}
|
||||
}
|
||||
|
||||
// If nothing ≤64, just report the largest
|
||||
if bestW == 0 {
|
||||
for i := 0; i < numImages; i++ {
|
||||
offset := 6 + i*16
|
||||
w := int(data[offset])
|
||||
h := int(data[offset+1])
|
||||
if w == 0 {
|
||||
w = 256
|
||||
}
|
||||
if h == 0 {
|
||||
h = 256
|
||||
}
|
||||
if w*h > bestW*bestH {
|
||||
bestW = w
|
||||
bestH = h
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return bestW, bestH
|
||||
}
|
||||
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
92
pipeline/03_icon_download/log.go
Normal file
92
pipeline/03_icon_download/log.go
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type LogWriter struct {
|
||||
file *os.File
|
||||
mu sync.Mutex
|
||||
errorsOnly bool
|
||||
}
|
||||
|
||||
func NewLogWriter(path string, errorsOnly bool) (*LogWriter, error) {
|
||||
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &LogWriter{file: f, errorsOnly: errorsOnly}, nil
|
||||
}
|
||||
|
||||
func (lw *LogWriter) Write(line string, isError bool) {
|
||||
if lw.errorsOnly && !isError {
|
||||
return
|
||||
}
|
||||
lw.mu.Lock()
|
||||
defer lw.mu.Unlock()
|
||||
fmt.Fprintln(lw.file, line)
|
||||
}
|
||||
|
||||
func (lw *LogWriter) Close() error {
|
||||
return lw.file.Close()
|
||||
}
|
||||
|
||||
func formatLogLine(icon IconRow, result DownloadResult) string {
|
||||
if result.Err != "" {
|
||||
return fmt.Sprintf("icon: %s err:%s %s", icon.URL, result.ErrType, result.Err)
|
||||
}
|
||||
|
||||
dedup := ""
|
||||
if result.Dedup {
|
||||
dedup = " dedup"
|
||||
}
|
||||
|
||||
dims := ""
|
||||
if result.Width > 0 && result.Height > 0 {
|
||||
dims = fmt.Sprintf(" %dx%d", result.Width, result.Height)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("icon: %s %s%s %.1fKB%s ok",
|
||||
icon.URL, result.ContentType, dims,
|
||||
float64(result.FileSize)/1024, dedup)
|
||||
}
|
||||
|
||||
func writeStats(stats *Stats) {
|
||||
finishedAt := time.Now()
|
||||
duration := finishedAt.Sub(stats.StartedAt)
|
||||
|
||||
data := map[string]interface{}{
|
||||
"started_at": stats.StartedAt.Format(time.RFC3339),
|
||||
"finished_at": finishedAt.Format(time.RFC3339),
|
||||
"duration_seconds": int(duration.Seconds()),
|
||||
"processed": stats.Processed.Load(),
|
||||
"completed": stats.Completed.Load(),
|
||||
"failed": stats.Failed.Load(),
|
||||
"failed_dns": stats.DNSErrors.Load(),
|
||||
"failed_timeout": stats.Timeouts.Load(),
|
||||
"failed_http": stats.HTTPErrors.Load(),
|
||||
"failed_invalid": stats.InvalidImg.Load(),
|
||||
"failed_too_large": stats.TooLarge.Load(),
|
||||
"dedup_hits": stats.DedupHits.Load(),
|
||||
"db_errors": stats.DBErrors.Load(),
|
||||
"panics": stats.Panics.Load(),
|
||||
"bytes_downloaded": stats.BytesDown.Load(),
|
||||
}
|
||||
|
||||
os.MkdirAll("stats", 0755)
|
||||
f, err := os.Create("stats/03_icon_download.json")
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to write stats: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
enc := json.NewEncoder(f)
|
||||
enc.SetIndent("", " ")
|
||||
enc.Encode(data)
|
||||
fmt.Println("Stats written to stats/03_icon_download.json")
|
||||
}
|
||||
241
pipeline/03_icon_download/main.go
Normal file
241
pipeline/03_icon_download/main.go
Normal file
|
|
@ -0,0 +1,241 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
DBUrl string
|
||||
S3Bucket string
|
||||
BatchSize int
|
||||
Concurrency int
|
||||
Limit int
|
||||
Timeout time.Duration
|
||||
MaxSize int64
|
||||
DryRun bool
|
||||
LogFile string
|
||||
LogErrors bool
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
Processed atomic.Int64
|
||||
Completed atomic.Int64
|
||||
Failed atomic.Int64
|
||||
DedupHits atomic.Int64
|
||||
DNSErrors atomic.Int64
|
||||
Timeouts atomic.Int64
|
||||
HTTPErrors atomic.Int64
|
||||
InvalidImg atomic.Int64
|
||||
TooLarge atomic.Int64
|
||||
DBErrors atomic.Int64
|
||||
Panics atomic.Int64
|
||||
BytesDown atomic.Int64
|
||||
StartedAt time.Time
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := Config{}
|
||||
flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)")
|
||||
flag.StringVar(&cfg.S3Bucket, "s3-bucket", "everytab-icons", "S3 bucket for icons")
|
||||
flag.IntVar(&cfg.BatchSize, "batch-size", 200, "Rows to claim per batch")
|
||||
flag.IntVar(&cfg.Concurrency, "concurrency", 200, "Number of concurrent goroutines")
|
||||
flag.IntVar(&cfg.Limit, "limit", 0, "Max icons to process (0 = all)")
|
||||
flag.DurationVar(&cfg.Timeout, "timeout", 10*time.Second, "HTTP request timeout")
|
||||
flag.Int64Var(&cfg.MaxSize, "max-size", 512*1024, "Max icon download size in bytes")
|
||||
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Download but don't upload to S3 or update DB")
|
||||
flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file")
|
||||
flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file")
|
||||
flag.Parse()
|
||||
|
||||
if cfg.DBUrl == "" {
|
||||
fmt.Println("Usage: icon_download --db DATABASE_URL [OPTIONS]")
|
||||
flag.PrintDefaults()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Init S3
|
||||
if err := initS3(cfg.S3Bucket); err != nil {
|
||||
log.Fatalf("Failed to init S3: %v", err)
|
||||
}
|
||||
|
||||
// Init DB pool
|
||||
pool, err := pgxpool.New(ctx, cfg.DBUrl)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to database: %v", err)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
||||
// Count eligible icons
|
||||
var total int64
|
||||
err = pool.QueryRow(ctx, `
|
||||
SELECT COUNT(*) FROM icons
|
||||
WHERE scan_state = 'unscanned'
|
||||
AND (source = 'favicon_ico'
|
||||
OR rel_sizes IS NULL
|
||||
OR rel_sizes IN ('16x16','32x32','48x48','64x64'))
|
||||
`).Scan(&total)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to count icons: %v", err)
|
||||
}
|
||||
|
||||
if cfg.Limit > 0 && int64(cfg.Limit) < total {
|
||||
total = int64(cfg.Limit)
|
||||
}
|
||||
|
||||
if total == 0 {
|
||||
fmt.Println("No eligible unscanned icons found.")
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("=== Icon Downloader ===\n")
|
||||
fmt.Printf("Eligible icons: %d\n", total)
|
||||
fmt.Printf("Concurrency: %d\n", cfg.Concurrency)
|
||||
fmt.Printf("Timeout: %s\n", cfg.Timeout)
|
||||
fmt.Printf("Max size: %dKB\n", cfg.MaxSize/1024)
|
||||
fmt.Printf("S3 bucket: %s\n", cfg.S3Bucket)
|
||||
fmt.Printf("Dry run: %v\n\n", cfg.DryRun)
|
||||
|
||||
// Setup log file
|
||||
var logWriter *LogWriter
|
||||
if cfg.LogFile != "" {
|
||||
logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to open log file: %v", err)
|
||||
}
|
||||
defer logWriter.Close()
|
||||
}
|
||||
|
||||
stats := &Stats{StartedAt: time.Now()}
|
||||
|
||||
// Feed icons into a channel so workers never starve waiting for batch claims
|
||||
iconCh := make(chan IconRow, cfg.Concurrency*2)
|
||||
go func() {
|
||||
defer close(iconCh)
|
||||
claimed := 0
|
||||
for {
|
||||
if cfg.Limit > 0 && claimed >= cfg.Limit {
|
||||
break
|
||||
}
|
||||
|
||||
batchLimit := cfg.BatchSize
|
||||
if cfg.Limit > 0 && claimed+batchLimit > cfg.Limit {
|
||||
batchLimit = cfg.Limit - claimed
|
||||
}
|
||||
|
||||
icons, err := claimBatch(ctx, pool, batchLimit)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to claim batch: %v", err)
|
||||
}
|
||||
if len(icons) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for _, icon := range icons {
|
||||
iconCh <- icon
|
||||
}
|
||||
claimed += len(icons)
|
||||
}
|
||||
}()
|
||||
|
||||
// Worker pool consumes from channel
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < cfg.Concurrency; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for icon := range iconCh {
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
stats.Panics.Add(1)
|
||||
stats.Processed.Add(1)
|
||||
logLine := fmt.Sprintf("PANIC: id=%d %s %v", icon.ID, icon.URL, r)
|
||||
fmt.Println(logLine)
|
||||
if logWriter != nil {
|
||||
logWriter.Write(logLine, true)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
result := processIcon(ctx, icon, cfg)
|
||||
|
||||
// Log line
|
||||
logLine := formatLogLine(icon, result)
|
||||
fmt.Println(logLine)
|
||||
if logWriter != nil {
|
||||
logWriter.Write(logLine, result.Err != "")
|
||||
}
|
||||
|
||||
// Update DB
|
||||
if !cfg.DryRun {
|
||||
if err := updateIcon(ctx, pool, icon.ID, result); err != nil {
|
||||
stats.DBErrors.Add(1)
|
||||
errLine := fmt.Sprintf("DB_ERROR: id=%d %v", icon.ID, err)
|
||||
fmt.Println(errLine)
|
||||
if logWriter != nil {
|
||||
logWriter.Write(errLine, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update stats
|
||||
stats.Processed.Add(1)
|
||||
if result.Err == "" {
|
||||
stats.Completed.Add(1)
|
||||
stats.BytesDown.Add(int64(result.FileSize))
|
||||
if result.Dedup {
|
||||
stats.DedupHits.Add(1)
|
||||
}
|
||||
} else {
|
||||
stats.Failed.Add(1)
|
||||
switch result.ErrType {
|
||||
case "dns":
|
||||
stats.DNSErrors.Add(1)
|
||||
case "timeout":
|
||||
stats.Timeouts.Add(1)
|
||||
case "http":
|
||||
stats.HTTPErrors.Add(1)
|
||||
case "invalid":
|
||||
stats.InvalidImg.Add(1)
|
||||
case "too_large":
|
||||
stats.TooLarge.Add(1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Summary
|
||||
duration := time.Since(stats.StartedAt)
|
||||
rate := float64(stats.Processed.Load()) / duration.Seconds()
|
||||
fmt.Printf("\n=== Summary ===\n")
|
||||
fmt.Printf("Duration: %s\n", duration.Round(time.Second))
|
||||
fmt.Printf("Processed: %d (%.0f/s)\n", stats.Processed.Load(), rate)
|
||||
fmt.Printf("Completed: %d\n", stats.Completed.Load())
|
||||
fmt.Printf("Failed: %d\n", stats.Failed.Load())
|
||||
fmt.Printf(" DNS errors: %d\n", stats.DNSErrors.Load())
|
||||
fmt.Printf(" Timeouts: %d\n", stats.Timeouts.Load())
|
||||
fmt.Printf(" HTTP errors: %d\n", stats.HTTPErrors.Load())
|
||||
fmt.Printf(" Invalid image: %d\n", stats.InvalidImg.Load())
|
||||
fmt.Printf(" Too large: %d\n", stats.TooLarge.Load())
|
||||
fmt.Printf("Dedup hits: %d\n", stats.DedupHits.Load())
|
||||
fmt.Printf("DB errors: %d\n", stats.DBErrors.Load())
|
||||
fmt.Printf("Panics: %d\n", stats.Panics.Load())
|
||||
fmt.Printf("Downloaded: %.1f MB\n", float64(stats.BytesDown.Load())/(1024*1024))
|
||||
|
||||
writeStats(stats)
|
||||
}
|
||||
64
pipeline/03_icon_download/s3.go
Normal file
64
pipeline/03_icon_download/s3.go
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
)
|
||||
|
||||
var (
|
||||
s3Client *s3.Client
|
||||
bucket string
|
||||
)
|
||||
|
||||
func initS3(bucketName string) error {
|
||||
cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion("us-east-1"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s3Client = s3.NewFromConfig(cfg)
|
||||
bucket = bucketName
|
||||
return nil
|
||||
}
|
||||
|
||||
// s3Exists checks if an object already exists in S3 (for dedup).
|
||||
func s3Exists(ctx context.Context, key string) (bool, error) {
|
||||
_, err := s3Client.HeadObject(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
var notFound *types.NotFound
|
||||
if errors.As(err, ¬Found) {
|
||||
return false, nil
|
||||
}
|
||||
// NoSuchKey error type
|
||||
var nsk *types.NoSuchKey
|
||||
if errors.As(err, &nsk) {
|
||||
return false, nil
|
||||
}
|
||||
// Some S3 errors return 404 as a generic error
|
||||
if ctx.Err() != nil {
|
||||
return false, ctx.Err()
|
||||
}
|
||||
// Treat other errors as "not found" to avoid blocking uploads
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// s3Upload uploads icon data to S3 with the given key.
|
||||
func s3Upload(ctx context.Context, key string, data []byte, contentType string) error {
|
||||
_, err := s3Client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(bucket),
|
||||
Key: aws.String(key),
|
||||
Body: bytes.NewReader(data),
|
||||
ContentType: aws.String(contentType),
|
||||
})
|
||||
return err
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue