everytab/pipeline/03_icon_download/main.go
2026-05-17 22:09:03 -04:00

241 lines
6.1 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Config struct {
DBUrl string
S3Bucket string
BatchSize int
Concurrency int
Limit int
Timeout time.Duration
MaxSize int64
DryRun bool
LogFile string
LogErrors bool
}
type Stats struct {
Processed atomic.Int64
Completed atomic.Int64
Failed atomic.Int64
DedupHits atomic.Int64
DNSErrors atomic.Int64
Timeouts atomic.Int64
HTTPErrors atomic.Int64
InvalidImg atomic.Int64
TooLarge atomic.Int64
DBErrors atomic.Int64
Panics atomic.Int64
BytesDown atomic.Int64
StartedAt time.Time
}
func main() {
cfg := Config{}
flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)")
flag.StringVar(&cfg.S3Bucket, "s3-bucket", "everytab-icons", "S3 bucket for icons")
flag.IntVar(&cfg.BatchSize, "batch-size", 200, "Rows to claim per batch")
flag.IntVar(&cfg.Concurrency, "concurrency", 200, "Number of concurrent goroutines")
flag.IntVar(&cfg.Limit, "limit", 0, "Max icons to process (0 = all)")
flag.DurationVar(&cfg.Timeout, "timeout", 10*time.Second, "HTTP request timeout")
flag.Int64Var(&cfg.MaxSize, "max-size", 512*1024, "Max icon download size in bytes")
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Download but don't upload to S3 or update DB")
flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file")
flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file")
flag.Parse()
if cfg.DBUrl == "" {
fmt.Println("Usage: icon_download --db DATABASE_URL [OPTIONS]")
flag.PrintDefaults()
os.Exit(1)
}
ctx := context.Background()
// Init S3
if err := initS3(cfg.S3Bucket); err != nil {
log.Fatalf("Failed to init S3: %v", err)
}
// Init DB pool
pool, err := pgxpool.New(ctx, cfg.DBUrl)
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer pool.Close()
// Count eligible icons
var total int64
err = pool.QueryRow(ctx, `
SELECT COUNT(*) FROM icons
WHERE scan_state = 'unscanned'
AND (source = 'favicon_ico'
OR rel_sizes IS NULL
OR rel_sizes IN ('16x16','32x32','48x48','64x64'))
`).Scan(&total)
if err != nil {
log.Fatalf("Failed to count icons: %v", err)
}
if cfg.Limit > 0 && int64(cfg.Limit) < total {
total = int64(cfg.Limit)
}
if total == 0 {
fmt.Println("No eligible unscanned icons found.")
return
}
fmt.Printf("=== Icon Downloader ===\n")
fmt.Printf("Eligible icons: %d\n", total)
fmt.Printf("Concurrency: %d\n", cfg.Concurrency)
fmt.Printf("Timeout: %s\n", cfg.Timeout)
fmt.Printf("Max size: %dKB\n", cfg.MaxSize/1024)
fmt.Printf("S3 bucket: %s\n", cfg.S3Bucket)
fmt.Printf("Dry run: %v\n\n", cfg.DryRun)
// Setup log file
var logWriter *LogWriter
if cfg.LogFile != "" {
logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors)
if err != nil {
log.Fatalf("Failed to open log file: %v", err)
}
defer logWriter.Close()
}
stats := &Stats{StartedAt: time.Now()}
// Feed icons into a channel so workers never starve waiting for batch claims
iconCh := make(chan IconRow, cfg.Concurrency*2)
go func() {
defer close(iconCh)
claimed := 0
for {
if cfg.Limit > 0 && claimed >= cfg.Limit {
break
}
batchLimit := cfg.BatchSize
if cfg.Limit > 0 && claimed+batchLimit > cfg.Limit {
batchLimit = cfg.Limit - claimed
}
icons, err := claimBatch(ctx, pool, batchLimit)
if err != nil {
log.Fatalf("Failed to claim batch: %v", err)
}
if len(icons) == 0 {
break
}
for _, icon := range icons {
iconCh <- icon
}
claimed += len(icons)
}
}()
// Worker pool consumes from channel
var wg sync.WaitGroup
for i := 0; i < cfg.Concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for icon := range iconCh {
func() {
defer func() {
if r := recover(); r != nil {
stats.Panics.Add(1)
stats.Processed.Add(1)
logLine := fmt.Sprintf("PANIC: id=%d %s %v", icon.ID, icon.URL, r)
fmt.Println(logLine)
if logWriter != nil {
logWriter.Write(logLine, true)
}
}
}()
result := processIcon(ctx, icon, cfg)
// Log line
logLine := formatLogLine(icon, result)
fmt.Println(logLine)
if logWriter != nil {
logWriter.Write(logLine, result.Err != "")
}
// Update DB
if !cfg.DryRun {
if err := updateIcon(ctx, pool, icon.ID, result); err != nil {
stats.DBErrors.Add(1)
errLine := fmt.Sprintf("DB_ERROR: id=%d %v", icon.ID, err)
fmt.Println(errLine)
if logWriter != nil {
logWriter.Write(errLine, true)
}
}
}
// Update stats
stats.Processed.Add(1)
if result.Err == "" {
stats.Completed.Add(1)
stats.BytesDown.Add(int64(result.FileSize))
if result.Dedup {
stats.DedupHits.Add(1)
}
} else {
stats.Failed.Add(1)
switch result.ErrType {
case "dns":
stats.DNSErrors.Add(1)
case "timeout":
stats.Timeouts.Add(1)
case "http":
stats.HTTPErrors.Add(1)
case "invalid":
stats.InvalidImg.Add(1)
case "too_large":
stats.TooLarge.Add(1)
}
}
}()
}
}()
}
wg.Wait()
// Summary
duration := time.Since(stats.StartedAt)
rate := float64(stats.Processed.Load()) / duration.Seconds()
fmt.Printf("\n=== Summary ===\n")
fmt.Printf("Duration: %s\n", duration.Round(time.Second))
fmt.Printf("Processed: %d (%.0f/s)\n", stats.Processed.Load(), rate)
fmt.Printf("Completed: %d\n", stats.Completed.Load())
fmt.Printf("Failed: %d\n", stats.Failed.Load())
fmt.Printf(" DNS errors: %d\n", stats.DNSErrors.Load())
fmt.Printf(" Timeouts: %d\n", stats.Timeouts.Load())
fmt.Printf(" HTTP errors: %d\n", stats.HTTPErrors.Load())
fmt.Printf(" Invalid image: %d\n", stats.InvalidImg.Load())
fmt.Printf(" Too large: %d\n", stats.TooLarge.Load())
fmt.Printf("Dedup hits: %d\n", stats.DedupHits.Load())
fmt.Printf("DB errors: %d\n", stats.DBErrors.Load())
fmt.Printf("Panics: %d\n", stats.Panics.Load())
fmt.Printf("Downloaded: %.1f MB\n", float64(stats.BytesDown.Load())/(1024*1024))
writeStats(stats)
}