diff --git a/pipeline/05_bundle_gen/db.go b/pipeline/05_bundle_gen/db.go index dc3d282..49b1885 100644 --- a/pipeline/05_bundle_gen/db.go +++ b/pipeline/05_bundle_gen/db.go @@ -7,57 +7,36 @@ import ( ) type HostRow struct { + ID int64 Hostname string Protocol string HtmlTitle string IframeAllowed bool BestIconS3Key string + RandomOrder float32 } -// fetchHosts gets all hosts with titles, randomized order. -func fetchHosts(ctx context.Context, pool *pgxpool.Pool, limit int) ([]HostRow, error) { - query := ` - SELECT hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_s3_key, '') +// fetchHostsPage gets a page of hosts with titles, ordered by random_order for shuffled bundles. +func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastRandom float32, limit int) ([]HostRow, error) { + rows, err := pool.Query(ctx, ` + SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_s3_key, ''), random_order FROM hosts - WHERE html_title IS NOT NULL - ORDER BY random() - ` - if limit > 0 { - query += " LIMIT $1" - } - - var rows interface{ Query(context.Context, string, ...interface{}) (interface{ Close(); Next() bool; Scan(...interface{}) error; Err() error }, error) } - _ = rows // unused, using pool directly - - var hosts []HostRow - - if limit > 0 { - pgRows, err := pool.Query(ctx, query, limit) - if err != nil { - return nil, err - } - defer pgRows.Close() - for pgRows.Next() { - var h HostRow - if err := pgRows.Scan(&h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconS3Key); err != nil { - return nil, err - } - hosts = append(hosts, h) - } - return hosts, pgRows.Err() - } - - pgRows, err := pool.Query(ctx, query) + WHERE html_title IS NOT NULL AND random_order > $1 + ORDER BY random_order + LIMIT $2 + `, lastRandom, limit) if err != nil { return nil, err } - defer pgRows.Close() - for pgRows.Next() { + defer rows.Close() + + var hosts []HostRow + for rows.Next() { var h HostRow - if err := pgRows.Scan(&h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconS3Key); err != nil { + if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconS3Key, &h.RandomOrder); err != nil { return nil, err } hosts = append(hosts, h) } - return hosts, pgRows.Err() + return hosts, rows.Err() } diff --git a/pipeline/05_bundle_gen/main.go b/pipeline/05_bundle_gen/main.go index 7e83541..db9133a 100644 --- a/pipeline/05_bundle_gen/main.go +++ b/pipeline/05_bundle_gen/main.go @@ -27,13 +27,13 @@ type Config struct { } type Stats struct { - TotalHosts int - HostsWithIcon int - HostsNoIcon int - BundlesCreated int - ConvertErrors atomic.Int64 - TotalBytes int64 - StartedAt time.Time + TotalHosts int + HostsWithIcon int + HostsNoIcon int + BundlesCreated int + ConvertErrors atomic.Int64 + TotalBytes int64 + StartedAt time.Time } func main() { @@ -42,10 +42,10 @@ func main() { flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons") flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site") flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file") + flag.IntVar(&cfg.Concurrency, "concurrency", 200, "Concurrent icon conversions") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3") flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode") flag.IntVar(&cfg.Limit, "limit", 0, "Max hosts to process (0 = all)") - flag.IntVar(&cfg.Concurrency, "concurrency", 50, "Concurrent icon conversions") flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file") flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file") flag.Parse() @@ -58,7 +58,7 @@ func main() { ctx := context.Background() - // Init S3 + // Init S3 (for uploading bundles) if err := initS3(); err != nil { log.Fatalf("Failed to init S3: %v", err) } @@ -82,91 +82,137 @@ func main() { stats := &Stats{StartedAt: time.Now()} - // Fetch all qualifying hosts (randomized) + // Count hosts fmt.Println("=== Bundle Generator ===") - fmt.Println("Querying hosts...") - - hosts, err := fetchHosts(ctx, pool, cfg.Limit) + var totalHosts, hostsWithIcon int + err = pool.QueryRow(ctx, `SELECT COUNT(*) FROM hosts WHERE html_title IS NOT NULL`).Scan(&totalHosts) if err != nil { - log.Fatalf("Failed to fetch hosts: %v", err) + log.Fatalf("Failed to count hosts: %v", err) + } + err = pool.QueryRow(ctx, `SELECT COUNT(*) FROM hosts WHERE html_title IS NOT NULL AND best_icon_s3_key IS NOT NULL`).Scan(&hostsWithIcon) + if err != nil { + log.Fatalf("Failed to count icons: %v", err) } - stats.TotalHosts = len(hosts) - for _, h := range hosts { - if h.BestIconS3Key != "" { - stats.HostsWithIcon++ - } else { - stats.HostsNoIcon++ - } - } + stats.TotalHosts = totalHosts + stats.HostsWithIcon = hostsWithIcon + stats.HostsNoIcon = totalHosts - hostsWithIcon - fmt.Printf("Total hosts: %d (with icon: %d, no icon: %d)\n", stats.TotalHosts, stats.HostsWithIcon, stats.HostsNoIcon) + fmt.Printf("Total hosts: %d (with icon: %d, no icon: %d)\n", totalHosts, hostsWithIcon, totalHosts-hostsWithIcon) fmt.Printf("Entries per bundle: %d\n", cfg.EntriesPerBundle) + fmt.Printf("Concurrency: %d\n", cfg.Concurrency) fmt.Printf("Dry run: %v\n\n", cfg.DryRun) if cfg.DryRun { os.MkdirAll(cfg.OutputDir, 0755) } - // Process hosts into bundle entries (concurrently for S3 downloads) - fmt.Printf("Converting icons and building entries (concurrency: %d)...\n", cfg.Concurrency) - entries := make([]BundleEntry, len(hosts)) - - var wg sync.WaitGroup - sem := make(chan struct{}, cfg.Concurrency) - var processed atomic.Int64 - - for i, host := range hosts { - wg.Add(1) - sem <- struct{}{} - go func(idx int, h HostRow) { - defer wg.Done() - defer func() { <-sem }() - entries[idx] = buildEntry(h, cfg.IconsDir, logWriter, stats) - n := processed.Add(1) - if n%5000 == 0 { - fmt.Printf(" processed %d/%d hosts\n", n, len(hosts)) - } - }(i, host) - } - wg.Wait() - - // Clean old bundles before writing new ones (avoids orphans if count changed) + // Clean old bundles before writing new ones if !cfg.DryRun { - fmt.Println("\nCleaning old bundles from S3...") + fmt.Println("Cleaning old bundles from S3...") if err := s3DeletePrefix(cfg.SiteBucket, "tabs/"); err != nil { log.Fatalf("Failed to clean old bundles: %v", err) } } - // Chunk into bundles and write - fmt.Println("Writing bundles...") + // Stream hosts from DB in pages, convert icons, write bundles incrementally + fmt.Println("Processing hosts and writing bundles...") + bundleCount := 0 var totalBytes int64 + var lastRandom float32 = -1 + pageSize := cfg.EntriesPerBundle * 50 // fetch 50 bundles worth at a time + var entryBuf []BundleEntry + hostsProcessed := 0 - for i := 0; i < len(entries); i += cfg.EntriesPerBundle { - end := i + cfg.EntriesPerBundle - if end > len(entries) { - end = len(entries) + for { + // Fetch a page of hosts + limit := pageSize + if cfg.Limit > 0 { + remaining := cfg.Limit - hostsProcessed + if remaining <= 0 { + break + } + if limit > remaining { + limit = remaining + } } - chunk := entries[i:end] - bundleIndex := bundleCount - data, err := serializeBundle(chunk) + hosts, err := fetchHostsPage(ctx, pool, lastRandom, limit) if err != nil { - log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err) + log.Fatalf("Failed to fetch hosts: %v", err) + } + if len(hosts) == 0 { + break + } + lastRandom = hosts[len(hosts)-1].RandomOrder + hostsProcessed += len(hosts) + + // Convert icons concurrently for this page + pageEntries := make([]BundleEntry, len(hosts)) + var wg sync.WaitGroup + sem := make(chan struct{}, cfg.Concurrency) + + for i, host := range hosts { + wg.Add(1) + sem <- struct{}{} + go func(idx int, h HostRow) { + defer wg.Done() + defer func() { <-sem }() + pageEntries[idx] = buildEntry(h, cfg.IconsDir, logWriter, stats) + }(i, host) + } + wg.Wait() + + entryBuf = append(entryBuf, pageEntries...) + + // Write complete bundles from the buffer + for len(entryBuf) >= cfg.EntriesPerBundle { + chunk := entryBuf[:cfg.EntriesPerBundle] + entryBuf = entryBuf[cfg.EntriesPerBundle:] + + data, err := serializeBundle(chunk) + if err != nil { + log.Fatalf("Failed to serialize bundle %d: %v", bundleCount, err) + } + + if cfg.DryRun { + err = writeBundleLocal(cfg.OutputDir, bundleCount, data) + } else { + err = writeBundleS3(cfg.SiteBucket, bundleCount, data) + } + if err != nil { + log.Fatalf("Failed to write bundle %d: %v", bundleCount, err) + } + + logLine := fmt.Sprintf("bundle: %04d.json %d entries %dKB", bundleCount, len(chunk), len(data)/1024) + fmt.Println(logLine) + if logWriter != nil { + logWriter.Write(logLine, false) + } + + totalBytes += int64(len(data)) + bundleCount++ + } + } + + // Write final partial bundle + if len(entryBuf) > 0 { + data, err := serializeBundle(entryBuf) + if err != nil { + log.Fatalf("Failed to serialize final bundle: %v", err) } if cfg.DryRun { - err = writeBundleLocal(cfg.OutputDir, bundleIndex, data) + err = writeBundleLocal(cfg.OutputDir, bundleCount, data) } else { - err = writeBundleS3(cfg.SiteBucket, bundleIndex, data) + err = writeBundleS3(cfg.SiteBucket, bundleCount, data) } if err != nil { - log.Fatalf("Failed to write bundle %d: %v", bundleIndex, err) + log.Fatalf("Failed to write final bundle: %v", err) } - logLine := fmt.Sprintf("bundle: %04d.json %d entries %dKB", bundleIndex, len(chunk), len(data)/1024) + logLine := fmt.Sprintf("bundle: %04d.json %d entries %dKB", bundleCount, len(entryBuf), len(data)/1024) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, false) @@ -189,8 +235,15 @@ func main() { fmt.Printf("Convert errors: %d\n", stats.ConvertErrors.Load()) fmt.Printf("Bundles created: %d\n", stats.BundlesCreated) fmt.Printf("Total size: %.1f MB\n", float64(stats.TotalBytes)/(1024*1024)) - fmt.Printf("Avg bundle size: %.0f KB\n", float64(stats.TotalBytes)/float64(stats.BundlesCreated)/1024) + fmt.Printf("Avg bundle size: %.0f KB\n", float64(stats.TotalBytes)/float64(max(stats.BundlesCreated, 1))/1024) fmt.Printf("TOTAL_BUNDLES = %d (bake this into the frontend)\n", stats.BundlesCreated) writeStats(stats) } + +func max(a, b int) int { + if a > b { + return a + } + return b +}