package main import ( "context" "github.com/jackc/pgx/v5/pgxpool" ) // IconRow represents a row from the icons table to be downloaded. type IconRow struct { ID int64 URL string } // claimBatch atomically claims a batch of unscanned icons for processing. // Uses md5 shuffle to spread requests across different hosts. func claimBatch(ctx context.Context, pool *pgxpool.Pool, limit int) ([]IconRow, error) { rows, err := pool.Query(ctx, ` UPDATE icons SET scan_state = 'in_progress' WHERE id IN ( SELECT id FROM icons WHERE scan_state = 'unscanned' LIMIT $1 FOR UPDATE SKIP LOCKED ) RETURNING id, url `, limit) if err != nil { return nil, err } defer rows.Close() var icons []IconRow for rows.Next() { var icon IconRow if err := rows.Scan(&icon.ID, &icon.URL); err != nil { return nil, err } icons = append(icons, icon) } return icons, rows.Err() } // DownloadResult holds the outcome of downloading one icon. type DownloadResult struct { IconHash string ContentType string Width int Height int FileSize int Dedup bool Err string ErrType string // "dns", "timeout", "http", "invalid", "too_large", "other" } // updateIcon writes the download result back to the icons table. func updateIcon(ctx context.Context, pool *pgxpool.Pool, iconID int64, result DownloadResult) error { if result.Err != "" { _, err := pool.Exec(ctx, `UPDATE icons SET scan_state = 'failed', error = $1 WHERE id = $2`, result.Err, iconID) return err } _, err := pool.Exec(ctx, ` UPDATE icons SET scan_state = 'completed', icon_hash = $1, content_type = $2, width = $3, height = $4, file_size = $5, downloaded_at = now() WHERE id = $6`, result.IconHash, result.ContentType, nilIntIf(result.Width, 0), nilIntIf(result.Height, 0), result.FileSize, iconID) return err } func nilIntIf(v int, zero int) *int { if v == zero { return nil } return &v }