package main import ( "context" "flag" "fmt" "log" "os" "sync" "sync/atomic" "time" "github.com/jackc/pgx/v5/pgxpool" ) type Config struct { DBUrl string BatchSize int Concurrency int Limit int DryRun bool LogFile string LogErrors bool } type Stats struct { Processed atomic.Int64 TitlesFound atomic.Int64 IconsFound atomic.Int64 IframeBlocked atomic.Int64 ParseErrors atomic.Int64 FetchErrors atomic.Int64 DBErrors atomic.Int64 Panics atomic.Int64 StartedAt time.Time } func main() { cfg := Config{} flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)") flag.IntVar(&cfg.BatchSize, "batch-size", 500, "Rows to fetch per batch") flag.IntVar(&cfg.Concurrency, "concurrency", 100, "Number of concurrent goroutines") flag.IntVar(&cfg.Limit, "limit", 0, "Max rows to process (0 = all)") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Print results without writing to DB") flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file") flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file") flag.Parse() if cfg.DBUrl == "" { fmt.Println("Usage: warc_parse --db DATABASE_URL [OPTIONS]") flag.PrintDefaults() os.Exit(1) } ctx := context.Background() // Init S3 client if err := initS3(); err != nil { log.Fatalf("Failed to init S3: %v", err) } pool, err := pgxpool.New(ctx, cfg.DBUrl) if err != nil { log.Fatalf("Failed to connect to database: %v", err) } defer pool.Close() // Get total count var total int64 if cfg.Limit > 0 { total = int64(cfg.Limit) } else { err = pool.QueryRow(ctx, "SELECT COUNT(*) FROM hosts WHERE parsed = FALSE").Scan(&total) if err != nil { log.Fatalf("Failed to count unparsed hosts: %v", err) } } if total == 0 { fmt.Println("No unparsed hosts found.") return } fmt.Printf("=== WARC Parser ===\n") fmt.Printf("Unparsed hosts: %d\n", total) fmt.Printf("Concurrency: %d\n", cfg.Concurrency) fmt.Printf("Batch size: %d\n", cfg.BatchSize) fmt.Printf("Dry run: %v\n\n", cfg.DryRun) // Setup log file var logWriter *LogWriter if cfg.LogFile != "" { logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors) if err != nil { log.Fatalf("Failed to open log file: %v", err) } defer logWriter.Close() } stats := &Stats{StartedAt: time.Now()} // Worker pool sem := make(chan struct{}, cfg.Concurrency) var wg sync.WaitGroup // Process in batches var lastID int64 processed := 0 for { if cfg.Limit > 0 && processed >= cfg.Limit { break } batchLimit := cfg.BatchSize if cfg.Limit > 0 && processed+batchLimit > cfg.Limit { batchLimit = cfg.Limit - processed } hosts, err := fetchBatch(ctx, pool, lastID, batchLimit) if err != nil { log.Fatalf("Failed to fetch batch: %v", err) } if len(hosts) == 0 { break } lastID = hosts[len(hosts)-1].ID for i := range hosts { host := hosts[i] wg.Add(1) sem <- struct{}{} go func() { defer wg.Done() defer func() { <-sem }() // Recover from panics — log them, don't mark row as parsed defer func() { if r := recover(); r != nil { stats.Panics.Add(1) stats.Processed.Add(1) logLine := fmt.Sprintf("PANIC: %s %v", host.Hostname, r) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, true) } } }() result := processHost(host) // Log line logLine := formatLogLine(host, result) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, result.Err != nil) } // Write to DB if !cfg.DryRun && result.Err == nil { errs := writeResult(ctx, pool, host, result, logWriter) stats.DBErrors.Add(int64(errs.HostUpdate + errs.IconInsert)) } // Update stats stats.Processed.Add(1) if result.Title != "" { stats.TitlesFound.Add(1) } stats.IconsFound.Add(int64(len(result.Icons))) if !result.IframeAllowed { stats.IframeBlocked.Add(1) } if result.Err != nil { if result.FetchErr { stats.FetchErrors.Add(1) } else { stats.ParseErrors.Add(1) } } }() } processed += len(hosts) } wg.Wait() // Print summary duration := time.Since(stats.StartedAt) fmt.Printf("\n=== Summary ===\n") fmt.Printf("Duration: %s\n", duration.Round(time.Second)) fmt.Printf("Processed: %d\n", stats.Processed.Load()) fmt.Printf("Titles found: %d\n", stats.TitlesFound.Load()) fmt.Printf("Icons found: %d\n", stats.IconsFound.Load()) fmt.Printf("Iframe blocked: %d\n", stats.IframeBlocked.Load()) fmt.Printf("Fetch errors: %d\n", stats.FetchErrors.Load()) fmt.Printf("Parse errors: %d\n", stats.ParseErrors.Load()) fmt.Printf("DB errors: %d\n", stats.DBErrors.Load()) fmt.Printf("Panics: %d\n", stats.Panics.Load()) // Write stats JSON writeStats(stats, cfg) }