如何在不等待其他方法完成的情况下从服务结构参与者获取状态? [英] How to get state from service fabric actor without waiting for other methods to complete?

查看:56
本文介绍了如何在不等待其他方法完成的情况下从服务结构参与者获取状态?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个正在运行的服务,该服务正在X角色上进行迭代,并使用ActorProxy要求它们的状态.

I have a service running that is iterating over X actors asking them for their state using the ActorProxy.

对我来说重要的是,该服务不会在ect提醒回调中等待actor中其他长时间运行的方法而被阻止.

Its important to me that this service is not blocked waiting for some other long running method in the actor from ect reminder callbacks.

有什么方法可以调用下面的简单示例GetState(),该方法将允许该方法以正确的方式完成而不会阻止某些提醒的运行.

Is there some way to call the below simple example GetState() that would allow the method to complete right way without blocking incase of some reminder running.

class Actor : IMyActor{

public Task<MyState> GetState() => StateManager.GetAsync("key")
}

替代.

服务调用的正确方式是什么,如果服务在5秒钟内未回复,则只需遏制.

what is the proper way form the service to call and if it dont reply within 5 sec, just containue.

var proxy = ActorProxy.Create<IMyActor();
var state = await proxy.GetState(); // this will wait until the actor is ready to send back the state. 

推荐答案

即使当前正在执行阻塞方法的Actor,也可以读取actor状态.参与者使用 IActorStateManager 依次使用 .实例化了 IActorStateProvider 每个 ActorService 一次.每个分区都实例化 ActorService 负责主持和运行演员. actor服务是StatefulService的核心(或更确切地说是

It is possible to read the actor state even for Actors that are currently executing a blocking method. Actors store their state using an IActorStateManager which in turn uses an IActorStateProvider. The IActorStateProvider is instantiated once per ActorService. Each partition instantiates the ActorService that is responsible for hosting and running actors. The actor service is at the core a StatefulService (or rather StatefulServiceBase which is the base class that regular stateful service uses). With this in mind, we can work with the ActorService that caters to our Actors the same way we would work with a regular service, i.e. with a service interface based on IService.

IActorStateProvider (由 KvsActorStateProvider (如果您使用的是持久状态)可以使用两种方法:

The IActorStateProvider (Implemented by KvsActorStateProvider if you are using Persisted state) has two methods that we can use:

Task<T> LoadStateAsync<T>(ActorId actorId, string stateName, CancellationToken cancellationToken = null);
Task<PagedResult<ActorId>> GetActorsAsync(int numItemsToReturn, ContinuationToken continuationToken, CancellationToken cancellationToken);

对这些方法的调用不受actor锁的影响,这很有意义,因为这些锁旨在支持分区上的所有actor.

Calls to these methods are not affected by actors' locks, which makes sense since these are designed to support all actors on a partition.

示例:

创建一个自定义ActorService并使用该自定义角色来托管您的演员:

Create a custom ActorService and use that one to host your actors:

public interface IManyfoldActorService : IService
{
    Task<IDictionary<long, int>> GetCountsAsync(CancellationToken cancellationToken);
}

public class ManyfoldActorService : ActorService, IManyfoldActorService
{
    ...
}

Program.Main中注册新的ActorService:

Register the new ActorService in Program.Main:

ActorRuntime.RegisterActorAsync<ManyfoldActor>(
    (context, actorType) => new ManyfoldActorService(context, actorType)).GetAwaiter().GetResult();

假设我们有一个使用以下方法的简单Actor:

Assuming we have a simple Actor with the following method:

    Task IManyfoldActor.SetCountAsync(int count, CancellationToken cancellationToken)
    {
        Task.Delay(TimeSpan.FromSeconds(30), cancellationToken).GetAwaiter().GetResult();
        var task = this.StateManager.SetStateAsync("count", count, cancellationToken);
        ActorEventSource.Current.ActorMessage(this, $"Finished set {count} on {this.Id.GetLongId()}");
        return task;
    }

等待30秒(以模拟长时间运行,阻塞,方法调用),然后将状态值"count"设置为int.

It waits for 30 seconds (to simulate long running, blocking, method calls) and then set a state value "count" to an int.

在单独的服务中,我们现在可以调用SetCountAsync以便Actor生成一些状态数据:

In a separate service we can now call the SetCountAsync for the Actors to generate some state data:

    protected override async Task RunAsync(CancellationToken cancellationToken)
    {
        var actorProxyFactory = new ActorProxyFactory();
        long iterations = 0;
        while (true)
        {
            cancellationToken.ThrowIfCancellationRequested();
            iterations += 1;
            var actorId = iterations % 10;
            var count = Environment.TickCount % 100;
            var manyfoldActor = actorProxyFactory.CreateActorProxy<IManyfoldActor>(new ActorId(actorId));
            manyfoldActor.SetCountAsync(count, cancellationToken).ConfigureAwait(false);
            ServiceEventSource.Current.ServiceMessage(this.Context, $"Set count {count} on {actorId} @ {iterations}");
            await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken);
        }
    }

此方法只是无限循环地循环更改actor的值. (请注意,总共10个actor,3秒的延迟和actor的30秒的延迟之间的相关性.简单设计这种方法是为了防止等待锁定的Actor调用无限增加.)每个调用也被当作即发即弃执行,因此我们可以在下一个参与者返回之前继续更新其下一个参与者的状态.这是一段愚蠢的代码,它只是通过这种方式设计的,以证明这一理论.

This method simply loops endlessly changing the values of the actors. (Note the correlation between total of 10 actors, delay for 3 second and the 30 second delay in actor. Simply designed this way to prevent an infinite buildup of Actor calls waiting for a lock). Each call is also executed as fire-and-forget so we can continue on to updating the state of the next actor before that one returns. Its a silly piece of code, it is just designed this way to prove the theory.

现在在actor服务中,我们可以像这样实现方法GetCountsAsync:

Now in the actor service we can implement the method GetCountsAsync like this:

    public async Task<IDictionary<long, int>> GetCountsAsync(CancellationToken cancellationToken)
    {
        ContinuationToken continuationToken = null;
        var actors = new Dictionary<long, int>();

        do
        {
            var page = await this.StateProvider.GetActorsAsync(100, continuationToken, cancellationToken);

            foreach (var actor in page.Items)
            {
                var count = await this.StateProvider.LoadStateAsync<int>(actor, "count", cancellationToken);
                actors.Add(actor.GetLongId(), count);
            }

            continuationToken = page.ContinuationToken;
        }
        while (continuationToken != null);

        return actors;
    }

这使用基础ActorStateProvider查询所有已知的Actor(针对该分区),然后以这种方式直接读取状态,从而绕过" Actor,而不会被actor的方法执行所阻塞.

This uses the underlying ActorStateProvider to query for all known Actors (for that partition) and then directly reads the state for each this way 'bypassing' the Actor and not being blocked by the actor's method execution.

最后一部分,一些可以调用我们的ActorService并在所有分区上调用GetCountsAsync的方法:

Final piece, some method that can call our ActorService and call GetCountsAsync across all partitions:

    public IDictionary<long, int> Get()
    {
        var applicationName = FabricRuntime.GetActivationContext().ApplicationName;
        var actorServiceName = $"{typeof(IManyfoldActorService).Name.Substring(1)}";
        var actorServiceUri = new Uri($"{applicationName}/{actorServiceName}");

        var fabricClient = new FabricClient();
        var partitions = new List<long>();
        var servicePartitionList = fabricClient.QueryManager.GetPartitionListAsync(actorServiceUri).GetAwaiter().GetResult();
        foreach (var servicePartition in servicePartitionList)
        {
            var partitionInformation = servicePartition.PartitionInformation as Int64RangePartitionInformation;
            partitions.Add(partitionInformation.LowKey);
        }

        var serviceProxyFactory = new ServiceProxyFactory();

        var actors = new Dictionary<long, int>();
        foreach (var partition in partitions)
        {
            var actorService = serviceProxyFactory.CreateServiceProxy<IManyfoldActorService>(actorServiceUri, new ServicePartitionKey(partition));

            var counts = actorService.GetCountsAsync(CancellationToken.None).GetAwaiter().GetResult();
            foreach (var count in counts)
            {
                actors.Add(count.Key, count.Value);
            }
        }
        return actors;
    }

现在运行此代码将为我们提供10个角色,每33:d秒更新一次状态,每个角色每次忙30秒.每个actor方法返回后,Actor服务就会看到更新的状态.

Running this code will now give us 10 actors that every 33:d second gets it's state updated and where each actor is busy for 30 seconds each time. The Actor service sees the updated state as soon as each actor method returns.

此示例中省略了一些内容,例如,当您在actor服务中加载状态时,我们可能应该防止超时.

There are some things omitted in this sample, for instance, when you load the state in the actor service we should probably guard against timeouts.

这篇关于如何在不等待其他方法完成的情况下从服务结构参与者获取状态?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆