AGI Strategy - Day 5
Day 5 Detailed Plan: End-to-End Pipeline Test Link to heading
Context from prior days: You have a generate_completion function from Day 3 and an evaluate_toxicity function from Day 4, each tested independently. Today you combine them into a single pipeline, run it on 50-100 prompts, and produce your first real baseline metrics. This is also the end of Week 1, so the goal is to leave the day with full confidence that the pipeline is ready to run at scale on Day 6.
Session Structure (2-3 hours) Link to heading
Block 1 — Integrate the two components (30-40 min) Link to heading
The core task is writing a pipeline function that calls generate_completion and feeds the result directly into evaluate_toxicity, then packages everything into a structured record.
import json
import time
from pathlib import Path
from generation import generate_completion, MODEL_NAME
from evaluation import evaluate_toxicity
from config import TOXICITY_THRESHOLD, TEMPERATURE, MAX_TOKENS, TOP_P
def run_pipeline_single(
prompt_record: dict,
system_prompt: str = "",
) -> dict:
"""
Run generation + evaluation for a single prompt record.
Returns a result dict ready for serialisation.
"""
prompt_text = prompt_record["prompt"] # adjust key to match your Day 2 schema
completion = generate_completion(
prompt=prompt_text,
system_prompt=system_prompt,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
top_p=TOP_P,
)
toxicity_scores = evaluate_toxicity(completion) if completion else None
return {
"prompt_id": prompt_record.get("id"),
"prompt": prompt_text,
"demographic_group": prompt_record.get("demographic_group"), # from Day 2
"system_prompt": system_prompt if system_prompt else None,
"completion": completion,
"toxicity_scores": toxicity_scores,
"above_threshold": (
toxicity_scores["toxicity"] > TOXICITY_THRESHOLD
if toxicity_scores else None
),
"model": MODEL_NAME,
"temperature": TEMPERATURE,
"max_tokens": MAX_TOKENS,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
A few points on the schema:
- Including
demographic_groupin every record now avoids a painful join later when you do per-group breakdowns in Week 3. - Recording
system_promptin every record — even asNonefor the baseline — means your baseline and mitigation results files share an identical schema, which simplifies comparative analysis on Days 10 and 11. timestampis cheap to include and useful if you need to correlate results with logs later.
Block 2 — Write the batch runner (20-30 min) Link to heading
def run_pipeline_batch(
prompts: list[dict],
output_path: str,
system_prompt: str = "",
resume: bool = True,
) -> None:
"""
Run pipeline over a list of prompt records, saving results incrementally.
If resume=True, skips prompts whose IDs are already in the output file.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Load already-completed IDs if resuming
completed_ids = set()
if resume and output_path.exists():
with open(output_path, "r") as f:
for line in f:
record = json.loads(line)
completed_ids.add(record["prompt_id"])
print(f"Resuming: {len(completed_ids)} records already complete")
with open(output_path, "a") as f:
for i, prompt_record in enumerate(prompts):
if prompt_record.get("id") in completed_ids:
continue
result = run_pipeline_single(prompt_record, system_prompt=system_prompt)
f.write(json.dumps(result) + "\n")
f.flush() # ensure writes are not buffered during long runs
if (i + 1) % 10 == 0:
print(f"[{i + 1}/{len(prompts)}] completed")
The resume logic is worth the extra few lines. If your run is interrupted partway through on Day 6 — due to a crash, a timeout, or you simply needing your machine back — you can restart without losing progress. f.flush() after each write ensures no records are lost to buffering if the process is killed.
Block 3 — Run on 50-100 prompts and time it (45-60 min, partially compute time) Link to heading
Load your Day 2 subset and take a stratified sample of 50-100 prompts, ensuring multiple demographic groups are represented:
import json
import random
random.seed(42) # fix seed for reproducibility
with open("data/subset.jsonl", "r") as f:
all_prompts = [json.loads(line) for line in f]
# Stratified sample: take N per demographic group
from collections import defaultdict
by_group = defaultdict(list)
for p in all_prompts:
by_group[p["demographic_group"]].append(p)
sample = []
per_group = 10 # adjust to hit your 50-100 target
for group, items in by_group.items():
sample.extend(random.sample(items, min(per_group, len(items))))
print(f"Sample size: {len(sample)} across {len(by_group)} groups")
Then run the pipeline and measure wall-clock time:
start = time.time()
run_pipeline_batch(sample, "results/day5_smoke_test.jsonl")
elapsed = time.time() - start
per_prompt = elapsed / len(sample)
print(f"Total time: {elapsed:.1f}s")
print(f"Per prompt: {per_prompt:.2f}s")
print(f"Estimated time for 500 prompts: {500 * per_prompt / 60:.1f} min")
print(f"Estimated time for 1000 prompts: {1000 * per_prompt / 60:.1f} min")
This time estimate is the primary practical output of Day 5. It determines whether you can run the full evaluation in a single session on Day 6 or need to run it overnight.
Block 4 — Calculate baseline metrics (20-25 min) Link to heading
import statistics
records = []
with open("results/day5_smoke_test.jsonl") as f:
for line in f:
records.append(json.loads(line))
# Filter to records with valid scores
scored = [r for r in records if r["toxicity_scores"]]
tox_scores = [r["toxicity_scores"]["toxicity"] for r in scored]
id_attack_scores = [r["toxicity_scores"]["identity_attack"] for r in scored]
print(f"N scored: {len(scored)} / {len(records)}")
print(f"\n--- Toxicity ---")
print(f"Mean: {statistics.mean(tox_scores):.3f}")
print(f"Median: {statistics.median(tox_scores):.3f}")
print(f"Stdev: {statistics.stdev(tox_scores):.3f}")
print(f"Max: {max(tox_scores):.3f}")
print(f"Above threshold: {sum(s > TOXICITY_THRESHOLD for s in tox_scores)}/{len(tox_scores)}")
print(f"\n--- Identity Attack ---")
print(f"Mean: {statistics.mean(id_attack_scores):.3f}")
print(f"Above threshold: {sum(s > TOXICITY_THRESHOLD for s in id_attack_scores)}/{len(id_attack_scores)}")
# Per-group breakdown
from collections import defaultdict
by_group = defaultdict(list)
for r in scored:
by_group[r["demographic_group"]].append(r["toxicity_scores"]["toxicity"])
print(f"\n--- Per Group ---")
for group, scores in sorted(by_group.items()):
print(f"{group:30s} mean={statistics.mean(scores):.3f} n={len(scores)}")
At this stage the sample is too small for statistical conclusions, but you are looking for:
- Whether scores are distributed across the full 0-1 range or clustered
- Whether any demographic groups appear notably higher or lower
- Whether the proportion above threshold is non-trivial — if it is close to zero on the baseline, the mitigation experiment will have little room to show improvement, which is worth flagging early
Block 5 — Bug check and pipeline hardening (15-20 min) Link to heading
Review the output file and look for:
- Any records where
completionisNone— if there are many, revisit your Ollama timeout setting - Any records where
toxicity_scoresisNonebutcompletionis not — indicates an issue in the evaluation function - Unexpected characters or encoding issues in completions
- Consistency of
demographic_groupvalues across records — if there are inconsistent spellings or casing from the Day 2 dataset, normalise them now before you have 1000 records to fix
Also confirm that the output file is valid JSONL throughout — a partially written line from an earlier interrupted test can silently corrupt downstream reads:
with open("results/day5_smoke_test.jsonl") as f:
for i, line in enumerate(f):
try:
json.loads(line)
except json.JSONDecodeError as e:
print(f"Bad line {i}: {e}")
Deliverables Checklist Link to heading
pipeline.py(or equivalent) withrun_pipeline_singleandrun_pipeline_batchresults/day5_smoke_test.jsonlwith 50-100 scored records- Time-per-prompt recorded and extrapolated to 500 and 1000 prompts
- Baseline summary statistics documented in
notes/day5_observations.md - Any bugs or data quality issues identified and resolved
- Decision made on whether to run 500 or 1000 prompts on Day 6, based on the time estimate
Week 1 Checkpoint Link to heading
By the end of today you should be able to answer yes to all of the following:
- Can the pipeline run from a cold start without manual intervention?
- Are results being saved incrementally, and can a failed run be resumed?
- Are all generation and evaluation parameters sourced from
config.pyrather than hard-coded? - Is the output schema sufficient to support per-group analysis and baseline vs. mitigation comparison?
If any of these are no, resolve them today rather than carrying the issue into the full evaluation runs next week.