Skip to main content
When you run a dataset in the AgentMark platform, it sends a dataset-run event to your webhook endpoint. This event contains the dataset items and prompt configuration for processing. Dataset Run Button Highlighted

Event Format

{
  "event": {
    "type": "dataset-run",
    "data": {
      "datasetRunName": "string",
      "prompt": "// Prompt AST object"
    }
  }
}

Processing Dataset Runs

The webhook handler processes dataset runs by executing the prompt for each item in the dataset:
if (event.type === "dataset-run") {
  const data = event.data;
  const frontmatter = getFrontMatter(data.prompt) as any;
  const runId = crypto.randomUUID();

  if (frontmatter.text_config) {
    const prompt = await agentmarkClient.loadTextPrompt(data.prompt);
    const dataset = await prompt.formatWithDataset({
      datasetPath: frontmatter?.test_settings?.dataset,
      telemetry: { isEnabled: true },
    });

    const stream = new ReadableStream({
      async start(controller) {
        const encoder = new TextEncoder();
        let index = 0;
        for await (const item of dataset) {
          if (item.type === "error") {
            controller.enqueue(
              encoder.encode(
                JSON.stringify({
                  error: item.error,
                  type: "error",
                }) + "\n"
              )
            );
            controller.close();
            return;
          }

          const traceId = crypto.randomUUID();
          const result = await generateText({
            ...item.formatted,
            experimental_telemetry: {
              ...item.formatted.experimental_telemetry,
              metadata: {
                ...item.formatted.experimental_telemetry?.metadata,
                dataset_run_id: runId,
                dataset_path: frontmatter?.test_settings?.dataset,
                dataset_run_name: data.datasetRunName,
                dataset_item_name: index,
                traceName: `ds-run-${data.datasetRunName}-${index}`,
                traceId,
                dataset_expected_output: item.dataset.expected_output,
              },
            },
          });

          // Optional: Run evaluations if evaluators are configured
          let evalResults: any = [];
          if (evalRegistry && item.evals) {
            const evaluators = item.evals
              .map((evaluator: string) => {
                const evalFn = evalRegistry.get(evaluator);
                if (evalFn) {
                  return {
                    name: evaluator,
                    fn: evalFn,
                  };
                }
              })
              .filter((evaluator) => evaluator !== undefined);

            evalResults = await Promise.all(
              evaluators.map(async (evaluator) => {
                const evalResult = await evaluator.fn({
                  input: item.formatted.messages,
                  output: result.text,
                  expectedOutput: item.dataset.expected_output,
                });

                // Optional: Score the evaluation if AgentMarkSDK is available
                agentmarkSDK?.score({
                  resourceId: traceId,
                  label: evalResult.label,
                  reason: evalResult.reason,
                  score: evalResult.score,
                  name: evaluator.name,
                });

                return {
                  name: evaluator.name,
                  ...evalResult,
                };
              })
            );
          }

          const chunk = encoder.encode(
            JSON.stringify({
              type: "dataset",
              result: {
                input: item.dataset.input,
                expectedOutput: item.dataset.expected_output,
                actualOutput: result.text,
                tokens: result.usage?.totalTokens,
                evals: evalResults,
              },
              runId,
              runName: data.datasetRunName,
            }) + "\n"
          );

          controller.enqueue(chunk);
          index++;
        }
        controller.close();
      },
    });

    return new Response(stream, {
      headers: {
        "AgentMark-Streaming": "true",
      },
    });
  }

  if (frontmatter.object_config) {
    const prompt = await agentmarkClient.loadObjectPrompt(data.prompt);
    const dataset = await prompt.formatWithDataset({
      datasetPath: frontmatter?.test_settings?.dataset,
      telemetry: { isEnabled: true },
    });

    const stream = new ReadableStream({
      async start(controller) {
        const encoder = new TextEncoder();
        let index = 0;
        for await (const item of dataset) {
          if (item.type === "error") {
            controller.enqueue(
              encoder.encode(
                JSON.stringify({
                  error: item.error,
                  type: "error",
                }) + "\n"
              )
            );
            controller.close();
            return;
          }

          const traceId = crypto.randomUUID();
          const result = await generateObject({
            ...item.formatted,
            experimental_telemetry: {
              ...item.formatted.experimental_telemetry,
              metadata: {
                ...item.formatted.experimental_telemetry?.metadata,
                dataset_run_id: runId,
                dataset_path: frontmatter?.test_settings?.dataset,
                dataset_run_name: data.datasetRunName,
                dataset_item_name: index,
                traceName: `ds-run-${data.datasetRunName}-${index}`,
                traceId,
                dataset_expected_output: item.dataset.expected_output,
              },
            },
          });

          // Optional: Run evaluations if evaluators are configured
          let evalResults: any = [];
          if (evalRegistry && item.evals) {
            const evaluators = item.evals
              .map((evaluator: string) => {
                const evalFn = evalRegistry.get(evaluator);
                if (evalFn) {
                  return {
                    name: evaluator,
                    fn: evalFn,
                  };
                }
              })
              .filter((evaluator) => evaluator !== undefined);

            evalResults = await Promise.all(
              evaluators.map(async (evaluator) => {
                const evalResult = await evaluator.fn({
                  input: item.formatted.messages,
                  output: result.object,
                  expectedOutput: item.dataset.expected_output,
                });

                // Optional: Score the evaluation if AgentMarkSDK is available
                agentmarkSDK?.score({
                  resourceId: traceId,
                  label: evalResult.label,
                  reason: evalResult.reason,
                  score: evalResult.score,
                  name: evaluator.name,
                });

                return {
                  name: evaluator.name,
                  ...evalResult,
                };
              })
            );
          }

          const chunk = encoder.encode(
            JSON.stringify({
              type: "dataset",
              result: {
                input: item.dataset.input,
                expectedOutput: item.dataset.expected_output,
                actualOutput: result.object,
                tokens: result.usage?.totalTokens,
                evals: evalResults,
              },
              runId,
              runName: data.datasetRunName,
            }) + "\n"
          );

          controller.enqueue(chunk);
          index++;
        }
        controller.close();
      },
    });

    return new Response(stream, {
      headers: {
        "AgentMark-Streaming": "true",
      },
    });
  }
}

Streaming Response

Dataset runs now return streaming responses for real-time processing updates. Each chunk in the stream contains:
{
  type: "dataset",
  result: {
    input: any,              // Original dataset item input
    expectedOutput: any,     // Expected output from dataset
    actualOutput: any,       // Generated output from model
    tokens: number,          // Token usage for this item
    evals: Array<{           // Evaluation results (if evaluators configured)
      name: string,          // Evaluator name
      score: number,         // Evaluation score
      label: string,         // Evaluation label
      reason?: string,       // Optional evaluation reasoning
    }>,
  },
  runId: string,            // Unique run identifier
  runName: string,          // Dataset run name
}

Telemetry

Each dataset item includes comprehensive telemetry information:
const telemetry = {
  dataset_run_id: runId, // required
  dataset_path: frontmatter?.test_settings?.dataset, // required
  dataset_run_name: data.datasetRunName, // required
  dataset_item_name: index, // required
  traceName: `ds-run-${data.datasetRunName}-${index}`, // required
  traceId: traceId, // required
  dataset_expected_output: item.dataset.expected_output, // required
};

Evaluations

AgentMark supports running evaluations on dataset results to automatically score outputs. To enable evaluations:

Setting up Evaluators

import { EvalRegistry } from "@agentmark/agentmark-core";
import { AgentMarkSDK } from "@agentmark/sdk";

// Create an evaluation registry
const evalRegistry = new EvalRegistry();

// Register custom evaluators
evalRegistry.set("accuracy", async ({ input, output, expectedOutput }) => {
  // Your custom evaluation logic
  const score = output === expectedOutput ? 1.0 : 0.0;
  return {
    score,
    label: score >= 0.8 ? "PASS" : "FAIL",
    reason: `Output ${score >= 0.8 ? "matches" : "does not match"} expected result`,
  };
});

evalRegistry.set("relevance", async ({ input, output, expectedOutput }) => {
  // Another evaluator for relevance scoring
  const relevanceScore = calculateRelevance(output, expectedOutput);
  return {
    score: relevanceScore,
    label: relevanceScore >= 0.7 ? "RELEVANT" : "NOT_RELEVANT",
    reason: `Relevance score: ${relevanceScore}`,
  };
});

// Optional: Initialize AgentMarkSDK for scoring
const agentmarkSDK = new AgentMarkSDK({
  apiKey: "your-api-key",
  baseUrl: "your-agentmark-instance-url",
});

Evaluation Results

When evaluations are configured, each dataset item will include evaluation results in the response:
{
  "type": "dataset",
  "result": {
    "input": "What is 2+2?",
    "expectedOutput": "4",
    "actualOutput": "The answer is 4",
    "tokens": 15,
    "evals": [
      {
        "name": "accuracy",
        "score": 1.0,
        "label": "PASS",
        "reason": "Output matches expected result"
      },
      {
        "name": "relevance", 
        "score": 0.85,
        "label": "RELEVANT",
        "reason": "Relevance score: 0.85"
      }
    ]
  },
  "runId": "abc-123",
  "runName": "test-dataset-run"
}

Error Handling

Handle errors appropriately in your webhook:
try {
  // Process dataset run
} catch (error) {
  console.error("Dataset run error:", error);
  return NextResponse.json(
    { message: "Error processing dataset run" },
    { status: 500 }
  );
}

Best Practices

  1. Streaming
    • Always return streaming responses for dataset runs
    • Use proper headers: "AgentMark-Streaming": "true"
    • Handle stream errors appropriately
  2. Telemetry
    • Include all required metadata in experimental_telemetry
    • Use unique traceId and runId for each execution
    • Track dataset progress and results
  3. Error Handling
    • Validate prompt configuration before processing
    • Handle individual item failures gracefully
    • Return appropriate HTTP status codes
  4. Performance
    • Process dataset items sequentially to avoid overwhelming the model
    • Use appropriate timeouts for long-running datasets
    • Monitor memory usage for large datasets

Next Steps

Have Questions?

We’re here to help! Choose the best way to reach us: