everytab/pipeline/02_warc_parse/main.go

240 lines
5.8 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Config struct {
DBUrl string
BatchSize int
Concurrency int
WriteBatch int
Limit int
DryRun bool
LogFile string
LogErrors bool
}
type Stats struct {
Processed atomic.Int64
TitlesFound atomic.Int64
NoTitle atomic.Int64
IconsFound atomic.Int64
IframeBlocked atomic.Int64
ParseErrors atomic.Int64
FetchErrors atomic.Int64
DBErrors atomic.Int64
Panics atomic.Int64
StartedAt time.Time
}
func main() {
cfg := Config{}
flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)")
flag.IntVar(&cfg.BatchSize, "batch-size", 5000, "Rows to fetch per batch")
flag.IntVar(&cfg.Concurrency, "concurrency", 500, "Number of concurrent goroutines")
flag.IntVar(&cfg.WriteBatch, "write-batch", 100, "Results to batch per DB write")
flag.IntVar(&cfg.Limit, "limit", 0, "Max rows to process (0 = all)")
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Print results without writing to DB")
flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file")
flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file")
flag.Parse()
if cfg.DBUrl == "" {
fmt.Println("Usage: warc_parse --db DATABASE_URL [OPTIONS]")
flag.PrintDefaults()
os.Exit(1)
}
ctx := context.Background()
// Init S3 client
if err := initS3(); err != nil {
log.Fatalf("Failed to init S3: %v", err)
}
pool, err := pgxpool.New(ctx, cfg.DBUrl)
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer pool.Close()
// Get total count
var total int64
if cfg.Limit > 0 {
total = int64(cfg.Limit)
} else {
err = pool.QueryRow(ctx, "SELECT COUNT(*) FROM hosts WHERE parsed = FALSE").Scan(&total)
if err != nil {
log.Fatalf("Failed to count unparsed hosts: %v", err)
}
}
if total == 0 {
fmt.Println("No unparsed hosts found.")
return
}
fmt.Printf("=== WARC Parser ===\n")
fmt.Printf("Unparsed hosts: %d\n", total)
fmt.Printf("Concurrency: %d\n", cfg.Concurrency)
fmt.Printf("Batch size: %d\n", cfg.BatchSize)
fmt.Printf("Dry run: %v\n\n", cfg.DryRun)
// Setup log file
var logWriter *LogWriter
if cfg.LogFile != "" {
logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors)
if err != nil {
log.Fatalf("Failed to open log file: %v", err)
}
defer logWriter.Close()
}
stats := &Stats{StartedAt: time.Now()}
// Three-stage pipeline:
// [DB fetcher] → hostCh → [N workers] → resultCh → [DB writer]
// Workers do S3 fetch + parsing (I/O-bound). Writer batches DB writes.
hostCh := make(chan Host, 20000)
resultCh := make(chan WorkResult, 1000)
// Stage 1: DB fetcher — continuously fetches pages into hostCh
go func() {
defer close(hostCh)
var lastID int64
fetched := 0
for {
limit := cfg.BatchSize
if cfg.Limit > 0 {
remaining := cfg.Limit - fetched
if remaining <= 0 {
break
}
if limit > remaining {
limit = remaining
}
}
hosts, err := fetchBatch(ctx, pool, lastID, limit)
if err != nil {
log.Fatalf("Failed to fetch batch: %v", err)
}
if len(hosts) == 0 {
break
}
lastID = hosts[len(hosts)-1].ID
for _, h := range hosts {
hostCh <- h
}
fetched += len(hosts)
}
}()
// Stage 2: Workers — fetch WARC from S3, parse HTML, emit results
var workerWg sync.WaitGroup
for i := 0; i < cfg.Concurrency; i++ {
workerWg.Add(1)
go func() {
defer workerWg.Done()
for host := range hostCh {
func() {
defer func() {
if r := recover(); r != nil {
stats.Panics.Add(1)
stats.Processed.Add(1)
logLine := fmt.Sprintf("PANIC: %s %v", host.Hostname, r)
fmt.Println(logLine)
if logWriter != nil {
logWriter.Write(logLine, true)
}
}
}()
result := processHost(host)
// Log
logLine := formatLogLine(host, result)
fmt.Println(logLine)
if logWriter != nil {
logWriter.Write(logLine, result.Err != nil)
}
// Update stats
stats.Processed.Add(1)
if result.Err == nil {
if result.Title != "" {
stats.TitlesFound.Add(1)
} else {
stats.NoTitle.Add(1)
}
stats.IconsFound.Add(int64(len(result.Icons) + 1))
if !result.IframeAllowed {
stats.IframeBlocked.Add(1)
}
} else if result.FetchErr {
stats.FetchErrors.Add(1)
} else {
stats.ParseErrors.Add(1)
}
// Send successful results to writer
if result.Err == nil {
resultCh <- WorkResult{Host: host, Result: result}
}
}()
}
}()
}
go func() {
workerWg.Wait()
close(resultCh)
}()
// Stage 3: DB writer — batches writes for efficiency
if !cfg.DryRun {
var buf []WorkResult
for wr := range resultCh {
buf = append(buf, wr)
if len(buf) >= cfg.WriteBatch {
dbErrs := flushResults(ctx, pool, buf, logWriter)
stats.DBErrors.Add(int64(dbErrs))
buf = buf[:0]
}
}
if len(buf) > 0 {
dbErrs := flushResults(ctx, pool, buf, logWriter)
stats.DBErrors.Add(int64(dbErrs))
}
} else {
for range resultCh {
// drain in dry-run mode
}
}
// Print summary
duration := time.Since(stats.StartedAt)
fmt.Printf("\n=== Summary ===\n")
fmt.Printf("Duration: %s\n", duration.Round(time.Second))
fmt.Printf("Processed: %d\n", stats.Processed.Load())
fmt.Printf("Titles found: %d\n", stats.TitlesFound.Load())
fmt.Printf("No title: %d\n", stats.NoTitle.Load())
fmt.Printf("Icons found: %d\n", stats.IconsFound.Load())
fmt.Printf("Iframe blocked: %d\n", stats.IframeBlocked.Load())
fmt.Printf("Fetch errors: %d\n", stats.FetchErrors.Load())
fmt.Printf("Parse errors: %d\n", stats.ParseErrors.Load())
fmt.Printf("DB errors: %d\n", stats.DBErrors.Load())
fmt.Printf("Panics: %d\n", stats.Panics.Load())
// Write stats JSON
writeStats(stats, cfg)
}