AI Worker
Background job processor for AI-powered company enrichment and analysis.
Overview
The AI Worker consumes messages from Cloudflare Queue supershyft-company-jobs to perform:
- Web scraping + AI extraction via Parallel.ai
- LLM-based scoring via GROQ (10 criteria evaluation)
- Semantic embeddings via Workers AI
Results stored in D1, R2, and Vectorize with progress tracking in bulk_actions table.
Architecture
API Worker → Queue → AI Worker (parallel) → D1/R2/Vectorize
↓
bulk_actions (progress every 5 batches)
↓
activity_log (successful + failed operations with correlation IDs)Actions
AI Actions
Generate Embeddings
Create semantic search vectors for company discovery.
Input: Company ID list Process:
- Fetch company data (name, industry, activity, country)
- Build text representation
- Generate embedding via Workers AI
@cf/baai/bge-small-en-v1.5 - Upsert to Vectorize index with metadata
- Skip if exists (idempotent)
Config: Batch size 10, BGE Small v1.5 (384 dims)
Enrich Companies
Web scraping + AI extraction via Parallel.ai.
Input: Company ID list Process:
- Create Parallel.ai task run (1hr timeout)
- Extract: products, ownership, activity, contacts, outreach emails
- Store raw:
R2:/companies/{id}/parallel-scrape.json - Merge:
R2:/companies/{id}/profile.json - Update D1: activity_description, ownership_type, enrichment_status
Config: Batch size 5, 2 retries with 500ms backoff
Score LLM Adoption
Evaluate LLM adoption potential (10 criteria, 0-50 points).
Input: Company ID list Process:
- Fetch company (name, NACE, activity)
- Score 10 criteria via GROQ (sequential)
- Map: Low=0, Medium=1, High=3, Very High=5
- Store all 11 scores in D1
Criteria:
- Knowledge-Intensive Workflows
- Repetitive Document Processes
- High Labor Cost in White-Collar Functions
- Large Proprietary Data Pools
- Customer Interaction Intensity
- Fragmented Market with Legacy Practices
- Complex Knowledge Distribution
- Digital Infrastructure Baseline
- Automation Scalability Potential
- Regulatory/Transparency Pressures
Config: Batch size 3, GROQ openai/gpt-oss-120b, 30s timeout/criterion
CRUD Actions
All operational as of 2025-10-20:
- Update Status - Bulk company status updates (batch size 100)
- Delete Companies - Full deletion with R2 + Vectorize cleanup (batch size 50)
- Assign Thesis - Bulk thesis assignment (batch size 100)
- Archive Companies - Soft delete (set status to 'passed', batch size 100)
- Add to Dealflow - Create dealflow records (batch size 50)
Test Endpoints
Available in development mode only (ENVIRONMENT=development|local):
POST /api/ai/test-embeddings
{ "company_ids": ["cmp_abc123"] }POST /api/ai/test-parallel
{ "company_ids": ["cmp_abc123"] }POST /api/ai/test-scoring
{ "company_ids": ["cmp_abc123"] }Error Handling
Queue Retries:
- Max: 3 with exponential backoff
- DLQ:
supershyft-company-jobs-dlq
Application Retries:
- Parallel.ai only: 2 retries, 500ms base
Error Tracking:
- Per-company failures in
bulk_actions.error_summary - Activity log for both successful and failed operations (with
_failedsuffix) - Correlation IDs:
{bulk_action_id}-b{batch_number}for tracing - Structured logging via
aiLoggerwith correlation tracking
Configuration
Environment Variables:
PARALLEL_API_KEY- Parallel.ai API keyGROQ_API_KEY- GROQ API keyGROQ_MODEL- Model name (optional, default:openai/gpt-oss-120b)ENVIRONMENT- Environment name (development/production)
Bindings:
DB- D1 databaseR2_COMPANY_PROFILES- R2 bucketVECTORIZE- Vectorize indexAI- Workers AI
Monitoring
Logs:
wrangler tail
wrangler tail --format=json | jq 'select(.level=="error")'Progress:
SELECT id, action_type, status,
processed_companies, failed_companies, total_companies,
(processed_companies + failed_companies) * 100.0 / total_companies as progress_pct
FROM bulk_actions
WHERE status = 'processing'
ORDER BY created_at DESC;Development
Run locally:
pnpm dev:aiDeploy:
pnpm deploy:aiTest scoring:
curl -X POST http://127.0.0.1:8788/api/ai/test-scoring \
-H "Content-Type: application/json" \
-d '{"company_ids":["cmp_abc123"]}'Recent Updates
2025-10-20 - Observability & CRUD Operations
- ✅ Standardized timestamp handling (
getUnixTimestamp(),getMillisTimestamp()) - ✅ Added correlation IDs for end-to-end tracing
- ✅ Failed operations now logged to activity_log
- ✅ All CRUD bulk actions operational
Outstanding
- [ ]
export_csvhandler (CSV file generation) - [ ] DLQ consumer (reprocess failed messages)
- [ ] Circuit breaker for external APIs
- [ ] LLM-based ownership classification
- [ ] Cost tracking and budgeting
- [ ] Performance metrics (operation duration)