diff --git a/infra/ec2-userdata.sh b/infra/ec2-userdata.sh index 380a3cf..2b58af8 100755 --- a/infra/ec2-userdata.sh +++ b/infra/ec2-userdata.sh @@ -48,6 +48,8 @@ sudo dnf install -y \ unbound \ jq \ htop \ + iftop \ + iotop \ tmux # --- Go --- diff --git a/pipeline/05_bundle_gen/db.go b/pipeline/05_bundle_gen/db.go index 9f51ba1..78aee54 100644 --- a/pipeline/05_bundle_gen/db.go +++ b/pipeline/05_bundle_gen/db.go @@ -13,12 +13,14 @@ type HostRow struct { HtmlTitle string IframeAllowed bool BestIconHash string + RandomOrder float64 } // fetchHostsPage gets a page of hosts with titles, ordered by id for disk locality. +// random_order is included for bundle bucket assignment (randomized bundles). func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]HostRow, error) { rows, err := pool.Query(ctx, ` - SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, '') + SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, ''), random_order FROM hosts WHERE html_title IS NOT NULL AND id > $1 ORDER BY id @@ -32,7 +34,7 @@ func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit var hosts []HostRow for rows.Next() { var h HostRow - if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash); err != nil { + if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash, &h.RandomOrder); err != nil { return nil, err } hosts = append(hosts, h) diff --git a/pipeline/05_bundle_gen/main.go b/pipeline/05_bundle_gen/main.go index bd2ad8b..8f5ea71 100644 --- a/pipeline/05_bundle_gen/main.go +++ b/pipeline/05_bundle_gen/main.go @@ -18,6 +18,7 @@ type Config struct { IconsDir string SiteBucket string EntriesPerBundle int + NumBuckets int Concurrency int Uploaders int DryRun bool @@ -45,6 +46,7 @@ func main() { flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons") flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site") flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file") + flag.IntVar(&cfg.NumBuckets, "num-buckets", 10000, "Number of shuffle buckets for randomizing bundles") flag.IntVar(&cfg.Concurrency, "concurrency", 40, "Concurrent icon conversions") flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3") @@ -121,8 +123,13 @@ func main() { data []byte } + type assemblerEntry struct { + entry BundleEntry + randomOrder float64 + } + hostCh := make(chan HostRow, 50000) - entryCh := make(chan BundleEntry, 50000) + entryCh := make(chan assemblerEntry, 50000) uploadCh := make(chan bundleJob, cfg.Uploaders*2) // Stage 1: DB fetcher — continuously fetches pages into hostCh @@ -173,7 +180,7 @@ func main() { } else { stats.BundledNoIcon.Add(1) } - entryCh <- entry + entryCh <- assemblerEntry{entry: entry, randomOrder: host.RandomOrder} } }() } @@ -182,33 +189,56 @@ func main() { close(entryCh) }() - // Stage 3: Bundle assembler — collects entries, serializes bundles, hands off to uploaders + // Stage 3: Bundle assembler — distributes entries across N buckets by random_order + // for randomized bundles, while reads stay in ID order for disk locality. go func() { defer close(uploadCh) - var buf []BundleEntry + buckets := make([][]BundleEntry, cfg.NumBuckets) bundleIndex := 0 - for entry := range entryCh { - buf = append(buf, entry) - if len(buf) >= cfg.EntriesPerBundle { - chunk := make([]BundleEntry, cfg.EntriesPerBundle) - copy(chunk, buf[:cfg.EntriesPerBundle]) - buf = buf[cfg.EntriesPerBundle:] - data, err := serializeBundle(chunk) + for ae := range entryCh { + b := int(ae.randomOrder * float64(cfg.NumBuckets)) + if b >= cfg.NumBuckets { + b = cfg.NumBuckets - 1 + } + buckets[b] = append(buckets[b], ae.entry) + + if len(buckets[b]) >= cfg.EntriesPerBundle { + data, err := serializeBundle(buckets[b]) if err != nil { log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err) } uploadCh <- bundleJob{index: bundleIndex, data: data} bundleIndex++ + buckets[b] = nil + } + } + + // Roll up remaining partial buckets into full bundles + var remainder []BundleEntry + for i := range buckets { + if len(buckets[i]) > 0 { + remainder = append(remainder, buckets[i]...) + buckets[i] = nil + for len(remainder) >= cfg.EntriesPerBundle { + data, err := serializeBundle(remainder[:cfg.EntriesPerBundle]) + if err != nil { + log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err) + } + uploadCh <- bundleJob{index: bundleIndex, data: data} + bundleIndex++ + remainder = remainder[cfg.EntriesPerBundle:] + } } } // Final partial bundle - if len(buf) > 0 { - data, err := serializeBundle(buf) + if len(remainder) > 0 { + data, err := serializeBundle(remainder) if err != nil { log.Fatalf("Failed to serialize final bundle: %v", err) } uploadCh <- bundleJob{index: bundleIndex, data: data} + bundleIndex++ } }()