revert to using host id ordering for bundle generation
This commit is contained in:
parent
9266c1417d
commit
1343df1a41
3 changed files with 20 additions and 62 deletions
|
|
@ -21,7 +21,7 @@ SET work_mem = '2GB';
|
||||||
\timing on
|
\timing on
|
||||||
|
|
||||||
CREATE TEMP TABLE best_icons AS
|
CREATE TEMP TABLE best_icons AS
|
||||||
SELECT DISTINCT ON (host_id) host_id, icon_hash, downloaded_at
|
SELECT DISTINCT ON (host_id) host_id, icon_hash
|
||||||
FROM icons
|
FROM icons
|
||||||
WHERE scan_state = 'completed'
|
WHERE scan_state = 'completed'
|
||||||
AND icon_hash IS NOT NULL
|
AND icon_hash IS NOT NULL
|
||||||
|
|
@ -50,7 +50,7 @@ ORDER BY host_id,
|
||||||
|
|
||||||
\echo 'Step 2: Updating hosts...'
|
\echo 'Step 2: Updating hosts...'
|
||||||
|
|
||||||
UPDATE hosts h SET best_icon_hash = b.icon_hash, icon_downloaded_at = b.downloaded_at
|
UPDATE hosts h SET best_icon_hash = b.icon_hash
|
||||||
FROM best_icons b WHERE h.id = b.host_id;
|
FROM best_icons b WHERE h.id = b.host_id;
|
||||||
|
|
||||||
\timing off
|
\timing off
|
||||||
|
|
|
||||||
|
|
@ -2,56 +2,28 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
type HostRow struct {
|
type HostRow struct {
|
||||||
ID int64
|
ID int64
|
||||||
Hostname string
|
Hostname string
|
||||||
Protocol string
|
Protocol string
|
||||||
HtmlTitle string
|
HtmlTitle string
|
||||||
IframeAllowed bool
|
IframeAllowed bool
|
||||||
BestIconHash string
|
BestIconHash string
|
||||||
IconDownloadedAt *time.Time
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchHostsPage gets a page of hosts with titles, ordered by icon_downloaded_at for disk locality.
|
// fetchHostsPage gets a page of hosts with titles, ordered by id for disk locality.
|
||||||
// Icons written to disk at similar times are physically adjacent on the EBS volume —
|
func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]HostRow, error) {
|
||||||
// reading in write order maximizes OS readahead cache hits.
|
rows, err := pool.Query(ctx, `
|
||||||
func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastTime *time.Time, lastID int64, limit int) ([]HostRow, error) {
|
SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, '')
|
||||||
// Two-phase: first hosts with icons (ordered by download time), then hosts without
|
FROM hosts
|
||||||
var rows pgx.Rows
|
WHERE html_title IS NOT NULL AND id > $1
|
||||||
var err error
|
ORDER BY id
|
||||||
if lastTime != nil {
|
LIMIT $2
|
||||||
rows, err = pool.Query(ctx, `
|
`, lastID, limit)
|
||||||
SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, ''), icon_downloaded_at
|
|
||||||
FROM hosts
|
|
||||||
WHERE html_title IS NOT NULL AND icon_downloaded_at > $1
|
|
||||||
ORDER BY icon_downloaded_at
|
|
||||||
LIMIT $2
|
|
||||||
`, lastTime, limit)
|
|
||||||
} else if lastID > 0 {
|
|
||||||
// No more timestamped hosts — fetch remaining (no icon) by id
|
|
||||||
rows, err = pool.Query(ctx, `
|
|
||||||
SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), '', NULL::timestamptz
|
|
||||||
FROM hosts
|
|
||||||
WHERE html_title IS NOT NULL AND icon_downloaded_at IS NULL AND id > $1
|
|
||||||
ORDER BY id
|
|
||||||
LIMIT $2
|
|
||||||
`, lastID, limit)
|
|
||||||
} else {
|
|
||||||
// Start: fetch timestamped hosts first
|
|
||||||
rows, err = pool.Query(ctx, `
|
|
||||||
SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, ''), icon_downloaded_at
|
|
||||||
FROM hosts
|
|
||||||
WHERE html_title IS NOT NULL AND icon_downloaded_at IS NOT NULL
|
|
||||||
ORDER BY icon_downloaded_at
|
|
||||||
LIMIT $1
|
|
||||||
`, limit)
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -60,7 +32,7 @@ func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastTime *time.Time
|
||||||
var hosts []HostRow
|
var hosts []HostRow
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var h HostRow
|
var h HostRow
|
||||||
if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash, &h.IconDownloadedAt); err != nil {
|
if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
hosts = append(hosts, h)
|
hosts = append(hosts, h)
|
||||||
|
|
|
||||||
|
|
@ -128,9 +128,7 @@ func main() {
|
||||||
// Stage 1: DB fetcher — continuously fetches pages into hostCh
|
// Stage 1: DB fetcher — continuously fetches pages into hostCh
|
||||||
go func() {
|
go func() {
|
||||||
defer close(hostCh)
|
defer close(hostCh)
|
||||||
var lastTime *time.Time
|
|
||||||
var lastID int64
|
var lastID int64
|
||||||
iconPhase := true // first: hosts with icons (by download time), then: hosts without
|
|
||||||
pageSize := 50000
|
pageSize := 50000
|
||||||
fetched := 0
|
fetched := 0
|
||||||
for {
|
for {
|
||||||
|
|
@ -145,26 +143,14 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fetchStart := time.Now()
|
fetchStart := time.Now()
|
||||||
hosts, err := fetchHostsPage(ctx, pool, lastTime, lastID, limit)
|
hosts, err := fetchHostsPage(ctx, pool, lastID, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to fetch hosts: %v", err)
|
log.Fatalf("Failed to fetch hosts: %v", err)
|
||||||
}
|
}
|
||||||
if len(hosts) == 0 {
|
if len(hosts) == 0 {
|
||||||
if iconPhase {
|
|
||||||
// Switch to no-icon hosts
|
|
||||||
iconPhase = false
|
|
||||||
lastTime = nil
|
|
||||||
lastID = 0
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
last := hosts[len(hosts)-1]
|
lastID = hosts[len(hosts)-1].ID
|
||||||
if last.IconDownloadedAt != nil {
|
|
||||||
lastTime = last.IconDownloadedAt
|
|
||||||
} else {
|
|
||||||
lastID = last.ID
|
|
||||||
}
|
|
||||||
fmt.Printf("[fetcher] %d hosts in %dms (hostCh: %d/%d)\n",
|
fmt.Printf("[fetcher] %d hosts in %dms (hostCh: %d/%d)\n",
|
||||||
len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh))
|
len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh))
|
||||||
for _, h := range hosts {
|
for _, h := range hosts {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue