everytab/pipeline/02_warc_parse/db.go

123 lines
3.2 KiB
Go

package main
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// Host represents a row from the hosts table.
type Host struct {
ID int64
Hostname string
Protocol string
WarcFilename string
WarcRecordOffset int64
WarcRecordLength int
}
// ProcessResult holds everything extracted from one host's WARC record.
type ProcessResult struct {
Title string
IframeAllowed bool
Icons []Icon
Err error
FetchErr bool // true if error was during fetch (vs parse)
}
// fetchBatch gets the next batch of unparsed hosts after lastID.
func fetchBatch(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]Host, error) {
rows, err := pool.Query(ctx,
`SELECT id, hostname, protocol, warc_filename, warc_record_offset, warc_record_length
FROM hosts
WHERE parsed = FALSE AND id > $1
ORDER BY id
LIMIT $2`,
lastID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var hosts []Host
for rows.Next() {
var h Host
err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.WarcFilename, &h.WarcRecordOffset, &h.WarcRecordLength)
if err != nil {
return nil, err
}
hosts = append(hosts, h)
}
return hosts, rows.Err()
}
// WorkResult pairs a host with its parsed result for the DB writer.
type WorkResult struct {
Host Host
Result ProcessResult
}
// flushResults writes a batch of successful results to the database.
// Host UPDATEs use pgx.Batch. Icon INSERTs use pgx.CopyFrom for bulk throughput.
// Returns the number of DB errors encountered.
func flushResults(ctx context.Context, pool *pgxpool.Pool, results []WorkResult, logWriter *LogWriter) int {
dbErrors := 0
// 1. Batch UPDATE hosts (can't COPY an UPDATE)
batch := &pgx.Batch{}
for _, wr := range results {
batch.Queue(
`UPDATE hosts SET html_title = $1, iframe_allowed = $2, parsed = TRUE WHERE id = $3`,
nilIfEmpty(wr.Result.Title), wr.Result.IframeAllowed, wr.Host.ID,
)
}
br := pool.SendBatch(ctx, batch)
for _, wr := range results {
if _, err := br.Exec(); err != nil {
dbErrors++
logLine := fmt.Sprintf("DB_ERROR: %s hosts_update: %v", wr.Host.Hostname, err)
fmt.Println(logLine)
if logWriter != nil {
logWriter.Write(logLine, true)
}
}
}
br.Close()
// 2. COPY icons in bulk (much less IOPS than individual INSERTs)
var iconRows [][]any
for _, wr := range results {
// favicon.ico entry
faviconURL := fmt.Sprintf("%s://%s/favicon.ico", wr.Host.Protocol, wr.Host.Hostname)
iconRows = append(iconRows, []any{wr.Host.ID, faviconURL, "favicon_ico", nil, nil})
// link_rel entries
for _, icon := range wr.Result.Icons {
iconRows = append(iconRows, []any{wr.Host.ID, icon.URL, icon.Source, nilIfEmpty(icon.RelType), nilIfEmpty(icon.RelSizes)})
}
}
_, err := pool.CopyFrom(ctx,
pgx.Identifier{"icons"},
[]string{"host_id", "url", "source", "rel_type", "rel_sizes"},
pgx.CopyFromRows(iconRows),
)
if err != nil {
dbErrors++
logLine := fmt.Sprintf("DB_ERROR: icon COPY failed (%d rows): %v", len(iconRows), err)
fmt.Println(logLine)
if logWriter != nil {
logWriter.Write(logLine, true)
}
}
return dbErrors
}
func nilIfEmpty(s string) *string {
if s == "" {
return nil
}
return &s
}