如何在excel-DNA中处理市场数据UDF中的多个来源 [英] How to handle multiple sources in market data UDF in excel-DNA

查看:208
本文介绍了如何在excel-DNA中处理市场数据UDF中的多个来源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Rx关注市场数据模拟器示例



下面的代码不是处理程序而是数据提供程序的一个来源。

处理多个数据提供者的最佳方法是什么?



我尝试过的方法:



使用System; 
使用System.Collections.Generic;
使用System.Threading.Tasks;使用System.Reactive.Disposables
;使用System.Reactive.Linq
;

命名空间MarketDataSimulatorRx
{
using System.Collections.Concurrent;
使用System.Diagnostics;
使用System.Runtime.InteropServices;
使用System.Threading;使用ExcelDna.Integration

;
使用ExcelDna.Integration.Rtd;

公共静态类SimulatorFunctions
{
private static readonly Random _random = new Random();
private static readonly ConcurrentDictionary< string,IObserver< object>> _observers = new ConcurrentDictionary< string,IObserver< object>>();
private static readonly Timer _marketDataTickerProvider = new Timer(MarketDataTickingSimulator,null,0,100);
private static readonly ConcurrentDictionary< string,object> _processing = new ConcurrentDictionary< string,object>();
private static long _demoCount;


[ExcelFunction(Name =MD.GetMD)]
public static object MrktDataSubscibe(object source,string ticker,string field)
{
返回RxExcel.Observe(MarketDataSubscription,${ticker} | {field},GetObserver(${ticker} | {field}));
}

//这个将强制ExcelDna调用OnCompleted(),然后Excel将不会重新生成RTD tpoics
[ExcelFunction(Name =MD.Contribute1)]
public static object MrktDataSubscibe1(object source,string ticker,string field,object value)
{
var key = ${ticker} | {field};
// field1应该是关键,因为它永远不会改变
var asyncResult = ExcelAsyncUtil.Run(MarketDataContribution,key,h =>
{
ProcessContributionTask1(h,value );
});

返回asyncResult.Equals(ExcelError.ExcelErrorNA)
? #Processing ...
:asyncResult;
}

[ExcelFunction(Name =MD.Contribute2)]
公共静态对象MarketDataContribution2(对象源,字符串自动收录器,字符串字段,对象值)
{
var key = ${ticker} | {field};
MarketDataContriButionRtdServer.TopicInfoHolder [key] = value; //我们模仿更新值
返回XlCall.RTD(ExcelDnaMemoryLeak.MarketDataContriButionRtdServer,null,${ticker} | {field});
}

//这个工作完美但需要一点技巧
[ExcelFunction(Name =MD.Contribute3)]
公共静态对象MarketDataContribution3(对象源,字符串自动收录器,字符串字段,对象值)
{
//放置一个贡献前缀以避免弄乱其他观察者
var key = $贡献| {ticker} | {字段};
var asyncResult = RxExcel.Observe(MarketDataContribution3,key,GetObserver(key));


if(((double)value)< 0)
{
return#Value不能小于零;
}

if(!_processing.ContainsKey(key))
ProcessContributionTask3(key,value);

返回asyncResult.Equals(ExcelError.ExcelErrorNA)
? #Processing ...
:asyncResult;
}

private static void ProcessContributionTask3(字符串键,对象值)
{
Task.Run(async()=>
{
//模仿一些长期任务。例如,http请求
IObserver< object> obs;
if(_observers.TryGetValue(key,out obs))
{
_processing [key] = null;
await Task.Delay(_random.Next(500,997));
var result = $Observer OK:{DateTime.Now:yyyy-MM-dd HH: mm:ss}({value});
obs.OnNext(result);

await Task.Delay(1);

object o;
_processing.TryRemove(key,out o);
}
});
}

private static void ProcessContributionTask1(ExcelAsyncHandle句柄,对象值)
{
Task.Run(async()=>
{
//模仿一些长期运行任务。例如,http请求
等待Task.Delay(_random.Next(200,997));
var result = $LEAK OK:{DateTime.Now :yyyy-MM-dd HH:mm:ss}({value});
handle.SetResult(result);
});
}

private static void ProcessContributionTask(ExcelAsyncHandle句柄,对象值)
{
Task.Run(async()=>
{
//模仿一些长期运行任务。例如,http请求
等待Task.Delay(_random.Next(200,997));
var result = $LEAK OK:{DateTime.Now :yyyy-MM-dd HH:mm:ss}({value});
handle.SetResult(result);
});
}

public static Func< IObservable< object>> GetObserver0(字符串键)
{
return()=> Observable.Create< object>(observer =>
{
IObserver< object> obsEx;
if(!_observers.TryGetValue(key,out obsEx))
{
//Logger.Trace($\"{LogCaption}添加观察者:{key});

if(_observers.TryAdd(key,observer))
{
observer.OnNext(#Loading ...);
}
}

返回Disposable.Create(()=>
{
IObserver< object> obs;

if(_observers.TryRemove(key,out obs))
{
obs.OnCompleted();
}
});
});
}

public static Func< IObservable< object>> GetObserver(字符串键)
{
return()=> Observable.Create< object>(observer =>
{
IObserver< object> obsEx;
if(!_observers.TryGetValue(key,out obsEx))
{
if(_observers.TryAdd(key,observer))
{
observer.OnNext(ExcelError.ExcelErrorNA);
}
}

return Disposable.Create(()=>
{
IObserver< object> obs;

if(_observers.TryRemove(key,out obs))
{
obs.OnCompleted(); //这不是必要但可能有帮助吗?
}
});
});
}

private static void MarketDataTickingSimulator(object state)
{
foreach(_observers中的var observer)
{
// - if((int)(_ random.NextDouble()* 100)%5 == 0)//我们只想要一些随机的滴答
observer.Value.OnNext(_random.NextDouble());
}

if(_demoCount == 0)//给它初始值
{
foreach(_observers中的var观察者)
{
observer.Value.OnNext(_random.NextDouble());
}
}
_demoCount ++;
}
}

[ComVisible(true)]
公共类MarketDataContriButionRtdServer:ExcelRtdServer
{
private readonly ConcurrentDictionary< int,Topic> _话题;
private Timer _timer;
private readonly Random _rand;
public static ConcurrentDictionary< string,object> TopicInfoHolder = new ConcurrentDictionary< string,object>();
public static ConcurrentDictionary< int,string> TopicIdHolder = new ConcurrentDictionary< int,string>();

public MarketDataContriButionRtdServer()
{
_topics = new ConcurrentDictionary< int,Topic>();
_rand = new Random();
_timer = new Timer(委托
{
foreach(_topics中的var主题)
{
字符串键;
if(TopicIdHolder.TryGetValue(主题。 Key,out key))
{
对象值;
if(TopicInfoHolder.TryGetValue(key,out value))
{
var result = $NOLEAK OK :{DateTime.Now:yyyy-MM-dd HH:mm:ss}({value});
topic.Value.UpdateValue(result);
}
}
}
},null,0,500);
}

protected override主题CreateTopic(int topicId,IList< string> topicInfo)
{
TopicIdHolder [topicId] = topicInfo [0];
返回base.CreateTopic(topicId,topicInfo);
}

受保护的覆盖对象ConnectData(主题,IList< string> topicInfo,ref bool newValues)
{
Debug.WriteLine(ConnectData:{0} - {{{1}}},topic.TopicId,string.Join(,,topicInfo));
_topics [topic.TopicId] =主题;
返回ExcelErrorUtil.ToComError(ExcelError.ExcelErrorNA);
}

protected override void DisconnectData(主题)
{
主题t;
if(_topics.TryRemove(topic.TopicId,out t))
{
string key;
if(TopicIdHolder.TryRemove(topic.TopicId,out key))
{
对象值;
TopicInfoHolder.TryRemove(key,out value);
}
}
Debug.WriteLine(DisconnectData:{0},topic.TopicId);
}
}

公共静态类RxExcel
{
公共静态IExcelObservable ToExcelObservable< T>(此IObservable< T> observable)
{
返回新的ExcelObservable< T>(可观察);
}

公共静态对象Observe< T>(字符串functionName,对象参数,Func< IObservable< T>> observableSource)
{
返回ExcelAsyncUtil.Observe (functionName,parameters,()=> observableSource()。ToExcelObservable());
}
}

公共类ExcelObservable< T> :IExcelObservable
{
private readonly IObservable< T> _observable;

public ExcelObservable(IObservable< T> observable)
{
_observable = observable;
}

公共虚拟IDisposable订阅(IExcelObserver观察者)
{
return _observable.Subscribe(value => observer.OnNext(value),observer.OnError, observer.OnCompleted);
}
}
}

解决方案

{ticker} | {field},GetObserver (

{股票} | {字段}));
}

//这个将强制ExcelDna调用OnCompleted(),然后Excel将不会重新生成RTD tpoics
[ExcelFunction(Name =MD.Contribute1)]
public static object MrktDataSubscibe1(object source,string ticker,string field,object value)
{
var key =


{ticker} | {领域};
// field1应该是关键,因为它永远不会改变
var asyncResult = ExcelAsyncUtil.Run(MarketDataContribution,key,h =>
{
ProcessContributionTask1(h,value );
});

返回asyncResult.Equals(ExcelError.ExcelErrorNA)
? #Processing ...
:asyncResult;
}

[ExcelFunction(Name =MD.Contribute2)]
公共静态对象MarketDataContribution2(对象源,字符串自动收录器,字符串字段,对象值)
{
var key =


I am following a Sample of Market Data Simulator using Rx

The code below does not handler but one source of Data Provider.
What would be the best way to handle multiple Data providers?

What I have tried:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace MarketDataSimulatorRx
{
  using System.Collections.Concurrent;
  using System.Diagnostics;
  using System.Runtime.InteropServices;
  using System.Threading;

  using ExcelDna.Integration;
  using ExcelDna.Integration.Rtd;

  public static class SimulatorFunctions
  {
    private static readonly Random _random = new Random();
    private static readonly ConcurrentDictionary<string, IObserver<object>> _observers = new ConcurrentDictionary<string, IObserver<object>>();
    private static readonly Timer _marketDataTickerProvider = new Timer(MarketDataTickingSimulator, null, 0, 100);
    private static readonly ConcurrentDictionary<string, object> _processing = new ConcurrentDictionary<string, object>();
    private static long _demoCount;


    [ExcelFunction(Name = "MD.GetMD")]
    public static object MrktDataSubscibe(object source, string ticker, string field)
    {
      return RxExcel.Observe("MarketDataSubscription", $"{ticker}|{field}", GetObserver($"{ticker}|{field}"));
    }

    // This one will force ExcelDna to call OnCompleted() and then Excel will not resue the RTD tpoics
    [ExcelFunction(Name = "MD.Contribute1")]
    public static object MrktDataSubscibe1(object source, string ticker, string field, object value)
    {
      var key = $"{ticker}|{field}";
      // field1 should be the key because it never changes
      var asyncResult = ExcelAsyncUtil.Run("MarketDataContribution", key, h =>
      {
        ProcessContributionTask1(h, value);
      });

      return asyncResult.Equals(ExcelError.ExcelErrorNA)
          ? "#Processing..."
          : asyncResult;
    }

    [ExcelFunction(Name = "MD.Contribute2")]
    public static object MarketDataContribution2(object source, string ticker, string field, object value)
    {
      var key = $"{ticker}|{field}";
      MarketDataContriButionRtdServer.TopicInfoHolder[key] = value; // we mimic updating the value
      return XlCall.RTD("ExcelDnaMemoryLeak.MarketDataContriButionRtdServer", null, $"{ticker}|{field}");
    }

    // This one works perfectly but a little trick is needed
    [ExcelFunction(Name = "MD.Contribute3")]
    public static object MarketDataContribution3(object source, string ticker, string field, object value)
    {
      // put a "Contribution" prefix to avoid messing up with other observers
      var key = $"Contribution|{ticker}|{field}";
      var asyncResult = RxExcel.Observe("MarketDataContribution3", key, GetObserver(key));


      if (((double)value) < 0)
      {
        return "#Value cannot be less than zero";
      }

      if (!_processing.ContainsKey(key))
        ProcessContributionTask3(key, value);

      return asyncResult.Equals(ExcelError.ExcelErrorNA)
               ? "#Processing..."
               : asyncResult;
    }

    private static void ProcessContributionTask3(string key, object value)
    {
      Task.Run(async () =>
      {
        // mimic some long run task. For example, http request
        IObserver<object> obs;
        if (_observers.TryGetValue(key, out obs))
        {
          _processing[key] = null;
          await Task.Delay(_random.Next(500, 997));
          var result = $"Observer OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})";
          obs.OnNext(result);

          await Task.Delay(1);

          object o;
          _processing.TryRemove(key, out o);
        }
      });
    }

    private static void ProcessContributionTask1(ExcelAsyncHandle handle, object value)
    {
      Task.Run(async () =>
      {
        // mimic some long run task. For example, http request
        await Task.Delay(_random.Next(200, 997));
        var result = $"LEAK OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})";
        handle.SetResult(result);
      });
    }

    private static void ProcessContributionTask(ExcelAsyncHandle handle, object value)
    {
      Task.Run(async () =>
      {
        // mimic some long run task. For example, http request
        await Task.Delay(_random.Next(200, 997));
        var result = $"LEAK OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})";
        handle.SetResult(result);
      });
    }

    public static Func<IObservable<object>> GetObserver0(string key)
    {
      return () => Observable.Create<object>(observer =>
      {
        IObserver<object> obsEx;
        if (!_observers.TryGetValue(key, out obsEx))
        {
          //Logger.Trace($"{LogCaption} Adding observer for: {key}");

          if (_observers.TryAdd(key, observer))
          {
            observer.OnNext("#Loading...");
          }
        }

        return Disposable.Create(() =>
        {
          IObserver<object> obs;

          if (_observers.TryRemove(key, out obs))
          {
            obs.OnCompleted();
          }
        });
      });
    }

    public static Func<IObservable<object>> GetObserver(string key)
    {
      return () => Observable.Create<object>(observer =>
      {
        IObserver<object> obsEx;
        if (!_observers.TryGetValue(key, out obsEx))
        {
          if (_observers.TryAdd(key, observer))
          {
            observer.OnNext(ExcelError.ExcelErrorNA);
          }
        }

        return Disposable.Create(() =>
        {
          IObserver<object> obs;

          if (_observers.TryRemove(key, out obs))
          {
            obs.OnCompleted(); // This is not necessary but may help?
          }
        });
      });
    }

    private static void MarketDataTickingSimulator(object state)
    {
      foreach (var observer in _observers)
      {
        //--if ((int)(_random.NextDouble() * 100) % 5 == 0) // we just want some random ticks
        observer.Value.OnNext(_random.NextDouble());
      }

      if (_demoCount == 0) // give it initial values
      {
        foreach (var observer in _observers)
        {
          observer.Value.OnNext(_random.NextDouble());
        }
      }
      _demoCount++;
    }
  }

  [ComVisible(true)]
  public class MarketDataContriButionRtdServer : ExcelRtdServer
  {
    private readonly ConcurrentDictionary<int, Topic> _topics;
    private Timer _timer;
    private readonly Random _rand;
    public static ConcurrentDictionary<string, object> TopicInfoHolder = new ConcurrentDictionary<string, object>();
    public static ConcurrentDictionary<int, string> TopicIdHolder = new ConcurrentDictionary<int, string>();

    public MarketDataContriButionRtdServer()
    {
      _topics = new ConcurrentDictionary<int, Topic>();
      _rand = new Random();
      _timer = new Timer(delegate
      {
        foreach (var topic in _topics)
        {
          string key;
          if (TopicIdHolder.TryGetValue(topic.Key, out key))
          {
            object value;
            if (TopicInfoHolder.TryGetValue(key, out value))
            {
              var result = $"NOLEAK OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})";
              topic.Value.UpdateValue(result);
            }
          }
        }
      }, null, 0, 500);
    }

    protected override Topic CreateTopic(int topicId, IList<string> topicInfo)
    {
      TopicIdHolder[topicId] = topicInfo[0];
      return base.CreateTopic(topicId, topicInfo);
    }

    protected override object ConnectData(Topic topic, IList<string> topicInfo, ref bool newValues)
    {
      Debug.WriteLine("ConnectData: {0} - {{{1}}}", topic.TopicId, string.Join(", ", topicInfo));
      _topics[topic.TopicId] = topic;
      return ExcelErrorUtil.ToComError(ExcelError.ExcelErrorNA);
    }

    protected override void DisconnectData(Topic topic)
    {
      Topic t;
      if (_topics.TryRemove(topic.TopicId, out t))
      {
        string key;
        if (TopicIdHolder.TryRemove(topic.TopicId, out key))
        {
          object value;
          TopicInfoHolder.TryRemove(key, out value);
        }
      }
      Debug.WriteLine("DisconnectData: {0}", topic.TopicId);
    }
  }

  public static class RxExcel
  {
    public static IExcelObservable ToExcelObservable<T>(this IObservable<T> observable)
    {
      return new ExcelObservable<T>(observable);
    }

    public static object Observe<T>(string functionName, object parameters, Func<IObservable<T>> observableSource)
    {
      return ExcelAsyncUtil.Observe(functionName, parameters, () => observableSource().ToExcelObservable());
    }
  }

  public class ExcelObservable<T> : IExcelObservable
  {
    private readonly IObservable<T> _observable;

    public ExcelObservable(IObservable<T> observable)
    {
      _observable = observable;
    }

    public virtual IDisposable Subscribe(IExcelObserver observer)
    {
      return _observable.Subscribe(value => observer.OnNext(value), observer.OnError, observer.OnCompleted);
    }
  }
}

解决方案

"{ticker}|{field}", GetObserver(


"{ticker}|{field}")); } // This one will force ExcelDna to call OnCompleted() and then Excel will not resue the RTD tpoics [ExcelFunction(Name = "MD.Contribute1")] public static object MrktDataSubscibe1(object source, string ticker, string field, object value) { var key =


"{ticker}|{field}"; // field1 should be the key because it never changes var asyncResult = ExcelAsyncUtil.Run("MarketDataContribution", key, h => { ProcessContributionTask1(h, value); }); return asyncResult.Equals(ExcelError.ExcelErrorNA) ? "#Processing..." : asyncResult; } [ExcelFunction(Name = "MD.Contribute2")] public static object MarketDataContribution2(object source, string ticker, string field, object value) { var key =


这篇关于如何在excel-DNA中处理市场数据UDF中的多个来源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆