diff --git a/infra/ec2-userdata.sh b/infra/ec2-userdata.sh index 2b58af8..380a3cf 100755 --- a/infra/ec2-userdata.sh +++ b/infra/ec2-userdata.sh @@ -48,8 +48,6 @@ sudo dnf install -y \ unbound \ jq \ htop \ - iftop \ - iotop \ tmux # --- Go --- diff --git a/pipeline/05_bundle_gen/db.go b/pipeline/05_bundle_gen/db.go index 78aee54..9f51ba1 100644 --- a/pipeline/05_bundle_gen/db.go +++ b/pipeline/05_bundle_gen/db.go @@ -13,14 +13,12 @@ type HostRow struct { HtmlTitle string IframeAllowed bool BestIconHash string - RandomOrder float64 } // fetchHostsPage gets a page of hosts with titles, ordered by id for disk locality. -// random_order is included for bundle bucket assignment (randomized bundles). func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]HostRow, error) { rows, err := pool.Query(ctx, ` - SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, ''), random_order + SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, '') FROM hosts WHERE html_title IS NOT NULL AND id > $1 ORDER BY id @@ -34,7 +32,7 @@ func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit var hosts []HostRow for rows.Next() { var h HostRow - if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash, &h.RandomOrder); err != nil { + if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash); err != nil { return nil, err } hosts = append(hosts, h) diff --git a/pipeline/05_bundle_gen/main.go b/pipeline/05_bundle_gen/main.go index 8f5ea71..bd2ad8b 100644 --- a/pipeline/05_bundle_gen/main.go +++ b/pipeline/05_bundle_gen/main.go @@ -18,7 +18,6 @@ type Config struct { IconsDir string SiteBucket string EntriesPerBundle int - NumBuckets int Concurrency int Uploaders int DryRun bool @@ -46,7 +45,6 @@ func main() { flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons") flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site") flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file") - flag.IntVar(&cfg.NumBuckets, "num-buckets", 10000, "Number of shuffle buckets for randomizing bundles") flag.IntVar(&cfg.Concurrency, "concurrency", 40, "Concurrent icon conversions") flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3") @@ -123,13 +121,8 @@ func main() { data []byte } - type assemblerEntry struct { - entry BundleEntry - randomOrder float64 - } - hostCh := make(chan HostRow, 50000) - entryCh := make(chan assemblerEntry, 50000) + entryCh := make(chan BundleEntry, 50000) uploadCh := make(chan bundleJob, cfg.Uploaders*2) // Stage 1: DB fetcher — continuously fetches pages into hostCh @@ -180,7 +173,7 @@ func main() { } else { stats.BundledNoIcon.Add(1) } - entryCh <- assemblerEntry{entry: entry, randomOrder: host.RandomOrder} + entryCh <- entry } }() } @@ -189,56 +182,33 @@ func main() { close(entryCh) }() - // Stage 3: Bundle assembler — distributes entries across N buckets by random_order - // for randomized bundles, while reads stay in ID order for disk locality. + // Stage 3: Bundle assembler — collects entries, serializes bundles, hands off to uploaders go func() { defer close(uploadCh) - buckets := make([][]BundleEntry, cfg.NumBuckets) + var buf []BundleEntry bundleIndex := 0 + for entry := range entryCh { + buf = append(buf, entry) + if len(buf) >= cfg.EntriesPerBundle { + chunk := make([]BundleEntry, cfg.EntriesPerBundle) + copy(chunk, buf[:cfg.EntriesPerBundle]) + buf = buf[cfg.EntriesPerBundle:] - for ae := range entryCh { - b := int(ae.randomOrder * float64(cfg.NumBuckets)) - if b >= cfg.NumBuckets { - b = cfg.NumBuckets - 1 - } - buckets[b] = append(buckets[b], ae.entry) - - if len(buckets[b]) >= cfg.EntriesPerBundle { - data, err := serializeBundle(buckets[b]) + data, err := serializeBundle(chunk) if err != nil { log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err) } uploadCh <- bundleJob{index: bundleIndex, data: data} bundleIndex++ - buckets[b] = nil - } - } - - // Roll up remaining partial buckets into full bundles - var remainder []BundleEntry - for i := range buckets { - if len(buckets[i]) > 0 { - remainder = append(remainder, buckets[i]...) - buckets[i] = nil - for len(remainder) >= cfg.EntriesPerBundle { - data, err := serializeBundle(remainder[:cfg.EntriesPerBundle]) - if err != nil { - log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err) - } - uploadCh <- bundleJob{index: bundleIndex, data: data} - bundleIndex++ - remainder = remainder[cfg.EntriesPerBundle:] - } } } // Final partial bundle - if len(remainder) > 0 { - data, err := serializeBundle(remainder) + if len(buf) > 0 { + data, err := serializeBundle(buf) if err != nil { log.Fatalf("Failed to serialize final bundle: %v", err) } uploadCh <- bundleJob{index: bundleIndex, data: data} - bundleIndex++ } }()