everytab/pipeline/01_cc_index/query.sh

156 lines
4.1 KiB
Bash
Executable file

#!/usr/bin/env bash
set -euo pipefail
# Query Common Crawl columnar index and populate the hosts table.
# Uses DuckDB with S3 (credential chain) to read parquet files directly,
# and the postgres extension to write results into RDS.
usage() {
cat <<'EOF'
Usage: ./query.sh --db-url DATABASE_URL [OPTIONS]
Required:
--db-url URL Postgres connection string
Optional:
--crawl ID Common Crawl crawl ID (default: latest)
--limit N Max rows to insert (default: 100000, use 0 for no limit)
--dry-run Print the query without executing
--help Show this help message
Example:
./query.sh --db-url "$DATABASE_URL"
./query.sh --db-url "$DATABASE_URL" --limit 0 # full 30M scan
./query.sh --db-url "$DATABASE_URL" --dry-run
EOF
exit 0
}
# Defaults
DB_URL=""
CRAWL=""
LIMIT=100000
DRY_RUN=false
# Parse args
if [ $# -eq 0 ]; then usage; fi
while [ $# -gt 0 ]; do
case "$1" in
--help) usage ;;
--db-url) DB_URL="$2"; shift 2 ;;
--crawl) CRAWL="$2"; shift 2 ;;
--limit) LIMIT="$2"; shift 2 ;;
--dry-run) DRY_RUN=true; shift ;;
*) echo "Unknown option: $1"; usage ;;
esac
done
if [ -z "$DB_URL" ]; then
echo "ERROR: --db-url is required"
exit 1
fi
# Auto-detect latest crawl if not specified
if [ -z "$CRAWL" ]; then
echo "Fetching latest crawl ID..."
CRAWL=$(curl -s https://index.commoncrawl.org/collinfo.json | jq -r '.[0].id' | sed 's/-index$//')
echo "Using latest crawl: $CRAWL"
fi
# Build the limit clause
LIMIT_CLAUSE=""
if [ "$LIMIT" -gt 0 ] 2>/dev/null; then
LIMIT_CLAUSE="LIMIT ${LIMIT}"
fi
S3_PATH="s3://commoncrawl/cc-index/table/cc-main/warc/crawl=${CRAWL}/subset=warc/*.parquet"
QUERY="
INSTALL aws;
LOAD aws;
INSTALL httpfs;
LOAD httpfs;
CREATE SECRET (TYPE S3, PROVIDER CREDENTIAL_CHAIN);
INSTALL postgres;
LOAD postgres;
ATTACH '${DB_URL}' AS pg (TYPE POSTGRES);
INSERT INTO pg.public.hosts (hostname, protocol, crawl_id, warc_filename, warc_record_offset, warc_record_length)
SELECT
url_host_name AS hostname,
first(url_protocol ORDER BY CASE WHEN url_protocol = 'https' THEN 0 ELSE 1 END) AS protocol,
'${CRAWL}' AS crawl_id,
first(warc_filename ORDER BY CASE WHEN url_protocol = 'https' THEN 0 ELSE 1 END) AS warc_filename,
first(warc_record_offset ORDER BY CASE WHEN url_protocol = 'https' THEN 0 ELSE 1 END) AS warc_record_offset,
first(warc_record_length ORDER BY CASE WHEN url_protocol = 'https' THEN 0 ELSE 1 END) AS warc_record_length
FROM read_parquet('${S3_PATH}')
WHERE url_path = '/'
AND content_mime_type = 'text/html'
AND fetch_status = 200
AND url_query IS NULL
AND url_protocol IN ('http', 'https')
AND url_port IS NULL
AND url_host_name IS NOT NULL
AND url_host_name != ''
GROUP BY url_host_name
${LIMIT_CLAUSE};
"
if $DRY_RUN; then
echo "=== DRY RUN ==="
echo "Crawl: $CRAWL"
echo "S3 path: $S3_PATH"
echo "Limit: ${LIMIT} (0 = no limit)"
echo ""
echo "Query:"
echo "$QUERY"
exit 0
fi
echo "=== CC-Index Query ==="
echo "Crawl: $CRAWL"
echo "S3 path: $S3_PATH"
echo "Limit: ${LIMIT} (0 = no limit)"
echo ""
echo "Starting query..."
echo ""
START_TIME=$(date +%s)
duckdb -c "$QUERY"
END_TIME=$(date +%s)
DURATION=$((END_TIME - START_TIME))
echo ""
echo "Query completed in ${DURATION}s"
echo ""
# --- Stats ---
echo "--- Validation ---"
STATS=$(psql "$DB_URL" -t -A -c "
SELECT json_build_object(
'total_hosts', (SELECT COUNT(*) FROM hosts),
'https_count', (SELECT COUNT(*) FROM hosts WHERE protocol = 'https'),
'http_count', (SELECT COUNT(*) FROM hosts WHERE protocol = 'http')
);
")
echo "$STATS" | jq .
STATS_FILE="stats/01_cc_index.json"
mkdir -p stats
psql "$DB_URL" -t -A -c "
SELECT json_build_object(
'started_at', '$(date -d @$START_TIME -Iseconds)',
'finished_at', '$(date -d @$END_TIME -Iseconds)',
'duration_seconds', ${DURATION},
'crawl_id', '${CRAWL}',
'limit_applied', ${LIMIT},
'total_hosts', (SELECT COUNT(*) FROM hosts),
'https_count', (SELECT COUNT(*) FROM hosts WHERE protocol = 'https'),
'http_count', (SELECT COUNT(*) FROM hosts WHERE protocol = 'http')
);
" > "$STATS_FILE"
echo "Stats written to $STATS_FILE"