added warc parser
This commit is contained in:
parent
db81015e0b
commit
f45e4a6034
8 changed files with 954 additions and 0 deletions
207
pipeline/02_warc_parse/main.go
Normal file
207
pipeline/02_warc_parse/main.go
Normal file
|
|
@ -0,0 +1,207 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
DBUrl string
|
||||
BatchSize int
|
||||
Concurrency int
|
||||
Limit int
|
||||
DryRun bool
|
||||
LogFile string
|
||||
LogErrors bool
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
Processed atomic.Int64
|
||||
TitlesFound atomic.Int64
|
||||
IconsFound atomic.Int64
|
||||
IframeBlocked atomic.Int64
|
||||
ParseErrors atomic.Int64
|
||||
FetchErrors atomic.Int64
|
||||
DBErrors atomic.Int64
|
||||
Panics atomic.Int64
|
||||
StartedAt time.Time
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := Config{}
|
||||
flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)")
|
||||
flag.IntVar(&cfg.BatchSize, "batch-size", 500, "Rows to fetch per batch")
|
||||
flag.IntVar(&cfg.Concurrency, "concurrency", 100, "Number of concurrent goroutines")
|
||||
flag.IntVar(&cfg.Limit, "limit", 0, "Max rows to process (0 = all)")
|
||||
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Print results without writing to DB")
|
||||
flag.StringVar(&cfg.LogFile, "log-file", "", "Mirror log lines to this file")
|
||||
flag.BoolVar(&cfg.LogErrors, "log-errors-only", false, "Only write errors to log file")
|
||||
flag.Parse()
|
||||
|
||||
if cfg.DBUrl == "" {
|
||||
fmt.Println("Usage: warc_parse --db DATABASE_URL [OPTIONS]")
|
||||
flag.PrintDefaults()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Init S3 client
|
||||
if err := initS3(); err != nil {
|
||||
log.Fatalf("Failed to init S3: %v", err)
|
||||
}
|
||||
|
||||
pool, err := pgxpool.New(ctx, cfg.DBUrl)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to database: %v", err)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
||||
// Get total count
|
||||
var total int64
|
||||
if cfg.Limit > 0 {
|
||||
total = int64(cfg.Limit)
|
||||
} else {
|
||||
err = pool.QueryRow(ctx, "SELECT COUNT(*) FROM hosts WHERE parsed = FALSE").Scan(&total)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to count unparsed hosts: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if total == 0 {
|
||||
fmt.Println("No unparsed hosts found.")
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("=== WARC Parser ===\n")
|
||||
fmt.Printf("Unparsed hosts: %d\n", total)
|
||||
fmt.Printf("Concurrency: %d\n", cfg.Concurrency)
|
||||
fmt.Printf("Batch size: %d\n", cfg.BatchSize)
|
||||
fmt.Printf("Dry run: %v\n\n", cfg.DryRun)
|
||||
|
||||
// Setup log file
|
||||
var logWriter *LogWriter
|
||||
if cfg.LogFile != "" {
|
||||
logWriter, err = NewLogWriter(cfg.LogFile, cfg.LogErrors)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to open log file: %v", err)
|
||||
}
|
||||
defer logWriter.Close()
|
||||
}
|
||||
|
||||
stats := &Stats{StartedAt: time.Now()}
|
||||
|
||||
|
||||
// Worker pool
|
||||
sem := make(chan struct{}, cfg.Concurrency)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Process in batches
|
||||
var lastID int64
|
||||
processed := 0
|
||||
|
||||
for {
|
||||
if cfg.Limit > 0 && processed >= cfg.Limit {
|
||||
break
|
||||
}
|
||||
|
||||
batchLimit := cfg.BatchSize
|
||||
if cfg.Limit > 0 && processed+batchLimit > cfg.Limit {
|
||||
batchLimit = cfg.Limit - processed
|
||||
}
|
||||
|
||||
hosts, err := fetchBatch(ctx, pool, lastID, batchLimit)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to fetch batch: %v", err)
|
||||
}
|
||||
if len(hosts) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
lastID = hosts[len(hosts)-1].ID
|
||||
|
||||
for i := range hosts {
|
||||
host := hosts[i]
|
||||
wg.Add(1)
|
||||
sem <- struct{}{}
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer func() { <-sem }()
|
||||
|
||||
// Recover from panics — log them, don't mark row as parsed
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
stats.Panics.Add(1)
|
||||
stats.Processed.Add(1)
|
||||
logLine := fmt.Sprintf("PANIC: %s %v", host.Hostname, r)
|
||||
fmt.Println(logLine)
|
||||
if logWriter != nil {
|
||||
logWriter.Write(logLine, true)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
result := processHost(host)
|
||||
|
||||
// Log line
|
||||
logLine := formatLogLine(host, result)
|
||||
fmt.Println(logLine)
|
||||
if logWriter != nil {
|
||||
logWriter.Write(logLine, result.Err != nil)
|
||||
}
|
||||
|
||||
// Write to DB
|
||||
if !cfg.DryRun && result.Err == nil {
|
||||
errs := writeResult(ctx, pool, host, result, logWriter)
|
||||
stats.DBErrors.Add(int64(errs.HostUpdate + errs.IconInsert))
|
||||
}
|
||||
|
||||
// Update stats
|
||||
stats.Processed.Add(1)
|
||||
if result.Title != "" {
|
||||
stats.TitlesFound.Add(1)
|
||||
}
|
||||
stats.IconsFound.Add(int64(len(result.Icons)))
|
||||
if !result.IframeAllowed {
|
||||
stats.IframeBlocked.Add(1)
|
||||
}
|
||||
if result.Err != nil {
|
||||
if result.FetchErr {
|
||||
stats.FetchErrors.Add(1)
|
||||
} else {
|
||||
stats.ParseErrors.Add(1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
processed += len(hosts)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Print summary
|
||||
duration := time.Since(stats.StartedAt)
|
||||
fmt.Printf("\n=== Summary ===\n")
|
||||
fmt.Printf("Duration: %s\n", duration.Round(time.Second))
|
||||
fmt.Printf("Processed: %d\n", stats.Processed.Load())
|
||||
fmt.Printf("Titles found: %d\n", stats.TitlesFound.Load())
|
||||
fmt.Printf("Icons found: %d\n", stats.IconsFound.Load())
|
||||
fmt.Printf("Iframe blocked: %d\n", stats.IframeBlocked.Load())
|
||||
fmt.Printf("Fetch errors: %d\n", stats.FetchErrors.Load())
|
||||
fmt.Printf("Parse errors: %d\n", stats.ParseErrors.Load())
|
||||
fmt.Printf("DB errors: %d\n", stats.DBErrors.Load())
|
||||
fmt.Printf("Panics: %d\n", stats.Panics.Load())
|
||||
|
||||
// Write stats JSON
|
||||
writeStats(stats, cfg)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue