From 4fa40c7b47c271267f03f1a8474341870848c215 Mon Sep 17 00:00:00 2001 From: Joe Lothan Date: Wed, 20 May 2026 22:38:23 -0400 Subject: [PATCH] improved write efficency, though we are still bottlenecking on RDS - will switch to local postgres for future runs --- PLAN.md | 7 +++ pipeline/02_warc_parse/db.go | 74 +++++++++++--------------- pipeline/02_warc_parse/main.go | 94 ++++++++++++++++++++++------------ 3 files changed, 98 insertions(+), 77 deletions(-) diff --git a/PLAN.md b/PLAN.md index 625cc27..3ad9776 100644 --- a/PLAN.md +++ b/PLAN.md @@ -643,6 +643,13 @@ Full internet scan. Upgraded to c5.2xlarge + db.m5.large after 300K run revealed - **Channel buffer sizing matters.** Small buffers (5K) caused micro-stalls every time the DB fetcher ran a query. 20K buffers give the fetcher enough runway to query without starving workers. - **DuckDB temp_directory is critical at scale.** Without it, DuckDB spills to tmpfs (RAM-backed), which then swaps to disk via the OS — double indirection. Pointing temp_directory at EBS lets DuckDB manage spill efficiently with large sequential I/O. - **COUNT(*) on large partial indexes is expensive.** The startup query `SELECT COUNT(*) FROM hosts WHERE parsed = FALSE` on 26M rows took minutes. Unnecessary — just start processing and discover completion naturally. +- **Autovacuum competes with heavy writes for disk I/O.** Millions of UPDATEs create dead row versions. Autovacuum kicks in to clean them, saturating disk I/O and stalling writers. Fix: disable autovacuum on hosts/icons during the pipeline run, re-enable + manual `VACUUM ANALYZE` at the end. Now automated in the WARC parser code. +- **RDS storage must be sized for the full run.** 20GB (set during 100K dev) filled up at 16GB during the 30M run. Icons table at 80M rows with indexes needs ~25-30GB. Default bumped to 50GB. RDS storage can only be increased, never decreased. +- **Multiple DB writer goroutines help throughput.** Single writer couldn't drain resultCh fast enough — 8 writers with independent buffers keep up to 8 batches in flight to RDS simultaneously. +- **RDS storage optimization causes temporary I/O degradation.** After expanding storage, RDS runs background optimization that competes with writes. Can last up to an hour. Plan storage right from the start to avoid mid-run resizes. +- **gp3 IOPS baseline (3000) is a hard limit at scale.** 8 DB writers with batch size 500 exhausted EBS I/O burst credits (EBSIOBalance% hit 0%), causing 38ms write latency (normal <5ms) and pipeline stalls. Fix: reduce to 3 writers with batch size 1000 — fewer, larger flushes stay under 3000 IOPS. Custom provisioned IOPS on gp3 requires 400GB+ storage (not worth it for a temp DB). +- **Consider running Postgres locally on EC2 for future runs.** RDS gp3 IOPS (3000 baseline) is the main bottleneck for WARC parsing writes. Running Postgres directly on the EC2 instance's 1TB EBS volume eliminates the network hop to RDS and the separate IOPS budget. Also saves the RDS cost ($12-15/run). Tradeoff: must install and configure Postgres yourself (or add to ec2-userdata.sh). +- **Reconsider c5.xlarge for future runs.** Upgraded to c5.2xlarge assuming CPU was the bottleneck, but RDS IOPS turned out to be the real constraint for WARC parsing, and icon download is internet-bound. If the extra cores don't meaningfully improve throughput (check CPU utilization during the full run), c5.xlarge at half the cost ($0.17/hr vs $0.34/hr) may be sufficient. The only stage that clearly benefits from 8 cores is bundle gen (~2hrs saved). ## Phase 9: Frontend Polish diff --git a/pipeline/02_warc_parse/db.go b/pipeline/02_warc_parse/db.go index 3c2d0ec..63d9d2a 100644 --- a/pipeline/02_warc_parse/db.go +++ b/pipeline/02_warc_parse/db.go @@ -59,40 +59,22 @@ type WorkResult struct { Result ProcessResult } -// flushResults writes a batch of successful results to the database using pgx.Batch. +// flushResults writes a batch of successful results to the database. +// Host UPDATEs use pgx.Batch. Icon INSERTs use pgx.CopyFrom for bulk throughput. // Returns the number of DB errors encountered. func flushResults(ctx context.Context, pool *pgxpool.Pool, results []WorkResult, logWriter *LogWriter) int { - batch := &pgx.Batch{} + dbErrors := 0 - // Queue all queries + // 1. Batch UPDATE hosts (can't COPY an UPDATE) + batch := &pgx.Batch{} for _, wr := range results { - // Update host batch.Queue( `UPDATE hosts SET html_title = $1, iframe_allowed = $2, parsed = TRUE WHERE id = $3`, nilIfEmpty(wr.Result.Title), wr.Result.IframeAllowed, wr.Host.ID, ) - // Insert /favicon.ico - faviconURL := fmt.Sprintf("%s://%s/favicon.ico", wr.Host.Protocol, wr.Host.Hostname) - batch.Queue( - `INSERT INTO icons (host_id, url, source) VALUES ($1, $2, 'favicon_ico')`, - wr.Host.ID, faviconURL, - ) - // Insert link_rel icons - for _, icon := range wr.Result.Icons { - batch.Queue( - `INSERT INTO icons (host_id, url, source, rel_type, rel_sizes) VALUES ($1, $2, $3, $4, $5)`, - wr.Host.ID, icon.URL, icon.Source, nilIfEmpty(icon.RelType), nilIfEmpty(icon.RelSizes), - ) - } } - - // Send all queries in one round-trip br := pool.SendBatch(ctx, batch) - - // Check results - dbErrors := 0 for _, wr := range results { - // host update if _, err := br.Exec(); err != nil { dbErrors++ logLine := fmt.Sprintf("DB_ERROR: %s hosts_update: %v", wr.Host.Hostname, err) @@ -101,29 +83,35 @@ func flushResults(ctx context.Context, pool *pgxpool.Pool, results []WorkResult, logWriter.Write(logLine, true) } } - // favicon.ico insert - if _, err := br.Exec(); err != nil { - dbErrors++ - logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", wr.Host.Hostname, err) - fmt.Println(logLine) - if logWriter != nil { - logWriter.Write(logLine, true) - } - } - // link_rel icon inserts - for range wr.Result.Icons { - if _, err := br.Exec(); err != nil { - dbErrors++ - logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", wr.Host.Hostname, err) - fmt.Println(logLine) - if logWriter != nil { - logWriter.Write(logLine, true) - } - } + } + br.Close() + + // 2. COPY icons in bulk (much less IOPS than individual INSERTs) + var iconRows [][]any + for _, wr := range results { + // favicon.ico entry + faviconURL := fmt.Sprintf("%s://%s/favicon.ico", wr.Host.Protocol, wr.Host.Hostname) + iconRows = append(iconRows, []any{wr.Host.ID, faviconURL, "favicon_ico", nil, nil}) + // link_rel entries + for _, icon := range wr.Result.Icons { + iconRows = append(iconRows, []any{wr.Host.ID, icon.URL, icon.Source, nilIfEmpty(icon.RelType), nilIfEmpty(icon.RelSizes)}) + } + } + + _, err := pool.CopyFrom(ctx, + pgx.Identifier{"icons"}, + []string{"host_id", "url", "source", "rel_type", "rel_sizes"}, + pgx.CopyFromRows(iconRows), + ) + if err != nil { + dbErrors++ + logLine := fmt.Sprintf("DB_ERROR: icon COPY failed (%d rows): %v", len(iconRows), err) + fmt.Println(logLine) + if logWriter != nil { + logWriter.Write(logLine, true) } } - br.Close() return dbErrors } diff --git a/pipeline/02_warc_parse/main.go b/pipeline/02_warc_parse/main.go index 5e3fff6..c7f7906 100644 --- a/pipeline/02_warc_parse/main.go +++ b/pipeline/02_warc_parse/main.go @@ -42,7 +42,7 @@ func main() { flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)") flag.IntVar(&cfg.BatchSize, "batch-size", 5000, "Rows to fetch per batch") flag.IntVar(&cfg.Concurrency, "concurrency", 500, "Number of concurrent goroutines") - flag.IntVar(&cfg.WriteBatch, "write-batch", 100, "Results to batch per DB write") + flag.IntVar(&cfg.WriteBatch, "write-batch", 1000, "Results to batch per DB write") flag.IntVar(&cfg.Limit, "limit", 0, "Max rows to process (0 = all)") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Print results without writing to DB") flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file") @@ -68,26 +68,13 @@ func main() { } defer pool.Close() - // Get total count - var total int64 - if cfg.Limit > 0 { - total = int64(cfg.Limit) - } else { - err = pool.QueryRow(ctx, "SELECT COUNT(*) FROM hosts WHERE parsed = FALSE").Scan(&total) - if err != nil { - log.Fatalf("Failed to count unparsed hosts: %v", err) - } - } - - if total == 0 { - fmt.Println("No unparsed hosts found.") - return - } - fmt.Printf("=== WARC Parser ===\n") - fmt.Printf("Unparsed hosts: %d\n", total) fmt.Printf("Concurrency: %d\n", cfg.Concurrency) fmt.Printf("Batch size: %d\n", cfg.BatchSize) + fmt.Printf("Write batch: %d\n", cfg.WriteBatch) + if cfg.Limit > 0 { + fmt.Printf("Limit: %d\n", cfg.Limit) + } fmt.Printf("Dry run: %v\n\n", cfg.DryRun) // Setup log file @@ -100,6 +87,14 @@ func main() { defer logWriter.Close() } + // Disable autovacuum during heavy writes — it competes for disk I/O + // and causes writer stalls. Re-enabled + manual VACUUM at end. + if !cfg.DryRun { + fmt.Println("Disabling autovacuum on hosts and icons tables...") + pool.Exec(ctx, "ALTER TABLE hosts SET (autovacuum_enabled = false)") + pool.Exec(ctx, "ALTER TABLE icons SET (autovacuum_enabled = false)") + } + stats := &Stats{StartedAt: time.Now()} // Three-stage pipeline: @@ -125,6 +120,7 @@ func main() { limit = remaining } } + fetchStart := time.Now() hosts, err := fetchBatch(ctx, pool, lastID, limit) if err != nil { log.Fatalf("Failed to fetch batch: %v", err) @@ -133,6 +129,8 @@ func main() { break } lastID = hosts[len(hosts)-1].ID + fmt.Printf("[fetcher] %d hosts in %dms (hostCh: %d/%d)\n", + len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh)) for _, h := range hosts { hostCh <- h } @@ -200,25 +198,53 @@ func main() { close(resultCh) }() - // Stage 3: DB writer — batches writes for efficiency + // Stage 3: DB writers — multiple goroutines batch writes for efficiency + const numWriters = 3 + var writerWg sync.WaitGroup if !cfg.DryRun { - var buf []WorkResult - for wr := range resultCh { - buf = append(buf, wr) - if len(buf) >= cfg.WriteBatch { - dbErrs := flushResults(ctx, pool, buf, logWriter) - stats.DBErrors.Add(int64(dbErrs)) - buf = buf[:0] - } - } - if len(buf) > 0 { - dbErrs := flushResults(ctx, pool, buf, logWriter) - stats.DBErrors.Add(int64(dbErrs)) + for w := 0; w < numWriters; w++ { + writerWg.Add(1) + go func() { + defer writerWg.Done() + var buf []WorkResult + for wr := range resultCh { + buf = append(buf, wr) + if len(buf) >= cfg.WriteBatch { + flushStart := time.Now() + dbErrs := flushResults(ctx, pool, buf, logWriter) + stats.DBErrors.Add(int64(dbErrs)) + flushMs := time.Since(flushStart).Milliseconds() + if flushMs > 100 { + fmt.Printf("[writer] flushed %d results in %dms (resultCh: %d/%d)\n", + len(buf), flushMs, len(resultCh), cap(resultCh)) + } + buf = buf[:0] + } + } + if len(buf) > 0 { + dbErrs := flushResults(ctx, pool, buf, logWriter) + stats.DBErrors.Add(int64(dbErrs)) + } + }() } } else { - for range resultCh { - // drain in dry-run mode - } + writerWg.Add(1) + go func() { + defer writerWg.Done() + for range resultCh { + } + }() + } + writerWg.Wait() + + // Re-enable autovacuum and run manual vacuum + if !cfg.DryRun { + fmt.Println("Re-enabling autovacuum and running VACUUM ANALYZE...") + pool.Exec(ctx, "ALTER TABLE hosts SET (autovacuum_enabled = true)") + pool.Exec(ctx, "ALTER TABLE icons SET (autovacuum_enabled = true)") + pool.Exec(ctx, "VACUUM ANALYZE hosts") + pool.Exec(ctx, "VACUUM ANALYZE icons") + fmt.Println("VACUUM complete") } // Print summary