package main import ( "context" "fmt" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) // Host represents a row from the hosts table. type Host struct { ID int64 Hostname string Protocol string WarcFilename string WarcRecordOffset int64 WarcRecordLength int } // ProcessResult holds everything extracted from one host's WARC record. type ProcessResult struct { Title string IframeAllowed bool Icons []Icon Err error FetchErr bool // true if error was during fetch (vs parse) } // fetchBatch gets the next batch of unparsed hosts after lastID. func fetchBatch(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]Host, error) { rows, err := pool.Query(ctx, `SELECT id, hostname, protocol, warc_filename, warc_record_offset, warc_record_length FROM hosts WHERE parsed = FALSE AND id > $1 ORDER BY id LIMIT $2`, lastID, limit) if err != nil { return nil, err } defer rows.Close() var hosts []Host for rows.Next() { var h Host err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.WarcFilename, &h.WarcRecordOffset, &h.WarcRecordLength) if err != nil { return nil, err } hosts = append(hosts, h) } return hosts, rows.Err() } // WorkResult pairs a host with its parsed result for the DB writer. type WorkResult struct { Host Host Result ProcessResult } // flushResults writes a batch of successful results to the database. // Host UPDATEs use pgx.Batch. Icon INSERTs use pgx.CopyFrom for bulk throughput. // Returns the number of DB errors encountered. func flushResults(ctx context.Context, pool *pgxpool.Pool, results []WorkResult, logWriter *LogWriter) int { dbErrors := 0 // 1. Batch UPDATE hosts (can't COPY an UPDATE) batch := &pgx.Batch{} for _, wr := range results { batch.Queue( `UPDATE hosts SET html_title = $1, iframe_allowed = $2, parsed = TRUE WHERE id = $3`, nilIfEmpty(wr.Result.Title), wr.Result.IframeAllowed, wr.Host.ID, ) } br := pool.SendBatch(ctx, batch) for _, wr := range results { if _, err := br.Exec(); err != nil { dbErrors++ logLine := fmt.Sprintf("DB_ERROR: %s hosts_update: %v", wr.Host.Hostname, err) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, true) } } } br.Close() // 2. COPY icons in bulk (much less IOPS than individual INSERTs) var iconRows [][]any for _, wr := range results { // favicon.ico entry faviconURL := fmt.Sprintf("%s://%s/favicon.ico", wr.Host.Protocol, wr.Host.Hostname) iconRows = append(iconRows, []any{wr.Host.ID, faviconURL, "favicon_ico", nil, nil}) // link_rel entries for _, icon := range wr.Result.Icons { iconRows = append(iconRows, []any{wr.Host.ID, icon.URL, icon.Source, nilIfEmpty(icon.RelType), nilIfEmpty(icon.RelSizes)}) } } _, err := pool.CopyFrom(ctx, pgx.Identifier{"icons"}, []string{"host_id", "url", "source", "rel_type", "rel_sizes"}, pgx.CopyFromRows(iconRows), ) if err != nil { dbErrors++ logLine := fmt.Sprintf("DB_ERROR: icon COPY failed (%d rows): %v", len(iconRows), err) fmt.Println(logLine) if logWriter != nil { logWriter.Write(logLine, true) } } return dbErrors } func nilIfEmpty(s string) *string { if s == "" { return nil } return &s }