火曜日, 10月 14, 2025
火曜日, 10月 14, 2025
- Advertisment -
ホームニューステックニュースMicrosoft Agent Framework (C#) を見てみよう その8 Human in the loop を試してみよう

Microsoft Agent Framework (C#) を見てみよう その8 Human in the loop を試してみよう


シリーズ記事

はじめに

前回は、Microsoft Agent Framework のワークフローの永続化の仕組みについて見てきました。ワークフローを任意の場所で止めて、途中から再開することができるようになりました。ワークフローを途中で止めるということは、例えば人間の判断を挟みたい場合などに有効です。そのため今回は、Human in the loop の仕組みを試してみようと思います。

Microsoft Agent Framework の Human in the loop

Human in the loop は、ワークフローの中で人間の判断を挟みたい場合に利用します。例えば、AI が生成した文章を人間が確認して修正したり、AI が提案した選択肢から人間が選んだりする場合などです。 Microsoft Agent Framework ではワークフローに RequestPort を組み込むことで Human in the loop を実現できます。

RequestPort はワークフローから外部に対して追加情報を要求するためのポートです。RequestPort をワークフローに組み込むと、ワークフローの実行が一時停止し、外部からの応答を待つ状態になります。外部から応答が送られると、ワークフローの実行が再開されます。RequestPort にワークフローの実行順が来ると RequestInfoEvent というイベントがワークフローから発行されます。このイベントの Request プロパティの DataAs メソッドでワークフローから渡されたデータが取得できます。DataAs メソッドで取得できたデータをもとにユーザーや外部システムなどから応答を受け取り RunSendMessageAsync メソッドで応答をワークフローに送ります。SendMessageAsync メソッドの引数には RequestInfoEventRequest.CreateResponse(ワークフローに返す値) メソッドで生成したレスポンスを渡します。

実際に Human in the loop を試してみましょう。
今までもずっと使ってきたカウントアップをするワークフローに RequestPort を組み込んで、カウントアップの途中で人間の判断を挟むようにしてみます。具体的には、現在の値を表示してユーザーに、まだカウントアップを続けるかどうかを尋ねるようにします。カウントアップを続ける場合は、カウントアップを行いユーザーに確認を再度求めます。カウントアップを続けない場合は、最終的なメッセージを作成してワークフローを終了します。

処理の流れを図にすると以下のようになります。

ではコードで書いていきましょう。まずは、Executor を定義します。といっても今までとほとんど変わりはありません。唯一の違いは CurrentCountMessageValue 以外に Continue プロパティを追加しているところです。Continue プロパティはユーザーがカウントアップを続けるかどうかの選択肢を表します。


record SetAmountMessage(int Amount);


record CurrentCountMessage(int Value, bool Continue = true);


class CountUpExecutor() : ReflectingExecutorCountUpExecutor>(nameof(CountUpExecutor)),
    IMessageHandlerSetAmountMessage, CurrentCountMessage>,
    IMessageHandlerCurrentCountMessage, CurrentCountMessage>
{
    
    private int _amount;
    
    
    private int _currentCount;

    
    public ValueTaskCurrentCountMessage> HandleAsync(SetAmountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        
        _amount = message.Amount;
        
        return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
    }

    
    public ValueTaskCurrentCountMessage> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        
        _currentCount += _amount;
        
        return ValueTask.FromResult(new CurrentCountMessage(_currentCount));
    }

    
    protected override async ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        
        await context.QueueStateUpdateAsync(nameof(_amount), _amount, cancellationToken);
        
        await context.QueueStateUpdateAsync(nameof(_currentCount), _currentCount, cancellationToken);
    }

    
    protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        
        _amount = await context.ReadStateAsyncint>(nameof(_amount));
        
        _currentCount = await context.ReadStateAsyncint>(nameof(_currentCount));
    }
}


class GenerateOutputMessageExecutor() : ExecutorCurrentCountMessage, string>(nameof(GenerateOutputMessageExecutor))
{
    
    public override ValueTaskstring> HandleAsync(CurrentCountMessage message, IWorkflowContext context, CancellationToken cancellationToken = default) => 
        ValueTask.FromResult($"The final count is {message.Value}");
}

次にワークフローを組み立てます。ポイントは RequestPortCountUpExecutorGenerateOutputMessageExecutor の間に挟んで、RequestPort から返される CurrentCountMessageContinue プロパティの値を見て行先の ExecutorAddSwitch で決めています。


static async ValueTaskWorkflowSetAmountMessage>> BuildWorkflowAsync()
{
    
    var countUpExecutor = new CountUpExecutor();
    var generateOutputMessageExecutor = new GenerateOutputMessageExecutor();
    
    
    var requestPort = RequestPort.CreateCurrentCountMessage, CurrentCountMessage>("HumanInTheLoop");

    
    return await new WorkflowBuilder(countUpExecutor)
        
        .AddEdge(countUpExecutor, requestPort)
        
        .AddSwitch(requestPort, switchBuilder =>
        {
            
            switchBuilder.AddCase((CurrentCountMessage? m) => m?.Continue ?? false, countUpExecutor)
                
                .WithDefault(generateOutputMessageExecutor);
        })
        
        .WithOutputFrom(generateOutputMessageExecutor)
        .BuildAsyncSetAmountMessage>();
}

では、ワークフローを実行してみましょう。ワークフローを実行するコードで新しい所は RequestInfoEvent のハンドリングの箇所です。ここでユーザーからの入力を受け取り Continue プロパティの値を設定してワークフローにメッセージを返しています。


var workflow = await BuildWorkflowAsync();


await using var run = await InProcessExecution.StreamAsync(workflow, new SetAmountMessage(4));


await foreach (var evt in run.WatchStreamAsync())
{
    
    if (evt is RequestInfoEvent requestInfoEvent)
    {
        
        var currentCountMessage = requestInfoEvent.Request.DataAsCurrentCountMessage>();
        if (currentCountMessage is null)
        {
            throw new InvalidOperationException("Expected CurrentCountMessage");
        }

        
        Console.WriteLine($"Current count is {currentCountMessage.Value}, please type 'continue' to proceed.");
        var input = Console.ReadLine();
        
        
        if (input != "continue")
        {
            currentCountMessage = currentCountMessage with { Continue = false };
        }

        
        await run.SendResponseAsync(requestInfoEvent.Request.CreateResponse(currentCountMessage));
    }

    
    if (evt is WorkflowOutputEvent outputEvent)
    {
        
        Console.WriteLine($"Workflow completed with output: {outputEvent.Asstring>()}");
    }
}

実行すると以下のような結果になります。

Current count is 0, please type 'continue' to proceed.
continue
Current count is 4, please type 'continue' to proceed.
continue
Current count is 8, please type 'continue' to proceed.
continue
Current count is 12, please type 'continue' to proceed.
continue
Current count is 16, please type 'continue' to proceed.
NO!!!!
Workflow completed with output: The final count is 16

continue と入力するとカウントアップが行われて、それ以外を入力すると最終的なメッセージが生成されてワークフローが終わっていることが確認できます。

CheckpointManager との連携

先ほどの例では、イベントを処理する await foreach の中でユーザーからの入力を処理していました。実際のシステムではループ内で処理するのではなく、一旦ワークフローを止めてユーザーに確認をとることになると思います。ユーザーからの応答もいつ来るかわからないと思うのでワークフローは状態の保存をして、ユーザーからの応答が返ってきた段階で状態を復元して続きを実行するという形になります。

このような動作を実現するには、前回やった CheckpointManager を使ってワークフローのチェックポイントを保存するのが効果的です。

やってみましょう。まずはワークフローを細切れに実行できるようにします。
細切れに実行するために以下のように WorkflowResult というレコードを定義します。WorkflowResult にはワークフローの実行結果として、RequestInfoEvent、ワークフローの出力イベント、エラーイベント、チェックポイント情報を含めます。


record WorkflowResult(
    RequestInfoEvent? RequestInfoEvent,
    IReadOnlyCollectionWorkflowOutputEvent> Outputs,
    IReadOnlyCollectionWorkflowErrorEvent> Errors,
    IReadOnlyCollectionCheckpointInfo> Checkpoints);

RequestInfoEventnull でない場合は Human in the loop の入力待ち状態、Outputs に値が含まれる場合はワークフローの出力が完了した状態、Errors に値が含まれる場合はワークフローの実行中にエラーが発生した状態を表します。Checkpoints にはワークフローのチェックポイント情報が含まれます。

次にワークフローを開始して、RequestInfoEvent が返ってくるまで実行する StartWorkflowAsync メソッドを定義します。


static async ValueTaskWorkflowResult> StartWorkflowAsync(SetAmountMessage message, CheckpointManager checkpointManager)
{
    
    var workflow = await BuildWorkflowAsync();
    
    
    await using var checkpointedRun = await InProcessExecution.StreamAsync(workflow, message, checkpointManager);
    
    
    RequestInfoEvent? lastRequestInfoEvent = null;
    ListWorkflowOutputEvent> outputs = [];
    ListWorkflowErrorEvent> errors = [];
    ListCheckpointInfo> checkpoints = [];

    
    await foreach (var workflowEvent in checkpointedRun.Run.WatchStreamAsync())
    {
        switch (workflowEvent)
        {
            
            case RequestInfoEvent requestInfoEvent:
                lastRequestInfoEvent = requestInfoEvent;
                return new(lastRequestInfoEvent, outputs, errors, checkpoints);
            
            
            case WorkflowOutputEvent outputEvent:
                outputs.Add(outputEvent);
                break;
            
            
            case WorkflowErrorEvent errorEvent:
                errors.Add(errorEvent);
                break;
            
            
            case SuperStepCompletedEvent superStepCompletedEvent when superStepCompletedEvent.CompletionInfo?.Checkpoint is not null:
                checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
                break;
        }
    }

    
    return new(null, outputs, errors, checkpoints);
}

次に続きからワークフローを実行する ResumeWorkflowAsync メソッドを定義します。
このメソッドはチェックポイント、CheckpointManager、ユーザーからの応答メッセージを受け取ります。ワークフローの定義を再構築して、チェックポイントからワークフローを復元し、ユーザーからの応答メッセージをワークフローに送信します。RequestInfoEvent が返ってくるまでワークフローを実行し、イベントを収集して結果を返します。


static async ValueTaskWorkflowResult> ResumeWorkflowAsync(
    CheckpointInfo checkpoint, 
    CheckpointManager checkpointManager,
    CurrentCountMessage message)
{
    CurrentCountMessage? responseMessage = message;
    
    
    var workflow = await BuildWorkflowAsync();
    
    
    await using var checkpointedRun = await InProcessExecution.ResumeStreamAsync(workflow, checkpoint, checkpointManager, checkpoint.RunId);
    
    
    RequestInfoEvent? lastRequestInfoEvent = null;
    ListWorkflowOutputEvent> outputs = [];
    ListWorkflowErrorEvent> errors = [];
    ListCheckpointInfo> checkpoints = [];
    
    
    await foreach (var workflowEvent in checkpointedRun.Run.WatchStreamAsync())
    {
        switch (workflowEvent)
        {
            case RequestInfoEvent requestInfoEvent:
                lastRequestInfoEvent = requestInfoEvent;
                
                
                if (responseMessage is not null)
                {
                    await checkpointedRun.Run.SendResponseAsync(requestInfoEvent.Request.CreateResponse(responseMessage));
                    responseMessage = null;
                }
                else
                {
                    
                    return new(lastRequestInfoEvent, outputs, errors, checkpoints);
                }
                break;
            
            
           case WorkflowOutputEvent outputEvent:
                outputs.Add(outputEvent);
                break;
            
            
            case WorkflowErrorEvent errorEvent:
                errors.Add(errorEvent);
                break;
            
            
            case SuperStepCompletedEvent superStepCompletedEvent when superStepCompletedEvent.CompletionInfo?.Checkpoint is not null:
                checkpoints.Add(superStepCompletedEvent.CompletionInfo.Checkpoint);
                break;
        }
    }

    
    return new(null, outputs, errors, checkpoints);
}

これを組み合わせると以下のようにして Human in the loop を実現できます。


var checkpointManager = CheckpointManager.Default;


var result = await StartWorkflowAsync(new SetAmountMessage(4), checkpointManager);


while(result.RequestInfoEvent != null)
{
    
    var currentCountMessage = result.RequestInfoEvent.Request.DataAsCurrentCountMessage>();
    if (currentCountMessage == null) throw new InvalidOperationException("Received null CurrentCountMessage.");
    
    
    Console.WriteLine($"Current count is {currentCountMessage.Value}, please type 'continue' to proceed.");
    var input = Console.ReadLine();
    var continueFlag = string.Equals(input, "continue", StringComparison.OrdinalIgnoreCase);
    
    
    var responseMessage = currentCountMessage with { Continue = continueFlag };
    
    
    
    var latestCheckpoint = result.Checkpoints.Last();
    
    
    result = await ResumeWorkflowAsync(latestCheckpoint, checkpointManager, responseMessage);
}



Console.WriteLine(result.Outputs.Last().Asstring>());

実行すると以下のような結果になります。

Current count is 0, please type 'continue' to proceed.
continue
Current count is 4, please type 'continue' to proceed.
continue
Current count is 8, please type 'continue' to proceed.
continue
Current count is 12, please type 'continue' to proceed.
continue
Current count is 16, please type 'continue' to proceed.
no
The final count is 16

ちゃんと動いてますね。

この例では 1 回のプログラムの実行でワークフローを完了させていますが、実際のシステムではユーザーからの応答を受け取ったタイミングでプログラムを終了し、次回ユーザーからの応答があったタイミングでプログラムを再度起動して ResumeWorkflowAsync を呼び出す形になると思います。こうすることで、ユーザーからの応答を待つ間にシステムのリソースを消費し続けることを防げます。

例えば以下のようにコードを書き換えるとプロセスの終了を跨いでワークフローを継続できます。










using var store = new FileSystemJsonCheckpointStore(
    new DirectoryInfo(Directory.GetCurrentDirectory()));

var checkpointManager = CheckpointManager.CreateJson(store);


WorkflowResult? workflowResult = null;
if (File.Exists("state.json"))
{
    
    using (var fs = File.OpenRead("state.json"))
    {
        workflowResult = await JsonSerializer.DeserializeAsyncWorkflowResult>(fs);
    }
    
    File.Delete("state.json");
}

if (workflowResult == null)
{
    
    workflowResult = await StartWorkflowAsync(new SetAmountMessage(4), checkpointManager);
}
else
{
    
    
    var json = workflowResult.RequestInfoEvent?.Request.DataAsJsonElement>();
    var currentCountMessage = json?.DeserializeCurrentCountMessage>();
    if (currentCountMessage == null) throw new InvalidOperationException("Received null CurrentCountMessage.");

    
    Console.WriteLine($"Current count is {currentCountMessage.Value}, please type 'continue' to proceed.");
    var input = Console.ReadLine();
    var continueFlag = string.Equals(input, "continue", StringComparison.OrdinalIgnoreCase);

    
    workflowResult = await ResumeWorkflowAsync(
        workflowResult.Checkpoints.Last(), 
        checkpointManager, 
        currentCountMessage with { Continue = continueFlag });
}

if (workflowResult.RequestInfoEvent == null)
{
    
    Console.WriteLine(workflowResult.Outputs.Last().Asstring>());
}
else
{
    
    
    using var fs = File.Create("state.json");
    await JsonSerializer.SerializeAsync(fs, workflowResult);
    Console.WriteLine("State saved. Please run the program again to continue.");
}

このプログラムを実行すると、ユーザーの入力を待つ状態で state.json に状態が保存されてプログラムが終了します。再度プログラムを実行すると state.json から状態が復元されて、ユーザーに続行するかどうかの確認が再度表示されます。ユーザーが continue と入力するとカウントアップが続行され、state.json が作成されてプロセスが終了します。continue 以外を入力すると最終的なメッセージが表示されてワークフローが終了します。

処理の流れを図にすると以下のようになります。

実行すると以下のような結果になります。初回実行では Human in the loop の所まで進んで state.json が作成されてプロセスが終了します。

State saved. Please run the program again to continue.

次に実行をすると state.json から状態が復元されて、前回の続きからワークフローが再開されます。

Current count is 0, please type 'continue' to proceed.
continue
State saved. Please run the program again to continue.

さらに実行を続けると、カウントアップが進んでいきます。

Current count is 4, please type 'continue' to proceed.
continue
State saved. Please run the program again to continue.

continue 以外を入力するとワークフローが終了して最終的な出力が表示されます。

Current count is 8, please type 'continue' to proceed.
no
The final count is 8

いい感じに動いてますね。

まとめ

今回は Microsoft Agent Framework の Human in the loop の仕組みを試してみました。RequestPort をワークフローに組み込むことで、ワークフローの途中で人間の判断を挟むことができるようになりました。また、CheckpointManager と組み合わせることで、ワークフローの状態を保存して、ユーザーからの応答があったタイミングでワークフローを復元して再開することもできました。 これにより、より柔軟でインタラクティブなワークフローの構築が可能になります。

これまでのシリーズでワークフローの基本的な使い方から、条件分岐、ループ、チェックポイントの永続化、Human in the loop まで見てきました。次回からは Agent の機能を見ていこうと思います。



Source link

Views: 0

RELATED ARTICLES

返事を書く

あなたのコメントを入力してください。
ここにあなたの名前を入力してください

- Advertisment -