Skip to content

Bulk Actions

Parallel processing system using Cloudflare Queues for large-scale company operations.

Architecture

mermaid
graph LR
    A[User selects companies] --> B[API creates bulk_action]
    B --> C[Split into batches]
    C --> D[Enqueue messages]
    D --> E[AI Worker processes]
    E --> F[Update progress]

Configuration

Queue Name: supershyft-company-jobsDead-Letter Queue: supershyft-company-jobs-dlq (after 3 retries)

Message Structure

typescript
interface CompanyJobMessage {
  bulk_action_id: string;
  action_type: 'generate_embeddings' | 'enrich_companies' | 'score_llm_adoption';
  company_ids: string[];
  batch_number: number;
  total_batches: number;
  created_by: string;  // user email
}

Batch Sizing

Action-specific batch sizes for optimal performance:

ActionBatch SizeRate Limit
Generate Embeddings10Workers AI throughput
Enrich Companies5Parallel.ai 1hr timeout
LLM Scoring3GROQ 30 req/min (10 calls/company)

Retry Configuration

  • Max retries: 3 (queue-level)
  • Exponential backoff: Managed by Cloudflare Queues
  • Application retries: Parallel.ai only (2 retries, 500ms base backoff)
  • Progress updates: Every 5 batches to reduce D1 write contention

Supported Actions

1. Generate Embeddings

Creates vector embeddings for semantic company search.

Action Type: generate_embeddings

Flow:

  1. Fetch company data (name, industry, activity, country)
  2. Build text: "Company: {name}. Industry: {industry}. Business: {activity}. Location: {country}."
  3. Generate embedding via Workers AI @cf/baai/bge-small-en-v1.5
  4. Upsert to Vectorize with metadata
  5. Skip if embedding already exists (idempotent)

Configuration:

  • Batch: 10 companies
  • Model: BGE Small English v1.5 (384 dimensions)
  • Index: supershyft-companies

2. Enrich Companies

Web scraping + AI extraction via Parallel.ai.

Action Type: enrich_companies

Flow:

  1. Create Parallel.ai task run with company website
  2. Wait for scrape completion (timeout: 1hr)
  3. Extract: products, ownership, activity, contacts, outreach emails (EN + NL)
  4. Store raw scrape: R2:/companies/{id}/parallel-scrape.json
  5. Merge into profile: R2:/companies/{id}/profile.json
  6. Update D1: activity_description, ownership_type, enrichment_status

Configuration:

  • Batch: 5 companies
  • Retries: 2 (exponential backoff: 500ms base)
  • API timeout: 3600s (1 hour)

3. LLM Scoring

Evaluates LLM adoption potential across 10 criteria.

Action Type: score_llm_adoption

Flow:

  1. Fetch company data (name, NACE code, activity description)
  2. Score each criterion via GROQ API (10 sequential calls)
  3. Map score levels: Low=0, Medium=1, High=3, Very High=5
  4. Calculate overall score (0-50 total)
  5. Store all 11 scores in D1 companies table

Configuration:

  • Batch: 3 companies (10 API calls × 3 = 30 requests)
  • Model: openai/gpt-oss-120b (configurable via GROQ_MODEL)
  • Temperature: 0.1, Max tokens: 500
  • Timeout: 30s per criterion

Implementation

Create Bulk Action

apps/api-worker/src/routes/bulk-actions.ts

typescript
POST /api/bulk-actions
{
  "action_type": "generate_embeddings",
  "company_ids": ["cmp_1", "cmp_2"],
  "priority_override": 80
}

Process:

  1. Create bulk_action record in D1
  2. Split into action-specific batches (10/5/3)
  3. Send messages to queue
  4. Log activity
  5. Return bulk_action_id

AI Worker Processing

apps/ai-worker/src/index.ts

typescript
export default {
  async queue(batch: MessageBatch, env: Env) {
    for (const message of batch.messages) {
      await processCompanyJob(message.body, env);
      message.ack();
    }
  }
}

Actions:

  • Route to appropriate handler (embeddings/enrichment/scoring)
  • Process company data
  • Store results (Vectorize/R2/D1)
  • Update bulk_action progress
  • Retry on failure (3x with backoff)

Progress Tracking

typescript
PATCH /api/bulk-actions/:id
{
  "processed_companies": 67,
  "failed_companies": 2,
  "status": "running"
}

Updates every batch. Status transitions:

  • pendingrunningcompleted
  • pendingrunningfailed (if errors exceed threshold)

Error Handling

Retry Strategy:

  • Max retries: 3 (queue-level)
  • Backoff: Exponential (managed by Cloudflare Queues)
  • Application retries: Parallel.ai only (2 retries, 500ms base backoff)
  • Failed batches logged in error_summary

Partial Failures:

  • Track per-company failures
  • Continue processing successful items
  • Update failed_companies count

Dead-Letter Queue:

  • Failed messages after 3 retries go to supershyft-company-jobs-dlq
  • Manual review and reprocessing required

Monitoring

Status Endpoint

bash
GET /api/bulk-actions/:id

Response:

json
{
  "data": {
    "id": "ba_abc123",
    "status": "running",
    "total_companies": 100,
    "processed_companies": 67,
    "failed_companies": 2,
    "created_at": "2025-01-15T10:00:00Z"
  }
}

List User's Actions

bash
GET /api/bulk-actions

Returns recent bulk actions for authenticated user.

Cost Optimization

Batching: Reduces queue message count (action-specific sizes) Progress Updates: Batched every 5 iterations to reduce D1 writes Retries: Exponential backoff reduces failed request cost Idempotency: Embeddings skip existing vectors

Outstanding Features

  • [ ] Frontend progress UI with real-time updates
  • [ ] update_status action handler
  • [ ] export_csv action handler
  • [ ] DLQ consumer for failed message reprocessing
  • [ ] Circuit breaker for external APIs
  • [ ] Cost tracking and budgeting
  • [ ] Pause/resume capability

Built for Supershyft Capital