update bundle gen to use channels and goroutines to saturate disk and not block on db access + bundle coalesing and uploading
This commit is contained in:
parent
902928235c
commit
081866f62e
2 changed files with 137 additions and 117 deletions
|
|
@ -297,17 +297,21 @@ Uses `DISTINCT ON (host_id)` for efficient single-pass selection. See `pipeline/
|
||||||
|
|
||||||
**Input:** All hosts where `html_title IS NOT NULL` (include hosts without icons)
|
**Input:** All hosts where `html_title IS NOT NULL` (include hosts without icons)
|
||||||
|
|
||||||
**Process:**
|
**Architecture:** Four-stage pipeline with all stages running concurrently:
|
||||||
1. Stream hosts from RDS in pages (keyset pagination on `random_order` column for shuffled output)
|
|
||||||
2. For each page, concurrently convert icons (configurable concurrency, default 200):
|
```
|
||||||
- Read icon from local disk at `{icons_dir}/ab/cd/ef/{hash}`
|
[DB fetcher] → hostCh → [N converters] → entryCh → [bundle assembler] → uploadCh → [M uploaders]
|
||||||
- Decode the image via Go's `image.Decode` (handles PNG, GIF, JPEG, WebP, ICO via registered decoders)
|
```
|
||||||
- SVGs are excluded (no rasterizer) — these hosts appear without icons
|
|
||||||
|
1. **DB fetcher** (1 goroutine): continuously fetches pages of hosts via keyset pagination on `random_order`. Feeds hosts into `hostCh`. Never waits for downstream stages.
|
||||||
|
2. **Converter workers** (N goroutines, default 20): read hosts from `hostCh`, read icon from disk, decode, re-encode as PNG, base64-encode, emit `BundleEntry` to `entryCh`. CPU-bound — default tuned to ~5x core count on c5.xlarge (4 vCPUs).
|
||||||
|
- Decode via Go's `image.Decode` (handles PNG, GIF, JPEG, WebP, BMP, ICO via registered decoders)
|
||||||
|
- SVGs excluded (no rasterizer) — these hosts appear without icons
|
||||||
- Icons >128px downscaled to 32x32 (nearest-neighbor). Icons ≤128px kept as-is.
|
- Icons >128px downscaled to 32x32 (nearest-neighbor). Icons ≤128px kept as-is.
|
||||||
- Re-encode as PNG, base64-encode
|
3. **Bundle assembler** (1 goroutine): collects entries from `entryCh`. Every 120 entries (configurable), serializes as JSON and sends to `uploadCh`. Hosts without icons included with `"icon": ""`.
|
||||||
3. Converted entries accumulate in a buffer. Every 120 entries (configurable), serialize as JSON and upload to S3
|
4. **Upload workers** (M goroutines, default 10): write bundles to S3 (or local disk in dry-run mode). I/O-bound — multiple uploads in flight hides S3 PUT latency (~50-100ms each).
|
||||||
4. Hosts without icons: included with `"icon": ""`
|
|
||||||
5. Final partial bundle written at end
|
Bundles are written in-place (overwriting previous run). No delete-first step, so the live site always has valid data even if bundle gen crashes midway. The frontend's `TOTAL_BUNDLES` constant ensures only valid bundle indices are requested.
|
||||||
|
|
||||||
**Output:**
|
**Output:**
|
||||||
- `tabs/0000.json` through `tabs/{M}.json` in S3 `everytab-site`
|
- `tabs/0000.json` through `tabs/{M}.json` in S3 `everytab-site`
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ type Config struct {
|
||||||
SiteBucket string
|
SiteBucket string
|
||||||
EntriesPerBundle int
|
EntriesPerBundle int
|
||||||
Concurrency int
|
Concurrency int
|
||||||
|
Uploaders int
|
||||||
DryRun bool
|
DryRun bool
|
||||||
OutputDir string
|
OutputDir string
|
||||||
Limit int
|
Limit int
|
||||||
|
|
@ -44,7 +45,8 @@ func main() {
|
||||||
flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons")
|
flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons")
|
||||||
flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site")
|
flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site")
|
||||||
flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file")
|
flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file")
|
||||||
flag.IntVar(&cfg.Concurrency, "concurrency", 200, "Concurrent icon conversions")
|
flag.IntVar(&cfg.Concurrency, "concurrency", 20, "Concurrent icon conversions")
|
||||||
|
flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads")
|
||||||
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3")
|
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3")
|
||||||
flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode")
|
flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode")
|
||||||
flag.IntVar(&cfg.Limit, "limit", 0, "Max hosts to process (0 = all)")
|
flag.IntVar(&cfg.Limit, "limit", 0, "Max hosts to process (0 = all)")
|
||||||
|
|
@ -109,125 +111,139 @@ func main() {
|
||||||
os.MkdirAll(cfg.OutputDir, 0755)
|
os.MkdirAll(cfg.OutputDir, 0755)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream hosts from DB in pages, convert icons, write bundles incrementally
|
// Four-stage pipeline:
|
||||||
// Bundles are written in-place (overwriting previous run). No delete-first step,
|
// [DB fetcher] → hostCh → [N converters] → entryCh → [bundle assembler] → uploadCh → [M uploaders]
|
||||||
// so the live site always has valid data even if bundle gen crashes midway.
|
// All stages run concurrently. Bundles are written in-place (overwriting previous run).
|
||||||
fmt.Println("Processing hosts and writing bundles...")
|
fmt.Println("Processing hosts and writing bundles...")
|
||||||
|
|
||||||
bundleCount := 0
|
type bundleJob struct {
|
||||||
var totalBytes int64
|
index int
|
||||||
var lastRandom float64 = -1
|
data []byte
|
||||||
pageSize := cfg.EntriesPerBundle * 50 // fetch 50 bundles worth at a time
|
}
|
||||||
var entryBuf []BundleEntry
|
|
||||||
hostsProcessed := 0
|
|
||||||
|
|
||||||
for {
|
hostCh := make(chan HostRow, cfg.EntriesPerBundle*10)
|
||||||
// Fetch a page of hosts
|
entryCh := make(chan BundleEntry, cfg.EntriesPerBundle*10)
|
||||||
limit := pageSize
|
uploadCh := make(chan bundleJob, cfg.Uploaders*2)
|
||||||
if cfg.Limit > 0 {
|
|
||||||
remaining := cfg.Limit - hostsProcessed
|
// Stage 1: DB fetcher — continuously fetches pages into hostCh
|
||||||
if remaining <= 0 {
|
go func() {
|
||||||
|
defer close(hostCh)
|
||||||
|
var lastRandom float64 = -1
|
||||||
|
pageSize := cfg.EntriesPerBundle * 50
|
||||||
|
fetched := 0
|
||||||
|
for {
|
||||||
|
limit := pageSize
|
||||||
|
if cfg.Limit > 0 {
|
||||||
|
remaining := cfg.Limit - fetched
|
||||||
|
if remaining <= 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if limit > remaining {
|
||||||
|
limit = remaining
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hosts, err := fetchHostsPage(ctx, pool, lastRandom, limit)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to fetch hosts: %v", err)
|
||||||
|
}
|
||||||
|
if len(hosts) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if limit > remaining {
|
lastRandom = hosts[len(hosts)-1].RandomOrder
|
||||||
limit = remaining
|
for _, h := range hosts {
|
||||||
|
hostCh <- h
|
||||||
|
}
|
||||||
|
fetched += len(hosts)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Stage 2: Converter workers — read hosts, convert icons, emit entries
|
||||||
|
var converterWg sync.WaitGroup
|
||||||
|
for i := 0; i < cfg.Concurrency; i++ {
|
||||||
|
converterWg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer converterWg.Done()
|
||||||
|
for host := range hostCh {
|
||||||
|
entry := buildEntry(host, cfg.IconsDir, logWriter, stats)
|
||||||
|
if entry.Icon != "" {
|
||||||
|
stats.BundledWithIcon.Add(1)
|
||||||
|
} else {
|
||||||
|
stats.BundledNoIcon.Add(1)
|
||||||
|
}
|
||||||
|
entryCh <- entry
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
converterWg.Wait()
|
||||||
|
close(entryCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Stage 3: Bundle assembler — collects entries, serializes bundles, hands off to uploaders
|
||||||
|
go func() {
|
||||||
|
defer close(uploadCh)
|
||||||
|
var buf []BundleEntry
|
||||||
|
bundleIndex := 0
|
||||||
|
for entry := range entryCh {
|
||||||
|
buf = append(buf, entry)
|
||||||
|
if len(buf) >= cfg.EntriesPerBundle {
|
||||||
|
chunk := make([]BundleEntry, cfg.EntriesPerBundle)
|
||||||
|
copy(chunk, buf[:cfg.EntriesPerBundle])
|
||||||
|
buf = buf[cfg.EntriesPerBundle:]
|
||||||
|
|
||||||
|
data, err := serializeBundle(chunk)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to serialize bundle %d: %v", bundleIndex, err)
|
||||||
|
}
|
||||||
|
uploadCh <- bundleJob{index: bundleIndex, data: data}
|
||||||
|
bundleIndex++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Final partial bundle
|
||||||
hosts, err := fetchHostsPage(ctx, pool, lastRandom, limit)
|
if len(buf) > 0 {
|
||||||
if err != nil {
|
data, err := serializeBundle(buf)
|
||||||
log.Fatalf("Failed to fetch hosts: %v", err)
|
|
||||||
}
|
|
||||||
if len(hosts) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
lastRandom = hosts[len(hosts)-1].RandomOrder
|
|
||||||
hostsProcessed += len(hosts)
|
|
||||||
|
|
||||||
// Convert icons concurrently for this page
|
|
||||||
pageEntries := make([]BundleEntry, len(hosts))
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
sem := make(chan struct{}, cfg.Concurrency)
|
|
||||||
|
|
||||||
for i, host := range hosts {
|
|
||||||
wg.Add(1)
|
|
||||||
sem <- struct{}{}
|
|
||||||
go func(idx int, h HostRow) {
|
|
||||||
defer wg.Done()
|
|
||||||
defer func() { <-sem }()
|
|
||||||
pageEntries[idx] = buildEntry(h, cfg.IconsDir, logWriter, stats)
|
|
||||||
}(i, host)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
for _, e := range pageEntries {
|
|
||||||
if e.Icon != "" {
|
|
||||||
stats.BundledWithIcon.Add(1)
|
|
||||||
} else {
|
|
||||||
stats.BundledNoIcon.Add(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
entryBuf = append(entryBuf, pageEntries...)
|
|
||||||
|
|
||||||
// Write complete bundles from the buffer
|
|
||||||
for len(entryBuf) >= cfg.EntriesPerBundle {
|
|
||||||
chunk := entryBuf[:cfg.EntriesPerBundle]
|
|
||||||
entryBuf = entryBuf[cfg.EntriesPerBundle:]
|
|
||||||
|
|
||||||
data, err := serializeBundle(chunk)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to serialize bundle %d: %v", bundleCount, err)
|
log.Fatalf("Failed to serialize final bundle: %v", err)
|
||||||
}
|
}
|
||||||
|
uploadCh <- bundleJob{index: bundleIndex, data: data}
|
||||||
if cfg.DryRun {
|
|
||||||
err = writeBundleLocal(cfg.OutputDir, bundleCount, data)
|
|
||||||
} else {
|
|
||||||
err = writeBundleS3(cfg.SiteBucket, bundleCount, data)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to write bundle %d: %v", bundleCount, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
logLine := fmt.Sprintf("bundle: %04d.json %d entries %dKB", bundleCount, len(chunk), len(data)/1024)
|
|
||||||
fmt.Println(logLine)
|
|
||||||
if logWriter != nil {
|
|
||||||
logWriter.Write(logLine, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
totalBytes += int64(len(data))
|
|
||||||
bundleCount++
|
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Stage 4: Upload workers — write bundles to S3 or local disk
|
||||||
|
var uploadWg sync.WaitGroup
|
||||||
|
var bundlesCreated atomic.Int64
|
||||||
|
var totalBytes atomic.Int64
|
||||||
|
for i := 0; i < cfg.Uploaders; i++ {
|
||||||
|
uploadWg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer uploadWg.Done()
|
||||||
|
for job := range uploadCh {
|
||||||
|
var err error
|
||||||
|
if cfg.DryRun {
|
||||||
|
err = writeBundleLocal(cfg.OutputDir, job.index, job.data)
|
||||||
|
} else {
|
||||||
|
err = writeBundleS3(cfg.SiteBucket, job.index, job.data)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to write bundle %d: %v", job.index, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logLine := fmt.Sprintf("bundle: %04d.json %dKB", job.index, len(job.data)/1024)
|
||||||
|
fmt.Println(logLine)
|
||||||
|
if logWriter != nil {
|
||||||
|
logWriter.Write(logLine, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
bundlesCreated.Add(1)
|
||||||
|
totalBytes.Add(int64(len(job.data)))
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write final partial bundle
|
uploadWg.Wait()
|
||||||
if len(entryBuf) > 0 {
|
|
||||||
data, err := serializeBundle(entryBuf)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to serialize final bundle: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.DryRun {
|
stats.BundlesCreated = int(bundlesCreated.Load())
|
||||||
err = writeBundleLocal(cfg.OutputDir, bundleCount, data)
|
stats.TotalBytes = totalBytes.Load()
|
||||||
} else {
|
|
||||||
err = writeBundleS3(cfg.SiteBucket, bundleCount, data)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to write final bundle: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
logLine := fmt.Sprintf("bundle: %04d.json %d entries %dKB", bundleCount, len(entryBuf), len(data)/1024)
|
|
||||||
fmt.Println(logLine)
|
|
||||||
if logWriter != nil {
|
|
||||||
logWriter.Write(logLine, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
totalBytes += int64(len(data))
|
|
||||||
bundleCount++
|
|
||||||
}
|
|
||||||
|
|
||||||
stats.BundlesCreated = bundleCount
|
|
||||||
stats.TotalBytes = totalBytes
|
|
||||||
|
|
||||||
// Summary
|
// Summary
|
||||||
duration := time.Since(stats.StartedAt)
|
duration := time.Since(stats.StartedAt)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue