fixed oom in bundle_gen and added randomOrder, still need a full redesign
This commit is contained in:
parent
cf17fc42b1
commit
e6d5d5175c
2 changed files with 132 additions and 100 deletions
|
|
@ -7,57 +7,36 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type HostRow struct {
|
type HostRow struct {
|
||||||
|
ID int64
|
||||||
Hostname string
|
Hostname string
|
||||||
Protocol string
|
Protocol string
|
||||||
HtmlTitle string
|
HtmlTitle string
|
||||||
IframeAllowed bool
|
IframeAllowed bool
|
||||||
BestIconS3Key string
|
BestIconS3Key string
|
||||||
|
RandomOrder float32
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchHosts gets all hosts with titles, randomized order.
|
// fetchHostsPage gets a page of hosts with titles, ordered by random_order for shuffled bundles.
|
||||||
func fetchHosts(ctx context.Context, pool *pgxpool.Pool, limit int) ([]HostRow, error) {
|
func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastRandom float32, limit int) ([]HostRow, error) {
|
||||||
query := `
|
rows, err := pool.Query(ctx, `
|
||||||
SELECT hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_s3_key, '')
|
SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_s3_key, ''), random_order
|
||||||
FROM hosts
|
FROM hosts
|
||||||
WHERE html_title IS NOT NULL
|
WHERE html_title IS NOT NULL AND random_order > $1
|
||||||
ORDER BY random()
|
ORDER BY random_order
|
||||||
`
|
LIMIT $2
|
||||||
if limit > 0 {
|
`, lastRandom, limit)
|
||||||
query += " LIMIT $1"
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer rows.Close()
|
||||||
var rows interface{ Query(context.Context, string, ...interface{}) (interface{ Close(); Next() bool; Scan(...interface{}) error; Err() error }, error) }
|
|
||||||
_ = rows // unused, using pool directly
|
|
||||||
|
|
||||||
var hosts []HostRow
|
var hosts []HostRow
|
||||||
|
for rows.Next() {
|
||||||
if limit > 0 {
|
|
||||||
pgRows, err := pool.Query(ctx, query, limit)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer pgRows.Close()
|
|
||||||
for pgRows.Next() {
|
|
||||||
var h HostRow
|
var h HostRow
|
||||||
if err := pgRows.Scan(&h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconS3Key); err != nil {
|
if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconS3Key, &h.RandomOrder); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
hosts = append(hosts, h)
|
hosts = append(hosts, h)
|
||||||
}
|
}
|
||||||
return hosts, pgRows.Err()
|
return hosts, rows.Err()
|
||||||
}
|
|
||||||
|
|
||||||
pgRows, err := pool.Query(ctx, query)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer pgRows.Close()
|
|
||||||
for pgRows.Next() {
|
|
||||||
var h HostRow
|
|
||||||
if err := pgRows.Scan(&h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconS3Key); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
hosts = append(hosts, h)
|
|
||||||
}
|
|
||||||
return hosts, pgRows.Err()
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -42,10 +42,10 @@ func main() {
|
||||||
flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons")
|
flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons")
|
||||||
flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site")
|
flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site")
|
||||||
flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file")
|
flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file")
|
||||||
|
flag.IntVar(&cfg.Concurrency, "concurrency", 200, "Concurrent icon conversions")
|
||||||
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3")
|
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3")
|
||||||
flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode")
|
flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode")
|
||||||
flag.IntVar(&cfg.Limit, "limit", 0, "Max hosts to process (0 = all)")
|
flag.IntVar(&cfg.Limit, "limit", 0, "Max hosts to process (0 = all)")
|
||||||
flag.IntVar(&cfg.Concurrency, "concurrency", 50, "Concurrent icon conversions")
|
|
||||||
flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file")
|
flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file")
|
||||||
flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file")
|
flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
@ -58,7 +58,7 @@ func main() {
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Init S3
|
// Init S3 (for uploading bundles)
|
||||||
if err := initS3(); err != nil {
|
if err := initS3(); err != nil {
|
||||||
log.Fatalf("Failed to init S3: %v", err)
|
log.Fatalf("Failed to init S3: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -82,39 +82,76 @@ func main() {
|
||||||
|
|
||||||
stats := &Stats{StartedAt: time.Now()}
|
stats := &Stats{StartedAt: time.Now()}
|
||||||
|
|
||||||
// Fetch all qualifying hosts (randomized)
|
// Count hosts
|
||||||
fmt.Println("=== Bundle Generator ===")
|
fmt.Println("=== Bundle Generator ===")
|
||||||
fmt.Println("Querying hosts...")
|
var totalHosts, hostsWithIcon int
|
||||||
|
err = pool.QueryRow(ctx, `SELECT COUNT(*) FROM hosts WHERE html_title IS NOT NULL`).Scan(&totalHosts)
|
||||||
hosts, err := fetchHosts(ctx, pool, cfg.Limit)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to fetch hosts: %v", err)
|
log.Fatalf("Failed to count hosts: %v", err)
|
||||||
|
}
|
||||||
|
err = pool.QueryRow(ctx, `SELECT COUNT(*) FROM hosts WHERE html_title IS NOT NULL AND best_icon_s3_key IS NOT NULL`).Scan(&hostsWithIcon)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to count icons: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stats.TotalHosts = len(hosts)
|
stats.TotalHosts = totalHosts
|
||||||
for _, h := range hosts {
|
stats.HostsWithIcon = hostsWithIcon
|
||||||
if h.BestIconS3Key != "" {
|
stats.HostsNoIcon = totalHosts - hostsWithIcon
|
||||||
stats.HostsWithIcon++
|
|
||||||
} else {
|
|
||||||
stats.HostsNoIcon++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("Total hosts: %d (with icon: %d, no icon: %d)\n", stats.TotalHosts, stats.HostsWithIcon, stats.HostsNoIcon)
|
fmt.Printf("Total hosts: %d (with icon: %d, no icon: %d)\n", totalHosts, hostsWithIcon, totalHosts-hostsWithIcon)
|
||||||
fmt.Printf("Entries per bundle: %d\n", cfg.EntriesPerBundle)
|
fmt.Printf("Entries per bundle: %d\n", cfg.EntriesPerBundle)
|
||||||
|
fmt.Printf("Concurrency: %d\n", cfg.Concurrency)
|
||||||
fmt.Printf("Dry run: %v\n\n", cfg.DryRun)
|
fmt.Printf("Dry run: %v\n\n", cfg.DryRun)
|
||||||
|
|
||||||
if cfg.DryRun {
|
if cfg.DryRun {
|
||||||
os.MkdirAll(cfg.OutputDir, 0755)
|
os.MkdirAll(cfg.OutputDir, 0755)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process hosts into bundle entries (concurrently for S3 downloads)
|
// Clean old bundles before writing new ones
|
||||||
fmt.Printf("Converting icons and building entries (concurrency: %d)...\n", cfg.Concurrency)
|
if !cfg.DryRun {
|
||||||
entries := make([]BundleEntry, len(hosts))
|
fmt.Println("Cleaning old bundles from S3...")
|
||||||
|
if err := s3DeletePrefix(cfg.SiteBucket, "tabs/"); err != nil {
|
||||||
|
log.Fatalf("Failed to clean old bundles: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream hosts from DB in pages, convert icons, write bundles incrementally
|
||||||
|
fmt.Println("Processing hosts and writing bundles...")
|
||||||
|
|
||||||
|
bundleCount := 0
|
||||||
|
var totalBytes int64
|
||||||
|
var lastRandom float32 = -1
|
||||||
|
pageSize := cfg.EntriesPerBundle * 50 // fetch 50 bundles worth at a time
|
||||||
|
var entryBuf []BundleEntry
|
||||||
|
hostsProcessed := 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
// Fetch a page of hosts
|
||||||
|
limit := pageSize
|
||||||
|
if cfg.Limit > 0 {
|
||||||
|
remaining := cfg.Limit - hostsProcessed
|
||||||
|
if remaining <= 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if limit > remaining {
|
||||||
|
limit = remaining
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
hosts, err := fetchHostsPage(ctx, pool, lastRandom, limit)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to fetch hosts: %v", err)
|
||||||
|
}
|
||||||
|
if len(hosts) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
lastRandom = hosts[len(hosts)-1].RandomOrder
|
||||||
|
hostsProcessed += len(hosts)
|
||||||
|
|
||||||
|
// Convert icons concurrently for this page
|
||||||
|
pageEntries := make([]BundleEntry, len(hosts))
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
sem := make(chan struct{}, cfg.Concurrency)
|
sem := make(chan struct{}, cfg.Concurrency)
|
||||||
var processed atomic.Int64
|
|
||||||
|
|
||||||
for i, host := range hosts {
|
for i, host := range hosts {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
@ -122,51 +159,60 @@ func main() {
|
||||||
go func(idx int, h HostRow) {
|
go func(idx int, h HostRow) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
defer func() { <-sem }()
|
defer func() { <-sem }()
|
||||||
entries[idx] = buildEntry(h, cfg.IconsDir, logWriter, stats)
|
pageEntries[idx] = buildEntry(h, cfg.IconsDir, logWriter, stats)
|
||||||
n := processed.Add(1)
|
|
||||||
if n%5000 == 0 {
|
|
||||||
fmt.Printf(" processed %d/%d hosts\n", n, len(hosts))
|
|
||||||
}
|
|
||||||
}(i, host)
|
}(i, host)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
// Clean old bundles before writing new ones (avoids orphans if count changed)
|
entryBuf = append(entryBuf, pageEntries...)
|
||||||
if !cfg.DryRun {
|
|
||||||
fmt.Println("\nCleaning old bundles from S3...")
|
|
||||||
if err := s3DeletePrefix(cfg.SiteBucket, "tabs/"); err != nil {
|
|
||||||
log.Fatalf("Failed to clean old bundles: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Chunk into bundles and write
|
// Write complete bundles from the buffer
|
||||||
fmt.Println("Writing bundles...")
|
for len(entryBuf) >= cfg.EntriesPerBundle {
|
||||||
bundleCount := 0
|
chunk := entryBuf[:cfg.EntriesPerBundle]
|
||||||
var totalBytes int64
|
entryBuf = entryBuf[cfg.EntriesPerBundle:]
|
||||||
|
|
||||||
for i := 0; i < len(entries); i += cfg.EntriesPerBundle {
|
|
||||||
end := i + cfg.EntriesPerBundle
|
|
||||||
if end > len(entries) {
|
|
||||||
end = len(entries)
|
|
||||||
}
|
|
||||||
|
|
||||||
chunk := entries[i:end]
|
|
||||||
bundleIndex := bundleCount
|
|
||||||
data, err := serializeBundle(chunk)
|
data, err := serializeBundle(chunk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err)
|
log.Fatalf("Failed to serialize bundle %d: %v", bundleCount, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.DryRun {
|
if cfg.DryRun {
|
||||||
err = writeBundleLocal(cfg.OutputDir, bundleIndex, data)
|
err = writeBundleLocal(cfg.OutputDir, bundleCount, data)
|
||||||
} else {
|
} else {
|
||||||
err = writeBundleS3(cfg.SiteBucket, bundleIndex, data)
|
err = writeBundleS3(cfg.SiteBucket, bundleCount, data)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to write bundle %d: %v", bundleIndex, err)
|
log.Fatalf("Failed to write bundle %d: %v", bundleCount, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logLine := fmt.Sprintf("bundle: %04d.json %d entries %dKB", bundleIndex, len(chunk), len(data)/1024)
|
logLine := fmt.Sprintf("bundle: %04d.json %d entries %dKB", bundleCount, len(chunk), len(data)/1024)
|
||||||
|
fmt.Println(logLine)
|
||||||
|
if logWriter != nil {
|
||||||
|
logWriter.Write(logLine, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
totalBytes += int64(len(data))
|
||||||
|
bundleCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write final partial bundle
|
||||||
|
if len(entryBuf) > 0 {
|
||||||
|
data, err := serializeBundle(entryBuf)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to serialize final bundle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.DryRun {
|
||||||
|
err = writeBundleLocal(cfg.OutputDir, bundleCount, data)
|
||||||
|
} else {
|
||||||
|
err = writeBundleS3(cfg.SiteBucket, bundleCount, data)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to write final bundle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logLine := fmt.Sprintf("bundle: %04d.json %d entries %dKB", bundleCount, len(entryBuf), len(data)/1024)
|
||||||
fmt.Println(logLine)
|
fmt.Println(logLine)
|
||||||
if logWriter != nil {
|
if logWriter != nil {
|
||||||
logWriter.Write(logLine, false)
|
logWriter.Write(logLine, false)
|
||||||
|
|
@ -189,8 +235,15 @@ func main() {
|
||||||
fmt.Printf("Convert errors: %d\n", stats.ConvertErrors.Load())
|
fmt.Printf("Convert errors: %d\n", stats.ConvertErrors.Load())
|
||||||
fmt.Printf("Bundles created: %d\n", stats.BundlesCreated)
|
fmt.Printf("Bundles created: %d\n", stats.BundlesCreated)
|
||||||
fmt.Printf("Total size: %.1f MB\n", float64(stats.TotalBytes)/(1024*1024))
|
fmt.Printf("Total size: %.1f MB\n", float64(stats.TotalBytes)/(1024*1024))
|
||||||
fmt.Printf("Avg bundle size: %.0f KB\n", float64(stats.TotalBytes)/float64(stats.BundlesCreated)/1024)
|
fmt.Printf("Avg bundle size: %.0f KB\n", float64(stats.TotalBytes)/float64(max(stats.BundlesCreated, 1))/1024)
|
||||||
fmt.Printf("TOTAL_BUNDLES = %d (bake this into the frontend)\n", stats.BundlesCreated)
|
fmt.Printf("TOTAL_BUNDLES = %d (bake this into the frontend)\n", stats.BundlesCreated)
|
||||||
|
|
||||||
writeStats(stats)
|
writeStats(stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func max(a, b int) int {
|
||||||
|
if a > b {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue