From 8ceb31bcbb485b7dc1dcbb887fd2210a1234b6f7 Mon Sep 17 00:00:00 2001 From: Joe Lothan Date: Mon, 25 May 2026 16:21:49 -0400 Subject: [PATCH] increase bundlegen producer host amount to ensure workers aren't starved --- pipeline/05_bundle_gen/main.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pipeline/05_bundle_gen/main.go b/pipeline/05_bundle_gen/main.go index 21820ac..0f39643 100644 --- a/pipeline/05_bundle_gen/main.go +++ b/pipeline/05_bundle_gen/main.go @@ -121,15 +121,15 @@ func main() { data []byte } - hostCh := make(chan HostRow, 6000) - entryCh := make(chan BundleEntry, 6000) + hostCh := make(chan HostRow, 50000) + entryCh := make(chan BundleEntry, 50000) uploadCh := make(chan bundleJob, cfg.Uploaders*2) // Stage 1: DB fetcher — continuously fetches pages into hostCh go func() { defer close(hostCh) var lastRandom float64 = -1 - pageSize := cfg.EntriesPerBundle * 50 + pageSize := 50000 fetched := 0 for { limit := pageSize @@ -142,6 +142,7 @@ func main() { limit = remaining } } + fetchStart := time.Now() hosts, err := fetchHostsPage(ctx, pool, lastRandom, limit) if err != nil { log.Fatalf("Failed to fetch hosts: %v", err) @@ -150,6 +151,8 @@ func main() { break } lastRandom = hosts[len(hosts)-1].RandomOrder + fmt.Printf("[fetcher] %d hosts in %dms (hostCh: %d/%d)\n", + len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh)) for _, h := range hosts { hostCh <- h }