From 081866f62eecf5a01c4f2e125a940713f3b89323 Mon Sep 17 00:00:00 2001 From: Joe Lothan Date: Wed, 20 May 2026 01:28:52 -0400 Subject: [PATCH] update bundle gen to use channels and goroutines to saturate disk and not block on db access + bundle coalesing and uploading --- ARCHITECTURE.md | 24 ++-- pipeline/05_bundle_gen/main.go | 230 ++++++++++++++++++--------------- 2 files changed, 137 insertions(+), 117 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index cdb0038..b1ee8aa 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -297,17 +297,21 @@ Uses `DISTINCT ON (host_id)` for efficient single-pass selection. See `pipeline/ **Input:** All hosts where `html_title IS NOT NULL` (include hosts without icons) -**Process:** -1. Stream hosts from RDS in pages (keyset pagination on `random_order` column for shuffled output) -2. For each page, concurrently convert icons (configurable concurrency, default 200): - - Read icon from local disk at `{icons_dir}/ab/cd/ef/{hash}` - - Decode the image via Go's `image.Decode` (handles PNG, GIF, JPEG, WebP, ICO via registered decoders) - - SVGs are excluded (no rasterizer) — these hosts appear without icons +**Architecture:** Four-stage pipeline with all stages running concurrently: + +``` +[DB fetcher] → hostCh → [N converters] → entryCh → [bundle assembler] → uploadCh → [M uploaders] +``` + +1. **DB fetcher** (1 goroutine): continuously fetches pages of hosts via keyset pagination on `random_order`. Feeds hosts into `hostCh`. Never waits for downstream stages. +2. **Converter workers** (N goroutines, default 20): read hosts from `hostCh`, read icon from disk, decode, re-encode as PNG, base64-encode, emit `BundleEntry` to `entryCh`. CPU-bound — default tuned to ~5x core count on c5.xlarge (4 vCPUs). + - Decode via Go's `image.Decode` (handles PNG, GIF, JPEG, WebP, BMP, ICO via registered decoders) + - SVGs excluded (no rasterizer) — these hosts appear without icons - Icons >128px downscaled to 32x32 (nearest-neighbor). Icons ≤128px kept as-is. - - Re-encode as PNG, base64-encode -3. Converted entries accumulate in a buffer. Every 120 entries (configurable), serialize as JSON and upload to S3 -4. Hosts without icons: included with `"icon": ""` -5. Final partial bundle written at end +3. **Bundle assembler** (1 goroutine): collects entries from `entryCh`. Every 120 entries (configurable), serializes as JSON and sends to `uploadCh`. Hosts without icons included with `"icon": ""`. +4. **Upload workers** (M goroutines, default 10): write bundles to S3 (or local disk in dry-run mode). I/O-bound — multiple uploads in flight hides S3 PUT latency (~50-100ms each). + +Bundles are written in-place (overwriting previous run). No delete-first step, so the live site always has valid data even if bundle gen crashes midway. The frontend's `TOTAL_BUNDLES` constant ensures only valid bundle indices are requested. **Output:** - `tabs/0000.json` through `tabs/{M}.json` in S3 `everytab-site` diff --git a/pipeline/05_bundle_gen/main.go b/pipeline/05_bundle_gen/main.go index 8198eb7..76a0397 100644 --- a/pipeline/05_bundle_gen/main.go +++ b/pipeline/05_bundle_gen/main.go @@ -19,6 +19,7 @@ type Config struct { SiteBucket string EntriesPerBundle int Concurrency int + Uploaders int DryRun bool OutputDir string Limit int @@ -44,7 +45,8 @@ func main() { flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons") flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site") flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file") - flag.IntVar(&cfg.Concurrency, "concurrency", 200, "Concurrent icon conversions") + flag.IntVar(&cfg.Concurrency, "concurrency", 20, "Concurrent icon conversions") + flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3") flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode") flag.IntVar(&cfg.Limit, "limit", 0, "Max hosts to process (0 = all)") @@ -109,125 +111,139 @@ func main() { os.MkdirAll(cfg.OutputDir, 0755) } - // Stream hosts from DB in pages, convert icons, write bundles incrementally - // Bundles are written in-place (overwriting previous run). No delete-first step, - // so the live site always has valid data even if bundle gen crashes midway. + // Four-stage pipeline: + // [DB fetcher] → hostCh → [N converters] → entryCh → [bundle assembler] → uploadCh → [M uploaders] + // All stages run concurrently. Bundles are written in-place (overwriting previous run). fmt.Println("Processing hosts and writing bundles...") - bundleCount := 0 - var totalBytes int64 - var lastRandom float64 = -1 - pageSize := cfg.EntriesPerBundle * 50 // fetch 50 bundles worth at a time - var entryBuf []BundleEntry - hostsProcessed := 0 + type bundleJob struct { + index int + data []byte + } - for { - // Fetch a page of hosts - limit := pageSize - if cfg.Limit > 0 { - remaining := cfg.Limit - hostsProcessed - if remaining <= 0 { + hostCh := make(chan HostRow, cfg.EntriesPerBundle*10) + entryCh := make(chan BundleEntry, cfg.EntriesPerBundle*10) + uploadCh := make(chan bundleJob, cfg.Uploaders*2) + + // Stage 1: DB fetcher — continuously fetches pages into hostCh + go func() { + defer close(hostCh) + var lastRandom float64 = -1 + pageSize := cfg.EntriesPerBundle * 50 + fetched := 0 + for { + limit := pageSize + if cfg.Limit > 0 { + remaining := cfg.Limit - fetched + if remaining <= 0 { + break + } + if limit > remaining { + limit = remaining + } + } + hosts, err := fetchHostsPage(ctx, pool, lastRandom, limit) + if err != nil { + log.Fatalf("Failed to fetch hosts: %v", err) + } + if len(hosts) == 0 { break } - if limit > remaining { - limit = remaining + lastRandom = hosts[len(hosts)-1].RandomOrder + for _, h := range hosts { + hostCh <- h + } + fetched += len(hosts) + } + }() + + // Stage 2: Converter workers — read hosts, convert icons, emit entries + var converterWg sync.WaitGroup + for i := 0; i < cfg.Concurrency; i++ { + converterWg.Add(1) + go func() { + defer converterWg.Done() + for host := range hostCh { + entry := buildEntry(host, cfg.IconsDir, logWriter, stats) + if entry.Icon != "" { + stats.BundledWithIcon.Add(1) + } else { + stats.BundledNoIcon.Add(1) + } + entryCh <- entry + } + }() + } + go func() { + converterWg.Wait() + close(entryCh) + }() + + // Stage 3: Bundle assembler — collects entries, serializes bundles, hands off to uploaders + go func() { + defer close(uploadCh) + var buf []BundleEntry + bundleIndex := 0 + for entry := range entryCh { + buf = append(buf, entry) + if len(buf) >= cfg.EntriesPerBundle { + chunk := make([]BundleEntry, cfg.EntriesPerBundle) + copy(chunk, buf[:cfg.EntriesPerBundle]) + buf = buf[cfg.EntriesPerBundle:] + + data, err := serializeBundle(chunk) + if err != nil { + log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err) + } + uploadCh <- bundleJob{index: bundleIndex, data: data} + bundleIndex++ } } - - hosts, err := fetchHostsPage(ctx, pool, lastRandom, limit) - if err != nil { - log.Fatalf("Failed to fetch hosts: %v", err) - } - if len(hosts) == 0 { - break - } - lastRandom = hosts[len(hosts)-1].RandomOrder - hostsProcessed += len(hosts) - - // Convert icons concurrently for this page - pageEntries := make([]BundleEntry, len(hosts)) - var wg sync.WaitGroup - sem := make(chan struct{}, cfg.Concurrency) - - for i, host := range hosts { - wg.Add(1) - sem <- struct{}{} - go func(idx int, h HostRow) { - defer wg.Done() - defer func() { <-sem }() - pageEntries[idx] = buildEntry(h, cfg.IconsDir, logWriter, stats) - }(i, host) - } - wg.Wait() - - for _, e := range pageEntries { - if e.Icon != "" { - stats.BundledWithIcon.Add(1) - } else { - stats.BundledNoIcon.Add(1) - } - } - - entryBuf = append(entryBuf, pageEntries...) - - // Write complete bundles from the buffer - for len(entryBuf) >= cfg.EntriesPerBundle { - chunk := entryBuf[:cfg.EntriesPerBundle] - entryBuf = entryBuf[cfg.EntriesPerBundle:] - - data, err := serializeBundle(chunk) + // Final partial bundle + if len(buf) > 0 { + data, err := serializeBundle(buf) if err != nil { - log.Fatalf("Failed to serialize bundle %d: %v", bundleCount, err) + log.Fatalf("Failed to serialize final bundle: %v", err) } - - if cfg.DryRun { - err = writeBundleLocal(cfg.OutputDir, bundleCount, data) - } else { - err = writeBundleS3(cfg.SiteBucket, bundleCount, data) - } - if err != nil { - log.Fatalf("Failed to write bundle %d: %v", bundleCount, err) - } - - logLine := fmt.Sprintf("bundle: %04d.json %d entries %dKB", bundleCount, len(chunk), len(data)/1024) - fmt.Println(logLine) - if logWriter != nil { - logWriter.Write(logLine, false) - } - - totalBytes += int64(len(data)) - bundleCount++ + uploadCh <- bundleJob{index: bundleIndex, data: data} } + }() + + // Stage 4: Upload workers — write bundles to S3 or local disk + var uploadWg sync.WaitGroup + var bundlesCreated atomic.Int64 + var totalBytes atomic.Int64 + for i := 0; i < cfg.Uploaders; i++ { + uploadWg.Add(1) + go func() { + defer uploadWg.Done() + for job := range uploadCh { + var err error + if cfg.DryRun { + err = writeBundleLocal(cfg.OutputDir, job.index, job.data) + } else { + err = writeBundleS3(cfg.SiteBucket, job.index, job.data) + } + if err != nil { + log.Fatalf("Failed to write bundle %d: %v", job.index, err) + } + + logLine := fmt.Sprintf("bundle: %04d.json %dKB", job.index, len(job.data)/1024) + fmt.Println(logLine) + if logWriter != nil { + logWriter.Write(logLine, false) + } + + bundlesCreated.Add(1) + totalBytes.Add(int64(len(job.data))) + } + }() } - // Write final partial bundle - if len(entryBuf) > 0 { - data, err := serializeBundle(entryBuf) - if err != nil { - log.Fatalf("Failed to serialize final bundle: %v", err) - } + uploadWg.Wait() - if cfg.DryRun { - err = writeBundleLocal(cfg.OutputDir, bundleCount, data) - } else { - err = writeBundleS3(cfg.SiteBucket, bundleCount, data) - } - if err != nil { - log.Fatalf("Failed to write final bundle: %v", err) - } - - logLine := fmt.Sprintf("bundle: %04d.json %d entries %dKB", bundleCount, len(entryBuf), len(data)/1024) - fmt.Println(logLine) - if logWriter != nil { - logWriter.Write(logLine, false) - } - - totalBytes += int64(len(data)) - bundleCount++ - } - - stats.BundlesCreated = bundleCount - stats.TotalBytes = totalBytes + stats.BundlesCreated = int(bundlesCreated.Load()) + stats.TotalBytes = totalBytes.Load() // Summary duration := time.Since(stats.StartedAt)