package main import ( "context" "flag" "fmt" "log" "os" "sync" "sync/atomic" "time" "github.com/jackc/pgx/v5/pgxpool" ) type Config struct { DBUrl string IconsDir string BatchSize int Concurrency int Limit int Timeout time.Duration MaxSize int64 DryRun bool LogFile string LogErrors bool } type Stats struct { Processed atomic.Int64 Completed atomic.Int64 Failed atomic.Int64 DedupHits atomic.Int64 DNSErrors atomic.Int64 Timeouts atomic.Int64 HTTPErrors atomic.Int64 InvalidImg atomic.Int64 TooLarge atomic.Int64 DBErrors atomic.Int64 Panics atomic.Int64 BytesDown atomic.Int64 StartedAt time.Time } func main() { cfg := Config{} flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)") flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory to store downloaded icons") flag.IntVar(&cfg.BatchSize, "batch-size", 200, "Rows to claim per batch") flag.IntVar(&cfg.Concurrency, "concurrency", 200, "Number of concurrent goroutines") flag.IntVar(&cfg.Limit, "limit", 0, "Max icons to process (0 = all)") flag.DurationVar(&cfg.Timeout, "timeout", 10*time.Second, "HTTP request timeout") flag.Int64Var(&cfg.MaxSize, "max-size", 512*1024, "Max icon download size in bytes") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Download but don't write to disk or update DB") flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file") flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file") flag.Parse() if cfg.DBUrl == "" { fmt.Println("Usage: icon_download --db DATABASE_URL [OPTIONS]") flag.PrintDefaults() os.Exit(1) } ctx := context.Background() // Init storage if err := initStorage(cfg.IconsDir); err != nil { log.Fatalf("Failed to init storage: %v", err) } // Init DB pool pool, err := pgxpool.New(ctx, cfg.DBUrl) if err != nil { log.Fatalf("Failed to connect to database: %v", err) } defer pool.Close() // Count eligible icons var total int64 err = pool.QueryRow(ctx, ` SELECT COUNT(*) FROM icons WHERE scan_state = 'unscanned' AND (source = 'favicon_ico' OR rel_sizes IS NULL OR rel_sizes IN ('16x16','32x32','48x48','64x64')) `).Scan(&total) if err != nil { log.Fatalf("Failed to count icons: %v", err) } if cfg.Limit > 0 && int64(cfg.Limit) < total { total = int64(cfg.Limit) } if total == 0 { fmt.Println("No eligible unscanned icons found.") return } fmt.Printf("=== Icon Downloader ===\n") fmt.Printf("Eligible icons: %d\n", total) fmt.Printf("Concurrency: %d\n", cfg.Concurrency) fmt.Printf("Timeout: %s\n", cfg.Timeout) fmt.Printf("Max size: %dKB\n", cfg.MaxSize/1024) fmt.Printf("Icons dir: %s\n", cfg.IconsDir) fmt.Printf("Dry run: %v\n\n", cfg.DryRun) // Setup log file var logWriter *LogWriter if cfg.LogFile != "" { logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors) if err != nil { log.Fatalf("Failed to open log file: %v", err) } defer logWriter.Close() } stats := &Stats{StartedAt: time.Now()} // Feed icons into a channel so workers never starve waiting for batch claims iconCh := make(chan IconRow, cfg.Concurrency*2) go func() { defer close(iconCh) claimed := 0 for { if cfg.Limit > 0 && claimed >= cfg.Limit { break } batchLimit := cfg.BatchSize if cfg.Limit > 0 && claimed+batchLimit > cfg.Limit { batchLimit = cfg.Limit - claimed } icons, err := claimBatch(ctx, pool, batchLimit) if err != nil { log.Fatalf("Failed to claim batch: %v", err) } if len(icons) == 0 { break } for _, icon := range icons { iconCh <- icon } claimed += len(icons) } }() // Worker pool consumes from channel var wg sync.WaitGroup for i := 0; i < cfg.Concurrency; i++ { wg.Add(1) go func() { defer wg.Done() for icon := range iconCh { func() { defer func() { if r := recover(); r != nil { stats.Panics.Add(1) stats.Processed.Add(1) logLine := fmt.Sprintf("PANIC: id=%d %s %v", icon.ID, icon.URL, r) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, true) } } }() result := processIcon(icon, cfg) // Log line logLine := formatLogLine(icon, result) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, result.Err != "") } // Update DB if !cfg.DryRun { if err := updateIcon(ctx, pool, icon.ID, result); err != nil { stats.DBErrors.Add(1) errLine := fmt.Sprintf("DB_ERROR: id=%d %v", icon.ID, err) fmt.Println(errLine) if logWriter != nil { logWriter.Write(errLine, true) } } } // Update stats stats.Processed.Add(1) if result.Err == "" { stats.Completed.Add(1) stats.BytesDown.Add(int64(result.FileSize)) if result.Dedup { stats.DedupHits.Add(1) } } else { stats.Failed.Add(1) switch result.ErrType { case "dns": stats.DNSErrors.Add(1) case "timeout": stats.Timeouts.Add(1) case "http": stats.HTTPErrors.Add(1) case "invalid": stats.InvalidImg.Add(1) case "too_large": stats.TooLarge.Add(1) } } }() } }() } wg.Wait() // Summary duration := time.Since(stats.StartedAt) rate := float64(stats.Processed.Load()) / duration.Seconds() fmt.Printf("\n=== Summary ===\n") fmt.Printf("Duration: %s\n", duration.Round(time.Second)) fmt.Printf("Processed: %d (%.0f/s)\n", stats.Processed.Load(), rate) fmt.Printf("Completed: %d\n", stats.Completed.Load()) fmt.Printf("Failed: %d\n", stats.Failed.Load()) fmt.Printf(" DNS errors: %d\n", stats.DNSErrors.Load()) fmt.Printf(" Timeouts: %d\n", stats.Timeouts.Load()) fmt.Printf(" HTTP errors: %d\n", stats.HTTPErrors.Load()) fmt.Printf(" Invalid image: %d\n", stats.InvalidImg.Load()) fmt.Printf(" Too large: %d\n", stats.TooLarge.Load()) fmt.Printf("Dedup hits: %d\n", stats.DedupHits.Load()) fmt.Printf("DB errors: %d\n", stats.DBErrors.Load()) fmt.Printf("Panics: %d\n", stats.Panics.Load()) fmt.Printf("Downloaded: %.1f MB\n", float64(stats.BytesDown.Load())/(1024*1024)) writeStats(stats) }