package main import ( "context" "flag" "fmt" "log" "os" "sync" "sync/atomic" "time" "github.com/jackc/pgx/v5/pgxpool" ) type Config struct { DBUrl string BatchSize int Concurrency int WriteBatch int Limit int DryRun bool LogFile string LogErrors bool } type Stats struct { Processed atomic.Int64 TitlesFound atomic.Int64 NoTitle atomic.Int64 IconsFound atomic.Int64 IframeBlocked atomic.Int64 ParseErrors atomic.Int64 FetchErrors atomic.Int64 DBErrors atomic.Int64 Panics atomic.Int64 StartedAt time.Time } func main() { cfg := Config{} flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)") flag.IntVar(&cfg.BatchSize, "batch-size", 5000, "Rows to fetch per batch") flag.IntVar(&cfg.Concurrency, "concurrency", 500, "Number of concurrent goroutines") flag.IntVar(&cfg.WriteBatch, "write-batch", 1000, "Results to batch per DB write") flag.IntVar(&cfg.Limit, "limit", 0, "Max rows to process (0 = all)") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Print results without writing to DB") flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file") flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file") flag.Parse() if cfg.DBUrl == "" { fmt.Println("Usage: warc_parse --db DATABASE_URL [OPTIONS]") flag.PrintDefaults() os.Exit(1) } ctx := context.Background() // Init S3 client if err := initS3(); err != nil { log.Fatalf("Failed to init S3: %v", err) } pool, err := pgxpool.New(ctx, cfg.DBUrl) if err != nil { log.Fatalf("Failed to connect to database: %v", err) } defer pool.Close() fmt.Printf("=== WARC Parser ===\n") fmt.Printf("Concurrency: %d\n", cfg.Concurrency) fmt.Printf("Batch size: %d\n", cfg.BatchSize) fmt.Printf("Write batch: %d\n", cfg.WriteBatch) if cfg.Limit > 0 { fmt.Printf("Limit: %d\n", cfg.Limit) } fmt.Printf("Dry run: %v\n\n", cfg.DryRun) // Setup log file var logWriter *LogWriter if cfg.LogFile != "" { logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors) if err != nil { log.Fatalf("Failed to open log file: %v", err) } defer logWriter.Close() } // Disable autovacuum during heavy writes — it competes for disk I/O // and causes writer stalls. Re-enabled + manual VACUUM at end. if !cfg.DryRun { fmt.Println("Disabling autovacuum on hosts and icons tables...") pool.Exec(ctx, "ALTER TABLE hosts SET (autovacuum_enabled = false)") pool.Exec(ctx, "ALTER TABLE icons SET (autovacuum_enabled = false)") } stats := &Stats{StartedAt: time.Now()} // Three-stage pipeline: // [DB fetcher] → hostCh → [N workers] → resultCh → [DB writer] // Workers do S3 fetch + parsing (I/O-bound). Writer batches DB writes. hostCh := make(chan Host, 20000) resultCh := make(chan WorkResult, 1000) // Stage 1: DB fetcher — continuously fetches pages into hostCh go func() { defer close(hostCh) var lastID int64 fetched := 0 for { limit := cfg.BatchSize if cfg.Limit > 0 { remaining := cfg.Limit - fetched if remaining <= 0 { break } if limit > remaining { limit = remaining } } fetchStart := time.Now() hosts, err := fetchBatch(ctx, pool, lastID, limit) if err != nil { log.Fatalf("Failed to fetch batch: %v", err) } if len(hosts) == 0 { break } lastID = hosts[len(hosts)-1].ID fmt.Printf("[fetcher] %d hosts in %dms (hostCh: %d/%d)\n", len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh)) for _, h := range hosts { hostCh <- h } fetched += len(hosts) } }() // Stage 2: Workers — fetch WARC from S3, parse HTML, emit results var workerWg sync.WaitGroup for i := 0; i < cfg.Concurrency; i++ { workerWg.Add(1) go func() { defer workerWg.Done() for host := range hostCh { func() { defer func() { if r := recover(); r != nil { stats.Panics.Add(1) stats.Processed.Add(1) logLine := fmt.Sprintf("PANIC: %s %v", host.Hostname, r) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, true) } } }() result := processHost(host) // Log logLine := formatLogLine(host, result) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, result.Err != nil) } // Update stats stats.Processed.Add(1) if result.Err == nil { if result.Title != "" { stats.TitlesFound.Add(1) } else { stats.NoTitle.Add(1) } stats.IconsFound.Add(int64(len(result.Icons) + 1)) if !result.IframeAllowed { stats.IframeBlocked.Add(1) } } else if result.FetchErr { stats.FetchErrors.Add(1) } else { stats.ParseErrors.Add(1) } // Send successful results to writer if result.Err == nil { resultCh <- WorkResult{Host: host, Result: result} } }() } }() } go func() { workerWg.Wait() close(resultCh) }() // Stage 3: DB writers — multiple goroutines batch writes for efficiency const numWriters = 3 var writerWg sync.WaitGroup if !cfg.DryRun { for w := 0; w < numWriters; w++ { writerWg.Add(1) go func() { defer writerWg.Done() var buf []WorkResult for wr := range resultCh { buf = append(buf, wr) if len(buf) >= cfg.WriteBatch { flushStart := time.Now() dbErrs := flushResults(ctx, pool, buf, logWriter) stats.DBErrors.Add(int64(dbErrs)) flushMs := time.Since(flushStart).Milliseconds() if flushMs > 100 { fmt.Printf("[writer] flushed %d results in %dms (resultCh: %d/%d)\n", len(buf), flushMs, len(resultCh), cap(resultCh)) } buf = buf[:0] } } if len(buf) > 0 { dbErrs := flushResults(ctx, pool, buf, logWriter) stats.DBErrors.Add(int64(dbErrs)) } }() } } else { writerWg.Add(1) go func() { defer writerWg.Done() for range resultCh { } }() } writerWg.Wait() // Re-enable autovacuum and run manual vacuum if !cfg.DryRun { fmt.Println("Re-enabling autovacuum and running VACUUM ANALYZE...") pool.Exec(ctx, "ALTER TABLE hosts SET (autovacuum_enabled = true)") pool.Exec(ctx, "ALTER TABLE icons SET (autovacuum_enabled = true)") pool.Exec(ctx, "VACUUM ANALYZE hosts") pool.Exec(ctx, "VACUUM ANALYZE icons") fmt.Println("VACUUM complete") } // Print summary duration := time.Since(stats.StartedAt) fmt.Printf("\n=== Summary ===\n") fmt.Printf("Duration: %s\n", duration.Round(time.Second)) fmt.Printf("Processed: %d\n", stats.Processed.Load()) fmt.Printf("Titles found: %d\n", stats.TitlesFound.Load()) fmt.Printf("No title: %d\n", stats.NoTitle.Load()) fmt.Printf("Icons found: %d\n", stats.IconsFound.Load()) fmt.Printf("Iframe blocked: %d\n", stats.IframeBlocked.Load()) fmt.Printf("Fetch errors: %d\n", stats.FetchErrors.Load()) fmt.Printf("Parse errors: %d\n", stats.ParseErrors.Load()) fmt.Printf("DB errors: %d\n", stats.DBErrors.Load()) fmt.Printf("Panics: %d\n", stats.Panics.Load()) // Write stats JSON writeStats(stats, cfg) }