diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 8912e2a..e362553 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -220,7 +220,11 @@ WHERE url_path = '/' **Prerequisite:** Unbound running as system resolver on the EC2 instance. -**Input:** `icons` table rows where `scan_state = 'unscanned'` +**Input:** `icons` table rows where `scan_state = 'unscanned'` and icon is worth downloading: +- All `favicon_ico` entries (always attempt) +- `link_rel` entries with no declared size (unknown, could be useful) +- `link_rel` entries with declared size ≤64x64 +- Skip `link_rel` entries with declared size >64x64 (192x192, 180x180, 152x152, etc. — apple-touch-icon bloat we won't use at tab scale) **Process:** 1. Claim batch (randomized to spread load across hosts): diff --git a/PLAN.md b/PLAN.md index c871648..7a57658 100644 --- a/PLAN.md +++ b/PLAN.md @@ -150,78 +150,32 @@ Binary: `pipeline/02_warc_parse/` (5 files: main.go, warc.go, parser.go, process --- -## Phase 3: Icon Download (Stage 3) +## Phase 3: Icon Download (Stage 3) [COMPLETED] -### Step 3.1: Icon Downloader Go Program +### Steps 3.1-3.3 [COMPLETED] -``` -pipeline/03_icon_download/ -├── main.go # Entry point, CLI flags, worker pool -├── downloader.go # HTTP fetch with timeouts, size limits -├── decoder.go # Image validation + dimension extraction -├── s3.go # Upload to everytab-icons bucket -└── db.go # Claim work, update results -``` +Binary: `pipeline/03_icon_download/` (6 files: main.go, download.go, image.go, s3.go, db.go, log.go) -CLI flags: -- `--db` connection string -- `--s3-bucket` (default `everytab-icons`) -- `--concurrency` (default 1000, tunable) -- `--batch-size` (default 500) -- `--timeout` (default 10s) -- `--max-size` (default 512KB) -- `--dry-run` (fetch and validate but don't upload to S3 or update DB) -- `--limit` (process at most N icons) +**Architecture:** +- Channel-based work distribution: producer goroutine claims batches, N worker goroutines consume from buffered channel (no worker starvation) +- Shared `http.Transport` for connection pooling / TLS session reuse +- Content-addressed S3 storage (SHA-256 hash as key, dedup via HeadObject before upload) +- Magic byte validation (PNG, GIF, JPEG, ICO, BMP, WebP, SVG) +- ICO directory parsing for dimensions (picks largest ≤64x64) +- Filters to eligible icons only: `favicon_ico` + link_rel with no declared size or ≤64x64 +- md5(id) shuffle in claim query to spread requests across hosts +- Panic recovery per worker, DB errors tracked and logged -Dependencies: -- `github.com/jackc/pgx/v5` — Postgres -- `github.com/aws/aws-sdk-go-v2` — S3 uploads -- `github.com/schollz/progressbar/v3` — Progress bar -- Standard library `image` + `image/png`, `image/gif`, `image/jpeg` for decoding dimensions -- `golang.org/x/image/webp` — WebP decoding -- ICO parsing: write a minimal decoder (ICO format is simple — 6-byte header + directory entries pointing to BMP/PNG data) or find a maintained library at implementation time +**CLI:** `./icon_download --db URL [--s3-bucket NAME] [--concurrency N] [--batch-size N] [--timeout D] [--max-size N] [--limit N] [--dry-run] [--log-file PATH] [--log-errors-only]` -### Step 3.2: Work Claiming + Download Logic - -Implement: -1. Claim batch with randomized order (md5 shuffle, FOR UPDATE SKIP LOCKED) -2. For each icon URL: - - HTTP GET with timeouts (5s dial, 10s total) - - Read up to max-size bytes, abort if exceeded - - Validate magic bytes (PNG: `\x89PNG`, GIF: `GIF8`, ICO: `\x00\x00\x01\x00`, etc.) - - Determine actual content type from magic bytes (don't trust HTTP Content-Type) - - Decode dimensions: - - PNG/GIF/JPEG/WebP/BMP: read image header (Go `image.DecodeConfig`) - - ICO: parse directory entries, find largest at standard size ≤64x64 - - SVG: set width=NULL, height=NULL - - Compute SHA-256 of full content - - Check if S3 key exists (HEAD request); if yes, skip upload (dedup) - - Upload to S3 if new -3. Update icons row with results (or error) - -**Dry-run test:** `--limit 200 --dry-run` — prints what it would do for 200 icons. Check URLs, detected types, dimensions. - -**Done when:** Can download, validate, and upload icons for a small batch. - -### Step 3.3: Full 100K Icon Run - -Run against all icons in the database (likely 150K-300K icon rows for 100K hosts). - -Monitor: -- icons/sec throughput -- Error breakdown (DNS failures, timeouts, HTTP errors, invalid images) -- S3 dedup hit rate -- Memory usage (adjust concurrency if needed) - -**Validation:** -- `SELECT scan_state, COUNT(*) FROM icons GROUP BY scan_state;` — expect mostly completed, some failed -- `SELECT error, COUNT(*) FROM icons WHERE scan_state = 'failed' GROUP BY error ORDER BY count DESC LIMIT 20;` — understand failure modes -- `aws s3 ls s3://everytab-icons/ | wc -l` — confirm icons in S3 -- Spot-check: download a few icons from S3, open them, verify they're valid images - -**Stats:** `stats/03_icon_download.json` - -**Done when:** Icon download complete for 100K dev set, error rate understood, S3 populated. +**Result (100K hosts, ~224K eligible icons):** +- Duration: 10m36s (351 icons/sec) +- Completed: 156,214 (70%) +- Failed: 67,459 (30% — mostly HTTP 404s from stale crawl data) +- Dedup hits: 55,771 (25% — shared Wix/WordPress/hosted platform favicons) +- Downloaded: 1.9GB +- DNS errors: 1,668 | Timeouts: 2,129 | HTTP errors: 47,565 | Invalid: 11,803 | Too large: 777 +- DB errors: 0 | Panics: 0 --- @@ -650,6 +604,22 @@ On completion, each program prints a summary line and writes its stats JSON (wit - Increasing concurrency from 100 to 500 didn't improve throughput (~300 hosts/sec either way). The bottleneck is likely Postgres write latency or S3 per-connection bandwidth, not parallelism. Could investigate batch inserts for the full run. - Progress bars and per-item log lines don't mix well in terminals. Pick one or write progress to a separate channel (file, stderr). +### Phase 3 — Completed 2026-05-18 + +**Changes from original plan:** +- Filtered eligible icons before downloading: skip link_rel icons with declared size >64x64 (apple-touch-icon bloat). Reduced download count from ~302K to ~224K. +- Channel-based worker pool instead of semaphore pattern — producer goroutine feeds work channel, N workers consume. No starvation between batch claims. +- Shared http.Transport for connection pooling (marginal benefit since hosts are unique, but reduces GC pressure). +- No progress bar — same approach as Phase 2 (log lines + summary). +- User-Agent set to `EveryTabBot/1.0` with link to `everytab.site/bot` for bot identification. + +**Lessons learned:** +- 70% icon download success rate is expected — most failures are 404s from domains/pages that changed since the crawl. This is acceptable loss. +- 25% dedup rate — many hosted platforms (Wix, WordPress.com, Squarespace) serve identical default favicons. Content-addressed S3 storage handles this efficiently. +- `data.commoncrawl.org` rate-limits HTTPS but S3 does not — same pattern as WARC parsing. Use S3 API for all CC access. +- Favicon download is I/O bound (network latency to diverse hosts worldwide). Concurrency helps up to a point, then the long tail of slow/dead servers dominates. 351 icons/sec at 200 concurrency. +- Invalid image detection (magic bytes) catches ~5% of "successful" downloads that are actually HTML error pages served at `/favicon.ico`. + --- ## Future Improvements @@ -659,3 +629,5 @@ On completion, each program prints a summary line and writes its stats JSON (wit - **WARC parser: investigate throughput ceiling** — 300 hosts/sec at both 100 and 500 concurrency suggests a bottleneck. Profile to determine if it's S3 response latency, Postgres writes, or something else. For the full 30M run this determines wall-clock time (~28 hours at current rate). - **CC-Index query: c5.2xlarge for full run** — 8GB is tight with 6.4GB usage + swap. 16GB instance for the 30M-host full run. - **Encoding: investigate remaining garbled titles** — Some titles still show `�` in output (e.g., `BERGSTRANDS BAGERI �...`). These are pages that lie about their encoding. Could try more aggressive charset detection heuristics. +- **Icon download: retry transient failures** — DNS and timeout failures could benefit from a single retry. Would recover a small percentage of icons. +- **Icon download: download large link_rel icons** — Currently skipping declared sizes >64x64. Re-run with broader filter for future high-res projects. diff --git a/go.mod b/go.mod index bc21774..1e64f56 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/nlnwa/whatwg-url v0.6.2 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/schollz/progressbar/v3 v3.19.0 // indirect + golang.org/x/image v0.40.0 // indirect golang.org/x/net v0.54.0 // indirect golang.org/x/sync v0.20.0 // indirect golang.org/x/sys v0.44.0 // indirect diff --git a/go.sum b/go.sum index 3454a95..c0b1ae2 100644 --- a/go.sum +++ b/go.sum @@ -72,6 +72,8 @@ golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliY golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= +golang.org/x/image v0.40.0 h1:Tw4GyDXMo+daZN1znreBRC3VayR1aLFUyUEOLUdW1a8= +golang.org/x/image v0.40.0/go.mod h1:uIc348UZMSvS5Z65CVZ7iDPaNobNFEPeJ4kbqTOszmA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= diff --git a/pipeline/03_icon_download/db.go b/pipeline/03_icon_download/db.go new file mode 100644 index 0000000..2e142dd --- /dev/null +++ b/pipeline/03_icon_download/db.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// IconRow represents a row from the icons table to be downloaded. +type IconRow struct { + ID int64 + URL string +} + +// claimBatch atomically claims a batch of unscanned icons for processing. +// Uses md5 shuffle to spread requests across different hosts. +func claimBatch(ctx context.Context, pool *pgxpool.Pool, limit int) ([]IconRow, error) { + rows, err := pool.Query(ctx, ` + UPDATE icons SET scan_state = 'in_progress' + WHERE id IN ( + SELECT id FROM icons + WHERE scan_state = 'unscanned' + AND (source = 'favicon_ico' + OR rel_sizes IS NULL + OR rel_sizes IN ('16x16','32x32','48x48','64x64')) + ORDER BY md5(id::text) + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + RETURNING id, url + `, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var icons []IconRow + for rows.Next() { + var icon IconRow + if err := rows.Scan(&icon.ID, &icon.URL); err != nil { + return nil, err + } + icons = append(icons, icon) + } + return icons, rows.Err() +} + +// DownloadResult holds the outcome of downloading one icon. +type DownloadResult struct { + S3Key string + ContentType string + Width int + Height int + FileSize int + Dedup bool + Err string + ErrType string // "dns", "timeout", "http", "invalid", "too_large", "other" +} + +// updateIcon writes the download result back to the icons table. +func updateIcon(ctx context.Context, pool *pgxpool.Pool, iconID int64, result DownloadResult) error { + if result.Err != "" { + _, err := pool.Exec(ctx, + `UPDATE icons SET scan_state = 'failed', error = $1 WHERE id = $2`, + result.Err, iconID) + return err + } + + _, err := pool.Exec(ctx, ` + UPDATE icons SET + scan_state = 'completed', + s3_key = $1, + content_type = $2, + width = $3, + height = $4, + file_size = $5 + WHERE id = $6`, + result.S3Key, result.ContentType, + nilIntIf(result.Width, 0), nilIntIf(result.Height, 0), + result.FileSize, iconID) + return err +} + +func nilIntIf(v int, zero int) *int { + if v == zero { + return nil + } + return &v +} diff --git a/pipeline/03_icon_download/download.go b/pipeline/03_icon_download/download.go new file mode 100644 index 0000000..294bdbf --- /dev/null +++ b/pipeline/03_icon_download/download.go @@ -0,0 +1,157 @@ +package main + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "net" + "net/http" + "strings" + "time" +) + +// processIcon downloads, validates, and uploads a single icon. +func processIcon(ctx context.Context, icon IconRow, cfg Config) DownloadResult { + // Download + data, contentType, err := downloadIcon(icon.URL, cfg.Timeout, cfg.MaxSize) + if err != nil { + errType := classifyError(err) + return DownloadResult{Err: err.Error(), ErrType: errType} + } + + // Validate it's an image + detectedType := detectImageType(data) + if detectedType == "" { + return DownloadResult{Err: "not a valid image", ErrType: "invalid"} + } + + // Use detected type over HTTP Content-Type (more reliable) + if contentType == "" || contentType == "application/octet-stream" { + contentType = detectedType + } + + // Get dimensions + width, height := getImageDimensions(data, detectedType) + + // Compute SHA-256 for content-addressed storage + hash := sha256.Sum256(data) + s3Key := hex.EncodeToString(hash[:]) + + // Upload to S3 (skip if already exists — dedup) + dedup := false + if !cfg.DryRun { + exists, err := s3Exists(ctx, s3Key) + if err == nil && exists { + dedup = true + } else { + if err := s3Upload(ctx, s3Key, data, contentType); err != nil { + return DownloadResult{Err: fmt.Sprintf("s3 upload: %v", err), ErrType: "other"} + } + } + } + + return DownloadResult{ + S3Key: s3Key, + ContentType: contentType, + Width: width, + Height: height, + FileSize: len(data), + Dedup: dedup, + } +} + +// httpTransport is shared across all goroutines for connection pooling and TLS session reuse. +var httpTransport = &http.Transport{ + MaxIdleConns: 1000, + MaxIdleConnsPerHost: 2, + IdleConnTimeout: 30 * time.Second, + DisableKeepAlives: false, + DialContext: (&net.Dialer{ + Timeout: 5 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 5 * time.Second, +} + +// downloadIcon fetches an icon URL with timeouts and size limits. +func downloadIcon(url string, timeout time.Duration, maxSize int64) ([]byte, string, error) { + client := &http.Client{ + Timeout: timeout, + Transport: httpTransport, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + if len(via) >= 3 { + return fmt.Errorf("too many redirects") + } + return nil + }, + } + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, "", fmt.Errorf("bad url: %w", err) + } + req.Header.Set("User-Agent", "Mozilla/5.0 (compatible; EveryTabBot/1.0; +https://everytab.site/bot)") + + resp, err := client.Do(req) + if err != nil { + return nil, "", err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("http %d", resp.StatusCode) + } + + // Read with size limit + limited := io.LimitReader(resp.Body, maxSize+1) + data, err := io.ReadAll(limited) + if err != nil { + return nil, "", fmt.Errorf("read: %w", err) + } + if int64(len(data)) > maxSize { + return nil, "", fmt.Errorf("exceeds %dKB", maxSize/1024) + } + + contentType := resp.Header.Get("Content-Type") + // Strip charset suffix + if idx := strings.Index(contentType, ";"); idx != -1 { + contentType = strings.TrimSpace(contentType[:idx]) + } + + return data, contentType, nil +} + +// classifyError categorizes a download error for stats. +func classifyError(err error) string { + msg := err.Error() + + // DNS errors + if _, ok := err.(*net.DNSError); ok { + return "dns" + } + if strings.Contains(msg, "no such host") || strings.Contains(msg, "dns") { + return "dns" + } + + // Timeouts + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + return "timeout" + } + if strings.Contains(msg, "timeout") || strings.Contains(msg, "deadline") { + return "timeout" + } + + // HTTP errors + if strings.Contains(msg, "http ") { + return "http" + } + + // Too large + if strings.Contains(msg, "exceeds") { + return "too_large" + } + + return "other" +} diff --git a/pipeline/03_icon_download/image.go b/pipeline/03_icon_download/image.go new file mode 100644 index 0000000..7ea5f8a --- /dev/null +++ b/pipeline/03_icon_download/image.go @@ -0,0 +1,139 @@ +package main + +import ( + "bytes" + "encoding/binary" + "image" + _ "image/gif" + _ "image/jpeg" + _ "image/png" + + _ "golang.org/x/image/webp" +) + +// detectImageType checks magic bytes to determine the actual image format. +// Returns empty string if not a recognized image format. +func detectImageType(data []byte) string { + if len(data) < 4 { + return "" + } + + // PNG: 89 50 4E 47 + if data[0] == 0x89 && data[1] == 'P' && data[2] == 'N' && data[3] == 'G' { + return "image/png" + } + + // GIF: GIF87a or GIF89a + if data[0] == 'G' && data[1] == 'I' && data[2] == 'F' { + return "image/gif" + } + + // JPEG: FF D8 FF + if data[0] == 0xFF && data[1] == 0xD8 && data[2] == 0xFF { + return "image/jpeg" + } + + // ICO: 00 00 01 00 + if data[0] == 0x00 && data[1] == 0x00 && data[2] == 0x01 && data[3] == 0x00 { + return "image/x-icon" + } + + // BMP: BM + if data[0] == 'B' && data[1] == 'M' { + return "image/bmp" + } + + // WebP: RIFF....WEBP + if len(data) >= 12 && string(data[0:4]) == "RIFF" && string(data[8:12]) == "WEBP" { + return "image/webp" + } + + // SVG: look for 5 { + header := string(data[:min(256, len(data))]) + if bytes.Contains([]byte(header), []byte(" bestW*bestH { + bestW = w + bestH = h + } + } + + // If nothing ≤64, just report the largest + if bestW == 0 { + for i := 0; i < numImages; i++ { + offset := 6 + i*16 + w := int(data[offset]) + h := int(data[offset+1]) + if w == 0 { + w = 256 + } + if h == 0 { + h = 256 + } + if w*h > bestW*bestH { + bestW = w + bestH = h + } + } + } + + return bestW, bestH +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/pipeline/03_icon_download/log.go b/pipeline/03_icon_download/log.go new file mode 100644 index 0000000..7145dc7 --- /dev/null +++ b/pipeline/03_icon_download/log.go @@ -0,0 +1,92 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "sync" + "time" +) + +type LogWriter struct { + file *os.File + mu sync.Mutex + errorsOnly bool +} + +func NewLogWriter(path string, errorsOnly bool) (*LogWriter, error) { + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return nil, err + } + return &LogWriter{file: f, errorsOnly: errorsOnly}, nil +} + +func (lw *LogWriter) Write(line string, isError bool) { + if lw.errorsOnly && !isError { + return + } + lw.mu.Lock() + defer lw.mu.Unlock() + fmt.Fprintln(lw.file, line) +} + +func (lw *LogWriter) Close() error { + return lw.file.Close() +} + +func formatLogLine(icon IconRow, result DownloadResult) string { + if result.Err != "" { + return fmt.Sprintf("icon: %s err:%s %s", icon.URL, result.ErrType, result.Err) + } + + dedup := "" + if result.Dedup { + dedup = " dedup" + } + + dims := "" + if result.Width > 0 && result.Height > 0 { + dims = fmt.Sprintf(" %dx%d", result.Width, result.Height) + } + + return fmt.Sprintf("icon: %s %s%s %.1fKB%s ok", + icon.URL, result.ContentType, dims, + float64(result.FileSize)/1024, dedup) +} + +func writeStats(stats *Stats) { + finishedAt := time.Now() + duration := finishedAt.Sub(stats.StartedAt) + + data := map[string]interface{}{ + "started_at": stats.StartedAt.Format(time.RFC3339), + "finished_at": finishedAt.Format(time.RFC3339), + "duration_seconds": int(duration.Seconds()), + "processed": stats.Processed.Load(), + "completed": stats.Completed.Load(), + "failed": stats.Failed.Load(), + "failed_dns": stats.DNSErrors.Load(), + "failed_timeout": stats.Timeouts.Load(), + "failed_http": stats.HTTPErrors.Load(), + "failed_invalid": stats.InvalidImg.Load(), + "failed_too_large": stats.TooLarge.Load(), + "dedup_hits": stats.DedupHits.Load(), + "db_errors": stats.DBErrors.Load(), + "panics": stats.Panics.Load(), + "bytes_downloaded": stats.BytesDown.Load(), + } + + os.MkdirAll("stats", 0755) + f, err := os.Create("stats/03_icon_download.json") + if err != nil { + fmt.Printf("Failed to write stats: %v\n", err) + return + } + defer f.Close() + + enc := json.NewEncoder(f) + enc.SetIndent("", " ") + enc.Encode(data) + fmt.Println("Stats written to stats/03_icon_download.json") +} diff --git a/pipeline/03_icon_download/main.go b/pipeline/03_icon_download/main.go new file mode 100644 index 0000000..1f6f52f --- /dev/null +++ b/pipeline/03_icon_download/main.go @@ -0,0 +1,241 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +type Config struct { + DBUrl string + S3Bucket string + BatchSize int + Concurrency int + Limit int + Timeout time.Duration + MaxSize int64 + DryRun bool + LogFile string + LogErrors bool +} + +type Stats struct { + Processed atomic.Int64 + Completed atomic.Int64 + Failed atomic.Int64 + DedupHits atomic.Int64 + DNSErrors atomic.Int64 + Timeouts atomic.Int64 + HTTPErrors atomic.Int64 + InvalidImg atomic.Int64 + TooLarge atomic.Int64 + DBErrors atomic.Int64 + Panics atomic.Int64 + BytesDown atomic.Int64 + StartedAt time.Time +} + +func main() { + cfg := Config{} + flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)") + flag.StringVar(&cfg.S3Bucket, "s3-bucket", "everytab-icons", "S3 bucket for icons") + flag.IntVar(&cfg.BatchSize, "batch-size", 200, "Rows to claim per batch") + flag.IntVar(&cfg.Concurrency, "concurrency", 200, "Number of concurrent goroutines") + flag.IntVar(&cfg.Limit, "limit", 0, "Max icons to process (0 = all)") + flag.DurationVar(&cfg.Timeout, "timeout", 10*time.Second, "HTTP request timeout") + flag.Int64Var(&cfg.MaxSize, "max-size", 512*1024, "Max icon download size in bytes") + flag.BoolVar(&cfg.DryRun, "dry-run", false, "Download but don't upload to S3 or update DB") + flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file") + flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file") + flag.Parse() + + if cfg.DBUrl == "" { + fmt.Println("Usage: icon_download --db DATABASE_URL [OPTIONS]") + flag.PrintDefaults() + os.Exit(1) + } + + ctx := context.Background() + + // Init S3 + if err := initS3(cfg.S3Bucket); err != nil { + log.Fatalf("Failed to init S3: %v", err) + } + + // Init DB pool + pool, err := pgxpool.New(ctx, cfg.DBUrl) + if err != nil { + log.Fatalf("Failed to connect to database: %v", err) + } + defer pool.Close() + + // Count eligible icons + var total int64 + err = pool.QueryRow(ctx, ` + SELECT COUNT(*) FROM icons + WHERE scan_state = 'unscanned' + AND (source = 'favicon_ico' + OR rel_sizes IS NULL + OR rel_sizes IN ('16x16','32x32','48x48','64x64')) + `).Scan(&total) + if err != nil { + log.Fatalf("Failed to count icons: %v", err) + } + + if cfg.Limit > 0 && int64(cfg.Limit) < total { + total = int64(cfg.Limit) + } + + if total == 0 { + fmt.Println("No eligible unscanned icons found.") + return + } + + fmt.Printf("=== Icon Downloader ===\n") + fmt.Printf("Eligible icons: %d\n", total) + fmt.Printf("Concurrency: %d\n", cfg.Concurrency) + fmt.Printf("Timeout: %s\n", cfg.Timeout) + fmt.Printf("Max size: %dKB\n", cfg.MaxSize/1024) + fmt.Printf("S3 bucket: %s\n", cfg.S3Bucket) + fmt.Printf("Dry run: %v\n\n", cfg.DryRun) + + // Setup log file + var logWriter *LogWriter + if cfg.LogFile != "" { + logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors) + if err != nil { + log.Fatalf("Failed to open log file: %v", err) + } + defer logWriter.Close() + } + + stats := &Stats{StartedAt: time.Now()} + + // Feed icons into a channel so workers never starve waiting for batch claims + iconCh := make(chan IconRow, cfg.Concurrency*2) + go func() { + defer close(iconCh) + claimed := 0 + for { + if cfg.Limit > 0 && claimed >= cfg.Limit { + break + } + + batchLimit := cfg.BatchSize + if cfg.Limit > 0 && claimed+batchLimit > cfg.Limit { + batchLimit = cfg.Limit - claimed + } + + icons, err := claimBatch(ctx, pool, batchLimit) + if err != nil { + log.Fatalf("Failed to claim batch: %v", err) + } + if len(icons) == 0 { + break + } + + for _, icon := range icons { + iconCh <- icon + } + claimed += len(icons) + } + }() + + // Worker pool consumes from channel + var wg sync.WaitGroup + for i := 0; i < cfg.Concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for icon := range iconCh { + func() { + defer func() { + if r := recover(); r != nil { + stats.Panics.Add(1) + stats.Processed.Add(1) + logLine := fmt.Sprintf("PANIC: id=%d %s %v", icon.ID, icon.URL, r) + fmt.Println(logLine) + if logWriter != nil { + logWriter.Write(logLine, true) + } + } + }() + + result := processIcon(ctx, icon, cfg) + + // Log line + logLine := formatLogLine(icon, result) + fmt.Println(logLine) + if logWriter != nil { + logWriter.Write(logLine, result.Err != "") + } + + // Update DB + if !cfg.DryRun { + if err := updateIcon(ctx, pool, icon.ID, result); err != nil { + stats.DBErrors.Add(1) + errLine := fmt.Sprintf("DB_ERROR: id=%d %v", icon.ID, err) + fmt.Println(errLine) + if logWriter != nil { + logWriter.Write(errLine, true) + } + } + } + + // Update stats + stats.Processed.Add(1) + if result.Err == "" { + stats.Completed.Add(1) + stats.BytesDown.Add(int64(result.FileSize)) + if result.Dedup { + stats.DedupHits.Add(1) + } + } else { + stats.Failed.Add(1) + switch result.ErrType { + case "dns": + stats.DNSErrors.Add(1) + case "timeout": + stats.Timeouts.Add(1) + case "http": + stats.HTTPErrors.Add(1) + case "invalid": + stats.InvalidImg.Add(1) + case "too_large": + stats.TooLarge.Add(1) + } + } + }() + } + }() + } + + wg.Wait() + + // Summary + duration := time.Since(stats.StartedAt) + rate := float64(stats.Processed.Load()) / duration.Seconds() + fmt.Printf("\n=== Summary ===\n") + fmt.Printf("Duration: %s\n", duration.Round(time.Second)) + fmt.Printf("Processed: %d (%.0f/s)\n", stats.Processed.Load(), rate) + fmt.Printf("Completed: %d\n", stats.Completed.Load()) + fmt.Printf("Failed: %d\n", stats.Failed.Load()) + fmt.Printf(" DNS errors: %d\n", stats.DNSErrors.Load()) + fmt.Printf(" Timeouts: %d\n", stats.Timeouts.Load()) + fmt.Printf(" HTTP errors: %d\n", stats.HTTPErrors.Load()) + fmt.Printf(" Invalid image: %d\n", stats.InvalidImg.Load()) + fmt.Printf(" Too large: %d\n", stats.TooLarge.Load()) + fmt.Printf("Dedup hits: %d\n", stats.DedupHits.Load()) + fmt.Printf("DB errors: %d\n", stats.DBErrors.Load()) + fmt.Printf("Panics: %d\n", stats.Panics.Load()) + fmt.Printf("Downloaded: %.1f MB\n", float64(stats.BytesDown.Load())/(1024*1024)) + + writeStats(stats) +} diff --git a/pipeline/03_icon_download/s3.go b/pipeline/03_icon_download/s3.go new file mode 100644 index 0000000..c0a77b7 --- /dev/null +++ b/pipeline/03_icon_download/s3.go @@ -0,0 +1,64 @@ +package main + +import ( + "bytes" + "context" + "errors" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +var ( + s3Client *s3.Client + bucket string +) + +func initS3(bucketName string) error { + cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion("us-east-1")) + if err != nil { + return err + } + s3Client = s3.NewFromConfig(cfg) + bucket = bucketName + return nil +} + +// s3Exists checks if an object already exists in S3 (for dedup). +func s3Exists(ctx context.Context, key string) (bool, error) { + _, err := s3Client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + var notFound *types.NotFound + if errors.As(err, ¬Found) { + return false, nil + } + // NoSuchKey error type + var nsk *types.NoSuchKey + if errors.As(err, &nsk) { + return false, nil + } + // Some S3 errors return 404 as a generic error + if ctx.Err() != nil { + return false, ctx.Err() + } + // Treat other errors as "not found" to avoid blocking uploads + return false, nil + } + return true, nil +} + +// s3Upload uploads icon data to S3 with the given key. +func s3Upload(ctx context.Context, key string, data []byte, contentType string) error { + _, err := s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + ContentType: aws.String(contentType), + }) + return err +}