月曜日, 10月 13, 2025
月曜日, 10月 13, 2025
- Advertisment -
ホームニューステックニュースMicrosoft Agent Framework (C#) を見てみよう その7 チェックポイントの永続化

Microsoft Agent Framework (C#) を見てみよう その7 チェックポイントの永続化


シリーズ記事

はじめに

前回は、Microsoft Agent Framework の Executor のステータス管理について掘り下げてきました。その中でワークフローのチェックポイントについても触れましたが、前回の段階ではチェックポイントのデータの管理を行うための CheckpointManager として CheckpointManager.Default を使用していました。CheckpointManager.Default はインメモリでデータを管理しているためプロセスの再起動などでデータが飛んでしまいます。
本格的にワークフローを実行する場合には、チェックポイントのデータを永続化しておく必要があります。今回は、チェックポイントの永続化について掘り下げてみます。

チェックポイントの永続化

Microsoft Agent Framework では、チェックポイントの永続化のために ICheckpointStore というインターフェースが用意されています。このインターフェースを実装することで、チェックポイントのデータを任意のストレージに保存することができます。現時点では、ICheckpointStore を実装したクラスは以下の 3 つが提供されています。

  • JsonCheckpointStore: TStoreObjectJsonElement を指定した抽象クラス。提供する機能は KeyTypeInfo という CheckpointInfo のための JsonTypeInfo だけ。
  • InMemoryCheckpointStore: デフォルトで使用されるストア。JsonCheckpointStore を継承しており、インメモリでデータを管理する。
  • FileSystemJsonCheckpointStore: JsonCheckpointStore を継承しており、ファイルシステムに JSON ファイルとしてデータを保存する。コンストラクタで同期メソッドを使ってファイルを読んでいたりするのでデスクトップアプリとかバッチ処理とかで使う感じかな?Web アプリで使うには心もとない。

ICheckpointStore はジェネリックインターフェースなので、TStoreObject に任意の型を指定して実装することができます。ですが、基本的には JsonCheckpointStore を継承して JSON 形式でシリアライズ/デシリアライズするのが簡単で良いと思います。

FileSystemJsonCheckpointStore の利用

では、FileSystemJsonCheckpointStore を使ってチェックポイントの永続化を試してみましょう。前回の記事のコードの CheckpointManager のインスタンスを作成している部分を以下のように変更します。

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.Reflection;


var workflow = await BuildWorkflowAsync();


using var fs = new FileSystemJsonCheckpointStore(new(Directory.GetCurrentDirectory()));
var checkpointManager = CheckpointManager.CreateJson(fs);

ListCheckpointInfo> checkpoints = [];


await using CheckpointedStreamingRun> run = await InProcessExecution.StreamAsync(
    workflow,
    new SetAmountMessage(4),
    checkpointManager);


string? output = null;

await foreach (var evt in run.Run.WatchStreamAsync())
{
    
    if (evt is SuperStepCompletedEvent { CompletionInfo: { Checkpoint: not null } } superStepCompletedEvent)
    {
        Console.WriteLine($"Checkpoint was stored: {superStepCompletedEvent.CompletionInfo.Checkpoint.CheckpointId}");
        checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
    }

    
    if (evt is ExecutorCompletedEvent { ExecutorId: nameof(CountUpExecutor) } executorCompletedEvent)
    {
        Console.WriteLine($"CountUpExecutor result: {executorCompletedEvent.Data}");
    }

    
    if (evt is WorkflowOutputEvent { Data: string } outputEvent)
    {
        Console.WriteLine($"WorkflowOutputEvent was raised: {outputEvent.Asstring>()}");
        output = outputEvent.Asstring>();
    }
}


Console.WriteLine(output);


foreach (var checkpoint in checkpoints)
{
    Console.WriteLine($"Checkpoint at run id {checkpoint.RunId} with checkpoint {checkpoint.CheckpointId}");
}

Console.WriteLine("--------------------------------------");

await run.RestoreCheckpointAsync(checkpoints[2]);
await foreach (var evt in run.Run.WatchStreamAsync())
{
    Console.WriteLine(evt);
}


async ValueTaskWorkflowSetAmountMessage>> BuildWorkflowAsync()
{
    
    var countUpExecutor = new CountUpExecutor();
    var generateOutputMessageExecutor = new GenerateOutputMessageExecutor();

    
    
    return await new WorkflowBuilder(countUpExecutor)
        .AddSwitch(countUpExecutor, switchBuilder =>
        {
            
            switchBuilder.AddCase(
                (CurrentCountMessage? message) => message is { Value:  10 },
                countUpExecutor);
            
            switchBuilder.WithDefault(generateOutputMessageExecutor);
        })
        
        .WithOutputFrom(generateOutputMessageExecutor)
        .BuildAsyncSetAmountMessage>();
}


record SetAmountMessage(int Amount);

record CurrentCountMessage(int Value);


class CountUpExecutor() : ReflectingExecutorCountUpExecutor>(nameof(CountUpExecutor)),
    IMessageHandlerSetAmountMessage, CurrentCountMessage>,
    IMessageHandlerCurrentCountMessage, CurrentCountMessage>
{
    private int _amount;
    private int _currentCount;

    
    public ValueTaskCurrentCountMessage> HandleAsync(SetAmountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        _amount = message.Amount;
        return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
    }

    
    public ValueTaskCurrentCountMessage> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        _currentCount += _amount;
        return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
    }

    protected override async ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        
        await context.QueueStateUpdateAsync(nameof(_amount), _amount, cancellationToken);
        await context.QueueStateUpdateAsync(nameof(_currentCount), _currentCount, cancellationToken);
    }

    protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        
        _amount = await context.ReadStateAsyncint>(nameof(_amount));
        _currentCount = await context.ReadStateAsyncint>(nameof(_currentCount));
    }
}


class GenerateOutputMessageExecutor() : ExecutorCurrentCountMessage, string>(nameof(GenerateOutputMessageExecutor))
{
    public override ValueTaskstring> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        return ValueTask.FromResult($"The final count is {message.Value}");
    }
}

:::

実行すると前回の記事の実行結果と同じように以下の出力が得られます。

CountUpExecutor result: CurrentCountMessage { Value = 0 }
Checkpoint was stored: ed3cd8ee19904a9587dae9e58eadb30f
CountUpExecutor result: CurrentCountMessage { Value = 4 }
Checkpoint was stored: 693bf3b144cc475abec57ea6d7cffcb4
CountUpExecutor result: CurrentCountMessage { Value = 8 }
Checkpoint was stored: 84e744f245074312aa64060350327014
CountUpExecutor result: CurrentCountMessage { Value = 12 }
Checkpoint was stored: af83e8591429451d9c0046b79d00b6b3
WorkflowOutputEvent was raised: The final count is 12
Checkpoint was stored: ff5ff9b3e2e544d9ac42d96cf667e659
The final count is 12
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint ed3cd8ee19904a9587dae9e58eadb30f
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint 693bf3b144cc475abec57ea6d7cffcb4
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint 84e744f245074312aa64060350327014
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint af83e8591429451d9c0046b79d00b6b3
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint ff5ff9b3e2e544d9ac42d96cf667e659
--------------------------------------
SuperStepStartedEvent(Step = 5, Data: Microsoft.Agents.AI.Workflows.SuperStepStartInfo = Microsoft.Agents.AI.Workflows.SuperStepStartInfo)
ExecutorInvokedEvent(Executor = CountUpExecutor, Data: Microsoft.Agents.AI.Workflows.PortableValue = Microsoft.Agents.AI.Workflows.PortableValue)
ExecutorCompletedEvent(Executor = CountUpExecutor, Data: CurrentCountMessage = CurrentCountMessage { Value = 12 })
SuperStepCompletedEvent(Step = 5, Data: Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo = Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo)
SuperStepStartedEvent(Step = 6, Data: Microsoft.Agents.AI.Workflows.SuperStepStartInfo = Microsoft.Agents.AI.Workflows.SuperStepStartInfo)
ExecutorInvokedEvent(Executor = GenerateOutputMessageExecutor, Data: CurrentCountMessage = CurrentCountMessage { Value = 12 })
ExecutorCompletedEvent(Executor = GenerateOutputMessageExecutor, Data: System.String = The final count is 12)
WorkflowOutputEvent(Data: System.String = The final count is 12)
SuperStepCompletedEvent(Step = 6, Data: Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo = Microsoft.Agents.AI.Workflows.SuperStepCompletionInfo)

---- で区切られているところから下がチェックポイントを復元した後の出力です。ちゃんと復元できていることがわかります。

FileSystemJsonCheckpointStore を使うと、チェックポイントのデータが実行ディレクトリに index.jsonl というファイル名で保存されます。中身は JSON Lines 形式で、1 行が 1 つのチェックポイントのデータになっています。例えば、上記の実行結果の場合、以下のような内容になります。

{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"ed3cd8ee19904a9587dae9e58eadb30f"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"693bf3b144cc475abec57ea6d7cffcb4"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"84e744f245074312aa64060350327014"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"af83e8591429451d9c0046b79d00b6b3"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"ff5ff9b3e2e544d9ac42d96cf667e659"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"a40f736db5344545bdaefcccb482bc11"}
{"runId":"d032fbdaecf74d80a1ca5c4a7ae5a6f9","checkpointId":"9e391f45551b48368953391ac0828eb7"}

全部で 7 行ありますが、これは 1 回目の実行で 5 回のスーパーステップが実行によるものです。実際に以下の実行結果にある run id と checkpoint id が対応しています。

Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint ed3cd8ee19904a9587dae9e58eadb30f
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint 693bf3b144cc475abec57ea6d7cffcb4
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint 84e744f245074312aa64060350327014
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint af83e8591429451d9c0046b79d00b6b3
Checkpoint at run id d032fbdaecf74d80a1ca5c4a7ae5a6f9 with checkpoint ff5ff9b3e2e544d9ac42d96cf667e659

2 回目の実行は 3 回目のチェックポイントの後から実行しているので 2 行追加されて合計 7 行になっています。でも、このファイルだけだとワークフローの状態が含まれていません。実際の各チェックポイントの情報は run id_checkpoint id.json というファイル名で保存されます。例えば、3 回目のチェックポイントの場合、d032fbdaecf74d80a1ca5c4a7ae5a6f9_84e744f245074312aa64060350327014.json というファイル名になります。このファイルの中身は以下のようになっています。

{
  "stepNumber": 2,
  "workflow": {
    "executors": {
      "CountUpExecutor": {
        "executorType": {
          "assemblyName": "WorkflowHelloWorldApp, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
          "typeName": "CountUpExecutor"
        },
        "executorId": "CountUpExecutor"
      },
      "GenerateOutputMessageExecutor": {
        "executorType": {
          "assemblyName": "WorkflowHelloWorldApp, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
          "typeName": "GenerateOutputMessageExecutor"
        },
        "executorId": "GenerateOutputMessageExecutor"
      }
    },
    "edges": {
      "CountUpExecutor": [
        {
          "$type": 1,
          "hasAssigner": true,
          "kind": 1,
          "connection": {
            "sourceIds": [
              "CountUpExecutor"
            ],
            "sinkIds": [
              "CountUpExecutor",
              "GenerateOutputMessageExecutor"
            ]
          }
        }
      ]
    },
    "requestPorts": [
    ],
    "startExecutorId": "CountUpExecutor",
    "outputExecutorIds": [
      "GenerateOutputMessageExecutor"
    ]
  },
  "runnerData": {
    "instantiatedExecutors": [
      "CountUpExecutor"
    ],
    "queuedMessages": {
      "CountUpExecutor": [
        {
          "messageType": {
            "assemblyName": "WorkflowHelloWorldApp, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
            "typeName": "CurrentCountMessage"
          },
          "message": {
            "typeId": {
              "assemblyName": "WorkflowHelloWorldApp, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
              "typeName": "CurrentCountMessage"
            },
            "value": {
              "value": 8
            }
          },
          "source": {
          }
        }
      ]
    },
    "outstandingRequests": [
    ]
  },
  "stateData": {
    "CountUpExecutor||_amount": {
      "typeId": {
        "assemblyName": "System.Private.CoreLib, Version=9.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e",
        "typeName": "System.Int32"
      },
      "value": 4
    },
    "CountUpExecutor||_currentCount": {
      "typeId": {
        "assemblyName": "System.Private.CoreLib, Version=9.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e",
        "typeName": "System.Int32"
      },
      "value": 8
    }
  },
  "edgeStateData": {
  }
}

JSON にはワークフローの Executor の情報や Edge の情報、実行中のメッセージキューの情報、Executor の状態データなどが含まれています。これらの情報を使ってワークフローの状態を復元しています。

コードを以下のように書き換えて保存されたファイルを読み込むようにしてみましょう。以下のコードでハードコードしている run id は実行するたびに変わるので、実際にはファイルを列挙して最新のものを取得するなどの処理が必要です。

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.Reflection;


var workflow = await BuildWorkflowAsync();


using var fs = new FileSystemJsonCheckpointStore(new(Directory.GetCurrentDirectory()));
var checkpointManager = CheckpointManager.CreateJson(fs);

ListCheckpointInfo> checkpoints = [.. await fs.RetrieveIndexAsync("d032fbdaecf74d80a1ca5c4a7ae5a6f9")];


await using CheckpointedStreamingRun> run = await InProcessExecution.ResumeStreamAsync(
    workflow,
    checkpoints[2],
    checkpointManager,
    checkpoints[2].RunId);


string? output = null;

await foreach (var evt in run.Run.WatchStreamAsync())
{
    
    if (evt is SuperStepCompletedEvent { CompletionInfo: { Checkpoint: not null } } superStepCompletedEvent)
    {
        Console.WriteLine($"Checkpoint was stored: {superStepCompletedEvent.CompletionInfo.Checkpoint.CheckpointId}");
        checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
    }

    
    if (evt is ExecutorCompletedEvent { ExecutorId: nameof(CountUpExecutor) } executorCompletedEvent)
    {
        Console.WriteLine($"CountUpExecutor result: {executorCompletedEvent.Data}");
    }

    
    if (evt is WorkflowOutputEvent { Data: string } outputEvent)
    {
        Console.WriteLine($"WorkflowOutputEvent was raised: {outputEvent.Asstring>()}");
        output = outputEvent.Asstring>();
    }
}


Console.WriteLine(output);

Executor の定義と BuildWorkflowAsync メソッドは前述のコードと同じです。
ファイルから読み込んだチェックポイントの 3 番目を指定して InProcessExecution.ResumeStreamAsync メソッドでワークフローを復元しています。実行結果は以下のようになります。

CountUpExecutor result: CurrentCountMessage { Value = 12 }
Checkpoint was stored: 81a9770cd56e4ce69b147d5002373c2f
WorkflowOutputEvent was raised: The final count is 12
Checkpoint was stored: c8f25adaf74c46678a3a8d5c330509d0
The final count is 12

ちゃんと 3 回目のチェックポイントの状態から復元されていることがわかります。

永続化を自作してみよう

実際のアプリでは、ファイルシステムではなくデータベースやクラウドストレージに保存したいことが多いと思います。そういう場合には JsonCheckpointStore を継承して特定のストレージに保存するクラスを実装すれば良いです。オーバーライドして実装するメソッドは以下の 3 つです。

  • CreateCheckpointAsync: 新しいチェックポイントを作成して保存する。ここで checkpoint ID を生成して CheckpointInfo を返す
  • RetrieveCheckpointAsync: 指定された run id とチェックポイント ID からチェックポイントデータを取得する
  • RetrieveIndexAsync: 指定された run id のすべてのチェックポイント情報のリストを返す。

やることさえわかれば、そんなに難しくないです。試しにインメモリで保存するだけの簡単な実装を作ってみましょう。

MyInMemoryJsonCheckpointStore.cs

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using System.Text.Json;

namespace WorkflowHelloWorldApp;


public class MyInMemoryJsonCheckpointStore : JsonCheckpointStore
{
    
    private readonly Dictionarystring, ListItemRecord>> _store = new();
    
    
    public override ValueTaskCheckpointInfo> CreateCheckpointAsync(string runId, JsonElement value, CheckpointInfo? parent = null)
    {
        Console.WriteLine($"## CreateCheckpointAsync({runId}, {value.ToString()[0..30]})");
        
        
        if (!_store.TryGetValue(runId, out var checkpoints))
        {
            checkpoints = _store[runId] = [];
        }

        
        var item = new ItemRecord(checkpoints.Count, value);
        checkpoints.Add(item);
        
        
        return ValueTask.FromResult(new CheckpointInfo(runId, item.CheckpointId.ToString()));
    }

    
    public override ValueTaskJsonElement> RetrieveCheckpointAsync(string runId, CheckpointInfo key)
    {
        Console.WriteLine($"## RetrieveCheckpointAsync({runId}, {key})");
        
        
        if (!_store.TryGetValue(runId, out var checkpoints))
        {
            throw new InvalidOperationException($"Not found: {runId}");
        }

        
        return ValueTask.FromResult(checkpoints[int.Parse(key.CheckpointId)].Value);
    }

    
    public override ValueTaskIEnumerableCheckpointInfo>> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null)
    {
        Console.WriteLine($"## RetrieveIndexAsync({runId})");
        
        
        return ValueTask.FromResult(
            _store.GetValueOrDefault(runId)
                ?.Select(x => x.CheckpointId)
                .Select(checkpointId => new CheckpointInfo(runId, checkpointId.ToString())) ?? []);
    }

    
    public void Dump()
    {
        Console.WriteLine("Dump all checkpoints");
        foreach (var checkpoints in _store.Values)
        {
            foreach (var item in checkpoints)
            {
                Console.WriteLine(item.ToString()[0..60]);
            }
        }
    }
}


record ItemRecord(int CheckpointId, JsonElement Value);

次に、上記の MyInMemoryJsonCheckpointStore を使うようにコードを書き換えます。カウントアップと結果の出力を生成するワークフローを完走させたあとに 3 番目のチェックポイントに戻して再実行するようにしてみます。

Program.cs

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
using WorkflowHelloWorldApp;


var workflow = await BuildWorkflowAsync();


var myStore = new MyInMemoryJsonCheckpointStore();
var checkpointManager = CheckpointManager.CreateJson(myStore);



await using CheckpointedStreamingRun> run = await InProcessExecution.StreamAsync(
    workflow,
    new SetAmountMessage(4),
    checkpointManager);

ListCheckpointInfo> checkpoints = [];

string? output = null;

await foreach (var evt in run.Run.WatchStreamAsync())
{
    
    if (evt is SuperStepCompletedEvent { CompletionInfo: { Checkpoint: not null } } superStepCompletedEvent)
    {
        Console.WriteLine($"Checkpoint was stored: {superStepCompletedEvent.CompletionInfo.Checkpoint.CheckpointId}");
        checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
    }

    
    if (evt is ExecutorCompletedEvent { ExecutorId: nameof(CountUpExecutor) } executorCompletedEvent)
    {
        Console.WriteLine($"CountUpExecutor result: {executorCompletedEvent.Data}");
    }

    
    if (evt is WorkflowOutputEvent { Data: string } outputEvent)
    {
        Console.WriteLine($"WorkflowOutputEvent was raised: {outputEvent.Asstring>()}");
        output = outputEvent.Asstring>();
    }
}


Console.WriteLine(output);


myStore.Dump();


Console.WriteLine("----------------------------------");
await run.RestoreCheckpointAsync(checkpoints[2]);
await foreach (var evt in run.Run.WatchStreamAsync())
{
    
    if (evt is SuperStepCompletedEvent { CompletionInfo: { Checkpoint: not null } } superStepCompletedEvent)
    {
        Console.WriteLine($"Checkpoint was stored: {superStepCompletedEvent.CompletionInfo.Checkpoint.CheckpointId}");
        checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
    }

    
    if (evt is ExecutorCompletedEvent { ExecutorId: nameof(CountUpExecutor) } executorCompletedEvent)
    {
        Console.WriteLine($"CountUpExecutor result: {executorCompletedEvent.Data}");
    }

    
    if (evt is WorkflowOutputEvent { Data: string } outputEvent)
    {
        Console.WriteLine($"WorkflowOutputEvent was raised: {outputEvent.Asstring>()}");
        output = outputEvent.Asstring>();
    }
}


Console.WriteLine(output);


myStore.Dump();

実行すると以下のような結果になります。

CountUpExecutor result: CurrentCountMessage { Value = 0 }
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":0,"workflow":{"e)
Checkpoint was stored: 0
CountUpExecutor result: CurrentCountMessage { Value = 4 }
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":1,"workflow":{"e)
Checkpoint was stored: 1
CountUpExecutor result: CurrentCountMessage { Value = 8 }
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":2,"workflow":{"e)
Checkpoint was stored: 2
CountUpExecutor result: CurrentCountMessage { Value = 12 }
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":3,"workflow":{"e)
Checkpoint was stored: 3
WorkflowOutputEvent was raised: The final count is 12
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":4,"workflow":{"e)
Checkpoint was stored: 4
The final count is 12
Dump all checkpoints
ItemRecord { CheckpointId = 0, Value = {"stepNumber":0,"work
ItemRecord { CheckpointId = 1, Value = {"stepNumber":1,"work
ItemRecord { CheckpointId = 2, Value = {"stepNumber":2,"work
ItemRecord { CheckpointId = 3, Value = {"stepNumber":3,"work
ItemRecord { CheckpointId = 4, Value = {"stepNumber":4,"work
----------------------------------
## RetrieveCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, CheckpointInfo(RunId: d704fbd5b1be4c058a17cf101d02ee8f, CheckpointId: 2))
CountUpExecutor result: CurrentCountMessage { Value = 12 }
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":5,"workflow":{"e)
Checkpoint was stored: 5
## CreateCheckpointAsync(d704fbd5b1be4c058a17cf101d02ee8f, {"stepNumber":6,"workflow":{"e)
WorkflowOutputEvent was raised: The final count is 12
Checkpoint was stored: 6
The final count is 12
Dump all checkpoints
ItemRecord { CheckpointId = 0, Value = {"stepNumber":0,"work
ItemRecord { CheckpointId = 1, Value = {"stepNumber":1,"work
ItemRecord { CheckpointId = 2, Value = {"stepNumber":2,"work
ItemRecord { CheckpointId = 3, Value = {"stepNumber":3,"work
ItemRecord { CheckpointId = 4, Value = {"stepNumber":4,"work
ItemRecord { CheckpointId = 5, Value = {"stepNumber":5,"work
ItemRecord { CheckpointId = 6, Value = {"stepNumber":6,"work

## CreateCheckpointAsync(...)## RetrieveCheckpointAsync(...) といったログが追加されているのがわかります。チェックポイントの作成と取得がちゃんと呼ばれていることがわかります。また、Dump all checkpoints の出力で、チェックポイントの内容がすべて保存されていることも確認できます。最初の実行で 5 つ、2 回目の実行で 2 つ、合計 7 つのチェックポイントが保存されています。

まとめ

今回は、Microsoft Agent Framework のチェックポイントの永続化について掘り下げてみました。
ICheckpointStore インターフェースを実装することで、チェックポイントのデータを任意のストレージに保存することができます。JsonCheckpointStore を継承して JSON 形式でシリアライズ/デシリアライズするのが簡単で便利です。FileSystemJsonCheckpointStore を使うとファイルシステムに保存できますが、実際のアプリではデータベースやクラウドストレージに保存したいことが多いと思います。その場合には JsonCheckpointStore を継承して特定のストレージに保存するクラスを実装すれば良いです1 つのスーパーステップ実装のたびにちゃんと履歴を保存していれば途中で失敗していたり、ワークフローを一時的に停止しなければいけないようなシナリオにも柔軟に対応可能です。なかなかいい感じですね。

今回の記事でワークフローを途中で止めて、途中から再開する方法がわかったので次は Human in the loop の機能を試してみようと思います。



Source link

Views: 0

RELATED ARTICLES

返事を書く

あなたのコメントを入力してください。
ここにあなたの名前を入力してください

- Advertisment -