update warc parsing with new 3 stage producer, worker, consumer model, increasing speed and saturating cores

This commit is contained in:
Joe Lothan 2026-05-20 10:18:15 -04:00
parent 0efec72e45
commit 6d8ba61102
2 changed files with 169 additions and 125 deletions

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
) )
@ -26,12 +27,6 @@ type ProcessResult struct {
FetchErr bool // true if error was during fetch (vs parse) FetchErr bool // true if error was during fetch (vs parse)
} }
// WriteErrors tracks errors encountered during DB writes.
type WriteErrors struct {
HostUpdate int
IconInsert int
}
// fetchBatch gets the next batch of unparsed hosts after lastID. // fetchBatch gets the next batch of unparsed hosts after lastID.
func fetchBatch(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]Host, error) { func fetchBatch(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]Host, error) {
rows, err := pool.Query(ctx, rows, err := pool.Query(ctx,
@ -58,55 +53,78 @@ func fetchBatch(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int
return hosts, rows.Err() return hosts, rows.Err()
} }
// writeResult writes parsed results back to the database. // WorkResult pairs a host with its parsed result for the DB writer.
// Returns counts of DB write errors encountered. type WorkResult struct {
func writeResult(ctx context.Context, pool *pgxpool.Pool, host Host, result ProcessResult, logWriter *LogWriter) WriteErrors { Host Host
var errs WriteErrors Result ProcessResult
}
// Update hosts table // flushResults writes a batch of successful results to the database using pgx.Batch.
_, err := pool.Exec(ctx, // Returns the number of DB errors encountered.
func flushResults(ctx context.Context, pool *pgxpool.Pool, results []WorkResult, logWriter *LogWriter) int {
batch := &pgx.Batch{}
// Queue all queries
for _, wr := range results {
// Update host
batch.Queue(
`UPDATE hosts SET html_title = $1, iframe_allowed = $2, parsed = TRUE WHERE id = $3`, `UPDATE hosts SET html_title = $1, iframe_allowed = $2, parsed = TRUE WHERE id = $3`,
nilIfEmpty(result.Title), result.IframeAllowed, host.ID) nilIfEmpty(wr.Result.Title), wr.Result.IframeAllowed, wr.Host.ID,
if err != nil { )
errs.HostUpdate++ // Insert /favicon.ico
logLine := fmt.Sprintf("DB_ERROR: %s hosts_update: %v", host.Hostname, err) faviconURL := fmt.Sprintf("%s://%s/favicon.ico", wr.Host.Protocol, wr.Host.Hostname)
fmt.Println(logLine) batch.Queue(
if logWriter != nil {
logWriter.Write(logLine, true)
}
return errs
}
// Insert /favicon.ico entry
faviconURL := fmt.Sprintf("%s://%s/favicon.ico", host.Protocol, host.Hostname)
_, err = pool.Exec(ctx,
`INSERT INTO icons (host_id, url, source) VALUES ($1, $2, 'favicon_ico')`, `INSERT INTO icons (host_id, url, source) VALUES ($1, $2, 'favicon_ico')`,
host.ID, faviconURL) wr.Host.ID, faviconURL,
if err != nil { )
errs.IconInsert++ // Insert link_rel icons
logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", host.Hostname, err) for _, icon := range wr.Result.Icons {
fmt.Println(logLine) batch.Queue(
if logWriter != nil {
logWriter.Write(logLine, true)
}
}
// Insert link rel="icon" entries
for _, icon := range result.Icons {
_, err = pool.Exec(ctx,
`INSERT INTO icons (host_id, url, source, rel_type, rel_sizes) VALUES ($1, $2, $3, $4, $5)`, `INSERT INTO icons (host_id, url, source, rel_type, rel_sizes) VALUES ($1, $2, $3, $4, $5)`,
host.ID, icon.URL, icon.Source, nilIfEmpty(icon.RelType), nilIfEmpty(icon.RelSizes)) wr.Host.ID, icon.URL, icon.Source, nilIfEmpty(icon.RelType), nilIfEmpty(icon.RelSizes),
if err != nil { )
errs.IconInsert++ }
logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", host.Hostname, err) }
// Send all queries in one round-trip
br := pool.SendBatch(ctx, batch)
// Check results
dbErrors := 0
for _, wr := range results {
// host update
if _, err := br.Exec(); err != nil {
dbErrors++
logLine := fmt.Sprintf("DB_ERROR: %s hosts_update: %v", wr.Host.Hostname, err)
fmt.Println(logLine)
if logWriter != nil {
logWriter.Write(logLine, true)
}
}
// favicon.ico insert
if _, err := br.Exec(); err != nil {
dbErrors++
logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", wr.Host.Hostname, err)
fmt.Println(logLine)
if logWriter != nil {
logWriter.Write(logLine, true)
}
}
// link_rel icon inserts
for range wr.Result.Icons {
if _, err := br.Exec(); err != nil {
dbErrors++
logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", wr.Host.Hostname, err)
fmt.Println(logLine) fmt.Println(logLine)
if logWriter != nil { if logWriter != nil {
logWriter.Write(logLine, true) logWriter.Write(logLine, true)
} }
} }
} }
}
return errs br.Close()
return dbErrors
} }
func nilIfEmpty(s string) *string { func nilIfEmpty(s string) *string {

View file

@ -17,6 +17,7 @@ type Config struct {
DBUrl string DBUrl string
BatchSize int BatchSize int
Concurrency int Concurrency int
WriteBatch int
Limit int Limit int
DryRun bool DryRun bool
LogFile string LogFile string
@ -39,8 +40,9 @@ type Stats struct {
func main() { func main() {
cfg := Config{} cfg := Config{}
flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)") flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)")
flag.IntVar(&cfg.BatchSize, "batch-size", 500, "Rows to fetch per batch") flag.IntVar(&cfg.BatchSize, "batch-size", 5000, "Rows to fetch per batch")
flag.IntVar(&cfg.Concurrency, "concurrency", 100, "Number of concurrent goroutines") flag.IntVar(&cfg.Concurrency, "concurrency", 500, "Number of concurrent goroutines")
flag.IntVar(&cfg.WriteBatch, "write-batch", 100, "Results to batch per DB write")
flag.IntVar(&cfg.Limit, "limit", 0, "Max rows to process (0 = all)") flag.IntVar(&cfg.Limit, "limit", 0, "Max rows to process (0 = all)")
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Print results without writing to DB") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Print results without writing to DB")
flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file") flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file")
@ -100,45 +102,52 @@ func main() {
stats := &Stats{StartedAt: time.Now()} stats := &Stats{StartedAt: time.Now()}
// Three-stage pipeline:
// [DB fetcher] → hostCh → [N workers] → resultCh → [DB writer]
// Workers do S3 fetch + parsing (I/O-bound). Writer batches DB writes.
// Worker pool hostCh := make(chan Host, cfg.BatchSize)
sem := make(chan struct{}, cfg.Concurrency) resultCh := make(chan WorkResult, cfg.Concurrency)
var wg sync.WaitGroup
// Process in batches // Stage 1: DB fetcher — continuously fetches pages into hostCh
go func() {
defer close(hostCh)
var lastID int64 var lastID int64
processed := 0 fetched := 0
for { for {
if cfg.Limit > 0 && processed >= cfg.Limit { limit := cfg.BatchSize
if cfg.Limit > 0 {
remaining := cfg.Limit - fetched
if remaining <= 0 {
break break
} }
if limit > remaining {
batchLimit := cfg.BatchSize limit = remaining
if cfg.Limit > 0 && processed+batchLimit > cfg.Limit {
batchLimit = cfg.Limit - processed
} }
}
hosts, err := fetchBatch(ctx, pool, lastID, batchLimit) hosts, err := fetchBatch(ctx, pool, lastID, limit)
if err != nil { if err != nil {
log.Fatalf("Failed to fetch batch: %v", err) log.Fatalf("Failed to fetch batch: %v", err)
} }
if len(hosts) == 0 { if len(hosts) == 0 {
break break
} }
lastID = hosts[len(hosts)-1].ID lastID = hosts[len(hosts)-1].ID
for _, h := range hosts {
hostCh <- h
}
fetched += len(hosts)
}
}()
for i := range hosts { // Stage 2: Workers — fetch WARC from S3, parse HTML, emit results
host := hosts[i] var workerWg sync.WaitGroup
wg.Add(1) for i := 0; i < cfg.Concurrency; i++ {
sem <- struct{}{} workerWg.Add(1)
go func() { go func() {
defer wg.Done() defer workerWg.Done()
defer func() { <-sem }() for host := range hostCh {
func() {
// Recover from panics — log them, don't mark row as parsed
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
stats.Panics.Add(1) stats.Panics.Add(1)
@ -153,19 +162,13 @@ func main() {
result := processHost(host) result := processHost(host)
// Log line // Log
logLine := formatLogLine(host, result) logLine := formatLogLine(host, result)
fmt.Println(logLine) fmt.Println(logLine)
if logWriter != nil { if logWriter != nil {
logWriter.Write(logLine, result.Err != nil) logWriter.Write(logLine, result.Err != nil)
} }
// Write to DB
if !cfg.DryRun && result.Err == nil {
errs := writeResult(ctx, pool, host, result, logWriter)
stats.DBErrors.Add(int64(errs.HostUpdate + errs.IconInsert))
}
// Update stats // Update stats
stats.Processed.Add(1) stats.Processed.Add(1)
if result.Err == nil { if result.Err == nil {
@ -174,26 +177,49 @@ func main() {
} else { } else {
stats.NoTitle.Add(1) stats.NoTitle.Add(1)
} }
// +1 for the /favicon.ico entry added per host
stats.IconsFound.Add(int64(len(result.Icons) + 1)) stats.IconsFound.Add(int64(len(result.Icons) + 1))
if !result.IframeAllowed { if !result.IframeAllowed {
stats.IframeBlocked.Add(1) stats.IframeBlocked.Add(1)
} }
} } else if result.FetchErr {
if result.Err != nil {
if result.FetchErr {
stats.FetchErrors.Add(1) stats.FetchErrors.Add(1)
} else { } else {
stats.ParseErrors.Add(1) stats.ParseErrors.Add(1)
} }
// Send successful results to writer
if result.Err == nil {
resultCh <- WorkResult{Host: host, Result: result}
} }
}() }()
} }
}()
processed += len(hosts)
} }
go func() {
workerWg.Wait()
close(resultCh)
}()
wg.Wait() // Stage 3: DB writer — batches writes for efficiency
if !cfg.DryRun {
var buf []WorkResult
for wr := range resultCh {
buf = append(buf, wr)
if len(buf) >= cfg.WriteBatch {
dbErrs := flushResults(ctx, pool, buf, logWriter)
stats.DBErrors.Add(int64(dbErrs))
buf = buf[:0]
}
}
if len(buf) > 0 {
dbErrs := flushResults(ctx, pool, buf, logWriter)
stats.DBErrors.Add(int64(dbErrs))
}
} else {
for range resultCh {
// drain in dry-run mode
}
}
// Print summary // Print summary
duration := time.Since(stats.StartedAt) duration := time.Since(stats.StartedAt)