everytab/pipeline/05_bundle_gen/main.go

351 lines
10 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Config struct {
DBUrl string
IconsDir string
SiteBucket string
EntriesPerBundle int
NumBuckets int
Concurrency int
Uploaders int
DryRun bool
OutputDir string
Limit int
LogFile string
LogErrors bool
}
type Stats struct {
TotalHosts int
HostsWithIcon int
HostsNoIcon int
BundlesCreated int
ConvertErrors atomic.Int64
BundledWithIcon atomic.Int64
BundledNoIcon atomic.Int64
TotalBytes int64
StartedAt time.Time
}
func main() {
cfg := Config{}
flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)")
flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons")
flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site")
flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file")
flag.IntVar(&cfg.NumBuckets, "num-buckets", 10000, "Number of shuffle buckets for randomizing bundles")
flag.IntVar(&cfg.Concurrency, "concurrency", 40, "Concurrent icon conversions")
flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads")
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3")
flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode")
flag.IntVar(&cfg.Limit, "limit", 0, "Max hosts to process (0 = all)")
flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file")
flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file")
flag.Parse()
if cfg.DBUrl == "" {
fmt.Println("Usage: bundle_gen --db DATABASE_URL [OPTIONS]")
flag.PrintDefaults()
os.Exit(1)
}
ctx := context.Background()
// Init S3 (for uploading bundles)
if err := initS3(); err != nil {
log.Fatalf("Failed to init S3: %v", err)
}
// Init DB
pool, err := pgxpool.New(ctx, cfg.DBUrl)
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer pool.Close()
// Setup log file
var logWriter *LogWriter
if cfg.LogFile != "" {
logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors)
if err != nil {
log.Fatalf("Failed to open log file: %v", err)
}
defer logWriter.Close()
}
stats := &Stats{StartedAt: time.Now()}
// Count hosts
fmt.Println("=== Bundle Generator ===")
var totalHosts, hostsWithIcon int
err = pool.QueryRow(ctx, `SELECT COUNT(*) FROM hosts WHERE html_title IS NOT NULL`).Scan(&totalHosts)
if err != nil {
log.Fatalf("Failed to count hosts: %v", err)
}
err = pool.QueryRow(ctx, `SELECT COUNT(*) FROM hosts WHERE html_title IS NOT NULL AND best_icon_hash IS NOT NULL`).Scan(&hostsWithIcon)
if err != nil {
log.Fatalf("Failed to count icons: %v", err)
}
stats.TotalHosts = totalHosts
stats.HostsWithIcon = hostsWithIcon
stats.HostsNoIcon = totalHosts - hostsWithIcon
fmt.Printf("Total hosts: %d (with icon: %d, no icon: %d)\n", totalHosts, hostsWithIcon, totalHosts-hostsWithIcon)
fmt.Printf("Entries per bundle: %d\n", cfg.EntriesPerBundle)
fmt.Printf("Concurrency: %d\n", cfg.Concurrency)
fmt.Printf("Dry run: %v\n\n", cfg.DryRun)
if cfg.DryRun {
os.MkdirAll(cfg.OutputDir, 0755)
}
// Four-stage pipeline:
// [DB fetcher] → hostCh → [N converters] → entryCh → [bundle assembler] → uploadCh → [M uploaders]
// All stages run concurrently. Bundles are written in-place (overwriting previous run).
fmt.Println("Processing hosts and writing bundles...")
type bundleJob struct {
index int
data []byte
}
type assemblerEntry struct {
entry BundleEntry
randomOrder float64
}
hostCh := make(chan HostRow, 50000)
entryCh := make(chan assemblerEntry, 50000)
uploadCh := make(chan bundleJob, cfg.Uploaders*2)
// Stage 1: DB fetcher — interleaves hosts with icons (ordered by download time for
// disk locality) and hosts without icons (no disk reads needed), matching the natural
// ratio so bundles contain a representative mix.
go func() {
defer close(hostCh)
// Calculate the ratio: for every iconPageSize icon-hosts, fetch noIconPageSize no-icon hosts
iconPageSize := 50000
noIconRatio := float64(stats.HostsNoIcon) / float64(max(stats.HostsWithIcon, 1))
noIconPageSize := int(float64(iconPageSize) * noIconRatio)
if noIconPageSize < 1000 {
noIconPageSize = 1000
}
var lastDownloaded *time.Time
var lastIconID int64
var lastNoIconID int64
iconsDone := false
noIconsDone := false
fetched := 0
for !iconsDone || !noIconsDone {
// Fetch icon-hosts (ordered by download time for disk locality)
if !iconsDone {
limit := iconPageSize
if cfg.Limit > 0 {
remaining := cfg.Limit - fetched
if remaining <= 0 {
break
}
if limit > remaining {
limit = remaining
}
}
fetchStart := time.Now()
hosts, err := fetchHostsWithIcons(ctx, pool, lastDownloaded, lastIconID, limit)
if err != nil {
log.Fatalf("Failed to fetch icon hosts: %v", err)
}
if len(hosts) == 0 {
iconsDone = true
} else {
last := hosts[len(hosts)-1]
lastDownloaded = last.IconDownloadedAt
lastIconID = last.ID
fmt.Printf("[fetcher] %d icon hosts in %dms (hostCh: %d/%d)\n",
len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh))
for _, h := range hosts {
hostCh <- h
}
fetched += len(hosts)
}
}
// Fetch proportional no-icon hosts (interleaved for balanced bundles)
if !noIconsDone {
limit := noIconPageSize
if cfg.Limit > 0 {
remaining := cfg.Limit - fetched
if remaining <= 0 {
break
}
if limit > remaining {
limit = remaining
}
}
fetchStart := time.Now()
hosts, err := fetchHostsWithoutIcons(ctx, pool, lastNoIconID, limit)
if err != nil {
log.Fatalf("Failed to fetch no-icon hosts: %v", err)
}
if len(hosts) == 0 {
noIconsDone = true
} else {
lastNoIconID = hosts[len(hosts)-1].ID
fmt.Printf("[fetcher] %d no-icon hosts in %dms (hostCh: %d/%d)\n",
len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh))
for _, h := range hosts {
hostCh <- h
}
fetched += len(hosts)
}
}
}
}()
// Stage 2: Converter workers — read hosts, convert icons, emit entries
var converterWg sync.WaitGroup
for i := 0; i < cfg.Concurrency; i++ {
converterWg.Add(1)
go func() {
defer converterWg.Done()
for host := range hostCh {
entry := buildEntry(host, cfg.IconsDir, logWriter, stats)
if entry.Icon != "" {
stats.BundledWithIcon.Add(1)
} else {
stats.BundledNoIcon.Add(1)
}
entryCh <- assemblerEntry{entry: entry, randomOrder: host.RandomOrder}
}
}()
}
go func() {
converterWg.Wait()
close(entryCh)
}()
// Stage 3: Bundle assembler — distributes entries across N buckets by random_order
// for randomized bundles, while reads stay in ID order for disk locality.
go func() {
defer close(uploadCh)
buckets := make([][]BundleEntry, cfg.NumBuckets)
bundleIndex := 0
for ae := range entryCh {
b := int(ae.randomOrder * float64(cfg.NumBuckets))
if b >= cfg.NumBuckets {
b = cfg.NumBuckets - 1
}
buckets[b] = append(buckets[b], ae.entry)
if len(buckets[b]) >= cfg.EntriesPerBundle {
data, err := serializeBundle(buckets[b])
if err != nil {
log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err)
}
uploadCh <- bundleJob{index: bundleIndex, data: data}
bundleIndex++
buckets[b] = nil
}
}
// Roll up remaining partial buckets into full bundles
var remainder []BundleEntry
for i := range buckets {
if len(buckets[i]) > 0 {
remainder = append(remainder, buckets[i]...)
buckets[i] = nil
for len(remainder) >= cfg.EntriesPerBundle {
data, err := serializeBundle(remainder[:cfg.EntriesPerBundle])
if err != nil {
log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err)
}
uploadCh <- bundleJob{index: bundleIndex, data: data}
bundleIndex++
remainder = remainder[cfg.EntriesPerBundle:]
}
}
}
// Final partial bundle
if len(remainder) > 0 {
data, err := serializeBundle(remainder)
if err != nil {
log.Fatalf("Failed to serialize final bundle: %v", err)
}
uploadCh <- bundleJob{index: bundleIndex, data: data}
bundleIndex++
}
}()
// Stage 4: Upload workers — write bundles to S3 or local disk
var uploadWg sync.WaitGroup
var bundlesCreated atomic.Int64
var totalBytes atomic.Int64
for i := 0; i < cfg.Uploaders; i++ {
uploadWg.Add(1)
go func() {
defer uploadWg.Done()
for job := range uploadCh {
var err error
if cfg.DryRun {
err = writeBundleLocal(cfg.OutputDir, job.index, job.data)
} else {
err = writeBundleS3(cfg.SiteBucket, job.index, job.data)
}
if err != nil {
log.Fatalf("Failed to write bundle %d: %v", job.index, err)
}
n := bundlesCreated.Add(1)
estimatedTotal := (totalHosts + cfg.EntriesPerBundle - 1) / cfg.EntriesPerBundle
if n%100 == 0 {
fmt.Printf(" %d/%d bundles\n", n, estimatedTotal)
}
logLine := fmt.Sprintf("bundle: %06d.json %dKB", job.index, len(job.data)/1024)
if logWriter != nil {
logWriter.Write(logLine, false)
}
totalBytes.Add(int64(len(job.data)))
}
}()
}
uploadWg.Wait()
stats.BundlesCreated = int(bundlesCreated.Load())
stats.TotalBytes = totalBytes.Load()
// Summary
duration := time.Since(stats.StartedAt)
fmt.Printf("\n=== Summary ===\n")
fmt.Printf("Duration: %s\n", duration.Round(time.Second))
fmt.Printf("Total hosts: %d\n", stats.TotalHosts)
fmt.Printf("Hosts with icon: %d\n", stats.HostsWithIcon)
fmt.Printf("Hosts without icon: %d\n", stats.HostsNoIcon)
fmt.Printf("Bundled with icon: %d\n", stats.BundledWithIcon.Load())
fmt.Printf("Bundled without icon: %d\n", stats.BundledNoIcon.Load())
fmt.Printf("Convert errors: %d\n", stats.ConvertErrors.Load())
fmt.Printf("Bundles created: %d\n", stats.BundlesCreated)
fmt.Printf("Total size: %.1f MB\n", float64(stats.TotalBytes)/(1024*1024))
fmt.Printf("Avg bundle size: %.0f KB\n", float64(stats.TotalBytes)/float64(max(stats.BundlesCreated, 1))/1024)
fmt.Printf("TOTAL_BUNDLES = %d (bake this into the frontend)\n", stats.BundlesCreated)
writeStats(stats)
}