Bulk Actions
Parallel processing system using Cloudflare Queues for large-scale company operations.
Architecture
graph LR
A[User selects companies] --> B[API creates bulk_action]
B --> C[Split into batches]
C --> D[Enqueue messages]
D --> E[AI Worker processes]
E --> F[Update progress]Configuration
Queue Name: supershyft-company-jobsDead-Letter Queue: supershyft-company-jobs-dlq (after 3 retries)
Message Structure
interface CompanyJobMessage {
bulk_action_id: string;
action_type: 'generate_embeddings' | 'enrich_companies' | 'score_llm_adoption';
company_ids: string[];
batch_number: number;
total_batches: number;
created_by: string; // user email
}Batch Sizing
Action-specific batch sizes for optimal performance:
| Action | Batch Size | Rate Limit |
|---|---|---|
| Generate Embeddings | 10 | Workers AI throughput |
| Enrich Companies | 5 | Parallel.ai 1hr timeout |
| LLM Scoring | 3 | GROQ 30 req/min (10 calls/company) |
Retry Configuration
- Max retries: 3 (queue-level)
- Exponential backoff: Managed by Cloudflare Queues
- Application retries: Parallel.ai only (2 retries, 500ms base backoff)
- Progress updates: Every 5 batches to reduce D1 write contention
Supported Actions
1. Generate Embeddings
Creates vector embeddings for semantic company search.
Action Type: generate_embeddings
Flow:
- Fetch company data (name, industry, activity, country)
- Build text:
"Company: {name}. Industry: {industry}. Business: {activity}. Location: {country}." - Generate embedding via Workers AI
@cf/baai/bge-small-en-v1.5 - Upsert to Vectorize with metadata
- Skip if embedding already exists (idempotent)
Configuration:
- Batch: 10 companies
- Model: BGE Small English v1.5 (384 dimensions)
- Index:
supershyft-companies
2. Enrich Companies
Web scraping + AI extraction via Parallel.ai.
Action Type: enrich_companies
Flow:
- Create Parallel.ai task run with company website
- Wait for scrape completion (timeout: 1hr)
- Extract: products, ownership, activity, contacts, outreach emails (EN + NL)
- Store raw scrape:
R2:/companies/{id}/parallel-scrape.json - Merge into profile:
R2:/companies/{id}/profile.json - Update D1: activity_description, ownership_type, enrichment_status
Configuration:
- Batch: 5 companies
- Retries: 2 (exponential backoff: 500ms base)
- API timeout: 3600s (1 hour)
3. LLM Scoring
Evaluates LLM adoption potential across 10 criteria.
Action Type: score_llm_adoption
Flow:
- Fetch company data (name, NACE code, activity description)
- Score each criterion via GROQ API (10 sequential calls)
- Map score levels: Low=0, Medium=1, High=3, Very High=5
- Calculate overall score (0-50 total)
- Store all 11 scores in D1
companiestable
Configuration:
- Batch: 3 companies (10 API calls × 3 = 30 requests)
- Model:
openai/gpt-oss-120b(configurable viaGROQ_MODEL) - Temperature: 0.1, Max tokens: 500
- Timeout: 30s per criterion
Implementation
Create Bulk Action
apps/api-worker/src/routes/bulk-actions.ts
POST /api/bulk-actions
{
"action_type": "generate_embeddings",
"company_ids": ["cmp_1", "cmp_2"],
"priority_override": 80
}Process:
- Create
bulk_actionrecord in D1 - Split into action-specific batches (10/5/3)
- Send messages to queue
- Log activity
- Return
bulk_action_id
AI Worker Processing
apps/ai-worker/src/index.ts
export default {
async queue(batch: MessageBatch, env: Env) {
for (const message of batch.messages) {
await processCompanyJob(message.body, env);
message.ack();
}
}
}Actions:
- Route to appropriate handler (embeddings/enrichment/scoring)
- Process company data
- Store results (Vectorize/R2/D1)
- Update bulk_action progress
- Retry on failure (3x with backoff)
Progress Tracking
PATCH /api/bulk-actions/:id
{
"processed_companies": 67,
"failed_companies": 2,
"status": "running"
}Updates every batch. Status transitions:
pending→running→completedpending→running→failed(if errors exceed threshold)
Error Handling
Retry Strategy:
- Max retries: 3 (queue-level)
- Backoff: Exponential (managed by Cloudflare Queues)
- Application retries: Parallel.ai only (2 retries, 500ms base backoff)
- Failed batches logged in
error_summary
Partial Failures:
- Track per-company failures
- Continue processing successful items
- Update
failed_companiescount
Dead-Letter Queue:
- Failed messages after 3 retries go to
supershyft-company-jobs-dlq - Manual review and reprocessing required
Monitoring
Status Endpoint
GET /api/bulk-actions/:idResponse:
{
"data": {
"id": "ba_abc123",
"status": "running",
"total_companies": 100,
"processed_companies": 67,
"failed_companies": 2,
"created_at": "2025-01-15T10:00:00Z"
}
}List User's Actions
GET /api/bulk-actionsReturns recent bulk actions for authenticated user.
Cost Optimization
Batching: Reduces queue message count (action-specific sizes) Progress Updates: Batched every 5 iterations to reduce D1 writes Retries: Exponential backoff reduces failed request cost Idempotency: Embeddings skip existing vectors
Outstanding Features
- [ ] Frontend progress UI with real-time updates
- [ ]
update_statusaction handler - [ ]
export_csvaction handler - [ ] DLQ consumer for failed message reprocessing
- [ ] Circuit breaker for external APIs
- [ ] Cost tracking and budgeting
- [ ] Pause/resume capability