266 lines
7 KiB
Go
266 lines
7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
type Config struct {
|
|
DBUrl string
|
|
BatchSize int
|
|
Concurrency int
|
|
WriteBatch int
|
|
Limit int
|
|
DryRun bool
|
|
LogFile string
|
|
LogErrors bool
|
|
}
|
|
|
|
type Stats struct {
|
|
Processed atomic.Int64
|
|
TitlesFound atomic.Int64
|
|
NoTitle atomic.Int64
|
|
IconsFound atomic.Int64
|
|
IframeBlocked atomic.Int64
|
|
ParseErrors atomic.Int64
|
|
FetchErrors atomic.Int64
|
|
DBErrors atomic.Int64
|
|
Panics atomic.Int64
|
|
StartedAt time.Time
|
|
}
|
|
|
|
func main() {
|
|
cfg := Config{}
|
|
flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)")
|
|
flag.IntVar(&cfg.BatchSize, "batch-size", 5000, "Rows to fetch per batch")
|
|
flag.IntVar(&cfg.Concurrency, "concurrency", 500, "Number of concurrent goroutines")
|
|
flag.IntVar(&cfg.WriteBatch, "write-batch", 1000, "Results to batch per DB write")
|
|
flag.IntVar(&cfg.Limit, "limit", 0, "Max rows to process (0 = all)")
|
|
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Print results without writing to DB")
|
|
flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file")
|
|
flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file")
|
|
flag.Parse()
|
|
|
|
if cfg.DBUrl == "" {
|
|
fmt.Println("Usage: warc_parse --db DATABASE_URL [OPTIONS]")
|
|
flag.PrintDefaults()
|
|
os.Exit(1)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
// Init S3 client
|
|
if err := initS3(); err != nil {
|
|
log.Fatalf("Failed to init S3: %v", err)
|
|
}
|
|
|
|
pool, err := pgxpool.New(ctx, cfg.DBUrl)
|
|
if err != nil {
|
|
log.Fatalf("Failed to connect to database: %v", err)
|
|
}
|
|
defer pool.Close()
|
|
|
|
fmt.Printf("=== WARC Parser ===\n")
|
|
fmt.Printf("Concurrency: %d\n", cfg.Concurrency)
|
|
fmt.Printf("Batch size: %d\n", cfg.BatchSize)
|
|
fmt.Printf("Write batch: %d\n", cfg.WriteBatch)
|
|
if cfg.Limit > 0 {
|
|
fmt.Printf("Limit: %d\n", cfg.Limit)
|
|
}
|
|
fmt.Printf("Dry run: %v\n\n", cfg.DryRun)
|
|
|
|
// Setup log file
|
|
var logWriter *LogWriter
|
|
if cfg.LogFile != "" {
|
|
logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors)
|
|
if err != nil {
|
|
log.Fatalf("Failed to open log file: %v", err)
|
|
}
|
|
defer logWriter.Close()
|
|
}
|
|
|
|
// Disable autovacuum during heavy writes — it competes for disk I/O
|
|
// and causes writer stalls. Re-enabled + manual VACUUM at end.
|
|
if !cfg.DryRun {
|
|
fmt.Println("Disabling autovacuum on hosts and icons tables...")
|
|
pool.Exec(ctx, "ALTER TABLE hosts SET (autovacuum_enabled = false)")
|
|
pool.Exec(ctx, "ALTER TABLE icons SET (autovacuum_enabled = false)")
|
|
}
|
|
|
|
stats := &Stats{StartedAt: time.Now()}
|
|
|
|
// Three-stage pipeline:
|
|
// [DB fetcher] → hostCh → [N workers] → resultCh → [DB writer]
|
|
// Workers do S3 fetch + parsing (I/O-bound). Writer batches DB writes.
|
|
|
|
hostCh := make(chan Host, 20000)
|
|
resultCh := make(chan WorkResult, 1000)
|
|
|
|
// Stage 1: DB fetcher — continuously fetches pages into hostCh
|
|
go func() {
|
|
defer close(hostCh)
|
|
var lastID int64
|
|
fetched := 0
|
|
for {
|
|
limit := cfg.BatchSize
|
|
if cfg.Limit > 0 {
|
|
remaining := cfg.Limit - fetched
|
|
if remaining <= 0 {
|
|
break
|
|
}
|
|
if limit > remaining {
|
|
limit = remaining
|
|
}
|
|
}
|
|
fetchStart := time.Now()
|
|
hosts, err := fetchBatch(ctx, pool, lastID, limit)
|
|
if err != nil {
|
|
log.Fatalf("Failed to fetch batch: %v", err)
|
|
}
|
|
if len(hosts) == 0 {
|
|
break
|
|
}
|
|
lastID = hosts[len(hosts)-1].ID
|
|
fmt.Printf("[fetcher] %d hosts in %dms (hostCh: %d/%d)\n",
|
|
len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh))
|
|
for _, h := range hosts {
|
|
hostCh <- h
|
|
}
|
|
fetched += len(hosts)
|
|
}
|
|
}()
|
|
|
|
// Stage 2: Workers — fetch WARC from S3, parse HTML, emit results
|
|
var workerWg sync.WaitGroup
|
|
for i := 0; i < cfg.Concurrency; i++ {
|
|
workerWg.Add(1)
|
|
go func() {
|
|
defer workerWg.Done()
|
|
for host := range hostCh {
|
|
func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
stats.Panics.Add(1)
|
|
stats.Processed.Add(1)
|
|
logLine := fmt.Sprintf("PANIC: %s %v", host.Hostname, r)
|
|
fmt.Println(logLine)
|
|
if logWriter != nil {
|
|
logWriter.Write(logLine, true)
|
|
}
|
|
}
|
|
}()
|
|
|
|
result := processHost(host)
|
|
|
|
// Log
|
|
logLine := formatLogLine(host, result)
|
|
fmt.Println(logLine)
|
|
if logWriter != nil {
|
|
logWriter.Write(logLine, result.Err != nil)
|
|
}
|
|
|
|
// Update stats
|
|
stats.Processed.Add(1)
|
|
if result.Err == nil {
|
|
if result.Title != "" {
|
|
stats.TitlesFound.Add(1)
|
|
} else {
|
|
stats.NoTitle.Add(1)
|
|
}
|
|
stats.IconsFound.Add(int64(len(result.Icons) + 1))
|
|
if !result.IframeAllowed {
|
|
stats.IframeBlocked.Add(1)
|
|
}
|
|
} else if result.FetchErr {
|
|
stats.FetchErrors.Add(1)
|
|
} else {
|
|
stats.ParseErrors.Add(1)
|
|
}
|
|
|
|
// Send successful results to writer
|
|
if result.Err == nil {
|
|
resultCh <- WorkResult{Host: host, Result: result}
|
|
}
|
|
}()
|
|
}
|
|
}()
|
|
}
|
|
go func() {
|
|
workerWg.Wait()
|
|
close(resultCh)
|
|
}()
|
|
|
|
// Stage 3: DB writers — multiple goroutines batch writes for efficiency
|
|
const numWriters = 3
|
|
var writerWg sync.WaitGroup
|
|
if !cfg.DryRun {
|
|
for w := 0; w < numWriters; w++ {
|
|
writerWg.Add(1)
|
|
go func() {
|
|
defer writerWg.Done()
|
|
var buf []WorkResult
|
|
for wr := range resultCh {
|
|
buf = append(buf, wr)
|
|
if len(buf) >= cfg.WriteBatch {
|
|
flushStart := time.Now()
|
|
dbErrs := flushResults(ctx, pool, buf, logWriter)
|
|
stats.DBErrors.Add(int64(dbErrs))
|
|
flushMs := time.Since(flushStart).Milliseconds()
|
|
if flushMs > 100 {
|
|
fmt.Printf("[writer] flushed %d results in %dms (resultCh: %d/%d)\n",
|
|
len(buf), flushMs, len(resultCh), cap(resultCh))
|
|
}
|
|
buf = buf[:0]
|
|
}
|
|
}
|
|
if len(buf) > 0 {
|
|
dbErrs := flushResults(ctx, pool, buf, logWriter)
|
|
stats.DBErrors.Add(int64(dbErrs))
|
|
}
|
|
}()
|
|
}
|
|
} else {
|
|
writerWg.Add(1)
|
|
go func() {
|
|
defer writerWg.Done()
|
|
for range resultCh {
|
|
}
|
|
}()
|
|
}
|
|
writerWg.Wait()
|
|
|
|
// Re-enable autovacuum and run manual vacuum
|
|
if !cfg.DryRun {
|
|
fmt.Println("Re-enabling autovacuum and running VACUUM ANALYZE...")
|
|
pool.Exec(ctx, "ALTER TABLE hosts SET (autovacuum_enabled = true)")
|
|
pool.Exec(ctx, "ALTER TABLE icons SET (autovacuum_enabled = true)")
|
|
pool.Exec(ctx, "VACUUM ANALYZE hosts")
|
|
pool.Exec(ctx, "VACUUM ANALYZE icons")
|
|
fmt.Println("VACUUM complete")
|
|
}
|
|
|
|
// Print summary
|
|
duration := time.Since(stats.StartedAt)
|
|
fmt.Printf("\n=== Summary ===\n")
|
|
fmt.Printf("Duration: %s\n", duration.Round(time.Second))
|
|
fmt.Printf("Processed: %d\n", stats.Processed.Load())
|
|
fmt.Printf("Titles found: %d\n", stats.TitlesFound.Load())
|
|
fmt.Printf("No title: %d\n", stats.NoTitle.Load())
|
|
fmt.Printf("Icons found: %d\n", stats.IconsFound.Load())
|
|
fmt.Printf("Iframe blocked: %d\n", stats.IframeBlocked.Load())
|
|
fmt.Printf("Fetch errors: %d\n", stats.FetchErrors.Load())
|
|
fmt.Printf("Parse errors: %d\n", stats.ParseErrors.Load())
|
|
fmt.Printf("DB errors: %d\n", stats.DBErrors.Load())
|
|
fmt.Printf("Panics: %d\n", stats.Panics.Load())
|
|
|
|
// Write stats JSON
|
|
writeStats(stats, cfg)
|
|
}
|