interleve no icon hosts and icons hosts for an even mix
This commit is contained in:
parent
a799c05f81
commit
758ab3080b
2 changed files with 116 additions and 40 deletions
|
|
@ -18,19 +18,47 @@ type HostRow struct {
|
|||
IconDownloadedAt *time.Time
|
||||
}
|
||||
|
||||
// fetchHostsPage gets a page of hosts with titles, ordered by icon_downloaded_at for disk locality.
|
||||
// Icons written to disk at similar times are physically adjacent — reading in write order
|
||||
// maximizes OS readahead cache hits. Hosts without icons come last (no disk reads needed).
|
||||
// random_order is included for bundle bucket assignment (randomized bundles).
|
||||
func fetchHostsPage(ctx context.Context, pool *pgxpool.Pool, lastDownloaded *time.Time, lastID int64, limit int) ([]HostRow, error) {
|
||||
rows, err := pool.Query(ctx, `
|
||||
SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), COALESCE(best_icon_hash, ''), random_order, icon_downloaded_at
|
||||
FROM hosts
|
||||
WHERE html_title IS NOT NULL
|
||||
AND (icon_downloaded_at, id) > ($1, $2)
|
||||
ORDER BY icon_downloaded_at NULLS LAST, id
|
||||
LIMIT $3
|
||||
`, lastDownloaded, lastID, limit)
|
||||
// fetchHostsWithIcons gets hosts with icons, ordered by icon_downloaded_at for disk locality.
|
||||
func fetchHostsWithIcons(ctx context.Context, pool *pgxpool.Pool, lastDownloaded *time.Time, lastID int64, limit int) ([]HostRow, error) {
|
||||
var query string
|
||||
var args []any
|
||||
if lastDownloaded == nil && lastID == 0 {
|
||||
query = `SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), best_icon_hash, random_order, icon_downloaded_at
|
||||
FROM hosts WHERE html_title IS NOT NULL AND icon_downloaded_at IS NOT NULL
|
||||
ORDER BY icon_downloaded_at, id LIMIT $1`
|
||||
args = []any{limit}
|
||||
} else {
|
||||
query = `SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), best_icon_hash, random_order, icon_downloaded_at
|
||||
FROM hosts WHERE html_title IS NOT NULL AND icon_downloaded_at IS NOT NULL
|
||||
AND (icon_downloaded_at > $1 OR (icon_downloaded_at = $1 AND id > $2))
|
||||
ORDER BY icon_downloaded_at, id LIMIT $3`
|
||||
args = []any{lastDownloaded, lastID, limit}
|
||||
}
|
||||
rows, err := pool.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var hosts []HostRow
|
||||
for rows.Next() {
|
||||
var h HostRow
|
||||
if err := rows.Scan(&h.ID, &h.Hostname, &h.Protocol, &h.HtmlTitle, &h.IframeAllowed, &h.BestIconHash, &h.RandomOrder, &h.IconDownloadedAt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hosts = append(hosts, h)
|
||||
}
|
||||
return hosts, rows.Err()
|
||||
}
|
||||
|
||||
// fetchHostsWithoutIcons gets hosts without icons, ordered by id.
|
||||
func fetchHostsWithoutIcons(ctx context.Context, pool *pgxpool.Pool, lastID int64, limit int) ([]HostRow, error) {
|
||||
rows, err := pool.Query(ctx, `
|
||||
SELECT id, hostname, protocol, html_title, COALESCE(iframe_allowed, true), '', random_order, NULL::timestamptz
|
||||
FROM hosts
|
||||
WHERE html_title IS NOT NULL AND icon_downloaded_at IS NULL AND id > $1
|
||||
ORDER BY id LIMIT $2
|
||||
`, lastID, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -132,41 +132,89 @@ func main() {
|
|||
entryCh := make(chan assemblerEntry, 50000)
|
||||
uploadCh := make(chan bundleJob, cfg.Uploaders*2)
|
||||
|
||||
// Stage 1: DB fetcher — continuously fetches pages into hostCh
|
||||
// Stage 1: DB fetcher — interleaves hosts with icons (ordered by download time for
|
||||
// disk locality) and hosts without icons (no disk reads needed), matching the natural
|
||||
// ratio so bundles contain a representative mix.
|
||||
go func() {
|
||||
defer close(hostCh)
|
||||
|
||||
// Calculate the ratio: for every iconPageSize icon-hosts, fetch noIconPageSize no-icon hosts
|
||||
iconPageSize := 50000
|
||||
noIconRatio := float64(stats.HostsNoIcon) / float64(max(stats.HostsWithIcon, 1))
|
||||
noIconPageSize := int(float64(iconPageSize) * noIconRatio)
|
||||
if noIconPageSize < 1000 {
|
||||
noIconPageSize = 1000
|
||||
}
|
||||
|
||||
var lastDownloaded *time.Time
|
||||
var lastID int64
|
||||
pageSize := 50000
|
||||
var lastIconID int64
|
||||
var lastNoIconID int64
|
||||
iconsDone := false
|
||||
noIconsDone := false
|
||||
fetched := 0
|
||||
for {
|
||||
limit := pageSize
|
||||
if cfg.Limit > 0 {
|
||||
remaining := cfg.Limit - fetched
|
||||
if remaining <= 0 {
|
||||
break
|
||||
|
||||
for !iconsDone || !noIconsDone {
|
||||
// Fetch icon-hosts (ordered by download time for disk locality)
|
||||
if !iconsDone {
|
||||
limit := iconPageSize
|
||||
if cfg.Limit > 0 {
|
||||
remaining := cfg.Limit - fetched
|
||||
if remaining <= 0 {
|
||||
break
|
||||
}
|
||||
if limit > remaining {
|
||||
limit = remaining
|
||||
}
|
||||
}
|
||||
if limit > remaining {
|
||||
limit = remaining
|
||||
fetchStart := time.Now()
|
||||
hosts, err := fetchHostsWithIcons(ctx, pool, lastDownloaded, lastIconID, limit)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to fetch icon hosts: %v", err)
|
||||
}
|
||||
if len(hosts) == 0 {
|
||||
iconsDone = true
|
||||
} else {
|
||||
last := hosts[len(hosts)-1]
|
||||
lastDownloaded = last.IconDownloadedAt
|
||||
lastIconID = last.ID
|
||||
fmt.Printf("[fetcher] %d icon hosts in %dms (hostCh: %d/%d)\n",
|
||||
len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh))
|
||||
for _, h := range hosts {
|
||||
hostCh <- h
|
||||
}
|
||||
fetched += len(hosts)
|
||||
}
|
||||
}
|
||||
fetchStart := time.Now()
|
||||
hosts, err := fetchHostsPage(ctx, pool, lastDownloaded, lastID, limit)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to fetch hosts: %v", err)
|
||||
|
||||
// Fetch proportional no-icon hosts (interleaved for balanced bundles)
|
||||
if !noIconsDone {
|
||||
limit := noIconPageSize
|
||||
if cfg.Limit > 0 {
|
||||
remaining := cfg.Limit - fetched
|
||||
if remaining <= 0 {
|
||||
break
|
||||
}
|
||||
if limit > remaining {
|
||||
limit = remaining
|
||||
}
|
||||
}
|
||||
fetchStart := time.Now()
|
||||
hosts, err := fetchHostsWithoutIcons(ctx, pool, lastNoIconID, limit)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to fetch no-icon hosts: %v", err)
|
||||
}
|
||||
if len(hosts) == 0 {
|
||||
noIconsDone = true
|
||||
} else {
|
||||
lastNoIconID = hosts[len(hosts)-1].ID
|
||||
fmt.Printf("[fetcher] %d no-icon hosts in %dms (hostCh: %d/%d)\n",
|
||||
len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh))
|
||||
for _, h := range hosts {
|
||||
hostCh <- h
|
||||
}
|
||||
fetched += len(hosts)
|
||||
}
|
||||
}
|
||||
if len(hosts) == 0 {
|
||||
break
|
||||
}
|
||||
last := hosts[len(hosts)-1]
|
||||
lastDownloaded = last.IconDownloadedAt
|
||||
lastID = last.ID
|
||||
fmt.Printf("[fetcher] %d hosts in %dms (hostCh: %d/%d)\n",
|
||||
len(hosts), time.Since(fetchStart).Milliseconds(), len(hostCh), cap(hostCh))
|
||||
for _, h := range hosts {
|
||||
hostCh <- h
|
||||
}
|
||||
fetched += len(hosts)
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue