动作块可以包含状态吗? [英] Can an ActionBlock contain a state?

查看:71
本文介绍了动作块可以包含状态吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个使用TPL数据流的应用程序.我正在尝试配置一个动作块以写入数据库.

I'm writing an application that is using TPL dataflow. I'm trying to configure an action block to write to a database.

但是,我需要此操作块对收到的第一条消息执行初始化步骤(请注意,我必须等待第一条消息并且不能在创建操作块期间执行初始化操作.)

However, I need this action block to perform an initialization step on the first message it receives (note that I must wait for the first message and cannot perform the initialization during the action block creation).

因此,我的操作块需要保持某种状态,以指示其是否已经收到第一条消息.

Because of this, my action block needs to maintain some sort of state that indicates if its already received the first message.

ActionBlock是否可以维持状态?

Is it possible for an ActionBlock to maintain a state?

参考下面的Microsoft示例代码,如何将状态变量添加到ActionBlock?似乎它仅维护局部变量.

Referencing the Microsoft sample code below, how would I go about adding a state variable to the ActionBlock? It seems like it only maintains local variables.

// Performs several computations by using dataflow and returns the elapsed
// time required to perform the computations.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism,
   int messageCount)
{
   // Create an ActionBlock<int> that performs some work.
   var workerBlock = new ActionBlock<int>(
      // Simulate work by suspending the current thread.
      millisecondsTimeout => Thread.Sleep(millisecondsTimeout),
      // Specify a maximum degree of parallelism.
      new ExecutionDataflowBlockOptions
      {
         MaxDegreeOfParallelism = maxDegreeOfParallelism
      });

   // Compute the time that it takes for several messages to 
   // flow through the dataflow block.

   Stopwatch stopwatch = new Stopwatch();
   stopwatch.Start();

   for (int i = 0; i < messageCount; i++)
   {
      workerBlock.Post(1000);
   }
   workerBlock.Complete();

   // Wait for all messages to propagate through the network.
   workerBlock.Completion.Wait();

   // Stop the timer and return the elapsed number of milliseconds.
   stopwatch.Stop();
   return stopwatch.Elapsed;
}

推荐答案

您可以像这样实现自己的 StatefulActionBlock< T> .根据您的 MaxDegreeOfParallelism ,您可能不需要锁(即使这样做,也可能有实现线程安全性的更好方法).感谢@TheodorZoulias帮助我改进了这种方法.

You could implement your own StatefulActionBlock<T>, like so. Depending on your MaxDegreeOfParallelism you may not need the lock (and even if you do there may be better ways of achieving thread-safety). Thanks to @TheodorZoulias for helping me refine this approach.

public class StatefulActionBlock<TInput, TState> : IDataflowBlock, ITargetBlock<TInput>
{
   private bool _initialized;

   private Action<TState> _initializer;

   private object _lock = new object();

   private ITargetBlock<TInput> _actionBlock;

   private TState _state;

   public Task Completion => _actionBlock.Completion;

   public StatefulActionBlock(Action<TInput> action, Action<TState> initializer, TState state, ExecutionDataflowBlockOptions options)
   {
       //null checks omitted...

       _initializer = initializer;
       _actionBlock = new ActionBlock<TInput>(action, options);
       _state = state;
   }

   void Initialize()
   {
       _initializer(_state);
       _initialized = true;
   }

   public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept) 
   {
       lock (_lock)
       {
           if (!_initialized)
               Initialize();
       }
       return _actionBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
   }

   public void Complete() =>
       _actionBlock.Complete();

   public void Fault(Exception exception) =>
       _actionBlock.Fault(exception);
}


您还可以锁定并检查是否在操作中进行了初始化.


You could also lock and check to see if you're initialized in your Action.

private static object _lock = new Object();
private static bool _isInitialized = false;

// Performs several computations by using dataflow and returns the elapsed
// time required to perform the computations.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism, int messageCount)
{
   // Create an ActionBlock<int> that performs some work.
   var workerBlock = new ActionBlock<int>(
      // Simulate work by suspending the current thread.
      DoStuff,
      // Specify a maximum degree of parallelism.
      new ExecutionDataflowBlockOptions
      {
         MaxDegreeOfParallelism = maxDegreeOfParallelism
      });

   // Compute the time that it takes for several messages to 
   // flow through the dataflow block.

   Stopwatch stopwatch = new Stopwatch();
   stopwatch.Start();

   for (int i = 0; i < messageCount; i++)
   {
      workerBlock.Post(1000);
   }
   workerBlock.Complete();

   // Wait for all messages to propagate through the network.
   workerBlock.Completion.Wait();

   // Stop the timer and return the elapsed number of milliseconds.
   stopwatch.Stop();
   return stopwatch.Elapsed;
}

private static void DoStuff(int i)
{
    lock (_lock)
    {
       if (!_initialized)
       {
          Initialize();
          _initialized = true;
       }
    }
    Thread.Sleep(i); //make a snack
}

private static void Initialize()
{
   //...
}

这篇关于动作块可以包含状态吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆