Skip to content

AI Worker

Background job processor for AI-powered company enrichment and analysis.

Overview

The AI Worker consumes messages from Cloudflare Queue supershyft-company-jobs to perform:

  • Web scraping + AI extraction via Parallel.ai
  • LLM-based scoring via GROQ (10 criteria evaluation)
  • Semantic embeddings via Workers AI

Results stored in D1, R2, and Vectorize with progress tracking in bulk_actions table.

Architecture

API Worker → Queue → AI Worker (parallel) → D1/R2/Vectorize

bulk_actions (progress every 5 batches)

activity_log (successful + failed operations with correlation IDs)

Actions

AI Actions

Generate Embeddings

Create semantic search vectors for company discovery.

Input: Company ID list Process:

  1. Fetch company data (name, industry, activity, country)
  2. Build text representation
  3. Generate embedding via Workers AI @cf/baai/bge-small-en-v1.5
  4. Upsert to Vectorize index with metadata
  5. Skip if exists (idempotent)

Config: Batch size 10, BGE Small v1.5 (384 dims)

Enrich Companies

Web scraping + AI extraction via Parallel.ai.

Input: Company ID list Process:

  1. Create Parallel.ai task run (1hr timeout)
  2. Extract: products, ownership, activity, contacts, outreach emails
  3. Store raw: R2:/companies/{id}/parallel-scrape.json
  4. Merge: R2:/companies/{id}/profile.json
  5. Update D1: activity_description, ownership_type, enrichment_status

Config: Batch size 5, 2 retries with 500ms backoff

Score LLM Adoption

Evaluate LLM adoption potential (10 criteria, 0-50 points).

Input: Company ID list Process:

  1. Fetch company (name, NACE, activity)
  2. Score 10 criteria via GROQ (sequential)
  3. Map: Low=0, Medium=1, High=3, Very High=5
  4. Store all 11 scores in D1

Criteria:

  1. Knowledge-Intensive Workflows
  2. Repetitive Document Processes
  3. High Labor Cost in White-Collar Functions
  4. Large Proprietary Data Pools
  5. Customer Interaction Intensity
  6. Fragmented Market with Legacy Practices
  7. Complex Knowledge Distribution
  8. Digital Infrastructure Baseline
  9. Automation Scalability Potential
  10. Regulatory/Transparency Pressures

Config: Batch size 3, GROQ openai/gpt-oss-120b, 30s timeout/criterion

CRUD Actions

All operational as of 2025-10-20:

  • Update Status - Bulk company status updates (batch size 100)
  • Delete Companies - Full deletion with R2 + Vectorize cleanup (batch size 50)
  • Assign Thesis - Bulk thesis assignment (batch size 100)
  • Archive Companies - Soft delete (set status to 'passed', batch size 100)
  • Add to Dealflow - Create dealflow records (batch size 50)

Test Endpoints

Available in development mode only (ENVIRONMENT=development|local):

POST /api/ai/test-embeddings

json
{ "company_ids": ["cmp_abc123"] }

POST /api/ai/test-parallel

json
{ "company_ids": ["cmp_abc123"] }

POST /api/ai/test-scoring

json
{ "company_ids": ["cmp_abc123"] }

Error Handling

Queue Retries:

  • Max: 3 with exponential backoff
  • DLQ: supershyft-company-jobs-dlq

Application Retries:

  • Parallel.ai only: 2 retries, 500ms base

Error Tracking:

  • Per-company failures in bulk_actions.error_summary
  • Activity log for both successful and failed operations (with _failed suffix)
  • Correlation IDs: {bulk_action_id}-b{batch_number} for tracing
  • Structured logging via aiLogger with correlation tracking

Configuration

Environment Variables:

  • PARALLEL_API_KEY - Parallel.ai API key
  • GROQ_API_KEY - GROQ API key
  • GROQ_MODEL - Model name (optional, default: openai/gpt-oss-120b)
  • ENVIRONMENT - Environment name (development/production)

Bindings:

  • DB - D1 database
  • R2_COMPANY_PROFILES - R2 bucket
  • VECTORIZE - Vectorize index
  • AI - Workers AI

Monitoring

Logs:

bash
wrangler tail
wrangler tail --format=json | jq 'select(.level=="error")'

Progress:

sql
SELECT id, action_type, status,
  processed_companies, failed_companies, total_companies,
  (processed_companies + failed_companies) * 100.0 / total_companies as progress_pct
FROM bulk_actions
WHERE status = 'processing'
ORDER BY created_at DESC;

Development

Run locally:

bash
pnpm dev:ai

Deploy:

bash
pnpm deploy:ai

Test scoring:

bash
curl -X POST http://127.0.0.1:8788/api/ai/test-scoring \
  -H "Content-Type: application/json" \
  -d '{"company_ids":["cmp_abc123"]}'

Recent Updates

2025-10-20 - Observability & CRUD Operations

  • ✅ Standardized timestamp handling (getUnixTimestamp(), getMillisTimestamp())
  • ✅ Added correlation IDs for end-to-end tracing
  • ✅ Failed operations now logged to activity_log
  • ✅ All CRUD bulk actions operational

Outstanding

  • [ ] export_csv handler (CSV file generation)
  • [ ] DLQ consumer (reprocess failed messages)
  • [ ] Circuit breaker for external APIs
  • [ ] LLM-based ownership classification
  • [ ] Cost tracking and budgeting
  • [ ] Performance metrics (operation duration)

Built for Supershyft Capital