upped buffer sizes and switched to 2xlarge to increase speed
This commit is contained in:
parent
1df9a234cf
commit
8dce702e8d
5 changed files with 10 additions and 8 deletions
|
|
@ -44,7 +44,7 @@ variable "ssh_cidr" {
|
||||||
}
|
}
|
||||||
|
|
||||||
variable "ec2_instance_type" {
|
variable "ec2_instance_type" {
|
||||||
default = "c5.xlarge"
|
default = "c5.2xlarge"
|
||||||
}
|
}
|
||||||
|
|
||||||
variable "ec2_ami" {
|
variable "ec2_ami" {
|
||||||
|
|
|
||||||
|
|
@ -68,6 +68,8 @@ S3_PATH="s3://commoncrawl/cc-index/table/cc-main/warc/crawl=${CRAWL}/subset=warc
|
||||||
LOCAL_INDEX="$HOME/cc-index"
|
LOCAL_INDEX="$HOME/cc-index"
|
||||||
|
|
||||||
QUERY="
|
QUERY="
|
||||||
|
SET temp_directory = '${HOME}/duckdb_temp';
|
||||||
|
|
||||||
INSTALL postgres;
|
INSTALL postgres;
|
||||||
LOAD postgres;
|
LOAD postgres;
|
||||||
ATTACH '${DB_URL}' AS pg (TYPE POSTGRES);
|
ATTACH '${DB_URL}' AS pg (TYPE POSTGRES);
|
||||||
|
|
|
||||||
|
|
@ -106,8 +106,8 @@ func main() {
|
||||||
// [DB fetcher] → hostCh → [N workers] → resultCh → [DB writer]
|
// [DB fetcher] → hostCh → [N workers] → resultCh → [DB writer]
|
||||||
// Workers do S3 fetch + parsing (I/O-bound). Writer batches DB writes.
|
// Workers do S3 fetch + parsing (I/O-bound). Writer batches DB writes.
|
||||||
|
|
||||||
hostCh := make(chan Host, cfg.BatchSize)
|
hostCh := make(chan Host, 20000)
|
||||||
resultCh := make(chan WorkResult, cfg.Concurrency)
|
resultCh := make(chan WorkResult, 1000)
|
||||||
|
|
||||||
// Stage 1: DB fetcher — continuously fetches pages into hostCh
|
// Stage 1: DB fetcher — continuously fetches pages into hostCh
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ func main() {
|
||||||
flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)")
|
flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)")
|
||||||
flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory to store downloaded icons")
|
flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory to store downloaded icons")
|
||||||
flag.IntVar(&cfg.BatchSize, "batch-size", 5000, "Rows to claim per batch")
|
flag.IntVar(&cfg.BatchSize, "batch-size", 5000, "Rows to claim per batch")
|
||||||
flag.IntVar(&cfg.Concurrency, "concurrency", 2500, "Number of concurrent goroutines")
|
flag.IntVar(&cfg.Concurrency, "concurrency", 5000, "Number of concurrent goroutines")
|
||||||
flag.IntVar(&cfg.Limit, "limit", 0, "Max icons to process (0 = all)")
|
flag.IntVar(&cfg.Limit, "limit", 0, "Max icons to process (0 = all)")
|
||||||
flag.DurationVar(&cfg.Timeout, "timeout", 10*time.Second, "HTTP request timeout")
|
flag.DurationVar(&cfg.Timeout, "timeout", 10*time.Second, "HTTP request timeout")
|
||||||
flag.Int64Var(&cfg.MaxSize, "max-size", 512*1024, "Max icon download size in bytes")
|
flag.Int64Var(&cfg.MaxSize, "max-size", 512*1024, "Max icon download size in bytes")
|
||||||
|
|
@ -116,7 +116,7 @@ func main() {
|
||||||
stats := &Stats{StartedAt: time.Now()}
|
stats := &Stats{StartedAt: time.Now()}
|
||||||
|
|
||||||
// Feed icons into a channel so workers never starve waiting for batch claims
|
// Feed icons into a channel so workers never starve waiting for batch claims
|
||||||
iconCh := make(chan IconRow, cfg.BatchSize)
|
iconCh := make(chan IconRow, 20000)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(iconCh)
|
defer close(iconCh)
|
||||||
claimed := 0
|
claimed := 0
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ func main() {
|
||||||
flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons")
|
flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons")
|
||||||
flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site")
|
flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site")
|
||||||
flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file")
|
flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file")
|
||||||
flag.IntVar(&cfg.Concurrency, "concurrency", 20, "Concurrent icon conversions")
|
flag.IntVar(&cfg.Concurrency, "concurrency", 40, "Concurrent icon conversions")
|
||||||
flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads")
|
flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads")
|
||||||
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3")
|
flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3")
|
||||||
flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode")
|
flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode")
|
||||||
|
|
@ -121,8 +121,8 @@ func main() {
|
||||||
data []byte
|
data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
hostCh := make(chan HostRow, cfg.EntriesPerBundle*10)
|
hostCh := make(chan HostRow, 6000)
|
||||||
entryCh := make(chan BundleEntry, cfg.EntriesPerBundle*10)
|
entryCh := make(chan BundleEntry, 6000)
|
||||||
uploadCh := make(chan bundleJob, cfg.Uploaders*2)
|
uploadCh := make(chan bundleJob, cfg.Uploaders*2)
|
||||||
|
|
||||||
// Stage 1: DB fetcher — continuously fetches pages into hostCh
|
// Stage 1: DB fetcher — continuously fetches pages into hostCh
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue