if (event.type === "dataset-run") {
const data = event.data;
const frontmatter = getFrontMatter(data.prompt) as any;
const runId = crypto.randomUUID();
if (frontmatter.text_config) {
const prompt = await agentmarkClient.loadTextPrompt(data.prompt);
const dataset = await prompt.formatWithDataset({
datasetPath: frontmatter?.test_settings?.dataset,
telemetry: { isEnabled: true },
});
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
let index = 0;
for await (const item of dataset) {
if (item.type === "error") {
controller.enqueue(
encoder.encode(
JSON.stringify({
error: item.error,
type: "error",
}) + "\n"
)
);
controller.close();
return;
}
const traceId = crypto.randomUUID();
const result = await generateText({
...item.formatted,
experimental_telemetry: {
...item.formatted.experimental_telemetry,
metadata: {
...item.formatted.experimental_telemetry?.metadata,
dataset_run_id: runId,
dataset_path: frontmatter?.test_settings?.dataset,
dataset_run_name: data.datasetRunName,
dataset_item_name: index,
traceName: `ds-run-${data.datasetRunName}-${index}`,
traceId,
dataset_expected_output: item.dataset.expected_output,
},
},
});
// Optional: Run evaluations if evaluators are configured
let evalResults: any = [];
if (evalRegistry && item.evals) {
const evaluators = item.evals
.map((evaluator: string) => {
const evalFn = evalRegistry.get(evaluator);
if (evalFn) {
return {
name: evaluator,
fn: evalFn,
};
}
})
.filter((evaluator) => evaluator !== undefined);
evalResults = await Promise.all(
evaluators.map(async (evaluator) => {
const evalResult = await evaluator.fn({
input: item.formatted.messages,
output: result.text,
expectedOutput: item.dataset.expected_output,
});
// Optional: Score the evaluation if AgentMarkSDK is available
agentmarkSDK?.score({
resourceId: traceId,
label: evalResult.label,
reason: evalResult.reason,
score: evalResult.score,
name: evaluator.name,
});
return {
name: evaluator.name,
...evalResult,
};
})
);
}
const chunk = encoder.encode(
JSON.stringify({
type: "dataset",
result: {
input: item.dataset.input,
expectedOutput: item.dataset.expected_output,
actualOutput: result.text,
tokens: result.usage?.totalTokens,
evals: evalResults,
},
runId,
runName: data.datasetRunName,
}) + "\n"
);
controller.enqueue(chunk);
index++;
}
controller.close();
},
});
return new Response(stream, {
headers: {
"AgentMark-Streaming": "true",
},
});
}
if (frontmatter.object_config) {
const prompt = await agentmarkClient.loadObjectPrompt(data.prompt);
const dataset = await prompt.formatWithDataset({
datasetPath: frontmatter?.test_settings?.dataset,
telemetry: { isEnabled: true },
});
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
let index = 0;
for await (const item of dataset) {
if (item.type === "error") {
controller.enqueue(
encoder.encode(
JSON.stringify({
error: item.error,
type: "error",
}) + "\n"
)
);
controller.close();
return;
}
const traceId = crypto.randomUUID();
const result = await generateObject({
...item.formatted,
experimental_telemetry: {
...item.formatted.experimental_telemetry,
metadata: {
...item.formatted.experimental_telemetry?.metadata,
dataset_run_id: runId,
dataset_path: frontmatter?.test_settings?.dataset,
dataset_run_name: data.datasetRunName,
dataset_item_name: index,
traceName: `ds-run-${data.datasetRunName}-${index}`,
traceId,
dataset_expected_output: item.dataset.expected_output,
},
},
});
// Optional: Run evaluations if evaluators are configured
let evalResults: any = [];
if (evalRegistry && item.evals) {
const evaluators = item.evals
.map((evaluator: string) => {
const evalFn = evalRegistry.get(evaluator);
if (evalFn) {
return {
name: evaluator,
fn: evalFn,
};
}
})
.filter((evaluator) => evaluator !== undefined);
evalResults = await Promise.all(
evaluators.map(async (evaluator) => {
const evalResult = await evaluator.fn({
input: item.formatted.messages,
output: result.object,
expectedOutput: item.dataset.expected_output,
});
// Optional: Score the evaluation if AgentMarkSDK is available
agentmarkSDK?.score({
resourceId: traceId,
label: evalResult.label,
reason: evalResult.reason,
score: evalResult.score,
name: evaluator.name,
});
return {
name: evaluator.name,
...evalResult,
};
})
);
}
const chunk = encoder.encode(
JSON.stringify({
type: "dataset",
result: {
input: item.dataset.input,
expectedOutput: item.dataset.expected_output,
actualOutput: result.object,
tokens: result.usage?.totalTokens,
evals: evalResults,
},
runId,
runName: data.datasetRunName,
}) + "\n"
);
controller.enqueue(chunk);
index++;
}
controller.close();
},
});
return new Response(stream, {
headers: {
"AgentMark-Streaming": "true",
},
});
}
}