shuffle bundle generation using a bucket approach in the assembler
This commit is contained in:
parent
4b09a3219a
commit
df2eaa251c
2 changed files with 47 additions and 15 deletions
|
|
@ -13,12 +13,14 @@ type HostRow struct {
|
|||
HtmlTitle string
|
||||
IframeAllowed bool
|
||||
BestIconHash string
|
||||
RandomOrder float64
|
||||
}
|
||||
|
||||
// fetchHostsPage gets a page of hosts with titles, ordered by id for disk locality.
|
||||
// random_order is included for bundle bucket assignment (randomized bundles).
|
||||
func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]HostRow, error) {
|
||||
rows, err := pool.Query(ctx, `
|
||||
SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, '')
|
||||
SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, ''), random_order
|
||||
FROM hosts
|
||||
WHERE html_title IS NOT NULL AND id > $1
|
||||
ORDER BY id
|
||||
|
|
@ -32,7 +34,7 @@ func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit
|
|||
var hosts []HostRow
|
||||
for rows.Next() {
|
||||
var h HostRow
|
||||
if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash); err != nil {
|
||||
if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash, &h.RandomOrder); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hosts = append(hosts, h)
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ type Config struct {
|
|||
IconsDir string
|
||||
SiteBucket string
|
||||
EntriesPerBundle int
|
||||
NumBuckets int
|
||||
Concurrency int
|
||||
Uploaders int
|
||||
DryRun bool
|
||||
|
|
@ -45,6 +46,7 @@ func main() {
|
|||
flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons")
|
||||
flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site")
|
||||
flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file")
|
||||
flag.IntVar(&cfg.NumBuckets, "num-buckets", 10000, "Number of shuffle buckets for randomizing bundles")
|
||||
flag.IntVar(&cfg.Concurrency, "concurrency", 40, "Concurrent icon conversions")
|
||||
flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads")
|
||||
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3")
|
||||
|
|
@ -121,8 +123,13 @@ func main() {
|
|||
data []byte
|
||||
}
|
||||
|
||||
type assemblerEntry struct {
|
||||
entry BundleEntry
|
||||
randomOrder float64
|
||||
}
|
||||
|
||||
hostCh := make(chan HostRow, 50000)
|
||||
entryCh := make(chan BundleEntry, 50000)
|
||||
entryCh := make(chan assemblerEntry, 50000)
|
||||
uploadCh := make(chan bundleJob, cfg.Uploaders*2)
|
||||
|
||||
// Stage 1: DB fetcher — continuously fetches pages into hostCh
|
||||
|
|
@ -173,7 +180,7 @@ func main() {
|
|||
} else {
|
||||
stats.BundledNoIcon.Add(1)
|
||||
}
|
||||
entryCh <- entry
|
||||
entryCh <- assemblerEntry{entry: entry, randomOrder: host.RandomOrder}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
@ -182,33 +189,56 @@ func main() {
|
|||
close(entryCh)
|
||||
}()
|
||||
|
||||
// Stage 3: Bundle assembler — collects entries, serializes bundles, hands off to uploaders
|
||||
// Stage 3: Bundle assembler — distributes entries across N buckets by random_order
|
||||
// for randomized bundles, while reads stay in ID order for disk locality.
|
||||
go func() {
|
||||
defer close(uploadCh)
|
||||
var buf []BundleEntry
|
||||
buckets := make([][]BundleEntry, cfg.NumBuckets)
|
||||
bundleIndex := 0
|
||||
for entry := range entryCh {
|
||||
buf = append(buf, entry)
|
||||
if len(buf) >= cfg.EntriesPerBundle {
|
||||
chunk := make([]BundleEntry, cfg.EntriesPerBundle)
|
||||
copy(chunk, buf[:cfg.EntriesPerBundle])
|
||||
buf = buf[cfg.EntriesPerBundle:]
|
||||
|
||||
data, err := serializeBundle(chunk)
|
||||
for ae := range entryCh {
|
||||
b := int(ae.randomOrder * float64(cfg.NumBuckets))
|
||||
if b >= cfg.NumBuckets {
|
||||
b = cfg.NumBuckets - 1
|
||||
}
|
||||
buckets[b] = append(buckets[b], ae.entry)
|
||||
|
||||
if len(buckets[b]) >= cfg.EntriesPerBundle {
|
||||
data, err := serializeBundle(buckets[b])
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err)
|
||||
}
|
||||
uploadCh <- bundleJob{index: bundleIndex, data: data}
|
||||
bundleIndex++
|
||||
buckets[b] = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Roll up remaining partial buckets into full bundles
|
||||
var remainder []BundleEntry
|
||||
for i := range buckets {
|
||||
if len(buckets[i]) > 0 {
|
||||
remainder = append(remainder, buckets[i]...)
|
||||
buckets[i] = nil
|
||||
for len(remainder) >= cfg.EntriesPerBundle {
|
||||
data, err := serializeBundle(remainder[:cfg.EntriesPerBundle])
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err)
|
||||
}
|
||||
uploadCh <- bundleJob{index: bundleIndex, data: data}
|
||||
bundleIndex++
|
||||
remainder = remainder[cfg.EntriesPerBundle:]
|
||||
}
|
||||
}
|
||||
}
|
||||
// Final partial bundle
|
||||
if len(buf) > 0 {
|
||||
data, err := serializeBundle(buf)
|
||||
if len(remainder) > 0 {
|
||||
data, err := serializeBundle(remainder)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to serialize final bundle: %v", err)
|
||||
}
|
||||
uploadCh <- bundleJob{index: bundleIndex, data: data}
|
||||
bundleIndex++
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue