Mutlithreading任务来处理在C#中的文件 [英] Mutlithreading task to process files in c#

查看:177
本文介绍了Mutlithreading任务来处理在C#中的文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经读了很多有关线程,但无法弄清楚如何找到一个解决我的问题。
首先让我介绍一下这个问题。我有需要处理的文件。主机名和文件路径位于两个数组。

I've been reading a lot about threading but can't figure out how to find a solution to my issue. First let me introduce the problem. I have files which need to be processed. The hostname and filepath are located in two arrays.


结果现在,我想设置多个线程处理的文件。线程创建的数量是基于三个因素:搜索
A)的最大线程数的不能超过在所有方案中唯一的主机名的数量。结果
B)使用相同的主机名的文件必须顺序处理。即我们无法处理的主机1 _file1和主机1 是在同一时间_file2。 (数据完整性就会岌岌可危,这是我无法控制的。结果
C)的用户可以节流线程可进行处理的次数。线程的数量仍然是由A条件从上面的限制。这纯粹是由于这样的事实,如果我们有一个大量的主机,让我们说50 ..我们可能不希望50个线程在同一时间处理。


Now I want to setup several threads to process the files. The number of threads to create is based on three factors:
A) The maximum thread count cannot exceed the number of unique hostnames in all scenarios.
B) Files with the same hostname MUST be processed sequentially. I.E We cannot process host1_file1 and host1_file2 at the same time. (Data integrity will be put at risk and this is beyond my control.
C) The user may throttle the number of threads available for processing. The number of threads is still limited by condition A from above. This is purely due to the fact that if we had an large number of hosts let's say 50.. we might not want 50 threads processing at the same time.

在最多6个线程上面的例子可以被创建。

In the example above a maximum of 6 threads can be created.

的最佳处理程序如下所示。

The optimal processing routine is shown below.

结果

public class file_prep_obj
{
    public string[] file_paths;
    public string[] hostname;
    public Dictionary<string, int> my_dictionary;

    public void get_files()
    {
        hostname = new string[]{ "host1", "host1", "host1", "host2", "host2", "host3", "host4","host4","host5","host6" };
        file_paths=new string[]{"C:\\host1_file1","C:\\host1_file2","C:\\host1_file3","C:\\host2_file1","C:\\host2_file2","C:\\host2_file2",
                                "C:\\host3_file1","C:\\host4_file1","C:\\host4_file2","C:\\host5_file1","C:\\host6_file1"};
        //The dictionary provides a count on the number of files that need to be processed for a particular host.
        my_dictionary = hostname.GroupBy(x => x)
                        .ToDictionary(g => g.Key,
                        g => g.Count());
    }
}

//This class contains a list of file_paths associated with the same host.
//The group_file_host_name will be the same for a host.
class host_file_thread
{
    public string[] group_file_paths;
    public string[] group_file_host_name;

    public void process_file(string file_path_in)
    {
        var time_delay_random=new Random();
        Console.WriteLine("Started processing File: " + file_path_in);
        Task.Delay(time_delay_random.Next(3000)+1000);
        Console.WriteLine("Completed processing File: " + file_path_in);
    }
}

class Program
{
    static void Main(string[] args)
    {
        file_prep_obj my_files=new file_prep_obj();
        my_files.get_files();
        //Create our host objects... my_files.my_dictionary.Count represents the max number of threads
        host_file_thread[] host_thread=new host_file_thread[my_files.my_dictionary.Count];

        int key_pair_count=0;
        int file_path_position=0;
        foreach (KeyValuePair<string, int> pair in my_files.my_dictionary)
        {
            host_thread[key_pair_count] = new host_file_thread();   //Initialise the host_file_thread object. Because we have an array of a customised object
            host_thread[key_pair_count].group_file_paths=new string[pair.Value];        //Initialise the group_file_paths
            host_thread[key_pair_count].group_file_host_name=new string[pair.Value];    //Initialise the group_file_host_name


            for(int j=0;j<pair.Value;j++)
            {
                host_thread[key_pair_count].group_file_host_name[j]=pair.Key.ToString();                        //Group the hosts
                host_thread[key_pair_count].group_file_paths[j]=my_files.file_paths[file_path_position];        //Group the file_paths
                file_path_position++;
            }
            key_pair_count++;
        }//Close foreach (KeyValuePair<string, int> pair in my_files.my_dictionary)

        //TODO PROCESS FILES USING host_thread objects. 
    }//Close static void Main(string[] args)
}//Close Class Program

结果



我想我之后就如何code中的线程处理例程是按照上面的规范指南。


I guess what I'm after is a guide on how to code the threaded processing routines that are in accordance with the specs above.

推荐答案

您可以用斯蒂芬Toub的的 ForEachAsync 扩展方法来处理这​​些文件。它允许你指定你要多少并发线程的使用,并且它是非阻塞所以它可以释放你的主线程做其他处理。下面是文章的方法:

You can use Stephen Toub's ForEachAsync extension method to process the files. It allows you to specify how many concurrent threads you want to use, and it is non-blocking so it frees up your main thread to do other processing. Here is the method from the article:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}

为了使用它,我稍微重构你的code。我改变了字典中键入词典&LT的;字符串列表&LT;串&GT;&GT; 它基本上保持主机作为密钥,然后所有的路径作为值。我假设的文件路径将会在它包含的主机名。

In order to use it I refactored your code slightly. I changed the dictionary to be of type Dictionary<string, List<string>> and it basically holds the host as the key and then all the paths as the values. I assumed the file path will contain the host name in it.

   my_dictionary = (from h in hostname
                    from f in file_paths
                    where f.Contains(h)
                    select new { Hostname = h, File = f }).GroupBy(x => x.Hostname)
                    .ToDictionary(x => x.Key, x => x.Select(s => s.File).Distinct().ToList());

我也改变了你的 process_file 方法是异步如您在使用任务。延迟里面,你需要等待,否则它不会做任何事情。

I also changed your process_file method to be async as you were using Task.Delay inside it, which you need to await otherwise it doesn't do anything.

public static async Task process_file(string file_path_in)
{
    var time_delay_random = new Random();
    Console.WriteLine("Started:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId);
    await Task.Delay(time_delay_random.Next(3000) + 1000);
    Console.WriteLine("Completed:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId);
}

要使用code,你得到你想要使用并传递给 my_files.my_dictionary.ForEachAsync 线程的最大数量。还提供一个异步委托其处理每个文件的一个特定的主机,并依次等待每一个进行处理。

To use the code, you get the maximum number of threads you want to use and pass that to my_files.my_dictionary.ForEachAsync. You also supply an asynchronous delegate which processes each of the files for a particular host and sequentially awaits each one to be processed.

public static async Task MainAsync()
{
    var my_files = new file_prep_obj();
    my_files.get_files();

    const int userSuppliedMaxThread = 5;
    var maxThreads = Math.Min(userSuppliedMaxThread, my_files.my_dictionary.Values.Count());
    Console.WriteLine("MaxThreads = " + maxThreads);

    foreach (var pair in my_files.my_dictionary)
    {
        foreach (var path in pair.Value)
        {
            Console.WriteLine("Key= {0}, Value={1}", pair.Key, path);   
        }            
    }

    await my_files.my_dictionary.ForEachAsync(maxThreads, async (pair) =>
    {
        foreach (var path in pair.Value)
        {
            // serially process each path for a particular host.
            await process_file(path);
        }
    });

}

static void Main(string[] args)
{
    MainAsync().Wait();
    Console.ReadKey();

}//Close static void Main(string[] args)

输出继电器

MaxThreads = 5
Key= host1, Value=C:\host1_file1
Key= host1, Value=C:\host1_file2
Key= host1, Value=C:\host1_file3
Key= host2, Value=C:\host2_file1
Key= host2, Value=C:\host2_file2
Key= host3, Value=C:\host3_file1
Key= host4, Value=C:\host4_file1
Key= host4, Value=C:\host4_file2
Key= host5, Value=C:\host5_file1
Key= host6, Value=C:\host6_file1
Started:C:\host1_file1 ThreadId:10
Started:C:\host2_file1 ThreadId:12
Started:C:\host3_file1 ThreadId:13
Started:C:\host4_file1 ThreadId:11
Started:C:\host5_file1 ThreadId:10
Completed:C:\host1_file1 ThreadId:13
Completed:C:\host2_file1 ThreadId:12
Started:C:\host1_file2 ThreadId:13
Started:C:\host2_file2 ThreadId:12
Completed:C:\host2_file2 ThreadId:11
Completed:C:\host1_file2 ThreadId:13
Started:C:\host6_file1 ThreadId:11
Started:C:\host1_file3 ThreadId:13
Completed:C:\host5_file1 ThreadId:11
Completed:C:\host4_file1 ThreadId:12
Completed:C:\host3_file1 ThreadId:13
Started:C:\host4_file2 ThreadId:12
Completed:C:\host1_file3 ThreadId:11
Completed:C:\host6_file1 ThreadId:13
Completed:C:\host4_file2 ThreadId:12

这篇关于Mutlithreading任务来处理在C#中的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆