diff --git a/infra/main.tf b/infra/main.tf index dfc9427..ac543d4 100644 --- a/infra/main.tf +++ b/infra/main.tf @@ -44,7 +44,7 @@ variable "ssh_cidr" { } variable "ec2_instance_type" { - default = "c5.xlarge" + default = "c5.2xlarge" } variable "ec2_ami" { diff --git a/pipeline/01_cc_index/query.sh b/pipeline/01_cc_index/query.sh index 931d20f..05634bf 100755 --- a/pipeline/01_cc_index/query.sh +++ b/pipeline/01_cc_index/query.sh @@ -68,6 +68,8 @@ S3_PATH="s3://commoncrawl/cc-index/table/cc-main/warc/crawl=${CRAWL}/subset=warc LOCAL_INDEX="$HOME/cc-index" QUERY=" +SET temp_directory = '${HOME}/duckdb_temp'; + INSTALL postgres; LOAD postgres; ATTACH '${DB_URL}' AS pg (TYPE POSTGRES); diff --git a/pipeline/02_warc_parse/main.go b/pipeline/02_warc_parse/main.go index 63d2ed7..5e3fff6 100644 --- a/pipeline/02_warc_parse/main.go +++ b/pipeline/02_warc_parse/main.go @@ -106,8 +106,8 @@ func main() { // [DB fetcher] → hostCh → [N workers] → resultCh → [DB writer] // Workers do S3 fetch + parsing (I/O-bound). Writer batches DB writes. - hostCh := make(chan Host, cfg.BatchSize) - resultCh := make(chan WorkResult, cfg.Concurrency) + hostCh := make(chan Host, 20000) + resultCh := make(chan WorkResult, 1000) // Stage 1: DB fetcher — continuously fetches pages into hostCh go func() { diff --git a/pipeline/03_icon_download/main.go b/pipeline/03_icon_download/main.go index 72eab4e..063418a 100644 --- a/pipeline/03_icon_download/main.go +++ b/pipeline/03_icon_download/main.go @@ -48,7 +48,7 @@ func main() { flag.StringVar(&cfg.DBUrl, "db", "", "Postgres connection string (required)") flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory to store downloaded icons") flag.IntVar(&cfg.BatchSize, "batch-size", 5000, "Rows to claim per batch") - flag.IntVar(&cfg.Concurrency, "concurrency", 2500, "Number of concurrent goroutines") + flag.IntVar(&cfg.Concurrency, "concurrency", 5000, "Number of concurrent goroutines") flag.IntVar(&cfg.Limit, "limit", 0, "Max icons to process (0 = all)") flag.DurationVar(&cfg.Timeout, "timeout", 10*time.Second, "HTTP request timeout") flag.Int64Var(&cfg.MaxSize, "max-size", 512*1024, "Max icon download size in bytes") @@ -116,7 +116,7 @@ func main() { stats := &Stats{StartedAt: time.Now()} // Feed icons into a channel so workers never starve waiting for batch claims - iconCh := make(chan IconRow, cfg.BatchSize) + iconCh := make(chan IconRow, 20000) go func() { defer close(iconCh) claimed := 0 diff --git a/pipeline/05_bundle_gen/main.go b/pipeline/05_bundle_gen/main.go index 613f00f..21820ac 100644 --- a/pipeline/05_bundle_gen/main.go +++ b/pipeline/05_bundle_gen/main.go @@ -45,7 +45,7 @@ func main() { flag.StringVar(&cfg.IconsDir, "icons-dir", "icons", "Directory with downloaded icons") flag.StringVar(&cfg.SiteBucket, "site-bucket", "everytab-site", "S3 bucket for the static site") flag.IntVar(&cfg.EntriesPerBundle, "entries-per-bundle", 120, "Tabs per bundle JSON file") - flag.IntVar(&cfg.Concurrency, "concurrency", 20, "Concurrent icon conversions") + flag.IntVar(&cfg.Concurrency, "concurrency", 40, "Concurrent icon conversions") flag.IntVar(&cfg.Uploaders, "uploaders", 10, "Concurrent S3 bundle uploads") flag.BoolVar(&cfg.DryRun, "dry-run", false, "Write bundles to local disk instead of S3") flag.StringVar(&cfg.OutputDir, "output-dir", "bundles", "Local output dir for dry-run mode") @@ -121,8 +121,8 @@ func main() { data []byte } - hostCh := make(chan HostRow, cfg.EntriesPerBundle*10) - entryCh := make(chan BundleEntry, cfg.EntriesPerBundle*10) + hostCh := make(chan HostRow, 6000) + entryCh := make(chan BundleEntry, 6000) uploadCh := make(chan bundleJob, cfg.Uploaders*2) // Stage 1: DB fetcher — continuously fetches pages into hostCh