package main import ( "context" "flag" "fmt" "log" "os" "sync" "sync/atomic" "time" "github.com/jackc/pgx/v5/pgxpool" ) type Config struct { DBUrl string BatchSize int Concurrency int WriteBatch int Limit int DryRun bool LogFile string LogErrors bool } type Stats struct { Processed atomic.Int64 TitlesFound atomic.Int64 NoTitle atomic.Int64 IconsFound atomic.Int64 IframeBlocked atomic.Int64 ParseErrors atomic.Int64 FetchErrors atomic.Int64 DBErrors atomic.Int64 Panics atomic.Int64 StartedAt time.Time } func main() { cfg := Config{} flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)") flag.IntVar(&cfg.BatchSize, "batch-size", 5000, "Rows to fetch per batch") flag.IntVar(&cfg.Concurrency, "concurrency", 500, "Number of concurrent goroutines") flag.IntVar(&cfg.WriteBatch, "write-batch", 100, "Results to batch per DB write") flag.IntVar(&cfg.Limit, "limit", 0, "Max rows to process (0 = all)") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Print results without writing to DB") flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file") flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file") flag.Parse() if cfg.DBUrl == "" { fmt.Println("Usage: warc_parse --db DATABASE_URL [OPTIONS]") flag.PrintDefaults() os.Exit(1) } ctx := context.Background() // Init S3 client if err := initS3(); err != nil { log.Fatalf("Failed to init S3: %v", err) } pool, err := pgxpool.New(ctx, cfg.DBUrl) if err != nil { log.Fatalf("Failed to connect to database: %v", err) } defer pool.Close() // Get total count var total int64 if cfg.Limit > 0 { total = int64(cfg.Limit) } else { err = pool.QueryRow(ctx, "SELECT COUNT(*) FROM hosts WHERE parsed = FALSE").Scan(&total) if err != nil { log.Fatalf("Failed to count unparsed hosts: %v", err) } } if total == 0 { fmt.Println("No unparsed hosts found.") return } fmt.Printf("=== WARC Parser ===\n") fmt.Printf("Unparsed hosts: %d\n", total) fmt.Printf("Concurrency: %d\n", cfg.Concurrency) fmt.Printf("Batch size: %d\n", cfg.BatchSize) fmt.Printf("Dry run: %v\n\n", cfg.DryRun) // Setup log file var logWriter *LogWriter if cfg.LogFile != "" { logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors) if err != nil { log.Fatalf("Failed to open log file: %v", err) } defer logWriter.Close() } stats := &Stats{StartedAt: time.Now()} // Three-stage pipeline: // [DB fetcher] → hostCh → [N workers] → resultCh → [DB writer] // Workers do S3 fetch + parsing (I/O-bound). Writer batches DB writes. hostCh := make(chan Host, cfg.BatchSize) resultCh := make(chan WorkResult, cfg.Concurrency) // Stage 1: DB fetcher — continuously fetches pages into hostCh go func() { defer close(hostCh) var lastID int64 fetched := 0 for { limit := cfg.BatchSize if cfg.Limit > 0 { remaining := cfg.Limit - fetched if remaining <= 0 { break } if limit > remaining { limit = remaining } } hosts, err := fetchBatch(ctx, pool, lastID, limit) if err != nil { log.Fatalf("Failed to fetch batch: %v", err) } if len(hosts) == 0 { break } lastID = hosts[len(hosts)-1].ID for _, h := range hosts { hostCh <- h } fetched += len(hosts) } }() // Stage 2: Workers — fetch WARC from S3, parse HTML, emit results var workerWg sync.WaitGroup for i := 0; i < cfg.Concurrency; i++ { workerWg.Add(1) go func() { defer workerWg.Done() for host := range hostCh { func() { defer func() { if r := recover(); r != nil { stats.Panics.Add(1) stats.Processed.Add(1) logLine := fmt.Sprintf("PANIC: %s %v", host.Hostname, r) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, true) } } }() result := processHost(host) // Log logLine := formatLogLine(host, result) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, result.Err != nil) } // Update stats stats.Processed.Add(1) if result.Err == nil { if result.Title != "" { stats.TitlesFound.Add(1) } else { stats.NoTitle.Add(1) } stats.IconsFound.Add(int64(len(result.Icons) + 1)) if !result.IframeAllowed { stats.IframeBlocked.Add(1) } } else if result.FetchErr { stats.FetchErrors.Add(1) } else { stats.ParseErrors.Add(1) } // Send successful results to writer if result.Err == nil { resultCh <- WorkResult{Host: host, Result: result} } }() } }() } go func() { workerWg.Wait() close(resultCh) }() // Stage 3: DB writer — batches writes for efficiency if !cfg.DryRun { var buf []WorkResult for wr := range resultCh { buf = append(buf, wr) if len(buf) >= cfg.WriteBatch { dbErrs := flushResults(ctx, pool, buf, logWriter) stats.DBErrors.Add(int64(dbErrs)) buf = buf[:0] } } if len(buf) > 0 { dbErrs := flushResults(ctx, pool, buf, logWriter) stats.DBErrors.Add(int64(dbErrs)) } } else { for range resultCh { // drain in dry-run mode } } // Print summary duration := time.Since(stats.StartedAt) fmt.Printf("\n=== Summary ===\n") fmt.Printf("Duration: %s\n", duration.Round(time.Second)) fmt.Printf("Processed: %d\n", stats.Processed.Load()) fmt.Printf("Titles found: %d\n", stats.TitlesFound.Load()) fmt.Printf("No title: %d\n", stats.NoTitle.Load()) fmt.Printf("Icons found: %d\n", stats.IconsFound.Load()) fmt.Printf("Iframe blocked: %d\n", stats.IframeBlocked.Load()) fmt.Printf("Fetch errors: %d\n", stats.FetchErrors.Load()) fmt.Printf("Parse errors: %d\n", stats.ParseErrors.Load()) fmt.Printf("DB errors: %d\n", stats.DBErrors.Load()) fmt.Printf("Panics: %d\n", stats.Panics.Load()) // Write stats JSON writeStats(stats, cfg) }