Seed Engine 설계
1. 개요 (Overview)
섹션 제목: “1. 개요 (Overview)”1.1 목적
섹션 제목: “1.1 목적”Cloudflare R2 forknews-datasets 버킷에 저장된 raw 파일들을 순차적으로 읽어, 각 파일 내의 도메인 정보를 추출하고, 도메인별로 병렬 Worker를 생성하여 robots.txt와 sitemap.xml을 수집하는 분산 Seed Engine을 구축합니다.
1.2 핵심 요구사항
섹션 제목: “1.2 핵심 요구사항”- ✅ R2에서 raw 파일들을 순서대로 스캔
- ✅ 각 raw 파일에서 도메인 목록 추출
- ✅ 도메인별로 Cloudflare Queue에 메시지 발송
- ✅ 각 도메인마다 독립적인 Worker가 robots.txt, sitemap.xml 수집
- ✅ 수집 결과를 지정된 폴더(
processing/)에 저장 - ✅ Stateless Checkpointing으로 중단 지점부터 재개 가능
- ✅ D1 데이터베이스 없이 R2만으로 상태 관리
1.3 기술 스택
섹션 제목: “1.3 기술 스택”- Infrastructure: Cloudflare Workers, Cloudflare R2, Cloudflare Queues
- Language: TypeScript (ES2022+)
- Framework: Hono
- Validation: Zod
- Local Testing: Wrangler (—local)
2. 아키텍처 설계 (Architecture)
섹션 제목: “2. 아키텍처 설계 (Architecture)”2.1 전체 아키텍처 다이어그램
섹션 제목: “2.1 전체 아키텍처 다이어그램”┌─────────────────────────────────────────────────────────────┐│ Orchestrator Worker ││ (HTTP Trigger or Scheduled Cron) ││ ││ 1. R2 List API로 raw_NNNN.json 파일 목록 수집 ││ 2. 순서대로 파일 경로를 SEED_QUEUE에 발송 ││ 3. 사전 검증 없음 (Just-in-Time Check) │└──────────────────────┬──────────────────────────────────────┘ │ │ Queue Messages │ (file_path: string) ▼┌─────────────────────────────────────────────────────────────┐│ Cloudflare Queues (SEED_QUEUE) ││ - max_batch_size: 1 ││ - max_retries: 3 ││ - dead_letter_queue: newsfork-seed-dlq-dev │└──────────────────────┬──────────────────────────────────────┘ │ │ Message: { file_path, partition_info } ▼┌─────────────────────────────────────────────────────────────┐│ Domain Worker (Consumer) ││ ││ 1. R2.head()로 .success 파일 확인 (Early Exit) ││ 2. raw_NNNN.json 파일을 ReadableStream으로 읽기 ││ 3. JSON 파싱하여 도메인 목록 추출 ││ 4. 각 도메인별로 DOMAIN_QUEUE에 메시지 발송 │└──────────────────────┬──────────────────────────────────────┘ │ │ Domain Messages │ { domain_id, domain_url, partition_info } ▼┌─────────────────────────────────────────────────────────────┐│ Cloudflare Queues (DOMAIN_QUEUE) ││ - max_batch_size: 1 ││ - max_retries: 3 ││ - dead_letter_queue: newsfork-domain-dlq-dev │└──────────────────────┬──────────────────────────────────────┘ │ │ Message: { domain_id, domain_url, ... } ▼┌─────────────────────────────────────────────────────────────┐│ Domain Collector Worker (Consumer) ││ ││ 1. R2.head()로 domain_metadata.json.success 확인 ││ 2. robots.txt 수집 (https://{domain}/robots.txt) ││ 3. sitemap.xml 수집 (https://{domain}/sitemap.xml) ││ 4. 결과를 processing/ 폴더에 저장 ││ 5. .success 파일 생성 (Checkpoint) │└─────────────────────────────────────────────────────────────┘2.2 컴포넌트 분리
섹션 제목: “2.2 컴포넌트 분리”2.2.1 Orchestrator (Producer)
섹션 제목: “2.2.1 Orchestrator (Producer)”- 역할: R2 파티션 스캔 및 Queue 발송
- 트리거: HTTP API 또는 Scheduled Cron
- 입력: R2 버킷 경로 (선택적 파티션 필터)
- 출력: SEED_QUEUE에 파일 경로 메시지 발송
2.2.2 File Processor Worker (Consumer)
섹션 제목: “2.2.2 File Processor Worker (Consumer)”- 역할: raw 파일 읽기 및 도메인 추출
- 트리거: SEED_QUEUE 메시지
- 입력:
{ file_path: string, partition_info: {...} } - 출력: DOMAIN_QUEUE에 도메인별 메시지 발송
2.2.3 Domain Collector Worker (Consumer)
섹션 제목: “2.2.3 Domain Collector Worker (Consumer)”- 역할: 개별 도메인의 robots.txt, sitemap.xml 수집
- 트리거: DOMAIN_QUEUE 메시지
- 입력:
{ domain_id: string, domain_url: string, ... } - 출력:
processing/폴더에 결과 파일 저장
3. 데이터 흐름 (Data Flow)
섹션 제목: “3. 데이터 흐름 (Data Flow)”3.1 전체 파이프라인
섹션 제목: “3.1 전체 파이프라인”R2: datasets/country={cc}/category={cat}/date=YYYY-MM-DD/raw_0001.json │ ├─> Orchestrator가 파일 경로를 SEED_QUEUE에 발송 │ ├─> File Processor가 raw_0001.json을 읽음 │ └─> JSON 파싱: { records: [{ normalized_domain, ... }] } │ ├─> 각 record의 normalized_domain을 추출 │ └─> DOMAIN_QUEUE에 발송: { domain_id, domain_url, ... } │ └─> Domain Collector가 각 도메인 처리 ├─> robots.txt 수집 → processing/.../robots.txt ├─> sitemap.xml 수집 → processing/.../sitemap.xml └─> domain_metadata.json 생성 └─> domain_metadata.json.success 생성 (Checkpoint)3.2 파일 구조
섹션 제목: “3.2 파일 구조”Source (R2)
섹션 제목: “Source (R2)”datasets/└── country={cc}/ └── category={cat}/ └── date=YYYY-MM-DD/ ├── raw_0001.json ├── raw_0002.json ├── ... └── raw_metadata.jsonResult (R2)
섹션 제목: “Result (R2)”processing/└── country={cc}/ └── category={cat}/ └── date=YYYY-MM-DD/ ├── domain_metadata.json # 도메인별 수집 결과 ├── domain_metadata.json.success # Checkpoint (빈 파일) ├── robots.txt # (선택적, 증거 보관용) └── sitemap.xml # (선택적, 증거 보관용)Checkpoint (R2)
섹션 제목: “Checkpoint (R2)”datasets/└── country={cc}/ └── category={cat}/ └── date=YYYY-MM-DD/ ├── raw_0001.json └── raw_0001.json.success # 파일 처리 완료 표시4. 데이터 구조 및 처리 로직 상세 (Data Structure & Processing Logic)
섹션 제목: “4. 데이터 구조 및 처리 로직 상세 (Data Structure & Processing Logic)”4.1 Raw 파일 구조 분석
섹션 제목: “4.1 Raw 파일 구조 분석”4.1.1 현재 Raw 파일 스키마 (EnhancedResearchDataset)
섹션 제목: “4.1.1 현재 Raw 파일 스키마 (EnhancedResearchDataset)”현재 R2에 저장된 raw 파일은 다음 Zod 스키마를 따릅니다:
export const EnhancedResearchDataset = z.object({ meta: EnhancedResearchDatasetMeta, records: z.array(EnhancedResearchRecord)});
export const EnhancedResearchDatasetMeta = z.object({ dataset_id: z.string().min(1), // 예: "sg-news-2026-01-28-0001" country: DomainCountryCode, // 예: "SG" category: z.string().min(1), // 예: "news" discovered_at: z.string().datetime(), // ISO 8601 research_methods: z.array(ResearchMethod), // ["google_search", "crtsh"] queries: z.array(z.string()).min(1), // ["URL discovery batch"] engine: z.object({ name: z.string().min(1), // "research-service" version: z.string().min(1) // "2.0.0" }), record_count: z.number().min(0) // records.length와 일치해야 함});
export const EnhancedResearchRecord = z.object({ raw_url: z.string().url(), // 원본 URL normalized_domain: z.string().min(1), // 정규화된 도메인 (예: "mom.gov.sg") domain_id: DomainId, // "gov:sg:mom.gov.sg" 형식 source_type: Authority, // "gov" | "edu" | "org" | "com" | "other" discovery_method: ResearchMethod, // "manual" | "google_search" | "crtsh" confidence: z.number().min(0).max(1), // 0.0 ~ 1.0 content_hints: z.array(z.string()).default([]), // ["news", "policy"] liveness_status: LivenessStatus.optional() // "alive" | "unstable" | "blocked" | "dead"});4.1.2 Raw 파일 예시
섹션 제목: “4.1.2 Raw 파일 예시”{ "meta": { "dataset_id": "sg-news-2026-01-28-0001", "country": "SG", "category": "news", "discovered_at": "2026-01-28T10:30:00Z", "research_methods": ["google_search", "crtsh"], "queries": ["Singapore government news"], "engine": { "name": "research-service", "version": "2.0.0" }, "record_count": 2 }, "records": [ { "raw_url": "https://www.mom.gov.sg/newsroom", "normalized_domain": "mom.gov.sg", "domain_id": "gov:sg:mom.gov.sg", "source_type": "gov", "discovery_method": "google_search", "confidence": 0.95, "content_hints": ["news", "policy"], "liveness_status": "alive" }, { "raw_url": "https://www.moh.gov.sg/news", "normalized_domain": "moh.gov.sg", "domain_id": "gov:sg:moh.gov.sg", "source_type": "gov", "discovery_method": "crtsh", "confidence": 0.90, "content_hints": ["news"], "liveness_status": "alive" } ]}4.1.3 파일 경로 구조
섹션 제목: “4.1.3 파일 경로 구조”현재 구조 (기존 코드):
research/datasets/country={cc}/category={cat}/{date}_{chunk}.json예: research/datasets/country=sg/category=news/2026-01-28_0001.json새로운 Seed Engine 구조:
datasets/country={cc}/category={cat}/date=YYYY-MM-DD/raw_NNNN.json예: datasets/country=sg/category=news/date=2026-01-28/raw_0001.json주요 차이점:
- 경로 접두사:
research/datasets/→datasets/(Raw 데이터 레이어) - 날짜 위치: 파일명 → 디렉토리 (
date=YYYY-MM-DD/) - 파일명 형식:
{date}_{chunk}.json→raw_{chunk}.json(날짜 제거)
4.2 데이터 추출 로직 상세
섹션 제목: “4.2 데이터 추출 로직 상세”4.2.1 File Processor에서 도메인 추출 프로세스
섹션 제목: “4.2.1 File Processor에서 도메인 추출 프로세스”/** * Raw 파일에서 도메인 정보 추출 * 기존 normalizeDomain 함수 활용 */async function extractDomainsFromRawFile( data: EnhancedResearchDataset, country: string): Promise<NormalizedDomain[]> { const domains: NormalizedDomain[] = []; const seenDomainIds = new Set<string>();
for (const record of data.records) { try { // 1. 기존 record에 이미 domain_id가 있으면 활용 if (record.domain_id && validateDomainId(record.domain_id)) { // 중복 체크 if (seenDomainIds.has(record.domain_id)) { continue; // 이미 처리된 도메인 스킵 } seenDomainIds.add(record.domain_id);
// 2. NormalizedDomain 객체 재구성 const normalized: NormalizedDomain = { input_url: record.raw_url, hostname: extractHostname(record.raw_url), registrable_domain: record.normalized_domain, authority: record.source_type, country: record.domain_id.split(':')[1].toUpperCase() as CountryCode, domain_id: record.domain_id, domain_hash: generateDomainHash(record.domain_id) };
domains.push(normalized); } else { // 3. domain_id가 없거나 유효하지 않으면 재정규화 const normalized = normalizeDomain(record.raw_url, country);
if (seenDomainIds.has(normalized.domain_id)) { continue; // 중복 스킵 } seenDomainIds.add(normalized.domain_id);
domains.push(normalized); } } catch (error) { logger.warn("domain_extraction_failed", { url: record.raw_url, error: error instanceof Error ? error.message : 'Unknown error' }); // 에러가 발생해도 다음 record 계속 처리 } }
return domains;}
/** * URL에서 hostname 추출 (www 제거) */function extractHostname(url: string): string { try { const u = new URL(url); return u.hostname.replace(/^www\./, '').toLowerCase(); } catch { return ''; }}4.2.2 도메인 중복 제거 전략
섹션 제목: “4.2.2 도메인 중복 제거 전략”- domain_id 기반 중복 제거:
Set<string>을 사용하여 O(1) 조회 - 같은 raw 파일 내 중복: File Processor에서 처리
- 다른 raw 파일 간 중복: Domain Collector에서 처리 (선택적, KV 활용 가능)
4.2.3 도메인 정규화 로직 재사용
섹션 제목: “4.2.3 도메인 정규화 로직 재사용”기존 src/lib/domain.ts의 함수들을 그대로 활용:
import { normalizeDomain, // URL → NormalizedDomain batchNormalizeDomains, // URL[] → NormalizedDomain[] deduplicateDomains, // 중복 제거 extractRegistrableDomain, // hostname → registrable domain detectAuthority, // domain → Authority generateDomainHash // domain_id → SHA-256 hash} from '../lib/domain';4.3 결과 파일 저장 구조 상세
섹션 제목: “4.3 결과 파일 저장 구조 상세”4.3.1 Domain Metadata 스키마
섹션 제목: “4.3.1 Domain Metadata 스키마”export const DomainMetadataSchema = z.object({ domain_id: DomainId, // "gov:sg:mom.gov.sg" registrable_domain: z.string(), // "mom.gov.sg" country: DomainCountryCode, // "SG" category: z.string(), // "news" collected_at: z.string().datetime(), // 수집 시각
// robots.txt 정보 robots: z.object({ status_code: z.number(), // HTTP 상태 코드 content_length: z.number().int().min(0), // 바이트 수 exists: z.boolean(), // 파일 존재 여부 fetched_at: z.string().datetime().optional(), sitemap_urls: z.array(z.string().url()).optional() // robots.txt에서 추출한 sitemap URL }),
// sitemap.xml 정보 sitemap: z.object({ status_code: z.number(), content_length: z.number().int().min(0), exists: z.boolean(), fetched_at: z.string().datetime().optional(), url_count: z.number().int().min(0).optional() // sitemap 내 URL 개수 (파싱 성공 시) }),
// 원본 raw 파일 정보 (추적용) source: z.object({ raw_file_path: z.string(), // 원본 raw 파일 경로 record_index: z.number().int().optional() // raw 파일 내 record 인덱스 }).optional()});4.3.2 저장 경로 구조
섹션 제목: “4.3.2 저장 경로 구조”processing/└── country={cc}/ └── category={cat}/ └── date=YYYY-MM-DD/ ├── {registrable_domain}/ │ ├── domain_metadata.json │ ├── domain_metadata.json.success │ ├── robots.txt # (선택적, 증거 보관) │ └── sitemap.xml # (선택적, 증거 보관) └── {another_domain}/ └── ...경로 빌더 함수:
function buildDomainResultPath( partition: { country: string; category: string; date: string }, registrableDomain: string): string { // 도메인명을 파일 시스템 안전한 형식으로 변환 const safeDomain = registrableDomain.replace(/[^a-z0-9.-]/g, '_');
return `processing/country=${partition.country}/category=${partition.category}/date=${partition.date}/${safeDomain}`;}4.3.3 파일 저장 전략
섹션 제목: “4.3.3 파일 저장 전략”필수 파일:
domain_metadata.json: 수집 결과 메타데이터 (Zod 검증 필수)domain_metadata.json.success: Checkpoint 파일 (빈 파일)
선택적 파일 (증거 보관용):
robots.txt: 원본 robots.txt 내용 (텍스트)sitemap.xml: 원본 sitemap.xml 내용 (XML)
저장 순서:
robots.txt수집 및 저장 (실패해도 계속)sitemap.xml수집 및 저장 (실패해도 계속)domain_metadata.json생성 및 저장 (Zod 검증)domain_metadata.json.success생성 (Checkpoint)
4.4 기존 코드와의 통합 및 수정 방향
섹션 제목: “4.4 기존 코드와의 통합 및 수정 방향”4.4.1 기존 코드 분석
섹션 제목: “4.4.1 기존 코드 분석”현재 사용 중인 모듈:
src/services/dataset.service.ts: R2 저장 로직src/lib/r2.ts: R2 경로 빌더 (buildR2DatasetPath)src/lib/domain.ts: 도메인 정규화 로직src/schemas/research.ts:EnhancedResearchDataset스키마
기존 경로 빌더:
// src/lib/r2.ts (현재)export function buildR2DatasetPath( country: CountryCode, category: ContentCategory, date: string, chunkIndex: number): string { const chunkPadded = String(chunkIndex).padStart(4, "0"); return `research/datasets/country=${country.toLowerCase()}/category=${category}/${date}_${chunkPadded}.json`;}4.4.2 수정 방향
섹션 제목: “4.4.2 수정 방향”1. 새로운 경로 빌더 추가 (기존 코드 유지):
// src/lib/r2.ts에 추가/** * Build R2 path for raw dataset (Seed Engine용) * Format: datasets/country={cc}/category={cat}/date=YYYY-MM-DD/raw_NNNN.json */export function buildRawDatasetPath( country: CountryCode, category: ContentCategory, date: string, chunkIndex: number): string { const chunkPadded = String(chunkIndex).padStart(4, "0"); return `datasets/country=${country.toLowerCase()}/category=${category}/date=${date}/raw_${chunkPadded}.json`;}
/** * Build R2 path for domain metadata (Seed Engine 결과) * Format: processing/country={cc}/category={cat}/date=YYYY-MM-DD/{domain}/domain_metadata.json */export function buildDomainMetadataPath( country: CountryCode, category: ContentCategory, date: string, registrableDomain: string): string { const safeDomain = registrableDomain.replace(/[^a-z0-9.-]/g, '_'); return `processing/country=${country.toLowerCase()}/category=${category}/date=${date}/${safeDomain}/domain_metadata.json`;}2. 기존 DatasetService 확장 (또는 새 서비스 생성):
// 기존 DatasetService는 research/datasets/ 경로 유지// 새로운 SeedDatasetService는 datasets/ 경로 사용
export class SeedDatasetService { // buildRawDatasetPath 사용 async getRawDataset(...): Promise<EnhancedResearchDataset | null> { const path = buildRawDatasetPath(country, category, date, chunkIndex); // ... }}3. 도메인 정규화 로직 재사용:
src/lib/domain.ts의 모든 함수는 그대로 활용- 추가 수정 불필요
4.4.3 마이그레이션 전략
섹션 제목: “4.4.3 마이그레이션 전략”옵션 1: 병행 운영 (권장)
- 기존
research/datasets/경로는 유지 - 새로운 Seed Engine은
datasets/경로 사용 - 두 시스템이 독립적으로 동작
옵션 2: 점진적 마이그레이션
- 기존 파일을 새 경로로 복사하는 스크립트 작성
- 검증 후 기존 경로 제거
5. 상세 설계 (Detailed Design)
섹션 제목: “5. 상세 설계 (Detailed Design)”5.1 Orchestrator 설계
섹션 제목: “5.1 Orchestrator 설계”4.1.1 API 엔드포인트
섹션 제목: “4.1.1 API 엔드포인트”POST /api/v1/seeds/orchestrateBody: { country?: string; // 선택적 필터 category?: string; // 선택적 필터 date?: string; // 선택적 필터 (YYYY-MM-DD) force?: boolean; // .success 파일 무시하고 재처리}4.1.2 구현 로직
섹션 제목: “4.1.2 구현 로직”// 1. R2 List API로 raw_NNNN.json 파일 목록 수집const files = await listRawFiles(bucket, partition);
// 2. 순서대로 정렬 (raw_0001.json, raw_0002.json, ...)const sortedFiles = sortFilesBySequence(files);
// 3. 각 파일 경로를 SEED_QUEUE에 발송for (const file of sortedFiles) { await SEED_QUEUE.send({ file_path: file.key, partition_info: { country: extractCountry(file.key), category: extractCategory(file.key), date: extractDate(file.key) } });}4.1.3 파일 목록 수집 전략
섹션 제목: “4.1.3 파일 목록 수집 전략”- Bulk List API 사용:
bucket.list({ prefix, limit: 1000, cursor }) - Pagination 처리:
cursor를 사용하여 모든 파일 수집 - 필터링:
raw_NNNN.json패턴만 선택 (.success파일 제외)
4.2 File Processor Worker 설계
섹션 제목: “4.2 File Processor Worker 설계”4.2.1 Queue Handler
섹션 제목: “4.2.1 Queue Handler”export async function processSeedQueue( message: Message<SeedQueueMessage>, env: Bindings): Promise<void> { const { file_path, partition_info } = message.body;
// 1. Early Exit: .success 파일 확인 const successPath = `${file_path}.success`; const exists = await env.DATASETS_BUCKET.head(successPath); if (exists) { logger.info("file_already_processed", { file_path }); return; // Early exit }
// 2. raw 파일을 ReadableStream으로 읽기 const file = await env.DATASETS_BUCKET.get(file_path); if (!file) { throw new Error(`File not found: ${file_path}`); }
// 3. Streaming JSON 파싱 (10-20MB 제한 준수) const content = await streamToString(file.body); const data = JSON.parse(content) as RawDataset;
// 4. 도메인 추출 및 정규화 const domains = extractDomains(data, partition_info.country);
// 5. 각 도메인을 DOMAIN_QUEUE에 발송 for (const domain of domains) { await env.DOMAIN_QUEUE.send({ domain_id: domain.domain_id, domain_url: domain.input_url, registrable_domain: domain.registrable_domain, partition_info }); }
// 6. 처리 완료 표시 (.success 파일 생성) await env.DATASETS_BUCKET.put(successPath, "");}4.2.2 도메인 추출 로직 (상세)
섹션 제목: “4.2.2 도메인 추출 로직 (상세)”기존 코드 활용:
src/lib/domain.ts의normalizeDomain()함수 재사용src/lib/domain.ts의deduplicateDomains()함수 재사용src/schemas/research.ts의EnhancedResearchDataset스키마 활용
import { EnhancedResearchDataset } from '../schemas/research';import { NormalizedDomain } from '../schemas/domain';import { normalizeDomain, deduplicateDomains, validateDomainId, generateDomainHash } from '../lib/domain';import { CountryCode, Authority } from '../schemas';
/** * Raw 파일에서 도메인 정보 추출 * 기존 record의 domain_id를 최대한 활용하여 재정규화 최소화 */async function extractDomainsFromRawFile( data: EnhancedResearchDataset, country: string): Promise<NormalizedDomain[]> { const domains: NormalizedDomain[] = []; const seenDomainIds = new Set<string>(); // 중복 제거용 (O(1) 조회)
for (let i = 0; i < data.records.length; i++) { const record = data.records[i];
try { let normalized: NormalizedDomain;
// Case 1: record에 이미 유효한 domain_id가 있는 경우 (최적화) if (record.domain_id && validateDomainId(record.domain_id)) { // 중복 체크 if (seenDomainIds.has(record.domain_id)) { continue; // 이미 처리된 도메인 스킵 } seenDomainIds.add(record.domain_id);
// NormalizedDomain 객체 재구성 (재정규화 없이) const urlParts = record.domain_id.split(':'); const authority = urlParts[0] as Authority; const countryCode = urlParts[1].toUpperCase() as CountryCode;
normalized = { input_url: record.raw_url, hostname: extractHostname(record.raw_url), registrable_domain: record.normalized_domain, authority: authority, country: countryCode, domain_id: record.domain_id, domain_hash: generateDomainHash(record.domain_id) };
// Zod 검증 const validated = NormalizedDomain.safeParse(normalized); if (!validated.success) { logger.warn("normalized_domain_validation_failed", { domain_id: record.domain_id, issues: validated.error.issues }); // 검증 실패 시 재정규화로 폴백 normalized = normalizeDomain(record.raw_url, country); } else { normalized = validated.data; } } else { // Case 2: domain_id가 없거나 유효하지 않은 경우 재정규화 normalized = normalizeDomain(record.raw_url, country);
// 중복 체크 if (seenDomainIds.has(normalized.domain_id)) { continue; } seenDomainIds.add(normalized.domain_id); }
domains.push(normalized);
} catch (error) { logger.warn("domain_extraction_failed", { record_index: i, url: record.raw_url, error: error instanceof Error ? error.message : 'Unknown error' }); // 에러가 발생해도 다음 record 계속 처리 } }
// 최종 중복 제거 (안전장치) return deduplicateDomains(domains);}
/** * URL에서 hostname 추출 (www 제거) */function extractHostname(url: string): string { try { const u = new URL(url); return u.hostname.replace(/^www\./, '').toLowerCase(); } catch { return ''; }}성능 최적화 포인트:
- 기존 domain_id 활용: 재정규화 최소화 (CPU 절약)
- Set 기반 중복 제거: O(1) 조회로 성능 최적화
- 에러 허용: 개별 record 실패가 전체 처리 중단하지 않음
4.3 Domain Collector Worker 설계 (상세)
섹션 제목: “4.3 Domain Collector Worker 설계 (상세)”4.3.1 Queue Handler (상세 구현)
섹션 제목: “4.3.1 Queue Handler (상세 구현)”import { DomainMetadataSchema } from '../schemas/seed-engine';import { buildDomainMetadataPath } from '../lib/r2';
export async function processDomainQueue( message: Message<DomainQueueMessage>, env: Bindings): Promise<void> { const { domain_id, domain_url, registrable_domain, partition_info, authority, source_file_path } = message.body;
logger.info("domain_collector_start", { domain_id, registrable_domain });
// 1. Early Exit: domain_metadata.json.success 확인 const resultPath = buildDomainResultPath(partition_info, registrable_domain); const metadataPath = `${resultPath}/domain_metadata.json`; const successPath = `${metadataPath}.success`;
const exists = await env.DATASETS_BUCKET.head(successPath); if (exists) { logger.info("domain_already_processed", { domain_id }); return; // Early exit - 이미 처리 완료 }
// 2. robots.txt 수집 const robotsResult = await fetchRobotsTxt(registrable_domain);
// 3. robots.txt에서 sitemap URL 추출 const sitemapUrls = extractSitemapUrlsFromRobots(robotsResult.content);
// 4. sitemap.xml 수집 (robots.txt에서 추출한 URL 우선 시도) const sitemapResult = await fetchSitemapXml(registrable_domain, sitemapUrls);
// 5. sitemap.xml 파싱 (URL 개수 추출) const sitemapUrlCount = sitemapResult.content ? parseSitemapUrlCount(sitemapResult.content) : undefined;
// 6. 결과 메타데이터 생성 (Zod 스키마 준수) const metadata = { domain_id, registrable_domain, country: partition_info.country.toUpperCase(), category: partition_info.category, collected_at: new Date().toISOString(), robots: { status_code: robotsResult.status, content_length: robotsResult.content?.length || 0, exists: robotsResult.status === 200, fetched_at: robotsResult.content ? new Date().toISOString() : undefined, sitemap_urls: sitemapUrls }, sitemap: { status_code: sitemapResult.status, content_length: sitemapResult.content?.length || 0, exists: sitemapResult.status === 200, fetched_at: sitemapResult.content ? new Date().toISOString() : undefined, url_count: sitemapUrlCount, url: sitemapResult.url }, source: source_file_path ? { raw_file_path: source_file_path } : undefined };
// 7. Zod 스키마로 검증 const validated = DomainMetadataSchema.safeParse(metadata); if (!validated.success) { throw new ValidationError("Invalid domain metadata", { issues: validated.error.issues, domain_id }); }
// 8. 결과 파일 저장 (순서 중요) // 8-1. 증거 파일 저장 (선택적, 실패해도 계속) if (robotsResult.content) { try { await env.DATASETS_BUCKET.put( `${resultPath}/robots.txt`, robotsResult.content, { httpMetadata: { contentType: 'text/plain' } } ); } catch (error) { logger.warn("robots_file_save_failed", { domain_id, error: error instanceof Error ? error.message : 'Unknown error' }); // 파일 저장 실패해도 메타데이터는 저장 } }
if (sitemapResult.content) { try { await env.DATASETS_BUCKET.put( `${resultPath}/sitemap.xml`, sitemapResult.content, { httpMetadata: { contentType: 'application/xml' } } ); } catch (error) { logger.warn("sitemap_file_save_failed", { domain_id, error: error instanceof Error ? error.message : 'Unknown error' }); } }
// 8-2. 메타데이터 파일 저장 (필수) await env.DATASETS_BUCKET.put( metadataPath, JSON.stringify(validated.data, null, 2), { httpMetadata: { contentType: 'application/json' }, customMetadata: { domain_id, registrable_domain, country: partition_info.country, category: partition_info.category, date: partition_info.date } } );
// 9. Checkpoint 생성 (마지막 단계) await env.DATASETS_BUCKET.put(successPath, "", { httpMetadata: { contentType: 'text/plain' } });
logger.info("domain_collector_complete", { domain_id, robots_exists: robotsResult.status === 200, sitemap_exists: sitemapResult.status === 200 });}
/** * 결과 경로 빌더 */function buildDomainResultPath( partition: { country: string; category: string; date: string }, registrableDomain: string): string { // 도메인명을 파일 시스템 안전한 형식으로 변환 const safeDomain = registrableDomain.replace(/[^a-z0-9.-]/g, '_');
return `processing/country=${partition.country}/category=${partition.category}/date=${partition.date}/${safeDomain}`;}4.3.2 HTTP Fetch 로직 (상세)
섹션 제목: “4.3.2 HTTP Fetch 로직 (상세)”interface FetchResult { status: number; content: string | null; url: string; error?: string; responseTimeMs?: number;}
/** * robots.txt 수집 */async function fetchRobotsTxt(domain: string): Promise<FetchResult> { const url = `https://${domain}/robots.txt`; const startTime = Date.now();
try { const response = await fetch(url, { headers: { 'User-Agent': 'Newsfork-SeedEngine/1.0', 'Accept': 'text/plain, */*' }, signal: AbortSignal.timeout(10000), // 10초 타임아웃 redirect: 'follow' // 리다이렉트 따라가기 });
const responseTimeMs = Date.now() - startTime; let content: string | null = null;
if (response.ok) { try { content = await response.text();
// 크기 제한 검사 (1MB) if (content.length > 1024 * 1024) { logger.warn("robots_file_too_large", { domain, size: content.length, url }); content = content.substring(0, 1024 * 1024); // 처음 1MB만 저장 } } catch (error) { logger.warn("robots_content_read_failed", { domain, error: error instanceof Error ? error.message : 'Unknown error' }); } }
return { status: response.status, content, url, responseTimeMs }; } catch (error) { const responseTimeMs = Date.now() - startTime;
// 에러 타입별 처리 if (error instanceof TypeError && error.message.includes('fetch')) { // 네트워크 에러 return { status: 0, content: null, url, error: 'Network error', responseTimeMs }; } else if (error instanceof DOMException && error.name === 'AbortError') { // 타임아웃 return { status: 0, content: null, url, error: 'Timeout', responseTimeMs }; }
return { status: 0, content: null, url, error: error instanceof Error ? error.message : 'Unknown error', responseTimeMs }; }}
/** * robots.txt에서 sitemap URL 추출 */function extractSitemapUrlsFromRobots(robotsContent: string | null): string[] { if (!robotsContent) { return []; }
const urls: string[] = []; const lines = robotsContent.split('\n');
for (const line of lines) { const trimmed = line.trim(); // 대소문자 구분 없이 "Sitemap:" 또는 "sitemap:" 찾기 const match = trimmed.match(/^sitemap:\s*(.+)$/i); if (match && match[1]) { const url = match[1].trim(); try { // URL 유효성 검사 new URL(url); urls.push(url); } catch { // 유효하지 않은 URL은 스킵 logger.warn("invalid_sitemap_url_in_robots", { url }); } } }
return [...new Set(urls)]; // 중복 제거}
/** * sitemap.xml 수집 * robots.txt에서 추출한 URL을 우선 시도 */async function fetchSitemapXml( domain: string, sitemapUrlsFromRobots: string[] = []): Promise<FetchResult> { // 우선순위: robots.txt에서 추출한 URL > 기본 경로 const urls = [ ...sitemapUrlsFromRobots, `https://${domain}/sitemap.xml`, `https://${domain}/sitemap_index.xml`, `https://${domain}/sitemaps.xml` ];
for (const url of urls) { const startTime = Date.now();
try { const response = await fetch(url, { headers: { 'User-Agent': 'Newsfork-SeedEngine/1.0', 'Accept': 'application/xml, text/xml, */*' }, signal: AbortSignal.timeout(15000), // sitemap은 더 긴 타임아웃 redirect: 'follow' });
const responseTimeMs = Date.now() - startTime;
if (response.ok) { try { let content = await response.text();
// 크기 제한 검사 (10MB) if (content.length > 10 * 1024 * 1024) { logger.warn("sitemap_file_too_large", { domain, size: content.length, url }); content = content.substring(0, 10 * 1024 * 1024); }
return { status: response.status, content, url, responseTimeMs }; } catch (error) { logger.warn("sitemap_content_read_failed", { domain, url, error: error instanceof Error ? error.message : 'Unknown error' }); // 다음 URL 시도 continue; } } } catch (error) { // 네트워크 에러나 타임아웃은 다음 URL 시도 if (error instanceof DOMException && error.name === 'AbortError') { logger.warn("sitemap_fetch_timeout", { domain, url }); } continue; } }
// 모든 URL 실패 return { status: 404, content: null, url: urls[0] || `https://${domain}/sitemap.xml`, error: 'Not found' };}
/** * sitemap.xml에서 URL 개수 추출 * sitemap_index.xml인 경우 하위 sitemap 개수만 반환 */function parseSitemapUrlCount(sitemapContent: string): number | undefined { try { // sitemap_index.xml인 경우 const sitemapIndexMatch = sitemapContent.match(/<sitemap>/g); if (sitemapIndexMatch) { return sitemapIndexMatch.length; // 하위 sitemap 개수 }
// 일반 sitemap.xml인 경우 const urlMatch = sitemapContent.match(/<url>/g); if (urlMatch) { return urlMatch.length; // URL 개수 }
return undefined; // 파싱 실패 } catch (error) { logger.warn("sitemap_parse_failed", { error: error instanceof Error ? error.message : 'Unknown error' }); return undefined; }}에러 처리 전략:
- 네트워크 에러: 다음 URL 시도 또는 실패 반환
- 타임아웃: 로그 남기고 다음 URL 시도
- 파일 크기 초과: 처음 N 바이트만 저장하고 경고
- 파싱 실패: 메타데이터는 저장하되 url_count는 undefined
4.4 Stateless Checkpointing 구현
섹션 제목: “4.4 Stateless Checkpointing 구현”4.4.1 Checkpoint 파일 규칙
섹션 제목: “4.4.1 Checkpoint 파일 규칙”- 위치: 원본 파일과 동일한 경로에
.success확장자 추가 - 내용: 빈 파일 (0 bytes)
- 용도: 작업 완료 여부 표시
4.4.2 Checkpoint 확인 시점
섹션 제목: “4.4.2 Checkpoint 확인 시점”- File Processor:
raw_NNNN.json.success확인 - Domain Collector:
domain_metadata.json.success확인
4.4.3 Idempotency 보장
섹션 제목: “4.4.3 Idempotency 보장”- 동일 메시지가 중복 전달되어도
.success파일이 있으면 즉시 종료 - 부분 실패 시 재시도해도 완료된 작업은 스킵
5. Queue 설정 (Queue Configuration)
섹션 제목: “5. Queue 설정 (Queue Configuration)”5.1 wrangler.jsonc 설정
섹션 제목: “5.1 wrangler.jsonc 설정”{ "queues": { "producers": [ { "binding": "SEED_QUEUE", "queue": "newsfork-seed-dev" }, { "binding": "DOMAIN_QUEUE", "queue": "newsfork-domain-dev" } ], "consumers": [ { "queue": "newsfork-seed-dev", "max_batch_size": 1, "max_batch_timeout": 30, "max_retries": 3, "dead_letter_queue": "newsfork-seed-dlq-dev" }, { "queue": "newsfork-domain-dev", "max_batch_size": 1, "max_batch_timeout": 30, "max_retries": 3, "dead_letter_queue": "newsfork-domain-dlq-dev" } ] }}5.2 Queue 메시지 스키마
섹션 제목: “5.2 Queue 메시지 스키마”SEED_QUEUE Message
섹션 제목: “SEED_QUEUE Message”interface SeedQueueMessage { file_path: string; // R2 파일 경로 partition_info: { country: string; category: string; date: string; // YYYY-MM-DD };}DOMAIN_QUEUE Message
섹션 제목: “DOMAIN_QUEUE Message”interface DomainQueueMessage { domain_id: string; // authority:country:domain domain_url: string; // 원본 URL registrable_domain: string; // 정규화된 도메인 partition_info: { country: string; category: string; date: string; };}6. 로컬 테스트 방법 (Local Testing Guide)
섹션 제목: “6. 로컬 테스트 방법 (Local Testing Guide)”6.1 사전 준비
섹션 제목: “6.1 사전 준비”6.1.1 Wrangler 로컬 환경 설정
섹션 제목: “6.1.1 Wrangler 로컬 환경 설정”# Wrangler 로컬 개발 모드 시작pnpm dev:local
# 또는 특정 환경 변수와 함께pnpm dev:local --env dev6.1.2 로컬 R2 버킷 설정
섹션 제목: “6.1.2 로컬 R2 버킷 설정”Wrangler는 로컬 개발 시 .wrangler/state 디렉토리에 로컬 R2 버킷을 생성합니다.
# 로컬 R2 버킷 확인wrangler r2 bucket list --local
# 로컬 R2에 테스트 파일 업로드wrangler r2 object put \ datasets/country=sg/category=news/date=2026-01-28/raw_0001.json \ --file=./test/data/raw_0001.json \ --local6.1.3 로컬 Queue 설정
섹션 제목: “6.1.3 로컬 Queue 설정”로컬 개발 시 Queue는 메모리 기반으로 동작합니다. wrangler dev --local 실행 시 자동으로 설정됩니다.
6.2 테스트 시나리오
섹션 제목: “6.2 테스트 시나리오”시나리오 1: Orchestrator 테스트
섹션 제목: “시나리오 1: Orchestrator 테스트”1. 테스트 데이터 준비
# 로컬에 테스트 raw 파일 생성mkdir -p test/data/datasets/country=sg/category=news/date=2026-01-28
# 샘플 raw_0001.json 생성cat > test/data/raw_0001.json << 'EOF'{ "meta": { "dataset_id": "sg-news-2026-01-28-0001", "country": "SG", "category": "news", "record_count": 2 }, "records": [ { "raw_url": "https://www.mom.gov.sg/newsroom", "normalized_domain": "mom.gov.sg", "domain_id": "gov:sg:mom.gov.sg" }, { "raw_url": "https://www.moh.gov.sg/news", "normalized_domain": "moh.gov.sg", "domain_id": "gov:sg:moh.gov.sg" } ]}EOF
# 로컬 R2에 업로드wrangler r2 object put \ datasets/country=sg/category=news/date=2026-01-28/raw_0001.json \ --file=test/data/raw_0001.json \ --local2. Orchestrator API 호출
# 로컬 Worker 실행 중인 상태에서curl -X POST http://localhost:8787/api/v1/seeds/orchestrate \ -H "Content-Type: application/json" \ -d '{ "country": "sg", "category": "news", "date": "2026-01-28" }'3. Queue 메시지 확인
// src/routes/seeds.ts에서 로그 확인// 또는 wrangler tail --local로 실시간 로그 확인시나리오 2: File Processor Worker 테스트
섹션 제목: “시나리오 2: File Processor Worker 테스트”1. Queue에 직접 메시지 발송 (테스트용)
import { env } from './test-setup';
test('file processor processes raw file', async () => { await env.SEED_QUEUE.send({ file_path: 'datasets/country=sg/category=news/date=2026-01-28/raw_0001.json', partition_info: { country: 'sg', category: 'news', date: '2026-01-28' } });
// Worker가 자동으로 처리 // 결과 확인: DOMAIN_QUEUE에 메시지가 발송되었는지 확인});2. 로컬 Queue 모니터링
# Wrangler 로그 확인wrangler tail --local
# 또는 Worker 코드에 console.log 추가하여 확인시나리오 3: Domain Collector Worker 테스트
섹션 제목: “시나리오 3: Domain Collector Worker 테스트”1. Mock HTTP Fetch (로컬 테스트용)
import { env, createMockFetch } from './test-setup';
test('domain collector fetches robots and sitemap', async () => { // Mock fetch 설정 global.fetch = createMockFetch({ 'https://mom.gov.sg/robots.txt': { status: 200, body: 'User-agent: *\nAllow: /' }, 'https://mom.gov.sg/sitemap.xml': { status: 200, body: '<?xml version="1.0"?><urlset>...</urlset>' } });
await env.DOMAIN_QUEUE.send({ domain_id: 'gov:sg:mom.gov.sg', domain_url: 'https://www.mom.gov.sg/newsroom', registrable_domain: 'mom.gov.sg', partition_info: { country: 'sg', category: 'news', date: '2026-01-28' } });
// 결과 확인: R2에 파일이 저장되었는지 확인 const result = await env.DATASETS_BUCKET.get( 'processing/country=sg/category=news/date=2026-01-28/mom.gov.sg/domain_metadata.json' );
expect(result).toBeTruthy(); const metadata = JSON.parse(await result.text()); expect(metadata.robots.exists).toBe(true); expect(metadata.sitemap.exists).toBe(true);});2. 실제 HTTP 요청 테스트 (옵션)
// 실제 도메인에 요청 (로컬에서도 가능)// 단, 타임아웃과 에러 핸들링 테스트 필요6.3 통합 테스트 플로우
섹션 제목: “6.3 통합 테스트 플로우”전체 파이프라인 테스트
섹션 제목: “전체 파이프라인 테스트”# 1. 로컬 R2에 테스트 데이터 준비./scripts/prepare-test-data.sh
# 2. Wrangler 로컬 개발 모드 시작pnpm dev:local
# 3. Orchestrator API 호출curl -X POST http://localhost:8787/api/v1/seeds/orchestrate \ -H "Content-Type: application/json" \ -d '{"country": "sg", "category": "news", "date": "2026-01-28"}'
# 4. 로그 모니터링wrangler tail --local
# 5. 결과 확인wrangler r2 object list \ processing/country=sg/category=news/date=2026-01-28/ \ --local6.4 디버깅 팁
섹션 제목: “6.4 디버깅 팁”6.4.1 로컬 R2 파일 확인
섹션 제목: “6.4.1 로컬 R2 파일 확인”# 로컬 R2 버킷 목록wrangler r2 bucket list --local
# 특정 경로의 파일 목록wrangler r2 object list \ datasets/country=sg/category=news/date=2026-01-28/ \ --local
# 파일 내용 확인wrangler r2 object get \ datasets/country=sg/category=news/date=2026-01-28/raw_0001.json \ --local \ --file=./output.json6.4.2 Queue 메시지 확인
섹션 제목: “6.4.2 Queue 메시지 확인”로컬 개발 시 Queue는 메모리 기반이므로 Worker 로그를 통해 확인합니다.
logger.info("queue_message_received", { queue: batch.queue, messageId: message.id, body: message.body});6.4.3 Checkpoint 파일 확인
섹션 제목: “6.4.3 Checkpoint 파일 확인”# .success 파일 존재 여부 확인wrangler r2 object head \ datasets/country=sg/category=news/date=2026-01-28/raw_0001.json.success \ --local6.5 테스트 데이터 생성 스크립트
섹션 제목: “6.5 테스트 데이터 생성 스크립트”#!/bin/bashBUCKET="newsfork-datasets-dev"COUNTRY="sg"CATEGORY="news"DATE="2026-01-28"
# 테스트 raw 파일 생성cat > /tmp/raw_0001.json << 'EOF'{ "meta": { "dataset_id": "sg-news-2026-01-28-0001", "country": "SG", "category": "news", "record_count": 2 }, "records": [ { "raw_url": "https://www.mom.gov.sg/newsroom", "normalized_domain": "mom.gov.sg", "domain_id": "gov:sg:mom.gov.sg" }, { "raw_url": "https://www.moh.gov.sg/news", "normalized_domain": "moh.gov.sg", "domain_id": "gov:sg:moh.gov.sg" } ]}EOF
# 로컬 R2에 업로드wrangler r2 object put \ datasets/country=${COUNTRY}/category=${CATEGORY}/date=${DATE}/raw_0001.json \ --file=/tmp/raw_0001.json \ --local
echo "✅ Test data prepared"7. 에러 처리 및 재시도 (Error Handling)
섹션 제목: “7. 에러 처리 및 재시도 (Error Handling)”7.1 에러 분류
섹션 제목: “7.1 에러 분류”7.1.1 재시도 가능한 에러
섹션 제목: “7.1.1 재시도 가능한 에러”- 네트워크 타임아웃: HTTP fetch 실패
- 일시적 R2 오류: R2 API 5xx 에러
- Queue 전송 실패: 일시적 Queue 오류
7.1.2 재시도 불가능한 에러
섹션 제목: “7.1.2 재시도 불가능한 에러”- 파일 없음: raw_NNNN.json이 존재하지 않음
- 잘못된 JSON: 파일 파싱 실패
- 도메인 형식 오류: URL 파싱 실패
7.2 Dead Letter Queue (DLQ)
섹션 제목: “7.2 Dead Letter Queue (DLQ)”7.2.1 DLQ 처리
섹션 제목: “7.2.1 DLQ 처리”- 최대 3회 재시도 후 DLQ로 이동
- DLQ 메시지는 수동 검토 필요
- 자동 재처리 없음 (규칙 준수)
7.2.2 DLQ 모니터링
섹션 제목: “7.2.2 DLQ 모니터링”// DLQ 메시지 확인 (수동)const dlqMessages = await env.SEED_DLQ.receive();for (const message of dlqMessages) { logger.error("dlq_message", { messageId: message.id, body: message.body, error: message.error });}8. 성능 최적화 (Performance Optimization)
섹션 제목: “8. 성능 최적화 (Performance Optimization)”8.1 병렬 처리 전략
섹션 제목: “8.1 병렬 처리 전략”8.1.1 Fan-out 아키텍처
섹션 제목: “8.1.1 Fan-out 아키텍처”- File Processor: 하나의 raw 파일 → N개의 도메인 메시지
- Domain Collector: 각 도메인마다 독립적인 Worker 실행
- 동시성: Cloudflare가 자동으로 수백 개의 Worker 병렬 실행
8.1.2 배치 크기 제한
섹션 제목: “8.1.2 배치 크기 제한”max_batch_size: 1: Worker당 하나의 메시지만 처리- CPU/메모리 고립 보장
- 실패 시 영향 범위 최소화
8.2 스트리밍 처리
섹션 제목: “8.2 스트리밍 처리”8.2.1 ReadableStream 활용
섹션 제목: “8.2.1 ReadableStream 활용”// 10-20MB 파일을 스트리밍으로 읽기const file = await bucket.get(file_path);const content = await streamToString(file.body); // ReadableStream → string8.2.2 메모리 제한 준수
섹션 제목: “8.2.2 메모리 제한 준수”- 큰 JSON 파일을 한 번에 파싱하지 않음
- 필요 시 스트리밍 JSON 파서 사용 (선택적)
9. 모니터링 및 로깅 (Monitoring & Logging)
섹션 제목: “9. 모니터링 및 로깅 (Monitoring & Logging)”9.1 로그 포인트
섹션 제목: “9.1 로그 포인트”9.1.1 Orchestrator 로그
섹션 제목: “9.1.1 Orchestrator 로그”logger.info("orchestrator_start", { partition_info });logger.info("files_found", { count: files.length });logger.info("queue_messages_sent", { count: files.length });9.1.2 File Processor 로그
섹션 제목: “9.1.2 File Processor 로그”logger.info("file_processing_start", { file_path });logger.info("domains_extracted", { count: domains.length });logger.info("file_processing_complete", { file_path });9.1.3 Domain Collector 로그
섹션 제목: “9.1.3 Domain Collector 로그”logger.info("domain_processing_start", { domain_id });logger.info("robots_fetched", { status: robotsContent.status });logger.info("sitemap_fetched", { status: sitemapContent.status });logger.info("domain_processing_complete", { domain_id });9.2 메트릭 수집
섹션 제목: “9.2 메트릭 수집”9.2.1 처리 통계
섹션 제목: “9.2.1 처리 통계”- 처리된 파일 수
- 처리된 도메인 수
- 성공/실패 비율
- 평균 처리 시간
9.2.2 Cloudflare Analytics 활용
섹션 제목: “9.2.2 Cloudflare Analytics 활용”- Worker 실행 횟수
- Queue 메시지 처리량
- R2 API 호출 횟수
10. 배포 전략 (Deployment Strategy)
섹션 제목: “10. 배포 전략 (Deployment Strategy)”10.1 환경별 설정
섹션 제목: “10.1 환경별 설정”10.1.1 Development
섹션 제목: “10.1.1 Development”- Queue:
newsfork-seed-dev,newsfork-domain-dev - R2 Bucket:
newsfork-datasets-dev - 테스트 데이터로 검증
10.1.2 Staging
섹션 제목: “10.1.2 Staging”- Queue:
newsfork-seed-staging,newsfork-domain-staging - R2 Bucket:
newsfork-datasets-staging - 프로덕션 데이터 샘플로 검증
10.1.3 Production
섹션 제목: “10.1.3 Production”- Queue:
newsfork-seed-prod,newsfork-domain-prod - R2 Bucket:
newsfork-datasets-prod - 전체 데이터 처리
10.2 점진적 롤아웃
섹션 제목: “10.2 점진적 롤아웃”10.2.1 단계별 배포
섹션 제목: “10.2.1 단계별 배포”- Phase 1: Orchestrator만 배포 (Queue 발송만)
- Phase 2: File Processor 배포 (파일 읽기 테스트)
- Phase 3: Domain Collector 배포 (전체 파이프라인)
10.2.2 모니터링
섹션 제목: “10.2.2 모니터링”- 각 Phase마다 24시간 모니터링
- 에러율, 처리 속도 확인
- 문제 발생 시 즉시 롤백
11. 보안 고려사항 (Security Considerations)
섹션 제목: “11. 보안 고려사항 (Security Considerations)”11.1 입력 검증
섹션 제목: “11.1 입력 검증”- 모든 Queue 메시지는 Zod 스키마로 검증
- 파일 경로는 Hive-style 파티션 규칙 준수 확인
- 도메인 URL은 정규화 및 검증
11.2 Rate Limiting
섹션 제목: “11.2 Rate Limiting”- HTTP fetch 시 타임아웃 설정 (10초)
- 동일 도메인에 대한 연속 요청 제한 (선택적)
11.3 에러 메시지
섹션 제목: “11.3 에러 메시지”- 내부 경로나 민감한 정보 노출 방지
- 사용자에게는 일반적인 에러 메시지만 제공
12. 향후 개선 사항 (Future Enhancements)
섹션 제목: “12. 향후 개선 사항 (Future Enhancements)”12.1 기능 확장
섹션 제목: “12.1 기능 확장”- Sitemap Index 지원: 여러 sitemap 파일 처리
- RSS Feed 수집: robots.txt에서 RSS URL 추출
- Homepage 수집: 도메인 홈페이지 HTML 저장
12.2 성능 개선
섹션 제목: “12.2 성능 개선”- 배치 처리: 여러 도메인을 하나의 Worker에서 처리 (선택적)
- 캐싱: KV를 활용한 robots.txt 캐싱
- 병렬 Fetch: robots.txt와 sitemap.xml 동시 수집
12.3 모니터링 강화
섹션 제목: “12.3 모니터링 강화”- 대시보드: 처리 진행 상황 시각화
- 알림: 실패율 임계값 초과 시 알림
- 리포트: 일일/주간 처리 통계 리포트
13. 체크리스트 (Checklist)
섹션 제목: “13. 체크리스트 (Checklist)”13.1 개발 전
섹션 제목: “13.1 개발 전”- Queue 생성 (
newsfork-seed-dev,newsfork-domain-dev) - DLQ 생성 (
newsfork-seed-dlq-dev,newsfork-domain-dlq-dev) - R2 버킷 확인 (
forknews-datasets) - 테스트 데이터 준비
13.2 개발 중
섹션 제목: “13.2 개발 중”- Orchestrator API 구현
- File Processor Worker 구현
- Domain Collector Worker 구현
- Stateless Checkpointing 구현
- 에러 처리 및 로깅
- Zod 스키마 정의
13.3 테스트
섹션 제목: “13.3 테스트”- 로컬 환경에서 전체 파이프라인 테스트
- Checkpoint 동작 확인
- 재시도 로직 확인
- DLQ 동작 확인
- 에러 케이스 테스트
13.4 배포
섹션 제목: “13.4 배포”- Development 환경 배포
- 모니터링 설정
- Staging 환경 배포
- Production 환경 배포
14. 참고 자료 (References)
섹션 제목: “14. 참고 자료 (References)”14.1 프로젝트 문서
섹션 제목: “14.1 프로젝트 문서”- .cursorrules: 프로젝트 아키텍처 규칙
- ENVIRONMENT_GUIDE.md: 환경 설정 가이드
- README.md: 프로젝트 개요
14.2 Cloudflare 문서
섹션 제목: “14.2 Cloudflare 문서”14.3 관련 코드
섹션 제목: “14.3 관련 코드”src/lib/domain.ts: 도메인 정규화 로직src/lib/queue/queue-handlers.ts: 기존 Queue 핸들러src/infra/cloudflare/r2/: R2 Storage Adapter
15. 새로운 스키마 정의 (New Schema Definitions)
섹션 제목: “15. 새로운 스키마 정의 (New Schema Definitions)”15.1 Domain Metadata Schema
섹션 제목: “15.1 Domain Metadata Schema”// src/schemas/seed-engine.ts (새로 생성)
import { z } from 'zod';import { DomainId, DomainCountryCode } from './domain';
export const RobotsMetadataSchema = z.object({ status_code: z.number().int().min(0).max(599), content_length: z.number().int().min(0), exists: z.boolean(), fetched_at: z.string().datetime().optional(), sitemap_urls: z.array(z.string().url()).optional()});
export const SitemapMetadataSchema = z.object({ status_code: z.number().int().min(0).max(599), content_length: z.number().int().min(0), exists: z.boolean(), fetched_at: z.string().datetime().optional(), url_count: z.number().int().min(0).optional(), url: z.string().url()});
export const DomainMetadataSchema = z.object({ domain_id: DomainId, registrable_domain: z.string().min(1), country: DomainCountryCode, category: z.string().min(1), collected_at: z.string().datetime(), robots: RobotsMetadataSchema, sitemap: SitemapMetadataSchema, source: z.object({ raw_file_path: z.string(), record_index: z.number().int().optional() }).optional()});
export type DomainMetadata = z.infer<typeof DomainMetadataSchema>;15.2 Queue Message Schemas
섹션 제목: “15.2 Queue Message Schemas”// src/schemas/queue.ts에 추가
export const SeedQueueMessageSchema = z.object({ file_path: z.string().min(1), // R2 파일 경로 partition_info: z.object({ country: z.string().length(2), category: z.string().min(1), date: z.string().regex(/^\d{4}-\d{2}-\d{2}$/) })});
export const DomainQueueMessageSchema = z.object({ domain_id: DomainId, domain_url: z.string().url(), registrable_domain: z.string().min(1), authority: z.enum(['gov', 'edu', 'org', 'com', 'other']), partition_info: z.object({ country: z.string().length(2), category: z.string().min(1), date: z.string().regex(/^\d{4}-\d{2}-\d{2}$/) }), source_file_path: z.string().optional() // 추적용});16. 기존 코드 수정 체크리스트 (Code Modification Checklist)
섹션 제목: “16. 기존 코드 수정 체크리스트 (Code Modification Checklist)”16.1 새로 생성할 파일
섹션 제목: “16.1 새로 생성할 파일”-
src/schemas/seed-engine.ts: Domain Metadata 스키마 -
src/lib/r2-seed-engine.ts: Seed Engine용 R2 경로 빌더 -
src/services/seed-dataset.service.ts: Seed Engine용 Dataset Service -
src/apps/api/seed-orchestrator.ts: Orchestrator API 핸들러 -
src/lib/queue/seed-queue-handlers.ts: Seed Queue 핸들러 -
src/lib/queue/domain-queue-handlers.ts: Domain Queue 핸들러
16.2 수정할 기존 파일
섹션 제목: “16.2 수정할 기존 파일”-
src/schemas/queue.ts: Queue 메시지 스키마 추가 -
wrangler.jsonc: 새로운 Queue 설정 추가 -
src/index.ts: 새로운 Queue 핸들러 등록 -
src/routes/seeds.ts: Orchestrator API 엔드포인트 추가
16.3 재사용할 기존 코드 (수정 불필요)
섹션 제목: “16.3 재사용할 기존 코드 (수정 불필요)”- ✅
src/lib/domain.ts: 모든 도메인 정규화 함수 - ✅
src/schemas/research.ts:EnhancedResearchDataset스키마 - ✅
src/lib/logger.ts: 로깅 유틸리티 - ✅
src/lib/errors.ts: 에러 클래스
16.4 경로 빌더 함수 비교
섹션 제목: “16.4 경로 빌더 함수 비교”기존 (research/datasets/):
buildR2DatasetPath(country, category, date, chunkIndex)// → research/datasets/country=sg/category=news/2026-01-28_0001.json새로운 (datasets/):
// src/lib/r2-seed-engine.ts (새로 생성)buildRawDatasetPath(country, category, date, chunkIndex)// → datasets/country=sg/category=news/date=2026-01-28/raw_0001.json
buildDomainMetadataPath(country, category, date, domain)// → processing/country=sg/category=news/date=2026-01-28/mom.gov.sg/domain_metadata.json17. 데이터 처리 플로우 상세 (Detailed Data Flow)
섹션 제목: “17. 데이터 처리 플로우 상세 (Detailed Data Flow)”17.1 전체 파이프라인 시퀀스 다이어그램
섹션 제목: “17.1 전체 파이프라인 시퀀스 다이어그램”[Orchestrator] │ ├─> R2.list() → raw_0001.json, raw_0002.json, ... │ ├─> SEED_QUEUE.send({ file_path: "datasets/.../raw_0001.json" }) │ └─> SEED_QUEUE.send({ file_path: "datasets/.../raw_0002.json" }) │ ▼[File Processor Worker #1] │ ├─> R2.head(".../raw_0001.json.success") → false │ ├─> R2.get(".../raw_0001.json") → ReadableStream │ ├─> JSON.parse() → EnhancedResearchDataset │ └─> { meta: {...}, records: [{ domain_id: "gov:sg:mom.gov.sg", ... }] } │ ├─> extractDomainsFromRawFile() │ └─> [NormalizedDomain, NormalizedDomain, ...] │ ├─> DOMAIN_QUEUE.send({ domain_id: "gov:sg:mom.gov.sg", ... }) ├─> DOMAIN_QUEUE.send({ domain_id: "gov:sg:moh.gov.sg", ... }) │ └─> R2.put(".../raw_0001.json.success", "") │ ▼[Domain Collector Worker #1] │ ├─> R2.head(".../mom.gov.sg/domain_metadata.json.success") → false │ ├─> fetch("https://mom.gov.sg/robots.txt") → { status: 200, content: "..." } │ └─> extractSitemapUrlsFromRobots() → ["https://mom.gov.sg/sitemap.xml"] │ ├─> fetch("https://mom.gov.sg/sitemap.xml") → { status: 200, content: "<?xml>..." } │ └─> parseSitemapUrlCount() → 150 │ ├─> DomainMetadata 생성 (Zod 검증) │ ├─> R2.put(".../mom.gov.sg/robots.txt", content) ├─> R2.put(".../mom.gov.sg/sitemap.xml", content) ├─> R2.put(".../mom.gov.sg/domain_metadata.json", metadata) │ └─> R2.put(".../mom.gov.sg/domain_metadata.json.success", "")17.2 데이터 변환 단계
섹션 제목: “17.2 데이터 변환 단계”Step 1: Raw File → EnhancedResearchDataset
{ "meta": { "dataset_id": "...", "country": "SG", ... }, "records": [ { "raw_url": "https://www.mom.gov.sg/newsroom", "domain_id": "gov:sg:mom.gov.sg", "normalized_domain": "mom.gov.sg", ... } ]}Step 2: EnhancedResearchDataset → NormalizedDomain[]
[ { input_url: "https://www.mom.gov.sg/newsroom", hostname: "mom.gov.sg", registrable_domain: "mom.gov.sg", authority: "gov", country: "SG", domain_id: "gov:sg:mom.gov.sg", domain_hash: "abc123..." }]Step 3: NormalizedDomain → DomainQueueMessage
{ domain_id: "gov:sg:mom.gov.sg", domain_url: "https://www.mom.gov.sg/newsroom", registrable_domain: "mom.gov.sg", authority: "gov", partition_info: { country: "sg", category: "news", date: "2026-01-28" }, source_file_path: "datasets/.../raw_0001.json"}Step 4: HTTP Fetch → DomainMetadata
{ "domain_id": "gov:sg:mom.gov.sg", "registrable_domain": "mom.gov.sg", "country": "SG", "category": "news", "collected_at": "2026-01-28T10:30:00Z", "robots": { "status_code": 200, "content_length": 1234, "exists": true, "sitemap_urls": ["https://mom.gov.sg/sitemap.xml"] }, "sitemap": { "status_code": 200, "content_length": 5678, "exists": true, "url_count": 150, "url": "https://mom.gov.sg/sitemap.xml" }}문서 버전: 1.1.0
작성일: 2026-01-28
최종 수정일: 2026-01-28
주요 업데이트: 데이터 처리 로직 상세화, 기존 코드 분석 반영, 스키마 정의 추가