package main import ( "context" "flag" "fmt" "log" "os" "sync" "sync/atomic" "time" "github.com/jackc/pgx/v5/pgxpool" ) type Config struct { DBUrl string IconsDir string SiteBucket string EntriesPerBundle int Concurrency int Uploaders int DryRun bool OutputDir string Limit int LogFile string LogErrors bool } type Stats struct { TotalHosts int HostsWithIcon int HostsNoIcon int BundlesCreated int ConvertErrors atomic.Int64 BundledWithIcon atomic.Int64 BundledNoIcon atomic.Int64 TotalBytes int64 StartedAt time.Time } func main() { cfg := Config{} flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)") flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons") flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site") flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file") flag.IntVar(&cfg.Concurrency, "concurrency", 40, "Concurrent icon conversions") flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3") flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode") flag.IntVar(&cfg.Limit, "limit", 0, "Max hosts to process (0 = all)") flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file") flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file") flag.Parse() if cfg.DBUrl == "" { fmt.Println("Usage: bundle_gen --db DATABASE_URL [OPTIONS]") flag.PrintDefaults() os.Exit(1) } ctx := context.Background() // Init S3 (for uploading bundles) if err := initS3(); err != nil { log.Fatalf("Failed to init S3: %v", err) } // Init DB pool, err := pgxpool.New(ctx, cfg.DBUrl) if err != nil { log.Fatalf("Failed to connect to database: %v", err) } defer pool.Close() // Setup log file var logWriter *LogWriter if cfg.LogFile != "" { logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors) if err != nil { log.Fatalf("Failed to open log file: %v", err) } defer logWriter.Close() } stats := &Stats{StartedAt: time.Now()} // Count hosts fmt.Println("=== Bundle Generator ===") var totalHosts, hostsWithIcon int err = pool.QueryRow(ctx, `SELECT COUNT(*) FROM hosts WHERE html_title IS NOT NULL`).Scan(&totalHosts) if err != nil { log.Fatalf("Failed to count hosts: %v", err) } err = pool.QueryRow(ctx, `SELECT COUNT(*) FROM hosts WHERE html_title IS NOT NULL AND best_icon_hash IS NOT NULL`).Scan(&hostsWithIcon) if err != nil { log.Fatalf("Failed to count icons: %v", err) } stats.TotalHosts = totalHosts stats.HostsWithIcon = hostsWithIcon stats.HostsNoIcon = totalHosts - hostsWithIcon fmt.Printf("Total hosts: %d (with icon: %d, no icon: %d)\n", totalHosts, hostsWithIcon, totalHosts-hostsWithIcon) fmt.Printf("Entries per bundle: %d\n", cfg.EntriesPerBundle) fmt.Printf("Concurrency: %d\n", cfg.Concurrency) fmt.Printf("Dry run: %v\n\n", cfg.DryRun) if cfg.DryRun { os.MkdirAll(cfg.OutputDir, 0755) } // Four-stage pipeline: // [DB fetcher] → hostCh → [N converters] → entryCh → [bundle assembler] → uploadCh → [M uploaders] // All stages run concurrently. Bundles are written in-place (overwriting previous run). fmt.Println("Processing hosts and writing bundles...") type bundleJob struct { index int data []byte } hostCh := make(chan HostRow, 50000) entryCh := make(chan BundleEntry, 50000) uploadCh := make(chan bundleJob, cfg.Uploaders*2) // Stage 1: DB fetcher — continuously fetches pages into hostCh go func() { defer close(hostCh) var lastTime *time.Time var lastID int64 iconPhase := true // first: hosts with icons (by download time), then: hosts without pageSize := 50000 fetched := 0 for { limit := pageSize if cfg.Limit > 0 { remaining := cfg.Limit - fetched if remaining <= 0 { break } if limit > remaining { limit = remaining } } fetchStart := time.Now() hosts, err := fetchHostsPage(ctx, pool, lastTime, lastID, limit) if err != nil { log.Fatalf("Failed to fetch hosts: %v", err) } if len(hosts) == 0 { if iconPhase { // Switch to no-icon hosts iconPhase = false lastTime = nil lastID = 0 continue } break } last := hosts[len(hosts)-1] if last.IconDownloadedAt != nil { lastTime = last.IconDownloadedAt } else { lastID = last.ID } fmt.Printf("[fetcher] %d hosts in %dms (hostCh: %d/%d)\n", len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh)) for _, h := range hosts { hostCh <- h } fetched += len(hosts) } }() // Stage 2: Converter workers — read hosts, convert icons, emit entries var converterWg sync.WaitGroup for i := 0; i < cfg.Concurrency; i++ { converterWg.Add(1) go func() { defer converterWg.Done() for host := range hostCh { entry := buildEntry(host, cfg.IconsDir, logWriter, stats) if entry.Icon != "" { stats.BundledWithIcon.Add(1) } else { stats.BundledNoIcon.Add(1) } entryCh <- entry } }() } go func() { converterWg.Wait() close(entryCh) }() // Stage 3: Bundle assembler — collects entries, serializes bundles, hands off to uploaders go func() { defer close(uploadCh) var buf []BundleEntry bundleIndex := 0 for entry := range entryCh { buf = append(buf, entry) if len(buf) >= cfg.EntriesPerBundle { chunk := make([]BundleEntry, cfg.EntriesPerBundle) copy(chunk, buf[:cfg.EntriesPerBundle]) buf = buf[cfg.EntriesPerBundle:] data, err := serializeBundle(chunk) if err != nil { log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err) } uploadCh <- bundleJob{index: bundleIndex, data: data} bundleIndex++ } } // Final partial bundle if len(buf) > 0 { data, err := serializeBundle(buf) if err != nil { log.Fatalf("Failed to serialize final bundle: %v", err) } uploadCh <- bundleJob{index: bundleIndex, data: data} } }() // Stage 4: Upload workers — write bundles to S3 or local disk var uploadWg sync.WaitGroup var bundlesCreated atomic.Int64 var totalBytes atomic.Int64 for i := 0; i < cfg.Uploaders; i++ { uploadWg.Add(1) go func() { defer uploadWg.Done() for job := range uploadCh { var err error if cfg.DryRun { err = writeBundleLocal(cfg.OutputDir, job.index, job.data) } else { err = writeBundleS3(cfg.SiteBucket, job.index, job.data) } if err != nil { log.Fatalf("Failed to write bundle %d: %v", job.index, err) } n := bundlesCreated.Add(1) estimatedTotal := (totalHosts + cfg.EntriesPerBundle - 1) / cfg.EntriesPerBundle if n%100 == 0 { fmt.Printf(" %d/%d bundles\n", n, estimatedTotal) } logLine := fmt.Sprintf("bundle: %06d.json %dKB", job.index, len(job.data)/1024) if logWriter != nil { logWriter.Write(logLine, false) } totalBytes.Add(int64(len(job.data))) } }() } uploadWg.Wait() stats.BundlesCreated = int(bundlesCreated.Load()) stats.TotalBytes = totalBytes.Load() // Summary duration := time.Since(stats.StartedAt) fmt.Printf("\n=== Summary ===\n") fmt.Printf("Duration: %s\n", duration.Round(time.Second)) fmt.Printf("Total hosts: %d\n", stats.TotalHosts) fmt.Printf("Hosts with icon: %d\n", stats.HostsWithIcon) fmt.Printf("Hosts without icon: %d\n", stats.HostsNoIcon) fmt.Printf("Bundled with icon: %d\n", stats.BundledWithIcon.Load()) fmt.Printf("Bundled without icon: %d\n", stats.BundledNoIcon.Load()) fmt.Printf("Convert errors: %d\n", stats.ConvertErrors.Load()) fmt.Printf("Bundles created: %d\n", stats.BundlesCreated) fmt.Printf("Total size: %.1f MB\n", float64(stats.TotalBytes)/(1024*1024)) fmt.Printf("Avg bundle size: %.0f KB\n", float64(stats.TotalBytes)/float64(max(stats.BundlesCreated, 1))/1024) fmt.Printf("TOTAL_BUNDLES = %d (bake this into the frontend)\n", stats.BundlesCreated) writeStats(stats) }