Compare commits

..

No commits in common. "df2eaa251c0cc8bd3d8f7cfb88dc8de0a4e9e461" and "79881fce4b40d2bdfe8ea0a29adf52e32a008b88" have entirely different histories.

3 changed files with 15 additions and 49 deletions

View file

@ -48,8 +48,6 @@ sudo dnf install -y \
unbound \ unbound \
jq \ jq \
htop \ htop \
iftop \
iotop \
tmux tmux
# --- Go --- # --- Go ---

View file

@ -13,14 +13,12 @@ type HostRow struct {
HtmlTitle string HtmlTitle string
IframeAllowed bool IframeAllowed bool
BestIconHash string BestIconHash string
RandomOrder float64
} }
// fetchHostsPage gets a page of hosts with titles, ordered by id for disk locality. // fetchHostsPage gets a page of hosts with titles, ordered by id for disk locality.
// random_order is included for bundle bucket assignment (randomized bundles).
func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]HostRow, error) { func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]HostRow, error) {
rows, err := pool.Query(ctx, ` rows, err := pool.Query(ctx, `
SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, ''), random_order SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, '')
FROM hosts FROM hosts
WHERE html_title IS NOT NULL AND id > $1 WHERE html_title IS NOT NULL AND id > $1
ORDER BY id ORDER BY id
@ -34,7 +32,7 @@ func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit
var hosts []HostRow var hosts []HostRow
for rows.Next() { for rows.Next() {
var h HostRow var h HostRow
if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash, &h.RandomOrder); err != nil { if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash); err != nil {
return nil, err return nil, err
} }
hosts = append(hosts, h) hosts = append(hosts, h)

View file

@ -18,7 +18,6 @@ type Config struct {
IconsDir string IconsDir string
SiteBucket string SiteBucket string
EntriesPerBundle int EntriesPerBundle int
NumBuckets int
Concurrency int Concurrency int
Uploaders int Uploaders int
DryRun bool DryRun bool
@ -46,7 +45,6 @@ func main() {
flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons") flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons")
flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site") flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site")
flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file") flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file")
flag.IntVar(&cfg.NumBuckets, "num-buckets", 10000, "Number of shuffle buckets for randomizing bundles")
flag.IntVar(&cfg.Concurrency, "concurrency", 40, "Concurrent icon conversions") flag.IntVar(&cfg.Concurrency, "concurrency", 40, "Concurrent icon conversions")
flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads") flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads")
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3")
@ -123,13 +121,8 @@ func main() {
data []byte data []byte
} }
type assemblerEntry struct {
entry BundleEntry
randomOrder float64
}
hostCh := make(chan HostRow, 50000) hostCh := make(chan HostRow, 50000)
entryCh := make(chan assemblerEntry, 50000) entryCh := make(chan BundleEntry, 50000)
uploadCh := make(chan bundleJob, cfg.Uploaders*2) uploadCh := make(chan bundleJob, cfg.Uploaders*2)
// Stage 1: DB fetcher — continuously fetches pages into hostCh // Stage 1: DB fetcher — continuously fetches pages into hostCh
@ -180,7 +173,7 @@ func main() {
} else { } else {
stats.BundledNoIcon.Add(1) stats.BundledNoIcon.Add(1)
} }
entryCh <- assemblerEntry{entry: entry, randomOrder: host.RandomOrder} entryCh <- entry
} }
}() }()
} }
@ -189,56 +182,33 @@ func main() {
close(entryCh) close(entryCh)
}() }()
// Stage 3: Bundle assembler — distributes entries across N buckets by random_order // Stage 3: Bundle assembler — collects entries, serializes bundles, hands off to uploaders
// for randomized bundles, while reads stay in ID order for disk locality.
go func() { go func() {
defer close(uploadCh) defer close(uploadCh)
buckets := make([][]BundleEntry, cfg.NumBuckets) var buf []BundleEntry
bundleIndex := 0 bundleIndex := 0
for entry := range entryCh {
buf = append(buf, entry)
if len(buf) >= cfg.EntriesPerBundle {
chunk := make([]BundleEntry, cfg.EntriesPerBundle)
copy(chunk, buf[:cfg.EntriesPerBundle])
buf = buf[cfg.EntriesPerBundle:]
for ae := range entryCh { data, err := serializeBundle(chunk)
b := int(ae.randomOrder * float64(cfg.NumBuckets))
if b >= cfg.NumBuckets {
b = cfg.NumBuckets - 1
}
buckets[b] = append(buckets[b], ae.entry)
if len(buckets[b]) >= cfg.EntriesPerBundle {
data, err := serializeBundle(buckets[b])
if err != nil { if err != nil {
log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err) log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err)
} }
uploadCh <- bundleJob{index: bundleIndex, data: data} uploadCh <- bundleJob{index: bundleIndex, data: data}
bundleIndex++ bundleIndex++
buckets[b] = nil
}
}
// Roll up remaining partial buckets into full bundles
var remainder []BundleEntry
for i := range buckets {
if len(buckets[i]) > 0 {
remainder = append(remainder, buckets[i]...)
buckets[i] = nil
for len(remainder) >= cfg.EntriesPerBundle {
data, err := serializeBundle(remainder[:cfg.EntriesPerBundle])
if err != nil {
log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err)
}
uploadCh <- bundleJob{index: bundleIndex, data: data}
bundleIndex++
remainder = remainder[cfg.EntriesPerBundle:]
}
} }
} }
// Final partial bundle // Final partial bundle
if len(remainder) > 0 { if len(buf) > 0 {
data, err := serializeBundle(remainder) data, err := serializeBundle(buf)
if err != nil { if err != nil {
log.Fatalf("Failed to serialize final bundle: %v", err) log.Fatalf("Failed to serialize final bundle: %v", err)
} }
uploadCh <- bundleJob{index: bundleIndex, data: data} uploadCh <- bundleJob{index: bundleIndex, data: data}
bundleIndex++
} }
}() }()