diff --git a/pipeline/04_best_icon/select.sql b/pipeline/04_best_icon/select.sql index ac1dda6..fd0b514 100644 --- a/pipeline/04_best_icon/select.sql +++ b/pipeline/04_best_icon/select.sql @@ -21,7 +21,7 @@ SET work_mem = '2GB'; \timing on CREATE TEMP TABLE best_icons AS -SELECT DISTINCT ON (host_id) host_id, icon_hash +SELECT DISTINCT ON (host_id) host_id, icon_hash, downloaded_at FROM icons WHERE scan_state = 'completed' AND icon_hash IS NOT NULL @@ -50,7 +50,7 @@ ORDER BY host_id, \echo 'Step 2: Updating hosts...' -UPDATE hosts h SET best_icon_hash = b.icon_hash +UPDATE hosts h SET best_icon_hash = b.icon_hash, icon_downloaded_at = b.downloaded_at FROM best_icons b WHERE h.id = b.host_id; \timing off diff --git a/pipeline/05_bundle_gen/db.go b/pipeline/05_bundle_gen/db.go index fa7ba3b..6ec9d11 100644 --- a/pipeline/05_bundle_gen/db.go +++ b/pipeline/05_bundle_gen/db.go @@ -2,30 +2,56 @@ package main import ( "context" + "time" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) type HostRow struct { - ID int64 - Hostname string - Protocol string - HtmlTitle string - IframeAllowed bool - BestIconHash string + ID int64 + Hostname string + Protocol string + HtmlTitle string + IframeAllowed bool + BestIconHash string + IconDownloadedAt *time.Time } -// fetchHostsPage gets a page of hosts with titles, ordered by id for disk locality. -// Icons were downloaded roughly in host-ID order, so reading by ID approximates -// the physical write order on disk — improves EBS readahead cache hits. -func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]HostRow, error) { - rows, err := pool.Query(ctx, ` - SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, '') - FROM hosts - WHERE html_title IS NOT NULL AND id > $1 - ORDER BY id - LIMIT $2 - `, lastID, limit) +// fetchHostsPage gets a page of hosts with titles, ordered by icon_downloaded_at for disk locality. +// Icons written to disk at similar times are physically adjacent on the EBS volume — +// reading in write order maximizes OS readahead cache hits. +func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastTime *time.Time, lastID int64, limit int) ([]HostRow, error) { + // Two-phase: first hosts with icons (ordered by download time), then hosts without + var rows pgx.Rows + var err error + if lastTime != nil { + rows, err = pool.Query(ctx, ` + SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, ''), icon_downloaded_at + FROM hosts + WHERE html_title IS NOT NULL AND icon_downloaded_at > $1 + ORDER BY icon_downloaded_at + LIMIT $2 + `, lastTime, limit) + } else if lastID > 0 { + // No more timestamped hosts — fetch remaining (no icon) by id + rows, err = pool.Query(ctx, ` + SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), '', NULL::timestamptz + FROM hosts + WHERE html_title IS NOT NULL AND icon_downloaded_at IS NULL AND id > $1 + ORDER BY id + LIMIT $2 + `, lastID, limit) + } else { + // Start: fetch timestamped hosts first + rows, err = pool.Query(ctx, ` + SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, ''), icon_downloaded_at + FROM hosts + WHERE html_title IS NOT NULL AND icon_downloaded_at IS NOT NULL + ORDER BY icon_downloaded_at + LIMIT $1 + `, limit) + } if err != nil { return nil, err } @@ -34,7 +60,7 @@ func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit var hosts []HostRow for rows.Next() { var h HostRow - if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash); err != nil { + if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash, &h.IconDownloadedAt); err != nil { return nil, err } hosts = append(hosts, h) diff --git a/pipeline/05_bundle_gen/main.go b/pipeline/05_bundle_gen/main.go index bd2ad8b..b363fdf 100644 --- a/pipeline/05_bundle_gen/main.go +++ b/pipeline/05_bundle_gen/main.go @@ -128,7 +128,9 @@ func main() { // Stage 1: DB fetcher — continuously fetches pages into hostCh go func() { defer close(hostCh) + var lastTime *time.Time var lastID int64 + iconPhase := true // first: hosts with icons (by download time), then: hosts without pageSize := 50000 fetched := 0 for { @@ -143,14 +145,26 @@ func main() { } } fetchStart := time.Now() - hosts, err := fetchHostsPage(ctx, pool, lastID, limit) + hosts, err := fetchHostsPage(ctx, pool, lastTime, lastID, limit) if err != nil { log.Fatalf("Failed to fetch hosts: %v", err) } if len(hosts) == 0 { + if iconPhase { + // Switch to no-icon hosts + iconPhase = false + lastTime = nil + lastID = 0 + continue + } break } - lastID = hosts[len(hosts)-1].ID + last := hosts[len(hosts)-1] + if last.IconDownloadedAt != nil { + lastTime = last.IconDownloadedAt + } else { + lastID = last.ID + } fmt.Printf("[fetcher] %d hosts in %dms (hostCh: %d/%d)\n", len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh)) for _, h := range hosts {