自定义管道组件 - 从文件文件夹中读取多个消息会导致管道多次触发 [英] Custom Pipeline component - Reading multiple messages from a file folder is causing the pipeline to fire off multiple times

查看:46
本文介绍了自定义管道组件 - 从文件文件夹中读取多个消息会导致管道多次触发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个自定义管道组件,可以从许多其他文件创建批处理文件。

I have a custom pipeline component that creates a batch file from many other files.

自定义管道组件(发送)将传入的文件复制到文件夹地点。

The custom pipeline component(Send) copies the incoming file(s) to the folder location.

如果批次中的所有文件都已收到,它将一次一个地读入文件夹中的所有文件,然后"缝制"。将它们分成一条消息(通过写入传出流),然后清除临时消息并返回传出消息,
否则它只返回null。

If all the files in the batch have been received it will read in all the files from the folder one at a time and "sew" them into one message(by writing to the outgoing stream) and then it cleans up the temp messages and returns the outgoing message, otherwise it simply returns null.

管道创建传出的信息完美无缺。问题在于,出于某种原因,当我称之为"聚合"时,函数(读取消息的函数)它以某种方式导致另一条消息通过管道传出。

The pipeline creates the outgoing message perfectly. The problem is that for some reason when I call the "Aggregate" function(the one that reads in the messages) it somehow causes another message to come through the pipeline for some reason.

如果我注释掉"outboundStream = Aggregate(_BatchCreatorFolder,batchSize, interchangeId,_RepeatingNodeName);"行管道不会触发另一条消息,但当然不会创建出站消息。

If I comment out the "outboundStream = Aggregate(_BatchCreatorFolder, batchSize, interchangeId, _RepeatingNodeName);" line the pipeline doesn't fire off another message but the outbound message is never created of course.

还有其他人遇到过这个问题吗?

Has anyone else ran into this problem?

回复之前:

- 是的我已经使用调试器来逐步执行代码。

-Yes I have used the debugger to step through the code.

- 我不能使用信封模式和聚合器编排。

-No I can't use an envelope schema and an aggregator orchestration instead.

- 如果你不知道答案,不要回答。我不想猜测或代码Natzi回复。 

-Don't answer if you don't know the answer. I don't want guesses or code Natzi replies. 

       public Microsoft.BizTalk.Message.Interop.IBaseMessage Execute(Microsoft.BizTalk.Component.Interop.IPipelineContext pc, Microsoft.BizTalk.Message.Interop.IBaseMessage inMsg)
        {
            bool batchComplete = false;
            interchangeId = inMsg.Context.Read("InterchangeID", "http://schemas.microsoft.com/BizTalk/2003/system-properties") as string;
            messageId = inMsg.Context.Read("MessageID", "http://schemas.microsoft.com/BizTalk/2003/system-properties") as string;

            object bnObj = inMsg.Context.Read("BatchNumber", _PSNamespace);
            object bsObj = inMsg.Context.Read("BatchSize", _PSNamespace);

            IBaseMessagePart bodyPart = inMsg.BodyPart;
            MemoryStream outboundStream = new MemoryStream();

            if (bnObj != null && bsObj != null)
            {
                int batchNumber = Convert.ToInt32(bnObj);
                batchSize = Convert.ToInt32(bsObj);

                //check if this batch has been started
                if (BCMessageCount.ContainsKey(interchangeId))
                {
                    BCMessageCount[interchangeId]++;
                }
                else
                {
                    BCMessageCount.Add(interchangeId, 1);
                }

                try
                {
                    if (batchSize == 1)
                        return inMsg;

                    //Copies the file to the BatchCreator path
                    SendOutFile(bodyPart.GetOriginalDataStream(), batchNumber.ToString() + "_" + interchangeId);

                    //If all the messages have been received "sew" them back together
                    if (BCMessageCount[interchangeId] >= batchSize)
                    {
                        outboundStream = Aggregate(_BatchCreatorFolder, batchSize, interchangeId, _RepeatingNodeName);

                        //Remove entry
                        BCMessageCount.Remove(interchangeId);
                        batchComplete = true;
                    }
                }
                catch (Exception exception)
                {
                    EventLog.WriteEntry("BatchCreator", exception.Message, EventLogEntryType.Error);
                }
            }

            if (batchComplete)
            {
                outboundStream.Position = 0;
                bodyPart.Data = outboundStream;
                pc.ResourceTracker.AddResource(outboundStream);
                return inMsg;
            }
            else
            {
                return null;
            }
        }

public MemoryStream Aggregate(string inboundPath, int batchSize, string interchangeId, string repeatingNode)
        {
            Dictionary<string, NodeInfo> nodes = new Dictionary<string, NodeInfo>();

            bool writeRoot = true;
            MemoryStream outboundStream = new MemoryStream();
            StreamWriter writer = new StreamWriter(outboundStream);

            for (int messageNumber = 1; messageNumber <= batchSize; messageNumber++)
            {
                string filePath = @inboundPath + messageNumber.ToString() + "_" + interchangeId;

                FileStream fs = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
                StreamReader reader = new StreamReader(fs);

                try
                {
                    int currentChar;
                    string line = null;

                    while (reader.Peek() >= 0)
                    {
                        currentChar = reader.Read();

                        if (currentChar == (int)'>')
                        {
                            line = line + (char)currentChar;

                            //Ending node
                            if (line.Substring(1, 1) == "/")
                            {
                                string nodeName = line.Trim().Substring(2, line.Length - 3);

                                if (!nodes.ContainsKey(nodeName) || messageNumber == batchSize)
                                {
                                    writer.Write(line);
                                }
                            }
                            else //Starting node
                            {
                                string nodeName = line.Substring(1, line.IndexOf(' ') - 1);

                                if (!nodes.ContainsKey(nodeName))
                                {
                                    writer.Write(line);

                                    if (nodeName == repeatingNode)//Repeating node
                                    {
                                        writeRoot = false;
                                    }
                                    else if (writeRoot)//Writes out the starting elements
                                    {
                                        nodes.Add(nodeName, new NodeInfo(nodeName, line));
                                    }
                                }
                            }
                            line = null;
                            continue;
                        }

                        line = line + (char)currentChar;
                    }
                }
                catch (Exception exception)
                {
                    throw exception;
                }
                finally
                {
                    writer.Flush();
                    reader.DiscardBufferedData();
                    fs.Close();
                }
            }
            try
            {
                ////delete temp files
                for (int mn = 1; mn <= batchSize; mn++)
                {
                    string fp = @inboundPath + mn.ToString() + "_" + interchangeId;
                    File.Delete(fp);
                }
            }
            catch (IOException ex)
            {
                //We don't really care if the message is there...
            }

            return outboundStream;
        }






推荐答案


- 如果您不知道答案,请不要回答。我不想猜测或代码Natzi回复。 

-Don't answer if you don't know the answer. I don't want guesses or code Natzi replies. 

我认为你在那里设置的酒吧太高了。怎么会有人知道答案? ;)

I think you set the bar a smidge too high there. How would anyone just know the answer? ;)

无论如何,完全是另一条消息?重复?一条空信息?第一条消息?

Anyway, what exactly is this other message? A duplicate? An empty message? The first message?

为什么不能使用聚合器模式? 对不起,不得不问。

Why can't you use an Aggregator pattern?  Sorry, have to ask.


这篇关于自定义管道组件 - 从文件文件夹中读取多个消息会导致管道多次触发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆