135 lines
3.4 KiB
Go
135 lines
3.4 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// Host represents a row from the hosts table.
|
|
type Host struct {
|
|
ID int64
|
|
Hostname string
|
|
Protocol string
|
|
WarcFilename string
|
|
WarcRecordOffset int64
|
|
WarcRecordLength int
|
|
}
|
|
|
|
// ProcessResult holds everything extracted from one host's WARC record.
|
|
type ProcessResult struct {
|
|
Title string
|
|
IframeAllowed bool
|
|
Icons []Icon
|
|
Err error
|
|
FetchErr bool // true if error was during fetch (vs parse)
|
|
}
|
|
|
|
// fetchBatch gets the next batch of unparsed hosts after lastID.
|
|
func fetchBatch(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]Host, error) {
|
|
rows, err := pool.Query(ctx,
|
|
`SELECT id, hostname, protocol, warc_filename, warc_record_offset, warc_record_length
|
|
FROM hosts
|
|
WHERE parsed = FALSE AND id > $1
|
|
ORDER BY id
|
|
LIMIT $2`,
|
|
lastID, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var hosts []Host
|
|
for rows.Next() {
|
|
var h Host
|
|
err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.WarcFilename, &h.WarcRecordOffset, &h.WarcRecordLength)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
hosts = append(hosts, h)
|
|
}
|
|
return hosts, rows.Err()
|
|
}
|
|
|
|
// WorkResult pairs a host with its parsed result for the DB writer.
|
|
type WorkResult struct {
|
|
Host Host
|
|
Result ProcessResult
|
|
}
|
|
|
|
// flushResults writes a batch of successful results to the database using pgx.Batch.
|
|
// Returns the number of DB errors encountered.
|
|
func flushResults(ctx context.Context, pool *pgxpool.Pool, results []WorkResult, logWriter *LogWriter) int {
|
|
batch := &pgx.Batch{}
|
|
|
|
// Queue all queries
|
|
for _, wr := range results {
|
|
// Update host
|
|
batch.Queue(
|
|
`UPDATE hosts SET html_title = $1, iframe_allowed = $2, parsed = TRUE WHERE id = $3`,
|
|
nilIfEmpty(wr.Result.Title), wr.Result.IframeAllowed, wr.Host.ID,
|
|
)
|
|
// Insert /favicon.ico
|
|
faviconURL := fmt.Sprintf("%s://%s/favicon.ico", wr.Host.Protocol, wr.Host.Hostname)
|
|
batch.Queue(
|
|
`INSERT INTO icons (host_id, url, source) VALUES ($1, $2, 'favicon_ico')`,
|
|
wr.Host.ID, faviconURL,
|
|
)
|
|
// Insert link_rel icons
|
|
for _, icon := range wr.Result.Icons {
|
|
batch.Queue(
|
|
`INSERT INTO icons (host_id, url, source, rel_type, rel_sizes) VALUES ($1, $2, $3, $4, $5)`,
|
|
wr.Host.ID, icon.URL, icon.Source, nilIfEmpty(icon.RelType), nilIfEmpty(icon.RelSizes),
|
|
)
|
|
}
|
|
}
|
|
|
|
// Send all queries in one round-trip
|
|
br := pool.SendBatch(ctx, batch)
|
|
|
|
// Check results
|
|
dbErrors := 0
|
|
for _, wr := range results {
|
|
// host update
|
|
if _, err := br.Exec(); err != nil {
|
|
dbErrors++
|
|
logLine := fmt.Sprintf("DB_ERROR: %s hosts_update: %v", wr.Host.Hostname, err)
|
|
fmt.Println(logLine)
|
|
if logWriter != nil {
|
|
logWriter.Write(logLine, true)
|
|
}
|
|
}
|
|
// favicon.ico insert
|
|
if _, err := br.Exec(); err != nil {
|
|
dbErrors++
|
|
logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", wr.Host.Hostname, err)
|
|
fmt.Println(logLine)
|
|
if logWriter != nil {
|
|
logWriter.Write(logLine, true)
|
|
}
|
|
}
|
|
// link_rel icon inserts
|
|
for range wr.Result.Icons {
|
|
if _, err := br.Exec(); err != nil {
|
|
dbErrors++
|
|
logLine := fmt.Sprintf("DB_ERROR: %s icon_insert: %v", wr.Host.Hostname, err)
|
|
fmt.Println(logLine)
|
|
if logWriter != nil {
|
|
logWriter.Write(logLine, true)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
br.Close()
|
|
return dbErrors
|
|
}
|
|
|
|
func nilIfEmpty(s string) *string {
|
|
if s == "" {
|
|
return nil
|
|
}
|
|
return &s
|
|
}
|