added bundle generation
This commit is contained in:
parent
ca06a91dc6
commit
f89883e745
8 changed files with 536 additions and 0 deletions
188
pipeline/05_bundle_gen/main.go
Normal file
188
pipeline/05_bundle_gen/main.go
Normal file
|
|
@ -0,0 +1,188 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
DBUrl string
|
||||
IconsBucket string
|
||||
SiteBucket string
|
||||
EntriesPerBundle int
|
||||
Concurrency int
|
||||
DryRun bool
|
||||
OutputDir string
|
||||
Limit int
|
||||
LogFile string
|
||||
LogErrors bool
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
TotalHosts int
|
||||
HostsWithIcon int
|
||||
HostsNoIcon int
|
||||
BundlesCreated int
|
||||
ConvertErrors atomic.Int64
|
||||
TotalBytes int64
|
||||
StartedAt time.Time
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := Config{}
|
||||
flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)")
|
||||
flag.StringVar(&cfg.IconsBucket, "icons-bucket", "everytab-icons", "S3 bucket with downloaded icons")
|
||||
flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site")
|
||||
flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file")
|
||||
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3")
|
||||
flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode")
|
||||
flag.IntVar(&cfg.Limit, "limit", 0, "Max hosts to process (0 = all)")
|
||||
flag.IntVar(&cfg.Concurrency, "concurrency", 50, "Concurrent icon conversions")
|
||||
flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file")
|
||||
flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file")
|
||||
flag.Parse()
|
||||
|
||||
if cfg.DBUrl == "" {
|
||||
fmt.Println("Usage: bundle_gen --db DATABASE_URL [OPTIONS]")
|
||||
flag.PrintDefaults()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Init S3
|
||||
if err := initS3(); err != nil {
|
||||
log.Fatalf("Failed to init S3: %v", err)
|
||||
}
|
||||
|
||||
// Init DB
|
||||
pool, err := pgxpool.New(ctx, cfg.DBUrl)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to database: %v", err)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
||||
// Setup log file
|
||||
var logWriter *LogWriter
|
||||
if cfg.LogFile != "" {
|
||||
logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to open log file: %v", err)
|
||||
}
|
||||
defer logWriter.Close()
|
||||
}
|
||||
|
||||
stats := &Stats{StartedAt: time.Now()}
|
||||
|
||||
// Fetch all qualifying hosts (randomized)
|
||||
fmt.Println("=== Bundle Generator ===")
|
||||
fmt.Println("Querying hosts...")
|
||||
|
||||
hosts, err := fetchHosts(ctx, pool, cfg.Limit)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to fetch hosts: %v", err)
|
||||
}
|
||||
|
||||
stats.TotalHosts = len(hosts)
|
||||
for _, h := range hosts {
|
||||
if h.BestIconS3Key != "" {
|
||||
stats.HostsWithIcon++
|
||||
} else {
|
||||
stats.HostsNoIcon++
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("Total hosts: %d (with icon: %d, no icon: %d)\n", stats.TotalHosts, stats.HostsWithIcon, stats.HostsNoIcon)
|
||||
fmt.Printf("Entries per bundle: %d\n", cfg.EntriesPerBundle)
|
||||
fmt.Printf("Dry run: %v\n\n", cfg.DryRun)
|
||||
|
||||
if cfg.DryRun {
|
||||
os.MkdirAll(cfg.OutputDir, 0755)
|
||||
}
|
||||
|
||||
// Process hosts into bundle entries (concurrently for S3 downloads)
|
||||
fmt.Printf("Converting icons and building entries (concurrency: %d)...\n", cfg.Concurrency)
|
||||
entries := make([]BundleEntry, len(hosts))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
sem := make(chan struct{}, cfg.Concurrency)
|
||||
var processed atomic.Int64
|
||||
|
||||
for i, host := range hosts {
|
||||
wg.Add(1)
|
||||
sem <- struct{}{}
|
||||
go func(idx int, h HostRow) {
|
||||
defer wg.Done()
|
||||
defer func() { <-sem }()
|
||||
entries[idx] = buildEntry(ctx, h, cfg.IconsBucket, logWriter, stats)
|
||||
n := processed.Add(1)
|
||||
if n%5000 == 0 {
|
||||
fmt.Printf(" processed %d/%d hosts\n", n, len(hosts))
|
||||
}
|
||||
}(i, host)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Chunk into bundles and write
|
||||
fmt.Println("\nWriting bundles...")
|
||||
bundleCount := 0
|
||||
var totalBytes int64
|
||||
|
||||
for i := 0; i < len(entries); i += cfg.EntriesPerBundle {
|
||||
end := i + cfg.EntriesPerBundle
|
||||
if end > len(entries) {
|
||||
end = len(entries)
|
||||
}
|
||||
|
||||
chunk := entries[i:end]
|
||||
bundleIndex := bundleCount
|
||||
data, err := serializeBundle(chunk)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err)
|
||||
}
|
||||
|
||||
if cfg.DryRun {
|
||||
err = writeBundleLocal(cfg.OutputDir, bundleIndex, data)
|
||||
} else {
|
||||
err = writeBundleS3(ctx, cfg.SiteBucket, bundleIndex, data)
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to write bundle %d: %v", bundleIndex, err)
|
||||
}
|
||||
|
||||
logLine := fmt.Sprintf("bundle: %04d.json %d entries %dKB", bundleIndex, len(chunk), len(data)/1024)
|
||||
fmt.Println(logLine)
|
||||
if logWriter != nil {
|
||||
logWriter.Write(logLine, false)
|
||||
}
|
||||
|
||||
totalBytes += int64(len(data))
|
||||
bundleCount++
|
||||
}
|
||||
|
||||
stats.BundlesCreated = bundleCount
|
||||
stats.TotalBytes = totalBytes
|
||||
|
||||
// Summary
|
||||
duration := time.Since(stats.StartedAt)
|
||||
fmt.Printf("\n=== Summary ===\n")
|
||||
fmt.Printf("Duration: %s\n", duration.Round(time.Second))
|
||||
fmt.Printf("Total hosts: %d\n", stats.TotalHosts)
|
||||
fmt.Printf("Hosts with icon: %d\n", stats.HostsWithIcon)
|
||||
fmt.Printf("Hosts without icon: %d\n", stats.HostsNoIcon)
|
||||
fmt.Printf("Convert errors: %d\n", stats.ConvertErrors.Load())
|
||||
fmt.Printf("Bundles created: %d\n", stats.BundlesCreated)
|
||||
fmt.Printf("Total size: %.1f MB\n", float64(stats.TotalBytes)/(1024*1024))
|
||||
fmt.Printf("Avg bundle size: %.0f KB\n", float64(stats.TotalBytes)/float64(stats.BundlesCreated)/1024)
|
||||
fmt.Printf("TOTAL_BUNDLES = %d (bake this into the frontend)\n", stats.BundlesCreated)
|
||||
|
||||
writeStats(stats)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue