diff --git a/pipeline/05_bundle_gen/main.go b/pipeline/05_bundle_gen/main.go index 21820ac..0f39643 100644 --- a/pipeline/05_bundle_gen/main.go +++ b/pipeline/05_bundle_gen/main.go @@ -121,15 +121,15 @@ func main() { data []byte } - hostCh := make(chan HostRow, 6000) - entryCh := make(chan BundleEntry, 6000) + hostCh := make(chan HostRow, 50000) + entryCh := make(chan BundleEntry, 50000) uploadCh := make(chan bundleJob, cfg.Uploaders*2) // Stage 1: DB fetcher — continuously fetches pages into hostCh go func() { defer close(hostCh) var lastRandom float64 = -1 - pageSize := cfg.EntriesPerBundle * 50 + pageSize := 50000 fetched := 0 for { limit := pageSize @@ -142,6 +142,7 @@ func main() { limit = remaining } } + fetchStart := time.Now() hosts, err := fetchHostsPage(ctx, pool, lastRandom, limit) if err != nil { log.Fatalf("Failed to fetch hosts: %v", err) @@ -150,6 +151,8 @@ func main() { break } lastRandom = hosts[len(hosts)-1].RandomOrder + fmt.Printf("[fetcher] %d hosts in %dms (hostCh: %d/%d)\n", + len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh)) for _, h := range hosts { hostCh <- h }