From 758ab3080bfd1caa7d610d9ee24386a7e6cfd5cc Mon Sep 17 00:00:00 2001 From: Joe Lothan Date: Tue, 26 May 2026 23:50:02 -0400 Subject: [PATCH] interleve no icon hosts and icons hosts for an even mix --- pipeline/05_bundle_gen/db.go | 54 ++++++++++++----- pipeline/05_bundle_gen/main.go | 102 ++++++++++++++++++++++++--------- 2 files changed, 116 insertions(+), 40 deletions(-) diff --git a/pipeline/05_bundle_gen/db.go b/pipeline/05_bundle_gen/db.go index ec07a85..f1aaf04 100644 --- a/pipeline/05_bundle_gen/db.go +++ b/pipeline/05_bundle_gen/db.go @@ -18,19 +18,47 @@ type HostRow struct { IconDownloadedAt *time.Time } -// fetchHostsPage gets a page of hosts with titles, ordered by icon_downloaded_at for disk locality. -// Icons written to disk at similar times are physically adjacent — reading in write order -// maximizes OS readahead cache hits. Hosts without icons come last (no disk reads needed). -// random_order is included for bundle bucket assignment (randomized bundles). -func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastDownloaded *time.Time, lastID int64, limit int) ([]HostRow, error) { - rows, err := pool.Query(ctx, ` - SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, ''), random_order, icon_downloaded_at - FROM hosts - WHERE html_title IS NOT NULL - AND (icon_downloaded_at, id) > ($1, $2) - ORDER BY icon_downloaded_at NULLS LAST, id - LIMIT $3 - `, lastDownloaded, lastID, limit) +// fetchHostsWithIcons gets hosts with icons, ordered by icon_downloaded_at for disk locality. +func fetchHostsWithIcons(ctx context.Context, pool *pgxpool.Pool, lastDownloaded *time.Time, lastID int64, limit int) ([]HostRow, error) { + var query string + var args []any + if lastDownloaded == nil && lastID == 0 { + query = `SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), best_icon_hash, random_order, icon_downloaded_at + FROM hosts WHERE html_title IS NOT NULL AND icon_downloaded_at IS NOT NULL + ORDER BY icon_downloaded_at, id LIMIT $1` + args = []any{limit} + } else { + query = `SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), best_icon_hash, random_order, icon_downloaded_at + FROM hosts WHERE html_title IS NOT NULL AND icon_downloaded_at IS NOT NULL + AND (icon_downloaded_at > $1 OR (icon_downloaded_at = $1 AND id > $2)) + ORDER BY icon_downloaded_at, id LIMIT $3` + args = []any{lastDownloaded, lastID, limit} + } + rows, err := pool.Query(ctx, query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var hosts []HostRow + for rows.Next() { + var h HostRow + if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash, &h.RandomOrder, &h.IconDownloadedAt); err != nil { + return nil, err + } + hosts = append(hosts, h) + } + return hosts, rows.Err() +} + +// fetchHostsWithoutIcons gets hosts without icons, ordered by id. +func fetchHostsWithoutIcons(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]HostRow, error) { + rows, err := pool.Query(ctx, ` + SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), '', random_order, NULL::timestamptz + FROM hosts + WHERE html_title IS NOT NULL AND icon_downloaded_at IS NULL AND id > $1 + ORDER BY id LIMIT $2 + `, lastID, limit) if err != nil { return nil, err } diff --git a/pipeline/05_bundle_gen/main.go b/pipeline/05_bundle_gen/main.go index b7cfffb..6a73c4e 100644 --- a/pipeline/05_bundle_gen/main.go +++ b/pipeline/05_bundle_gen/main.go @@ -132,41 +132,89 @@ func main() { entryCh := make(chan assemblerEntry, 50000) uploadCh := make(chan bundleJob, cfg.Uploaders*2) - // Stage 1: DB fetcher — continuously fetches pages into hostCh + // Stage 1: DB fetcher — interleaves hosts with icons (ordered by download time for + // disk locality) and hosts without icons (no disk reads needed), matching the natural + // ratio so bundles contain a representative mix. go func() { defer close(hostCh) + + // Calculate the ratio: for every iconPageSize icon-hosts, fetch noIconPageSize no-icon hosts + iconPageSize := 50000 + noIconRatio := float64(stats.HostsNoIcon) / float64(max(stats.HostsWithIcon, 1)) + noIconPageSize := int(float64(iconPageSize) * noIconRatio) + if noIconPageSize < 1000 { + noIconPageSize = 1000 + } + var lastDownloaded *time.Time - var lastID int64 - pageSize := 50000 + var lastIconID int64 + var lastNoIconID int64 + iconsDone := false + noIconsDone := false fetched := 0 - for { - limit := pageSize - if cfg.Limit > 0 { - remaining := cfg.Limit - fetched - if remaining <= 0 { - break + + for !iconsDone || !noIconsDone { + // Fetch icon-hosts (ordered by download time for disk locality) + if !iconsDone { + limit := iconPageSize + if cfg.Limit > 0 { + remaining := cfg.Limit - fetched + if remaining <= 0 { + break + } + if limit > remaining { + limit = remaining + } } - if limit > remaining { - limit = remaining + fetchStart := time.Now() + hosts, err := fetchHostsWithIcons(ctx, pool, lastDownloaded, lastIconID, limit) + if err != nil { + log.Fatalf("Failed to fetch icon hosts: %v", err) + } + if len(hosts) == 0 { + iconsDone = true + } else { + last := hosts[len(hosts)-1] + lastDownloaded = last.IconDownloadedAt + lastIconID = last.ID + fmt.Printf("[fetcher] %d icon hosts in %dms (hostCh: %d/%d)\n", + len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh)) + for _, h := range hosts { + hostCh <- h + } + fetched += len(hosts) } } - fetchStart := time.Now() - hosts, err := fetchHostsPage(ctx, pool, lastDownloaded, lastID, limit) - if err != nil { - log.Fatalf("Failed to fetch hosts: %v", err) + + // Fetch proportional no-icon hosts (interleaved for balanced bundles) + if !noIconsDone { + limit := noIconPageSize + if cfg.Limit > 0 { + remaining := cfg.Limit - fetched + if remaining <= 0 { + break + } + if limit > remaining { + limit = remaining + } + } + fetchStart := time.Now() + hosts, err := fetchHostsWithoutIcons(ctx, pool, lastNoIconID, limit) + if err != nil { + log.Fatalf("Failed to fetch no-icon hosts: %v", err) + } + if len(hosts) == 0 { + noIconsDone = true + } else { + lastNoIconID = hosts[len(hosts)-1].ID + fmt.Printf("[fetcher] %d no-icon hosts in %dms (hostCh: %d/%d)\n", + len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh)) + for _, h := range hosts { + hostCh <- h + } + fetched += len(hosts) + } } - if len(hosts) == 0 { - break - } - last := hosts[len(hosts)-1] - lastDownloaded = last.IconDownloadedAt - lastID = last.ID - fmt.Printf("[fetcher] %d hosts in %dms (hostCh: %d/%d)\n", - len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh)) - for _, h := range hosts { - hostCh <- h - } - fetched += len(hosts) } }()