AGI Strategy - Day 5

Day 5 Detailed Plan: End-to-End Pipeline Test Link to heading

Context from prior days: You have a generate_completion function from Day 3 and an evaluate_toxicity function from Day 4, each tested independently. Today you combine them into a single pipeline, run it on 50-100 prompts, and produce your first real baseline metrics. This is also the end of Week 1, so the goal is to leave the day with full confidence that the pipeline is ready to run at scale on Day 6.


Session Structure (2-3 hours) Link to heading


Block 1 — Integrate the two components (30-40 min) Link to heading

The core task is writing a pipeline function that calls generate_completion and feeds the result directly into evaluate_toxicity, then packages everything into a structured record.

import json
import time
from pathlib import Path
from generation import generate_completion, MODEL_NAME
from evaluation import evaluate_toxicity
from config import TOXICITY_THRESHOLD, TEMPERATURE, MAX_TOKENS, TOP_P

def run_pipeline_single(
    prompt_record: dict,
    system_prompt: str = "",
) -> dict:
    """
    Run generation + evaluation for a single prompt record.
    Returns a result dict ready for serialisation.
    """
    prompt_text = prompt_record["prompt"]  # adjust key to match your Day 2 schema

    completion = generate_completion(
        prompt=prompt_text,
        system_prompt=system_prompt,
        temperature=TEMPERATURE,
        max_tokens=MAX_TOKENS,
        top_p=TOP_P,
    )

    toxicity_scores = evaluate_toxicity(completion) if completion else None

    return {
        "prompt_id": prompt_record.get("id"),
        "prompt": prompt_text,
        "demographic_group": prompt_record.get("demographic_group"),  # from Day 2
        "system_prompt": system_prompt if system_prompt else None,
        "completion": completion,
        "toxicity_scores": toxicity_scores,
        "above_threshold": (
            toxicity_scores["toxicity"] > TOXICITY_THRESHOLD
            if toxicity_scores else None
        ),
        "model": MODEL_NAME,
        "temperature": TEMPERATURE,
        "max_tokens": MAX_TOKENS,
        "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
    }

A few points on the schema:

  • Including demographic_group in every record now avoids a painful join later when you do per-group breakdowns in Week 3.
  • Recording system_prompt in every record — even as None for the baseline — means your baseline and mitigation results files share an identical schema, which simplifies comparative analysis on Days 10 and 11.
  • timestamp is cheap to include and useful if you need to correlate results with logs later.

Block 2 — Write the batch runner (20-30 min) Link to heading

def run_pipeline_batch(
    prompts: list[dict],
    output_path: str,
    system_prompt: str = "",
    resume: bool = True,
) -> None:
    """
    Run pipeline over a list of prompt records, saving results incrementally.
    If resume=True, skips prompts whose IDs are already in the output file.
    """
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Load already-completed IDs if resuming
    completed_ids = set()
    if resume and output_path.exists():
        with open(output_path, "r") as f:
            for line in f:
                record = json.loads(line)
                completed_ids.add(record["prompt_id"])
        print(f"Resuming: {len(completed_ids)} records already complete")

    with open(output_path, "a") as f:
        for i, prompt_record in enumerate(prompts):
            if prompt_record.get("id") in completed_ids:
                continue

            result = run_pipeline_single(prompt_record, system_prompt=system_prompt)
            f.write(json.dumps(result) + "\n")
            f.flush()  # ensure writes are not buffered during long runs

            if (i + 1) % 10 == 0:
                print(f"[{i + 1}/{len(prompts)}] completed")

The resume logic is worth the extra few lines. If your run is interrupted partway through on Day 6 — due to a crash, a timeout, or you simply needing your machine back — you can restart without losing progress. f.flush() after each write ensures no records are lost to buffering if the process is killed.


Block 3 — Run on 50-100 prompts and time it (45-60 min, partially compute time) Link to heading

Load your Day 2 subset and take a stratified sample of 50-100 prompts, ensuring multiple demographic groups are represented:

import json
import random

random.seed(42)  # fix seed for reproducibility

with open("data/subset.jsonl", "r") as f:
    all_prompts = [json.loads(line) for line in f]

# Stratified sample: take N per demographic group
from collections import defaultdict

by_group = defaultdict(list)
for p in all_prompts:
    by_group[p["demographic_group"]].append(p)

sample = []
per_group = 10  # adjust to hit your 50-100 target
for group, items in by_group.items():
    sample.extend(random.sample(items, min(per_group, len(items))))

print(f"Sample size: {len(sample)} across {len(by_group)} groups")

Then run the pipeline and measure wall-clock time:

start = time.time()
run_pipeline_batch(sample, "results/day5_smoke_test.jsonl")
elapsed = time.time() - start

per_prompt = elapsed / len(sample)
print(f"Total time: {elapsed:.1f}s")
print(f"Per prompt: {per_prompt:.2f}s")
print(f"Estimated time for 500 prompts: {500 * per_prompt / 60:.1f} min")
print(f"Estimated time for 1000 prompts: {1000 * per_prompt / 60:.1f} min")

This time estimate is the primary practical output of Day 5. It determines whether you can run the full evaluation in a single session on Day 6 or need to run it overnight.


Block 4 — Calculate baseline metrics (20-25 min) Link to heading

import statistics

records = []
with open("results/day5_smoke_test.jsonl") as f:
    for line in f:
        records.append(json.loads(line))

# Filter to records with valid scores
scored = [r for r in records if r["toxicity_scores"]]
tox_scores = [r["toxicity_scores"]["toxicity"] for r in scored]
id_attack_scores = [r["toxicity_scores"]["identity_attack"] for r in scored]

print(f"N scored: {len(scored)} / {len(records)}")
print(f"\n--- Toxicity ---")
print(f"Mean:   {statistics.mean(tox_scores):.3f}")
print(f"Median: {statistics.median(tox_scores):.3f}")
print(f"Stdev:  {statistics.stdev(tox_scores):.3f}")
print(f"Max:    {max(tox_scores):.3f}")
print(f"Above threshold: {sum(s > TOXICITY_THRESHOLD for s in tox_scores)}/{len(tox_scores)}")

print(f"\n--- Identity Attack ---")
print(f"Mean:   {statistics.mean(id_attack_scores):.3f}")
print(f"Above threshold: {sum(s > TOXICITY_THRESHOLD for s in id_attack_scores)}/{len(id_attack_scores)}")

# Per-group breakdown
from collections import defaultdict
by_group = defaultdict(list)
for r in scored:
    by_group[r["demographic_group"]].append(r["toxicity_scores"]["toxicity"])

print(f"\n--- Per Group ---")
for group, scores in sorted(by_group.items()):
    print(f"{group:30s}  mean={statistics.mean(scores):.3f}  n={len(scores)}")

At this stage the sample is too small for statistical conclusions, but you are looking for:

  • Whether scores are distributed across the full 0-1 range or clustered
  • Whether any demographic groups appear notably higher or lower
  • Whether the proportion above threshold is non-trivial — if it is close to zero on the baseline, the mitigation experiment will have little room to show improvement, which is worth flagging early

Block 5 — Bug check and pipeline hardening (15-20 min) Link to heading

Review the output file and look for:

  • Any records where completion is None — if there are many, revisit your Ollama timeout setting
  • Any records where toxicity_scores is None but completion is not — indicates an issue in the evaluation function
  • Unexpected characters or encoding issues in completions
  • Consistency of demographic_group values across records — if there are inconsistent spellings or casing from the Day 2 dataset, normalise them now before you have 1000 records to fix

Also confirm that the output file is valid JSONL throughout — a partially written line from an earlier interrupted test can silently corrupt downstream reads:

with open("results/day5_smoke_test.jsonl") as f:
    for i, line in enumerate(f):
        try:
            json.loads(line)
        except json.JSONDecodeError as e:
            print(f"Bad line {i}: {e}")

Deliverables Checklist Link to heading

  • pipeline.py (or equivalent) with run_pipeline_single and run_pipeline_batch
  • results/day5_smoke_test.jsonl with 50-100 scored records
  • Time-per-prompt recorded and extrapolated to 500 and 1000 prompts
  • Baseline summary statistics documented in notes/day5_observations.md
  • Any bugs or data quality issues identified and resolved
  • Decision made on whether to run 500 or 1000 prompts on Day 6, based on the time estimate

Week 1 Checkpoint Link to heading

By the end of today you should be able to answer yes to all of the following:

  • Can the pipeline run from a cold start without manual intervention?
  • Are results being saved incrementally, and can a failed run be resumed?
  • Are all generation and evaluation parameters sourced from config.py rather than hard-coded?
  • Is the output schema sufficient to support per-group analysis and baseline vs. mitigation comparison?

If any of these are no, resolve them today rather than carrying the issue into the full evaluation runs next week.