콘텐츠로 이동

Seed Engine 설계

Cloudflare R2 forknews-datasets 버킷에 저장된 raw 파일들을 순차적으로 읽어, 각 파일 내의 도메인 정보를 추출하고, 도메인별로 병렬 Worker를 생성하여 robots.txtsitemap.xml을 수집하는 분산 Seed Engine을 구축합니다.

  • ✅ R2에서 raw 파일들을 순서대로 스캔
  • ✅ 각 raw 파일에서 도메인 목록 추출
  • ✅ 도메인별로 Cloudflare Queue에 메시지 발송
  • ✅ 각 도메인마다 독립적인 Worker가 robots.txt, sitemap.xml 수집
  • ✅ 수집 결과를 지정된 폴더(processing/)에 저장
  • ✅ Stateless Checkpointing으로 중단 지점부터 재개 가능
  • ✅ D1 데이터베이스 없이 R2만으로 상태 관리
  • Infrastructure: Cloudflare Workers, Cloudflare R2, Cloudflare Queues
  • Language: TypeScript (ES2022+)
  • Framework: Hono
  • Validation: Zod
  • Local Testing: Wrangler (—local)

┌─────────────────────────────────────────────────────────────┐
│ Orchestrator Worker │
│ (HTTP Trigger or Scheduled Cron) │
│ │
│ 1. R2 List API로 raw_NNNN.json 파일 목록 수집 │
│ 2. 순서대로 파일 경로를 SEED_QUEUE에 발송 │
│ 3. 사전 검증 없음 (Just-in-Time Check) │
└──────────────────────┬──────────────────────────────────────┘
│ Queue Messages
│ (file_path: string)
┌─────────────────────────────────────────────────────────────┐
│ Cloudflare Queues (SEED_QUEUE) │
│ - max_batch_size: 1 │
│ - max_retries: 3 │
│ - dead_letter_queue: newsfork-seed-dlq-dev │
└──────────────────────┬──────────────────────────────────────┘
│ Message: { file_path, partition_info }
┌─────────────────────────────────────────────────────────────┐
│ Domain Worker (Consumer) │
│ │
│ 1. R2.head()로 .success 파일 확인 (Early Exit) │
│ 2. raw_NNNN.json 파일을 ReadableStream으로 읽기 │
│ 3. JSON 파싱하여 도메인 목록 추출 │
│ 4. 각 도메인별로 DOMAIN_QUEUE에 메시지 발송 │
└──────────────────────┬──────────────────────────────────────┘
│ Domain Messages
│ { domain_id, domain_url, partition_info }
┌─────────────────────────────────────────────────────────────┐
│ Cloudflare Queues (DOMAIN_QUEUE) │
│ - max_batch_size: 1 │
│ - max_retries: 3 │
│ - dead_letter_queue: newsfork-domain-dlq-dev │
└──────────────────────┬──────────────────────────────────────┘
│ Message: { domain_id, domain_url, ... }
┌─────────────────────────────────────────────────────────────┐
│ Domain Collector Worker (Consumer) │
│ │
│ 1. R2.head()로 domain_metadata.json.success 확인 │
│ 2. robots.txt 수집 (https://{domain}/robots.txt) │
│ 3. sitemap.xml 수집 (https://{domain}/sitemap.xml) │
│ 4. 결과를 processing/ 폴더에 저장 │
│ 5. .success 파일 생성 (Checkpoint) │
└─────────────────────────────────────────────────────────────┘
  • 역할: R2 파티션 스캔 및 Queue 발송
  • 트리거: HTTP API 또는 Scheduled Cron
  • 입력: R2 버킷 경로 (선택적 파티션 필터)
  • 출력: SEED_QUEUE에 파일 경로 메시지 발송
  • 역할: raw 파일 읽기 및 도메인 추출
  • 트리거: SEED_QUEUE 메시지
  • 입력: { file_path: string, partition_info: {...} }
  • 출력: DOMAIN_QUEUE에 도메인별 메시지 발송
  • 역할: 개별 도메인의 robots.txt, sitemap.xml 수집
  • 트리거: DOMAIN_QUEUE 메시지
  • 입력: { domain_id: string, domain_url: string, ... }
  • 출력: processing/ 폴더에 결과 파일 저장

R2: datasets/country={cc}/category={cat}/date=YYYY-MM-DD/raw_0001.json
├─> Orchestrator가 파일 경로를 SEED_QUEUE에 발송
├─> File Processor가 raw_0001.json을 읽음
│ └─> JSON 파싱: { records: [{ normalized_domain, ... }] }
├─> 각 record의 normalized_domain을 추출
│ └─> DOMAIN_QUEUE에 발송: { domain_id, domain_url, ... }
└─> Domain Collector가 각 도메인 처리
├─> robots.txt 수집 → processing/.../robots.txt
├─> sitemap.xml 수집 → processing/.../sitemap.xml
└─> domain_metadata.json 생성
└─> domain_metadata.json.success 생성 (Checkpoint)
datasets/
└── country={cc}/
└── category={cat}/
└── date=YYYY-MM-DD/
├── raw_0001.json
├── raw_0002.json
├── ...
└── raw_metadata.json
processing/
└── country={cc}/
└── category={cat}/
└── date=YYYY-MM-DD/
├── domain_metadata.json # 도메인별 수집 결과
├── domain_metadata.json.success # Checkpoint (빈 파일)
├── robots.txt # (선택적, 증거 보관용)
└── sitemap.xml # (선택적, 증거 보관용)
datasets/
└── country={cc}/
└── category={cat}/
└── date=YYYY-MM-DD/
├── raw_0001.json
└── raw_0001.json.success # 파일 처리 완료 표시

4. 데이터 구조 및 처리 로직 상세 (Data Structure & Processing Logic)

섹션 제목: “4. 데이터 구조 및 처리 로직 상세 (Data Structure & Processing Logic)”

4.1.1 현재 Raw 파일 스키마 (EnhancedResearchDataset)

섹션 제목: “4.1.1 현재 Raw 파일 스키마 (EnhancedResearchDataset)”

현재 R2에 저장된 raw 파일은 다음 Zod 스키마를 따릅니다:

src/schemas/research.ts
export const EnhancedResearchDataset = z.object({
meta: EnhancedResearchDatasetMeta,
records: z.array(EnhancedResearchRecord)
});
export const EnhancedResearchDatasetMeta = z.object({
dataset_id: z.string().min(1), // 예: "sg-news-2026-01-28-0001"
country: DomainCountryCode, // 예: "SG"
category: z.string().min(1), // 예: "news"
discovered_at: z.string().datetime(), // ISO 8601
research_methods: z.array(ResearchMethod), // ["google_search", "crtsh"]
queries: z.array(z.string()).min(1), // ["URL discovery batch"]
engine: z.object({
name: z.string().min(1), // "research-service"
version: z.string().min(1) // "2.0.0"
}),
record_count: z.number().min(0) // records.length와 일치해야 함
});
export const EnhancedResearchRecord = z.object({
raw_url: z.string().url(), // 원본 URL
normalized_domain: z.string().min(1), // 정규화된 도메인 (예: "mom.gov.sg")
domain_id: DomainId, // "gov:sg:mom.gov.sg" 형식
source_type: Authority, // "gov" | "edu" | "org" | "com" | "other"
discovery_method: ResearchMethod, // "manual" | "google_search" | "crtsh"
confidence: z.number().min(0).max(1), // 0.0 ~ 1.0
content_hints: z.array(z.string()).default([]), // ["news", "policy"]
liveness_status: LivenessStatus.optional() // "alive" | "unstable" | "blocked" | "dead"
});
{
"meta": {
"dataset_id": "sg-news-2026-01-28-0001",
"country": "SG",
"category": "news",
"discovered_at": "2026-01-28T10:30:00Z",
"research_methods": ["google_search", "crtsh"],
"queries": ["Singapore government news"],
"engine": {
"name": "research-service",
"version": "2.0.0"
},
"record_count": 2
},
"records": [
{
"raw_url": "https://www.mom.gov.sg/newsroom",
"normalized_domain": "mom.gov.sg",
"domain_id": "gov:sg:mom.gov.sg",
"source_type": "gov",
"discovery_method": "google_search",
"confidence": 0.95,
"content_hints": ["news", "policy"],
"liveness_status": "alive"
},
{
"raw_url": "https://www.moh.gov.sg/news",
"normalized_domain": "moh.gov.sg",
"domain_id": "gov:sg:moh.gov.sg",
"source_type": "gov",
"discovery_method": "crtsh",
"confidence": 0.90,
"content_hints": ["news"],
"liveness_status": "alive"
}
]
}

현재 구조 (기존 코드):

research/datasets/country={cc}/category={cat}/{date}_{chunk}.json
예: research/datasets/country=sg/category=news/2026-01-28_0001.json

새로운 Seed Engine 구조:

datasets/country={cc}/category={cat}/date=YYYY-MM-DD/raw_NNNN.json
예: datasets/country=sg/category=news/date=2026-01-28/raw_0001.json

주요 차이점:

  1. 경로 접두사: research/datasets/datasets/ (Raw 데이터 레이어)
  2. 날짜 위치: 파일명 → 디렉토리 (date=YYYY-MM-DD/)
  3. 파일명 형식: {date}_{chunk}.jsonraw_{chunk}.json (날짜 제거)

4.2.1 File Processor에서 도메인 추출 프로세스

섹션 제목: “4.2.1 File Processor에서 도메인 추출 프로세스”
/**
* Raw 파일에서 도메인 정보 추출
* 기존 normalizeDomain 함수 활용
*/
async function extractDomainsFromRawFile(
data: EnhancedResearchDataset,
country: string
): Promise<NormalizedDomain[]> {
const domains: NormalizedDomain[] = [];
const seenDomainIds = new Set<string>();
for (const record of data.records) {
try {
// 1. 기존 record에 이미 domain_id가 있으면 활용
if (record.domain_id && validateDomainId(record.domain_id)) {
// 중복 체크
if (seenDomainIds.has(record.domain_id)) {
continue; // 이미 처리된 도메인 스킵
}
seenDomainIds.add(record.domain_id);
// 2. NormalizedDomain 객체 재구성
const normalized: NormalizedDomain = {
input_url: record.raw_url,
hostname: extractHostname(record.raw_url),
registrable_domain: record.normalized_domain,
authority: record.source_type,
country: record.domain_id.split(':')[1].toUpperCase() as CountryCode,
domain_id: record.domain_id,
domain_hash: generateDomainHash(record.domain_id)
};
domains.push(normalized);
} else {
// 3. domain_id가 없거나 유효하지 않으면 재정규화
const normalized = normalizeDomain(record.raw_url, country);
if (seenDomainIds.has(normalized.domain_id)) {
continue; // 중복 스킵
}
seenDomainIds.add(normalized.domain_id);
domains.push(normalized);
}
} catch (error) {
logger.warn("domain_extraction_failed", {
url: record.raw_url,
error: error instanceof Error ? error.message : 'Unknown error'
});
// 에러가 발생해도 다음 record 계속 처리
}
}
return domains;
}
/**
* URL에서 hostname 추출 (www 제거)
*/
function extractHostname(url: string): string {
try {
const u = new URL(url);
return u.hostname.replace(/^www\./, '').toLowerCase();
} catch {
return '';
}
}
  1. domain_id 기반 중복 제거: Set<string>을 사용하여 O(1) 조회
  2. 같은 raw 파일 내 중복: File Processor에서 처리
  3. 다른 raw 파일 간 중복: Domain Collector에서 처리 (선택적, KV 활용 가능)

4.2.3 도메인 정규화 로직 재사용

섹션 제목: “4.2.3 도메인 정규화 로직 재사용”

기존 src/lib/domain.ts의 함수들을 그대로 활용:

import {
normalizeDomain, // URL → NormalizedDomain
batchNormalizeDomains, // URL[] → NormalizedDomain[]
deduplicateDomains, // 중복 제거
extractRegistrableDomain, // hostname → registrable domain
detectAuthority, // domain → Authority
generateDomainHash // domain_id → SHA-256 hash
} from '../lib/domain';
export const DomainMetadataSchema = z.object({
domain_id: DomainId, // "gov:sg:mom.gov.sg"
registrable_domain: z.string(), // "mom.gov.sg"
country: DomainCountryCode, // "SG"
category: z.string(), // "news"
collected_at: z.string().datetime(), // 수집 시각
// robots.txt 정보
robots: z.object({
status_code: z.number(), // HTTP 상태 코드
content_length: z.number().int().min(0), // 바이트 수
exists: z.boolean(), // 파일 존재 여부
fetched_at: z.string().datetime().optional(),
sitemap_urls: z.array(z.string().url()).optional() // robots.txt에서 추출한 sitemap URL
}),
// sitemap.xml 정보
sitemap: z.object({
status_code: z.number(),
content_length: z.number().int().min(0),
exists: z.boolean(),
fetched_at: z.string().datetime().optional(),
url_count: z.number().int().min(0).optional() // sitemap 내 URL 개수 (파싱 성공 시)
}),
// 원본 raw 파일 정보 (추적용)
source: z.object({
raw_file_path: z.string(), // 원본 raw 파일 경로
record_index: z.number().int().optional() // raw 파일 내 record 인덱스
}).optional()
});
processing/
└── country={cc}/
└── category={cat}/
└── date=YYYY-MM-DD/
├── {registrable_domain}/
│ ├── domain_metadata.json
│ ├── domain_metadata.json.success
│ ├── robots.txt # (선택적, 증거 보관)
│ └── sitemap.xml # (선택적, 증거 보관)
└── {another_domain}/
└── ...

경로 빌더 함수:

function buildDomainResultPath(
partition: { country: string; category: string; date: string },
registrableDomain: string
): string {
// 도메인명을 파일 시스템 안전한 형식으로 변환
const safeDomain = registrableDomain.replace(/[^a-z0-9.-]/g, '_');
return `processing/country=${partition.country}/category=${partition.category}/date=${partition.date}/${safeDomain}`;
}

필수 파일:

  • domain_metadata.json: 수집 결과 메타데이터 (Zod 검증 필수)
  • domain_metadata.json.success: Checkpoint 파일 (빈 파일)

선택적 파일 (증거 보관용):

  • robots.txt: 원본 robots.txt 내용 (텍스트)
  • sitemap.xml: 원본 sitemap.xml 내용 (XML)

저장 순서:

  1. robots.txt 수집 및 저장 (실패해도 계속)
  2. sitemap.xml 수집 및 저장 (실패해도 계속)
  3. domain_metadata.json 생성 및 저장 (Zod 검증)
  4. domain_metadata.json.success 생성 (Checkpoint)

4.4 기존 코드와의 통합 및 수정 방향

섹션 제목: “4.4 기존 코드와의 통합 및 수정 방향”

현재 사용 중인 모듈:

  • src/services/dataset.service.ts: R2 저장 로직
  • src/lib/r2.ts: R2 경로 빌더 (buildR2DatasetPath)
  • src/lib/domain.ts: 도메인 정규화 로직
  • src/schemas/research.ts: EnhancedResearchDataset 스키마

기존 경로 빌더:

// src/lib/r2.ts (현재)
export function buildR2DatasetPath(
country: CountryCode,
category: ContentCategory,
date: string,
chunkIndex: number
): string {
const chunkPadded = String(chunkIndex).padStart(4, "0");
return `research/datasets/country=${country.toLowerCase()}/category=${category}/${date}_${chunkPadded}.json`;
}

1. 새로운 경로 빌더 추가 (기존 코드 유지):

// src/lib/r2.ts에 추가
/**
* Build R2 path for raw dataset (Seed Engine용)
* Format: datasets/country={cc}/category={cat}/date=YYYY-MM-DD/raw_NNNN.json
*/
export function buildRawDatasetPath(
country: CountryCode,
category: ContentCategory,
date: string,
chunkIndex: number
): string {
const chunkPadded = String(chunkIndex).padStart(4, "0");
return `datasets/country=${country.toLowerCase()}/category=${category}/date=${date}/raw_${chunkPadded}.json`;
}
/**
* Build R2 path for domain metadata (Seed Engine 결과)
* Format: processing/country={cc}/category={cat}/date=YYYY-MM-DD/{domain}/domain_metadata.json
*/
export function buildDomainMetadataPath(
country: CountryCode,
category: ContentCategory,
date: string,
registrableDomain: string
): string {
const safeDomain = registrableDomain.replace(/[^a-z0-9.-]/g, '_');
return `processing/country=${country.toLowerCase()}/category=${category}/date=${date}/${safeDomain}/domain_metadata.json`;
}

2. 기존 DatasetService 확장 (또는 새 서비스 생성):

// 기존 DatasetService는 research/datasets/ 경로 유지
// 새로운 SeedDatasetService는 datasets/ 경로 사용
export class SeedDatasetService {
// buildRawDatasetPath 사용
async getRawDataset(...): Promise<EnhancedResearchDataset | null> {
const path = buildRawDatasetPath(country, category, date, chunkIndex);
// ...
}
}

3. 도메인 정규화 로직 재사용:

  • src/lib/domain.ts의 모든 함수는 그대로 활용
  • 추가 수정 불필요

옵션 1: 병행 운영 (권장)

  • 기존 research/datasets/ 경로는 유지
  • 새로운 Seed Engine은 datasets/ 경로 사용
  • 두 시스템이 독립적으로 동작

옵션 2: 점진적 마이그레이션

  • 기존 파일을 새 경로로 복사하는 스크립트 작성
  • 검증 후 기존 경로 제거

POST /api/v1/seeds/orchestrate
Body: {
country?: string; // 선택적 필터
category?: string; // 선택적 필터
date?: string; // 선택적 필터 (YYYY-MM-DD)
force?: boolean; // .success 파일 무시하고 재처리
}
// 1. R2 List API로 raw_NNNN.json 파일 목록 수집
const files = await listRawFiles(bucket, partition);
// 2. 순서대로 정렬 (raw_0001.json, raw_0002.json, ...)
const sortedFiles = sortFilesBySequence(files);
// 3. 각 파일 경로를 SEED_QUEUE에 발송
for (const file of sortedFiles) {
await SEED_QUEUE.send({
file_path: file.key,
partition_info: {
country: extractCountry(file.key),
category: extractCategory(file.key),
date: extractDate(file.key)
}
});
}
  • Bulk List API 사용: bucket.list({ prefix, limit: 1000, cursor })
  • Pagination 처리: cursor를 사용하여 모든 파일 수집
  • 필터링: raw_NNNN.json 패턴만 선택 (.success 파일 제외)
export async function processSeedQueue(
message: Message<SeedQueueMessage>,
env: Bindings
): Promise<void> {
const { file_path, partition_info } = message.body;
// 1. Early Exit: .success 파일 확인
const successPath = `${file_path}.success`;
const exists = await env.DATASETS_BUCKET.head(successPath);
if (exists) {
logger.info("file_already_processed", { file_path });
return; // Early exit
}
// 2. raw 파일을 ReadableStream으로 읽기
const file = await env.DATASETS_BUCKET.get(file_path);
if (!file) {
throw new Error(`File not found: ${file_path}`);
}
// 3. Streaming JSON 파싱 (10-20MB 제한 준수)
const content = await streamToString(file.body);
const data = JSON.parse(content) as RawDataset;
// 4. 도메인 추출 및 정규화
const domains = extractDomains(data, partition_info.country);
// 5. 각 도메인을 DOMAIN_QUEUE에 발송
for (const domain of domains) {
await env.DOMAIN_QUEUE.send({
domain_id: domain.domain_id,
domain_url: domain.input_url,
registrable_domain: domain.registrable_domain,
partition_info
});
}
// 6. 처리 완료 표시 (.success 파일 생성)
await env.DATASETS_BUCKET.put(successPath, "");
}

기존 코드 활용:

  • src/lib/domain.tsnormalizeDomain() 함수 재사용
  • src/lib/domain.tsdeduplicateDomains() 함수 재사용
  • src/schemas/research.tsEnhancedResearchDataset 스키마 활용
import { EnhancedResearchDataset } from '../schemas/research';
import { NormalizedDomain } from '../schemas/domain';
import { normalizeDomain, deduplicateDomains, validateDomainId, generateDomainHash } from '../lib/domain';
import { CountryCode, Authority } from '../schemas';
/**
* Raw 파일에서 도메인 정보 추출
* 기존 record의 domain_id를 최대한 활용하여 재정규화 최소화
*/
async function extractDomainsFromRawFile(
data: EnhancedResearchDataset,
country: string
): Promise<NormalizedDomain[]> {
const domains: NormalizedDomain[] = [];
const seenDomainIds = new Set<string>(); // 중복 제거용 (O(1) 조회)
for (let i = 0; i < data.records.length; i++) {
const record = data.records[i];
try {
let normalized: NormalizedDomain;
// Case 1: record에 이미 유효한 domain_id가 있는 경우 (최적화)
if (record.domain_id && validateDomainId(record.domain_id)) {
// 중복 체크
if (seenDomainIds.has(record.domain_id)) {
continue; // 이미 처리된 도메인 스킵
}
seenDomainIds.add(record.domain_id);
// NormalizedDomain 객체 재구성 (재정규화 없이)
const urlParts = record.domain_id.split(':');
const authority = urlParts[0] as Authority;
const countryCode = urlParts[1].toUpperCase() as CountryCode;
normalized = {
input_url: record.raw_url,
hostname: extractHostname(record.raw_url),
registrable_domain: record.normalized_domain,
authority: authority,
country: countryCode,
domain_id: record.domain_id,
domain_hash: generateDomainHash(record.domain_id)
};
// Zod 검증
const validated = NormalizedDomain.safeParse(normalized);
if (!validated.success) {
logger.warn("normalized_domain_validation_failed", {
domain_id: record.domain_id,
issues: validated.error.issues
});
// 검증 실패 시 재정규화로 폴백
normalized = normalizeDomain(record.raw_url, country);
} else {
normalized = validated.data;
}
} else {
// Case 2: domain_id가 없거나 유효하지 않은 경우 재정규화
normalized = normalizeDomain(record.raw_url, country);
// 중복 체크
if (seenDomainIds.has(normalized.domain_id)) {
continue;
}
seenDomainIds.add(normalized.domain_id);
}
domains.push(normalized);
} catch (error) {
logger.warn("domain_extraction_failed", {
record_index: i,
url: record.raw_url,
error: error instanceof Error ? error.message : 'Unknown error'
});
// 에러가 발생해도 다음 record 계속 처리
}
}
// 최종 중복 제거 (안전장치)
return deduplicateDomains(domains);
}
/**
* URL에서 hostname 추출 (www 제거)
*/
function extractHostname(url: string): string {
try {
const u = new URL(url);
return u.hostname.replace(/^www\./, '').toLowerCase();
} catch {
return '';
}
}

성능 최적화 포인트:

  1. 기존 domain_id 활용: 재정규화 최소화 (CPU 절약)
  2. Set 기반 중복 제거: O(1) 조회로 성능 최적화
  3. 에러 허용: 개별 record 실패가 전체 처리 중단하지 않음

4.3 Domain Collector Worker 설계 (상세)

섹션 제목: “4.3 Domain Collector Worker 설계 (상세)”
import { DomainMetadataSchema } from '../schemas/seed-engine';
import { buildDomainMetadataPath } from '../lib/r2';
export async function processDomainQueue(
message: Message<DomainQueueMessage>,
env: Bindings
): Promise<void> {
const { domain_id, domain_url, registrable_domain, partition_info, authority, source_file_path } = message.body;
logger.info("domain_collector_start", { domain_id, registrable_domain });
// 1. Early Exit: domain_metadata.json.success 확인
const resultPath = buildDomainResultPath(partition_info, registrable_domain);
const metadataPath = `${resultPath}/domain_metadata.json`;
const successPath = `${metadataPath}.success`;
const exists = await env.DATASETS_BUCKET.head(successPath);
if (exists) {
logger.info("domain_already_processed", { domain_id });
return; // Early exit - 이미 처리 완료
}
// 2. robots.txt 수집
const robotsResult = await fetchRobotsTxt(registrable_domain);
// 3. robots.txt에서 sitemap URL 추출
const sitemapUrls = extractSitemapUrlsFromRobots(robotsResult.content);
// 4. sitemap.xml 수집 (robots.txt에서 추출한 URL 우선 시도)
const sitemapResult = await fetchSitemapXml(registrable_domain, sitemapUrls);
// 5. sitemap.xml 파싱 (URL 개수 추출)
const sitemapUrlCount = sitemapResult.content
? parseSitemapUrlCount(sitemapResult.content)
: undefined;
// 6. 결과 메타데이터 생성 (Zod 스키마 준수)
const metadata = {
domain_id,
registrable_domain,
country: partition_info.country.toUpperCase(),
category: partition_info.category,
collected_at: new Date().toISOString(),
robots: {
status_code: robotsResult.status,
content_length: robotsResult.content?.length || 0,
exists: robotsResult.status === 200,
fetched_at: robotsResult.content ? new Date().toISOString() : undefined,
sitemap_urls: sitemapUrls
},
sitemap: {
status_code: sitemapResult.status,
content_length: sitemapResult.content?.length || 0,
exists: sitemapResult.status === 200,
fetched_at: sitemapResult.content ? new Date().toISOString() : undefined,
url_count: sitemapUrlCount,
url: sitemapResult.url
},
source: source_file_path ? {
raw_file_path: source_file_path
} : undefined
};
// 7. Zod 스키마로 검증
const validated = DomainMetadataSchema.safeParse(metadata);
if (!validated.success) {
throw new ValidationError("Invalid domain metadata", {
issues: validated.error.issues,
domain_id
});
}
// 8. 결과 파일 저장 (순서 중요)
// 8-1. 증거 파일 저장 (선택적, 실패해도 계속)
if (robotsResult.content) {
try {
await env.DATASETS_BUCKET.put(
`${resultPath}/robots.txt`,
robotsResult.content,
{
httpMetadata: {
contentType: 'text/plain'
}
}
);
} catch (error) {
logger.warn("robots_file_save_failed", {
domain_id,
error: error instanceof Error ? error.message : 'Unknown error'
});
// 파일 저장 실패해도 메타데이터는 저장
}
}
if (sitemapResult.content) {
try {
await env.DATASETS_BUCKET.put(
`${resultPath}/sitemap.xml`,
sitemapResult.content,
{
httpMetadata: {
contentType: 'application/xml'
}
}
);
} catch (error) {
logger.warn("sitemap_file_save_failed", {
domain_id,
error: error instanceof Error ? error.message : 'Unknown error'
});
}
}
// 8-2. 메타데이터 파일 저장 (필수)
await env.DATASETS_BUCKET.put(
metadataPath,
JSON.stringify(validated.data, null, 2),
{
httpMetadata: {
contentType: 'application/json'
},
customMetadata: {
domain_id,
registrable_domain,
country: partition_info.country,
category: partition_info.category,
date: partition_info.date
}
}
);
// 9. Checkpoint 생성 (마지막 단계)
await env.DATASETS_BUCKET.put(successPath, "", {
httpMetadata: {
contentType: 'text/plain'
}
});
logger.info("domain_collector_complete", {
domain_id,
robots_exists: robotsResult.status === 200,
sitemap_exists: sitemapResult.status === 200
});
}
/**
* 결과 경로 빌더
*/
function buildDomainResultPath(
partition: { country: string; category: string; date: string },
registrableDomain: string
): string {
// 도메인명을 파일 시스템 안전한 형식으로 변환
const safeDomain = registrableDomain.replace(/[^a-z0-9.-]/g, '_');
return `processing/country=${partition.country}/category=${partition.category}/date=${partition.date}/${safeDomain}`;
}
interface FetchResult {
status: number;
content: string | null;
url: string;
error?: string;
responseTimeMs?: number;
}
/**
* robots.txt 수집
*/
async function fetchRobotsTxt(domain: string): Promise<FetchResult> {
const url = `https://${domain}/robots.txt`;
const startTime = Date.now();
try {
const response = await fetch(url, {
headers: {
'User-Agent': 'Newsfork-SeedEngine/1.0',
'Accept': 'text/plain, */*'
},
signal: AbortSignal.timeout(10000), // 10초 타임아웃
redirect: 'follow' // 리다이렉트 따라가기
});
const responseTimeMs = Date.now() - startTime;
let content: string | null = null;
if (response.ok) {
try {
content = await response.text();
// 크기 제한 검사 (1MB)
if (content.length > 1024 * 1024) {
logger.warn("robots_file_too_large", {
domain,
size: content.length,
url
});
content = content.substring(0, 1024 * 1024); // 처음 1MB만 저장
}
} catch (error) {
logger.warn("robots_content_read_failed", {
domain,
error: error instanceof Error ? error.message : 'Unknown error'
});
}
}
return {
status: response.status,
content,
url,
responseTimeMs
};
} catch (error) {
const responseTimeMs = Date.now() - startTime;
// 에러 타입별 처리
if (error instanceof TypeError && error.message.includes('fetch')) {
// 네트워크 에러
return {
status: 0,
content: null,
url,
error: 'Network error',
responseTimeMs
};
} else if (error instanceof DOMException && error.name === 'AbortError') {
// 타임아웃
return {
status: 0,
content: null,
url,
error: 'Timeout',
responseTimeMs
};
}
return {
status: 0,
content: null,
url,
error: error instanceof Error ? error.message : 'Unknown error',
responseTimeMs
};
}
}
/**
* robots.txt에서 sitemap URL 추출
*/
function extractSitemapUrlsFromRobots(robotsContent: string | null): string[] {
if (!robotsContent) {
return [];
}
const urls: string[] = [];
const lines = robotsContent.split('\n');
for (const line of lines) {
const trimmed = line.trim();
// 대소문자 구분 없이 "Sitemap:" 또는 "sitemap:" 찾기
const match = trimmed.match(/^sitemap:\s*(.+)$/i);
if (match && match[1]) {
const url = match[1].trim();
try {
// URL 유효성 검사
new URL(url);
urls.push(url);
} catch {
// 유효하지 않은 URL은 스킵
logger.warn("invalid_sitemap_url_in_robots", { url });
}
}
}
return [...new Set(urls)]; // 중복 제거
}
/**
* sitemap.xml 수집
* robots.txt에서 추출한 URL을 우선 시도
*/
async function fetchSitemapXml(
domain: string,
sitemapUrlsFromRobots: string[] = []
): Promise<FetchResult> {
// 우선순위: robots.txt에서 추출한 URL > 기본 경로
const urls = [
...sitemapUrlsFromRobots,
`https://${domain}/sitemap.xml`,
`https://${domain}/sitemap_index.xml`,
`https://${domain}/sitemaps.xml`
];
for (const url of urls) {
const startTime = Date.now();
try {
const response = await fetch(url, {
headers: {
'User-Agent': 'Newsfork-SeedEngine/1.0',
'Accept': 'application/xml, text/xml, */*'
},
signal: AbortSignal.timeout(15000), // sitemap은 더 긴 타임아웃
redirect: 'follow'
});
const responseTimeMs = Date.now() - startTime;
if (response.ok) {
try {
let content = await response.text();
// 크기 제한 검사 (10MB)
if (content.length > 10 * 1024 * 1024) {
logger.warn("sitemap_file_too_large", {
domain,
size: content.length,
url
});
content = content.substring(0, 10 * 1024 * 1024);
}
return {
status: response.status,
content,
url,
responseTimeMs
};
} catch (error) {
logger.warn("sitemap_content_read_failed", {
domain,
url,
error: error instanceof Error ? error.message : 'Unknown error'
});
// 다음 URL 시도
continue;
}
}
} catch (error) {
// 네트워크 에러나 타임아웃은 다음 URL 시도
if (error instanceof DOMException && error.name === 'AbortError') {
logger.warn("sitemap_fetch_timeout", { domain, url });
}
continue;
}
}
// 모든 URL 실패
return {
status: 404,
content: null,
url: urls[0] || `https://${domain}/sitemap.xml`,
error: 'Not found'
};
}
/**
* sitemap.xml에서 URL 개수 추출
* sitemap_index.xml인 경우 하위 sitemap 개수만 반환
*/
function parseSitemapUrlCount(sitemapContent: string): number | undefined {
try {
// sitemap_index.xml인 경우
const sitemapIndexMatch = sitemapContent.match(/<sitemap>/g);
if (sitemapIndexMatch) {
return sitemapIndexMatch.length; // 하위 sitemap 개수
}
// 일반 sitemap.xml인 경우
const urlMatch = sitemapContent.match(/<url>/g);
if (urlMatch) {
return urlMatch.length; // URL 개수
}
return undefined; // 파싱 실패
} catch (error) {
logger.warn("sitemap_parse_failed", {
error: error instanceof Error ? error.message : 'Unknown error'
});
return undefined;
}
}

에러 처리 전략:

  1. 네트워크 에러: 다음 URL 시도 또는 실패 반환
  2. 타임아웃: 로그 남기고 다음 URL 시도
  3. 파일 크기 초과: 처음 N 바이트만 저장하고 경고
  4. 파싱 실패: 메타데이터는 저장하되 url_count는 undefined
  • 위치: 원본 파일과 동일한 경로에 .success 확장자 추가
  • 내용: 빈 파일 (0 bytes)
  • 용도: 작업 완료 여부 표시
  1. File Processor: raw_NNNN.json.success 확인
  2. Domain Collector: domain_metadata.json.success 확인
  • 동일 메시지가 중복 전달되어도 .success 파일이 있으면 즉시 종료
  • 부분 실패 시 재시도해도 완료된 작업은 스킵

{
"queues": {
"producers": [
{
"binding": "SEED_QUEUE",
"queue": "newsfork-seed-dev"
},
{
"binding": "DOMAIN_QUEUE",
"queue": "newsfork-domain-dev"
}
],
"consumers": [
{
"queue": "newsfork-seed-dev",
"max_batch_size": 1,
"max_batch_timeout": 30,
"max_retries": 3,
"dead_letter_queue": "newsfork-seed-dlq-dev"
},
{
"queue": "newsfork-domain-dev",
"max_batch_size": 1,
"max_batch_timeout": 30,
"max_retries": 3,
"dead_letter_queue": "newsfork-domain-dlq-dev"
}
]
}
}
interface SeedQueueMessage {
file_path: string; // R2 파일 경로
partition_info: {
country: string;
category: string;
date: string; // YYYY-MM-DD
};
}
interface DomainQueueMessage {
domain_id: string; // authority:country:domain
domain_url: string; // 원본 URL
registrable_domain: string; // 정규화된 도메인
partition_info: {
country: string;
category: string;
date: string;
};
}

6. 로컬 테스트 방법 (Local Testing Guide)

섹션 제목: “6. 로컬 테스트 방법 (Local Testing Guide)”
Terminal window
# Wrangler 로컬 개발 모드 시작
pnpm dev:local
# 또는 특정 환경 변수와 함께
pnpm dev:local --env dev

Wrangler는 로컬 개발 시 .wrangler/state 디렉토리에 로컬 R2 버킷을 생성합니다.

Terminal window
# 로컬 R2 버킷 확인
wrangler r2 bucket list --local
# 로컬 R2에 테스트 파일 업로드
wrangler r2 object put \
datasets/country=sg/category=news/date=2026-01-28/raw_0001.json \
--file=./test/data/raw_0001.json \
--local

로컬 개발 시 Queue는 메모리 기반으로 동작합니다. wrangler dev --local 실행 시 자동으로 설정됩니다.

1. 테스트 데이터 준비

Terminal window
# 로컬에 테스트 raw 파일 생성
mkdir -p test/data/datasets/country=sg/category=news/date=2026-01-28
# 샘플 raw_0001.json 생성
cat > test/data/raw_0001.json << 'EOF'
{
"meta": {
"dataset_id": "sg-news-2026-01-28-0001",
"country": "SG",
"category": "news",
"record_count": 2
},
"records": [
{
"raw_url": "https://www.mom.gov.sg/newsroom",
"normalized_domain": "mom.gov.sg",
"domain_id": "gov:sg:mom.gov.sg"
},
{
"raw_url": "https://www.moh.gov.sg/news",
"normalized_domain": "moh.gov.sg",
"domain_id": "gov:sg:moh.gov.sg"
}
]
}
EOF
# 로컬 R2에 업로드
wrangler r2 object put \
datasets/country=sg/category=news/date=2026-01-28/raw_0001.json \
--file=test/data/raw_0001.json \
--local

2. Orchestrator API 호출

Terminal window
# 로컬 Worker 실행 중인 상태에서
curl -X POST http://localhost:8787/api/v1/seeds/orchestrate \
-H "Content-Type: application/json" \
-d '{
"country": "sg",
"category": "news",
"date": "2026-01-28"
}'

3. Queue 메시지 확인

// src/routes/seeds.ts에서 로그 확인
// 또는 wrangler tail --local로 실시간 로그 확인

시나리오 2: File Processor Worker 테스트

섹션 제목: “시나리오 2: File Processor Worker 테스트”

1. Queue에 직접 메시지 발송 (테스트용)

test/seed-engine.test.ts
import { env } from './test-setup';
test('file processor processes raw file', async () => {
await env.SEED_QUEUE.send({
file_path: 'datasets/country=sg/category=news/date=2026-01-28/raw_0001.json',
partition_info: {
country: 'sg',
category: 'news',
date: '2026-01-28'
}
});
// Worker가 자동으로 처리
// 결과 확인: DOMAIN_QUEUE에 메시지가 발송되었는지 확인
});

2. 로컬 Queue 모니터링

Terminal window
# Wrangler 로그 확인
wrangler tail --local
# 또는 Worker 코드에 console.log 추가하여 확인

시나리오 3: Domain Collector Worker 테스트

섹션 제목: “시나리오 3: Domain Collector Worker 테스트”

1. Mock HTTP Fetch (로컬 테스트용)

test/domain-collector.test.ts
import { env, createMockFetch } from './test-setup';
test('domain collector fetches robots and sitemap', async () => {
// Mock fetch 설정
global.fetch = createMockFetch({
'https://mom.gov.sg/robots.txt': {
status: 200,
body: 'User-agent: *\nAllow: /'
},
'https://mom.gov.sg/sitemap.xml': {
status: 200,
body: '<?xml version="1.0"?><urlset>...</urlset>'
}
});
await env.DOMAIN_QUEUE.send({
domain_id: 'gov:sg:mom.gov.sg',
domain_url: 'https://www.mom.gov.sg/newsroom',
registrable_domain: 'mom.gov.sg',
partition_info: {
country: 'sg',
category: 'news',
date: '2026-01-28'
}
});
// 결과 확인: R2에 파일이 저장되었는지 확인
const result = await env.DATASETS_BUCKET.get(
'processing/country=sg/category=news/date=2026-01-28/mom.gov.sg/domain_metadata.json'
);
expect(result).toBeTruthy();
const metadata = JSON.parse(await result.text());
expect(metadata.robots.exists).toBe(true);
expect(metadata.sitemap.exists).toBe(true);
});

2. 실제 HTTP 요청 테스트 (옵션)

// 실제 도메인에 요청 (로컬에서도 가능)
// 단, 타임아웃과 에러 핸들링 테스트 필요
Terminal window
# 1. 로컬 R2에 테스트 데이터 준비
./scripts/prepare-test-data.sh
# 2. Wrangler 로컬 개발 모드 시작
pnpm dev:local
# 3. Orchestrator API 호출
curl -X POST http://localhost:8787/api/v1/seeds/orchestrate \
-H "Content-Type: application/json" \
-d '{"country": "sg", "category": "news", "date": "2026-01-28"}'
# 4. 로그 모니터링
wrangler tail --local
# 5. 결과 확인
wrangler r2 object list \
processing/country=sg/category=news/date=2026-01-28/ \
--local
Terminal window
# 로컬 R2 버킷 목록
wrangler r2 bucket list --local
# 특정 경로의 파일 목록
wrangler r2 object list \
datasets/country=sg/category=news/date=2026-01-28/ \
--local
# 파일 내용 확인
wrangler r2 object get \
datasets/country=sg/category=news/date=2026-01-28/raw_0001.json \
--local \
--file=./output.json

로컬 개발 시 Queue는 메모리 기반이므로 Worker 로그를 통해 확인합니다.

src/apps/api/queue-handler.ts
logger.info("queue_message_received", {
queue: batch.queue,
messageId: message.id,
body: message.body
});
Terminal window
# .success 파일 존재 여부 확인
wrangler r2 object head \
datasets/country=sg/category=news/date=2026-01-28/raw_0001.json.success \
--local

6.5 테스트 데이터 생성 스크립트

섹션 제목: “6.5 테스트 데이터 생성 스크립트”
scripts/prepare-test-data.sh
#!/bin/bash
BUCKET="newsfork-datasets-dev"
COUNTRY="sg"
CATEGORY="news"
DATE="2026-01-28"
# 테스트 raw 파일 생성
cat > /tmp/raw_0001.json << 'EOF'
{
"meta": {
"dataset_id": "sg-news-2026-01-28-0001",
"country": "SG",
"category": "news",
"record_count": 2
},
"records": [
{
"raw_url": "https://www.mom.gov.sg/newsroom",
"normalized_domain": "mom.gov.sg",
"domain_id": "gov:sg:mom.gov.sg"
},
{
"raw_url": "https://www.moh.gov.sg/news",
"normalized_domain": "moh.gov.sg",
"domain_id": "gov:sg:moh.gov.sg"
}
]
}
EOF
# 로컬 R2에 업로드
wrangler r2 object put \
datasets/country=${COUNTRY}/category=${CATEGORY}/date=${DATE}/raw_0001.json \
--file=/tmp/raw_0001.json \
--local
echo "✅ Test data prepared"

7. 에러 처리 및 재시도 (Error Handling)

섹션 제목: “7. 에러 처리 및 재시도 (Error Handling)”
  • 네트워크 타임아웃: HTTP fetch 실패
  • 일시적 R2 오류: R2 API 5xx 에러
  • Queue 전송 실패: 일시적 Queue 오류
  • 파일 없음: raw_NNNN.json이 존재하지 않음
  • 잘못된 JSON: 파일 파싱 실패
  • 도메인 형식 오류: URL 파싱 실패
  • 최대 3회 재시도 후 DLQ로 이동
  • DLQ 메시지는 수동 검토 필요
  • 자동 재처리 없음 (규칙 준수)
// DLQ 메시지 확인 (수동)
const dlqMessages = await env.SEED_DLQ.receive();
for (const message of dlqMessages) {
logger.error("dlq_message", {
messageId: message.id,
body: message.body,
error: message.error
});
}

8. 성능 최적화 (Performance Optimization)

섹션 제목: “8. 성능 최적화 (Performance Optimization)”
  • File Processor: 하나의 raw 파일 → N개의 도메인 메시지
  • Domain Collector: 각 도메인마다 독립적인 Worker 실행
  • 동시성: Cloudflare가 자동으로 수백 개의 Worker 병렬 실행
  • max_batch_size: 1: Worker당 하나의 메시지만 처리
  • CPU/메모리 고립 보장
  • 실패 시 영향 범위 최소화
// 10-20MB 파일을 스트리밍으로 읽기
const file = await bucket.get(file_path);
const content = await streamToString(file.body); // ReadableStream → string
  • 큰 JSON 파일을 한 번에 파싱하지 않음
  • 필요 시 스트리밍 JSON 파서 사용 (선택적)

9. 모니터링 및 로깅 (Monitoring & Logging)

섹션 제목: “9. 모니터링 및 로깅 (Monitoring & Logging)”
logger.info("orchestrator_start", { partition_info });
logger.info("files_found", { count: files.length });
logger.info("queue_messages_sent", { count: files.length });
logger.info("file_processing_start", { file_path });
logger.info("domains_extracted", { count: domains.length });
logger.info("file_processing_complete", { file_path });
logger.info("domain_processing_start", { domain_id });
logger.info("robots_fetched", { status: robotsContent.status });
logger.info("sitemap_fetched", { status: sitemapContent.status });
logger.info("domain_processing_complete", { domain_id });
  • 처리된 파일 수
  • 처리된 도메인 수
  • 성공/실패 비율
  • 평균 처리 시간
  • Worker 실행 횟수
  • Queue 메시지 처리량
  • R2 API 호출 횟수

  • Queue: newsfork-seed-dev, newsfork-domain-dev
  • R2 Bucket: newsfork-datasets-dev
  • 테스트 데이터로 검증
  • Queue: newsfork-seed-staging, newsfork-domain-staging
  • R2 Bucket: newsfork-datasets-staging
  • 프로덕션 데이터 샘플로 검증
  • Queue: newsfork-seed-prod, newsfork-domain-prod
  • R2 Bucket: newsfork-datasets-prod
  • 전체 데이터 처리
  1. Phase 1: Orchestrator만 배포 (Queue 발송만)
  2. Phase 2: File Processor 배포 (파일 읽기 테스트)
  3. Phase 3: Domain Collector 배포 (전체 파이프라인)
  • 각 Phase마다 24시간 모니터링
  • 에러율, 처리 속도 확인
  • 문제 발생 시 즉시 롤백

11. 보안 고려사항 (Security Considerations)

섹션 제목: “11. 보안 고려사항 (Security Considerations)”
  • 모든 Queue 메시지는 Zod 스키마로 검증
  • 파일 경로는 Hive-style 파티션 규칙 준수 확인
  • 도메인 URL은 정규화 및 검증
  • HTTP fetch 시 타임아웃 설정 (10초)
  • 동일 도메인에 대한 연속 요청 제한 (선택적)
  • 내부 경로나 민감한 정보 노출 방지
  • 사용자에게는 일반적인 에러 메시지만 제공

12. 향후 개선 사항 (Future Enhancements)

섹션 제목: “12. 향후 개선 사항 (Future Enhancements)”
  • Sitemap Index 지원: 여러 sitemap 파일 처리
  • RSS Feed 수집: robots.txt에서 RSS URL 추출
  • Homepage 수집: 도메인 홈페이지 HTML 저장
  • 배치 처리: 여러 도메인을 하나의 Worker에서 처리 (선택적)
  • 캐싱: KV를 활용한 robots.txt 캐싱
  • 병렬 Fetch: robots.txt와 sitemap.xml 동시 수집
  • 대시보드: 처리 진행 상황 시각화
  • 알림: 실패율 임계값 초과 시 알림
  • 리포트: 일일/주간 처리 통계 리포트

  • Queue 생성 (newsfork-seed-dev, newsfork-domain-dev)
  • DLQ 생성 (newsfork-seed-dlq-dev, newsfork-domain-dlq-dev)
  • R2 버킷 확인 (forknews-datasets)
  • 테스트 데이터 준비
  • Orchestrator API 구현
  • File Processor Worker 구현
  • Domain Collector Worker 구현
  • Stateless Checkpointing 구현
  • 에러 처리 및 로깅
  • Zod 스키마 정의
  • 로컬 환경에서 전체 파이프라인 테스트
  • Checkpoint 동작 확인
  • 재시도 로직 확인
  • DLQ 동작 확인
  • 에러 케이스 테스트
  • Development 환경 배포
  • 모니터링 설정
  • Staging 환경 배포
  • Production 환경 배포

  • src/lib/domain.ts: 도메인 정규화 로직
  • src/lib/queue/queue-handlers.ts: 기존 Queue 핸들러
  • src/infra/cloudflare/r2/: R2 Storage Adapter


15. 새로운 스키마 정의 (New Schema Definitions)

섹션 제목: “15. 새로운 스키마 정의 (New Schema Definitions)”
// src/schemas/seed-engine.ts (새로 생성)
import { z } from 'zod';
import { DomainId, DomainCountryCode } from './domain';
export const RobotsMetadataSchema = z.object({
status_code: z.number().int().min(0).max(599),
content_length: z.number().int().min(0),
exists: z.boolean(),
fetched_at: z.string().datetime().optional(),
sitemap_urls: z.array(z.string().url()).optional()
});
export const SitemapMetadataSchema = z.object({
status_code: z.number().int().min(0).max(599),
content_length: z.number().int().min(0),
exists: z.boolean(),
fetched_at: z.string().datetime().optional(),
url_count: z.number().int().min(0).optional(),
url: z.string().url()
});
export const DomainMetadataSchema = z.object({
domain_id: DomainId,
registrable_domain: z.string().min(1),
country: DomainCountryCode,
category: z.string().min(1),
collected_at: z.string().datetime(),
robots: RobotsMetadataSchema,
sitemap: SitemapMetadataSchema,
source: z.object({
raw_file_path: z.string(),
record_index: z.number().int().optional()
}).optional()
});
export type DomainMetadata = z.infer<typeof DomainMetadataSchema>;
// src/schemas/queue.ts에 추가
export const SeedQueueMessageSchema = z.object({
file_path: z.string().min(1), // R2 파일 경로
partition_info: z.object({
country: z.string().length(2),
category: z.string().min(1),
date: z.string().regex(/^\d{4}-\d{2}-\d{2}$/)
})
});
export const DomainQueueMessageSchema = z.object({
domain_id: DomainId,
domain_url: z.string().url(),
registrable_domain: z.string().min(1),
authority: z.enum(['gov', 'edu', 'org', 'com', 'other']),
partition_info: z.object({
country: z.string().length(2),
category: z.string().min(1),
date: z.string().regex(/^\d{4}-\d{2}-\d{2}$/)
}),
source_file_path: z.string().optional() // 추적용
});

16. 기존 코드 수정 체크리스트 (Code Modification Checklist)

섹션 제목: “16. 기존 코드 수정 체크리스트 (Code Modification Checklist)”
  • src/schemas/seed-engine.ts: Domain Metadata 스키마
  • src/lib/r2-seed-engine.ts: Seed Engine용 R2 경로 빌더
  • src/services/seed-dataset.service.ts: Seed Engine용 Dataset Service
  • src/apps/api/seed-orchestrator.ts: Orchestrator API 핸들러
  • src/lib/queue/seed-queue-handlers.ts: Seed Queue 핸들러
  • src/lib/queue/domain-queue-handlers.ts: Domain Queue 핸들러
  • src/schemas/queue.ts: Queue 메시지 스키마 추가
  • wrangler.jsonc: 새로운 Queue 설정 추가
  • src/index.ts: 새로운 Queue 핸들러 등록
  • src/routes/seeds.ts: Orchestrator API 엔드포인트 추가

16.3 재사용할 기존 코드 (수정 불필요)

섹션 제목: “16.3 재사용할 기존 코드 (수정 불필요)”
  • src/lib/domain.ts: 모든 도메인 정규화 함수
  • src/schemas/research.ts: EnhancedResearchDataset 스키마
  • src/lib/logger.ts: 로깅 유틸리티
  • src/lib/errors.ts: 에러 클래스

기존 (research/datasets/):

src/lib/r2.ts
buildR2DatasetPath(country, category, date, chunkIndex)
// → research/datasets/country=sg/category=news/2026-01-28_0001.json

새로운 (datasets/):

// src/lib/r2-seed-engine.ts (새로 생성)
buildRawDatasetPath(country, category, date, chunkIndex)
// → datasets/country=sg/category=news/date=2026-01-28/raw_0001.json
buildDomainMetadataPath(country, category, date, domain)
// → processing/country=sg/category=news/date=2026-01-28/mom.gov.sg/domain_metadata.json

17. 데이터 처리 플로우 상세 (Detailed Data Flow)

섹션 제목: “17. 데이터 처리 플로우 상세 (Detailed Data Flow)”

17.1 전체 파이프라인 시퀀스 다이어그램

섹션 제목: “17.1 전체 파이프라인 시퀀스 다이어그램”
[Orchestrator]
├─> R2.list() → raw_0001.json, raw_0002.json, ...
├─> SEED_QUEUE.send({ file_path: "datasets/.../raw_0001.json" })
└─> SEED_QUEUE.send({ file_path: "datasets/.../raw_0002.json" })
[File Processor Worker #1]
├─> R2.head(".../raw_0001.json.success") → false
├─> R2.get(".../raw_0001.json") → ReadableStream
├─> JSON.parse() → EnhancedResearchDataset
│ └─> { meta: {...}, records: [{ domain_id: "gov:sg:mom.gov.sg", ... }] }
├─> extractDomainsFromRawFile()
│ └─> [NormalizedDomain, NormalizedDomain, ...]
├─> DOMAIN_QUEUE.send({ domain_id: "gov:sg:mom.gov.sg", ... })
├─> DOMAIN_QUEUE.send({ domain_id: "gov:sg:moh.gov.sg", ... })
└─> R2.put(".../raw_0001.json.success", "")
[Domain Collector Worker #1]
├─> R2.head(".../mom.gov.sg/domain_metadata.json.success") → false
├─> fetch("https://mom.gov.sg/robots.txt") → { status: 200, content: "..." }
│ └─> extractSitemapUrlsFromRobots() → ["https://mom.gov.sg/sitemap.xml"]
├─> fetch("https://mom.gov.sg/sitemap.xml") → { status: 200, content: "<?xml>..." }
│ └─> parseSitemapUrlCount() → 150
├─> DomainMetadata 생성 (Zod 검증)
├─> R2.put(".../mom.gov.sg/robots.txt", content)
├─> R2.put(".../mom.gov.sg/sitemap.xml", content)
├─> R2.put(".../mom.gov.sg/domain_metadata.json", metadata)
└─> R2.put(".../mom.gov.sg/domain_metadata.json.success", "")

Step 1: Raw File → EnhancedResearchDataset

{
"meta": { "dataset_id": "...", "country": "SG", ... },
"records": [
{
"raw_url": "https://www.mom.gov.sg/newsroom",
"domain_id": "gov:sg:mom.gov.sg",
"normalized_domain": "mom.gov.sg",
...
}
]
}

Step 2: EnhancedResearchDataset → NormalizedDomain[]

[
{
input_url: "https://www.mom.gov.sg/newsroom",
hostname: "mom.gov.sg",
registrable_domain: "mom.gov.sg",
authority: "gov",
country: "SG",
domain_id: "gov:sg:mom.gov.sg",
domain_hash: "abc123..."
}
]

Step 3: NormalizedDomain → DomainQueueMessage

{
domain_id: "gov:sg:mom.gov.sg",
domain_url: "https://www.mom.gov.sg/newsroom",
registrable_domain: "mom.gov.sg",
authority: "gov",
partition_info: { country: "sg", category: "news", date: "2026-01-28" },
source_file_path: "datasets/.../raw_0001.json"
}

Step 4: HTTP Fetch → DomainMetadata

{
"domain_id": "gov:sg:mom.gov.sg",
"registrable_domain": "mom.gov.sg",
"country": "SG",
"category": "news",
"collected_at": "2026-01-28T10:30:00Z",
"robots": {
"status_code": 200,
"content_length": 1234,
"exists": true,
"sitemap_urls": ["https://mom.gov.sg/sitemap.xml"]
},
"sitemap": {
"status_code": 200,
"content_length": 5678,
"exists": true,
"url_count": 150,
"url": "https://mom.gov.sg/sitemap.xml"
}
}

문서 버전: 1.1.0
작성일: 2026-01-28
최종 수정일: 2026-01-28
주요 업데이트: 데이터 처리 로직 상세화, 기존 코드 분석 반영, 스키마 정의 추가