diff --git a/pipeline/02_warc_parse/db.go b/pipeline/02_warc_parse/db.go index fbebf88..3c2d0ec 100644 --- a/pipeline/02_warc_parse/db.go +++ b/pipeline/02_warc_parse/db.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) @@ -26,12 +27,6 @@ type ProcessResult struct { FetchErr bool // true if error was during fetch (vs parse) } -// WriteErrors tracks errors encountered during DB writes. -type WriteErrors struct { - HostUpdate int - IconInsert int -} - // fetchBatch gets the next batch of unparsed hosts after lastID. func fetchBatch(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]Host, error) { rows, err := pool.Query(ctx, @@ -58,55 +53,78 @@ func fetchBatch(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int return hosts, rows.Err() } -// writeResult writes parsed results back to the database. -// Returns counts of DB write errors encountered. -func writeResult(ctx context.Context, pool *pgxpool.Pool, host Host, result ProcessResult, logWriter *LogWriter) WriteErrors { - var errs WriteErrors +// WorkResult pairs a host with its parsed result for the DB writer. +type WorkResult struct { + Host Host + Result ProcessResult +} - // Update hosts table - _, err := pool.Exec(ctx, - `UPDATE hosts SET html_title = $1, iframe_allowed = $2, parsed = TRUE WHERE id = $3`, - nilIfEmpty(result.Title), result.IframeAllowed, host.ID) - if err != nil { - errs.HostUpdate++ - logLine := fmt.Sprintf("DB_ERROR: %s hosts_update: %v", host.Hostname, err) - fmt.Println(logLine) - if logWriter != nil { - logWriter.Write(logLine, true) - } - return errs - } +// flushResults writes a batch of successful results to the database using pgx.Batch. +// Returns the number of DB errors encountered. +func flushResults(ctx context.Context, pool *pgxpool.Pool, results []WorkResult, logWriter *LogWriter) int { + batch := &pgx.Batch{} - // Insert /favicon.ico entry - faviconURL := fmt.Sprintf("%s://%s/favicon.ico", host.Protocol, host.Hostname) - _, err = pool.Exec(ctx, - `INSERT INTO icons (host_id, url, source) VALUES ($1, $2, 'favicon_ico')`, - host.ID, faviconURL) - if err != nil { - errs.IconInsert++ - logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", host.Hostname, err) - fmt.Println(logLine) - if logWriter != nil { - logWriter.Write(logLine, true) + // Queue all queries + for _, wr := range results { + // Update host + batch.Queue( + `UPDATE hosts SET html_title = $1, iframe_allowed = $2, parsed = TRUE WHERE id = $3`, + nilIfEmpty(wr.Result.Title), wr.Result.IframeAllowed, wr.Host.ID, + ) + // Insert /favicon.ico + faviconURL := fmt.Sprintf("%s://%s/favicon.ico", wr.Host.Protocol, wr.Host.Hostname) + batch.Queue( + `INSERT INTO icons (host_id, url, source) VALUES ($1, $2, 'favicon_ico')`, + wr.Host.ID, faviconURL, + ) + // Insert link_rel icons + for _, icon := range wr.Result.Icons { + batch.Queue( + `INSERT INTO icons (host_id, url, source, rel_type, rel_sizes) VALUES ($1, $2, $3, $4, $5)`, + wr.Host.ID, icon.URL, icon.Source, nilIfEmpty(icon.RelType), nilIfEmpty(icon.RelSizes), + ) } } - // Insert link rel="icon" entries - for _, icon := range result.Icons { - _, err = pool.Exec(ctx, - `INSERT INTO icons (host_id, url, source, rel_type, rel_sizes) VALUES ($1, $2, $3, $4, $5)`, - host.ID, icon.URL, icon.Source, nilIfEmpty(icon.RelType), nilIfEmpty(icon.RelSizes)) - if err != nil { - errs.IconInsert++ - logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", host.Hostname, err) + // Send all queries in one round-trip + br := pool.SendBatch(ctx, batch) + + // Check results + dbErrors := 0 + for _, wr := range results { + // host update + if _, err := br.Exec(); err != nil { + dbErrors++ + logLine := fmt.Sprintf("DB_ERROR: %s hosts_update: %v", wr.Host.Hostname, err) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, true) } } + // favicon.ico insert + if _, err := br.Exec(); err != nil { + dbErrors++ + logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", wr.Host.Hostname, err) + fmt.Println(logLine) + if logWriter != nil { + logWriter.Write(logLine, true) + } + } + // link_rel icon inserts + for range wr.Result.Icons { + if _, err := br.Exec(); err != nil { + dbErrors++ + logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", wr.Host.Hostname, err) + fmt.Println(logLine) + if logWriter != nil { + logWriter.Write(logLine, true) + } + } + } } - return errs + br.Close() + return dbErrors } func nilIfEmpty(s string) *string { diff --git a/pipeline/02_warc_parse/main.go b/pipeline/02_warc_parse/main.go index ec4f530..63d2ed7 100644 --- a/pipeline/02_warc_parse/main.go +++ b/pipeline/02_warc_parse/main.go @@ -17,6 +17,7 @@ type Config struct { DBUrl string BatchSize int Concurrency int + WriteBatch int Limit int DryRun bool LogFile string @@ -39,8 +40,9 @@ type Stats struct { func main() { cfg := Config{} flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)") - flag.IntVar(&cfg.BatchSize, "batch-size", 500, "Rows to fetch per batch") - flag.IntVar(&cfg.Concurrency, "concurrency", 100, "Number of concurrent goroutines") + flag.IntVar(&cfg.BatchSize, "batch-size", 5000, "Rows to fetch per batch") + flag.IntVar(&cfg.Concurrency, "concurrency", 500, "Number of concurrent goroutines") + flag.IntVar(&cfg.WriteBatch, "write-batch", 100, "Results to batch per DB write") flag.IntVar(&cfg.Limit, "limit", 0, "Max rows to process (0 = all)") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Print results without writing to DB") flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file") @@ -100,100 +102,124 @@ func main() { stats := &Stats{StartedAt: time.Now()} + // Three-stage pipeline: + // [DB fetcher] → hostCh → [N workers] → resultCh → [DB writer] + // Workers do S3 fetch + parsing (I/O-bound). Writer batches DB writes. - // Worker pool - sem := make(chan struct{}, cfg.Concurrency) - var wg sync.WaitGroup + hostCh := make(chan Host, cfg.BatchSize) + resultCh := make(chan WorkResult, cfg.Concurrency) - // Process in batches - var lastID int64 - processed := 0 - - for { - if cfg.Limit > 0 && processed >= cfg.Limit { - break + // Stage 1: DB fetcher — continuously fetches pages into hostCh + go func() { + defer close(hostCh) + var lastID int64 + fetched := 0 + for { + limit := cfg.BatchSize + if cfg.Limit > 0 { + remaining := cfg.Limit - fetched + if remaining <= 0 { + break + } + if limit > remaining { + limit = remaining + } + } + hosts, err := fetchBatch(ctx, pool, lastID, limit) + if err != nil { + log.Fatalf("Failed to fetch batch: %v", err) + } + if len(hosts) == 0 { + break + } + lastID = hosts[len(hosts)-1].ID + for _, h := range hosts { + hostCh <- h + } + fetched += len(hosts) } + }() - batchLimit := cfg.BatchSize - if cfg.Limit > 0 && processed+batchLimit > cfg.Limit { - batchLimit = cfg.Limit - processed - } - - hosts, err := fetchBatch(ctx, pool, lastID, batchLimit) - if err != nil { - log.Fatalf("Failed to fetch batch: %v", err) - } - if len(hosts) == 0 { - break - } - - lastID = hosts[len(hosts)-1].ID - - for i := range hosts { - host := hosts[i] - wg.Add(1) - sem <- struct{}{} - - go func() { - defer wg.Done() - defer func() { <-sem }() - - // Recover from panics — log them, don't mark row as parsed - defer func() { - if r := recover(); r != nil { - stats.Panics.Add(1) - stats.Processed.Add(1) - logLine := fmt.Sprintf("PANIC: %s %v", host.Hostname, r) - fmt.Println(logLine) - if logWriter != nil { - logWriter.Write(logLine, true) + // Stage 2: Workers — fetch WARC from S3, parse HTML, emit results + var workerWg sync.WaitGroup + for i := 0; i < cfg.Concurrency; i++ { + workerWg.Add(1) + go func() { + defer workerWg.Done() + for host := range hostCh { + func() { + defer func() { + if r := recover(); r != nil { + stats.Panics.Add(1) + stats.Processed.Add(1) + logLine := fmt.Sprintf("PANIC: %s %v", host.Hostname, r) + fmt.Println(logLine) + if logWriter != nil { + logWriter.Write(logLine, true) + } } + }() + + result := processHost(host) + + // Log + logLine := formatLogLine(host, result) + fmt.Println(logLine) + if logWriter != nil { + logWriter.Write(logLine, result.Err != nil) } - }() - result := processHost(host) - - // Log line - logLine := formatLogLine(host, result) - fmt.Println(logLine) - if logWriter != nil { - logWriter.Write(logLine, result.Err != nil) - } - - // Write to DB - if !cfg.DryRun && result.Err == nil { - errs := writeResult(ctx, pool, host, result, logWriter) - stats.DBErrors.Add(int64(errs.HostUpdate + errs.IconInsert)) - } - - // Update stats - stats.Processed.Add(1) - if result.Err == nil { - if result.Title != "" { - stats.TitlesFound.Add(1) - } else { - stats.NoTitle.Add(1) - } - // +1 for the /favicon.ico entry added per host - stats.IconsFound.Add(int64(len(result.Icons) + 1)) - if !result.IframeAllowed { - stats.IframeBlocked.Add(1) - } - } - if result.Err != nil { - if result.FetchErr { + // Update stats + stats.Processed.Add(1) + if result.Err == nil { + if result.Title != "" { + stats.TitlesFound.Add(1) + } else { + stats.NoTitle.Add(1) + } + stats.IconsFound.Add(int64(len(result.Icons) + 1)) + if !result.IframeAllowed { + stats.IframeBlocked.Add(1) + } + } else if result.FetchErr { stats.FetchErrors.Add(1) } else { stats.ParseErrors.Add(1) } - } - }() - } - processed += len(hosts) + // Send successful results to writer + if result.Err == nil { + resultCh <- WorkResult{Host: host, Result: result} + } + }() + } + }() } + go func() { + workerWg.Wait() + close(resultCh) + }() - wg.Wait() + // Stage 3: DB writer — batches writes for efficiency + if !cfg.DryRun { + var buf []WorkResult + for wr := range resultCh { + buf = append(buf, wr) + if len(buf) >= cfg.WriteBatch { + dbErrs := flushResults(ctx, pool, buf, logWriter) + stats.DBErrors.Add(int64(dbErrs)) + buf = buf[:0] + } + } + if len(buf) > 0 { + dbErrs := flushResults(ctx, pool, buf, logWriter) + stats.DBErrors.Add(int64(dbErrs)) + } + } else { + for range resultCh { + // drain in dry-run mode + } + } // Print summary duration := time.Since(stats.StartedAt)