多线程任务以处理C#中的文件 [英] Multithreading task to process files in c#

查看:93
本文介绍了多线程任务以处理C#中的文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经阅读了很多有关线程的知识,但无法弄清楚如何找到解决问题的方法. 首先让我介绍一下这个问题.我有需要处理的文件.主机名和文件路径位于两个数组中.

I've been reading a lot about threading but can't figure out how to find a solution to my issue. First let me introduce the problem. I have files which need to be processed. The hostname and filepath are located in two arrays.


现在,我想设置多个线程来处理文件.创建的线程数基于三个因素:
A)在所有情况下,最大线程数不能超过唯一主机名的数量.
B)具有相同主机名必须的文件将被顺序处理.即我们不能同时处理 host1 _file1和 host1 _file2. (数据完整性将面临风险,这是我无法控制的.
C)用户可以限制可用于处理的线程数.线程数仍受上面条件A的限制.这完全是由于以下事实:如果我们有大量的主机,比如说50 ..我们可能不希望同时处理50个线程.


Now I want to setup several threads to process the files. The number of threads to create is based on three factors:
A) The maximum thread count cannot exceed the number of unique hostnames in all scenarios.
B) Files with the same hostname MUST be processed sequentially. I.E We cannot process host1_file1 and host1_file2 at the same time. (Data integrity will be put at risk and this is beyond my control.
C) The user may throttle the number of threads available for processing. The number of threads is still limited by condition A from above. This is purely due to the fact that if we had an large number of hosts let's say 50.. we might not want 50 threads processing at the same time.

在上面的示例中,最多可以创建6个线程.

In the example above a maximum of 6 threads can be created.

最佳处理例程如下所示.

The optimal processing routine is shown below.


public class file_prep_obj
{
    public string[] file_paths;
    public string[] hostname;
    public Dictionary<string, int> my_dictionary;

    public void get_files()
    {
        hostname = new string[]{ "host1", "host1", "host1", "host2", "host2", "host3", "host4","host4","host5","host6" };
        file_paths=new string[]{"C:\\host1_file1","C:\\host1_file2","C:\\host1_file3","C:\\host2_file1","C:\\host2_file2","C:\\host2_file2",
                                "C:\\host3_file1","C:\\host4_file1","C:\\host4_file2","C:\\host5_file1","C:\\host6_file1"};
        //The dictionary provides a count on the number of files that need to be processed for a particular host.
        my_dictionary = hostname.GroupBy(x => x)
                        .ToDictionary(g => g.Key,
                        g => g.Count());
    }
}

//This class contains a list of file_paths associated with the same host.
//The group_file_host_name will be the same for a host.
class host_file_thread
{
    public string[] group_file_paths;
    public string[] group_file_host_name;

    public void process_file(string file_path_in)
    {
        var time_delay_random=new Random();
        Console.WriteLine("Started processing File: " + file_path_in);
        Task.Delay(time_delay_random.Next(3000)+1000);
        Console.WriteLine("Completed processing File: " + file_path_in);
    }
}

class Program
{
    static void Main(string[] args)
    {
        file_prep_obj my_files=new file_prep_obj();
        my_files.get_files();
        //Create our host objects... my_files.my_dictionary.Count represents the max number of threads
        host_file_thread[] host_thread=new host_file_thread[my_files.my_dictionary.Count];

        int key_pair_count=0;
        int file_path_position=0;
        foreach (KeyValuePair<string, int> pair in my_files.my_dictionary)
        {
            host_thread[key_pair_count] = new host_file_thread();   //Initialise the host_file_thread object. Because we have an array of a customised object
            host_thread[key_pair_count].group_file_paths=new string[pair.Value];        //Initialise the group_file_paths
            host_thread[key_pair_count].group_file_host_name=new string[pair.Value];    //Initialise the group_file_host_name


            for(int j=0;j<pair.Value;j++)
            {
                host_thread[key_pair_count].group_file_host_name[j]=pair.Key.ToString();                        //Group the hosts
                host_thread[key_pair_count].group_file_paths[j]=my_files.file_paths[file_path_position];        //Group the file_paths
                file_path_position++;
            }
            key_pair_count++;
        }//Close foreach (KeyValuePair<string, int> pair in my_files.my_dictionary)

        //TODO PROCESS FILES USING host_thread objects. 
    }//Close static void Main(string[] args)
}//Close Class Program



我想我所需要的是有关如何按照上述规范对线程处理例程进行编码的指南.


I guess what I'm after is a guide on how to code the threaded processing routines that are in accordance with the specs above.

推荐答案

您可以使用Stephen Toub的

You can use Stephen Toub's ForEachAsync extension method to process the files. It allows you to specify how many concurrent threads you want to use, and it is non-blocking so it frees up your main thread to do other processing. Here is the method from the article:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}

为了使用它,我稍微重构了您的代码.我将字典更改为Dictionary<string, List<string>>类型,它基本上将主机作为键,然后将所有路径作为值.我以为文件路径中将包含主机名.

In order to use it I refactored your code slightly. I changed the dictionary to be of type Dictionary<string, List<string>> and it basically holds the host as the key and then all the paths as the values. I assumed the file path will contain the host name in it.

   my_dictionary = (from h in hostname
                    from f in file_paths
                    where f.Contains(h)
                    select new { Hostname = h, File = f }).GroupBy(x => x.Hostname)
                    .ToDictionary(x => x.Key, x => x.Select(s => s.File).Distinct().ToList());

我也将您的process_file方法更改为async,就像您在其中使用Task.Delay一样,您需要使用await,否则它不会执行任何操作.

I also changed your process_file method to be async as you were using Task.Delay inside it, which you need to await otherwise it doesn't do anything.

public static async Task process_file(string file_path_in)
{
    var time_delay_random = new Random();
    Console.WriteLine("Started:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId);
    await Task.Delay(time_delay_random.Next(3000) + 1000);
    Console.WriteLine("Completed:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId);
}

要使用该代码,您将获得要使用的最大线程数,并将其传递给my_files.my_dictionary.ForEachAsync.您还提供了一个异步委托,该委托可以处理特定主机的每个文件,并依次等待每个文件进行处理.

To use the code, you get the maximum number of threads you want to use and pass that to my_files.my_dictionary.ForEachAsync. You also supply an asynchronous delegate which processes each of the files for a particular host and sequentially awaits each one to be processed.

public static async Task MainAsync()
{
    var my_files = new file_prep_obj();
    my_files.get_files();

    const int userSuppliedMaxThread = 5;
    var maxThreads = Math.Min(userSuppliedMaxThread, my_files.my_dictionary.Values.Count());
    Console.WriteLine("MaxThreads = " + maxThreads);

    foreach (var pair in my_files.my_dictionary)
    {
        foreach (var path in pair.Value)
        {
            Console.WriteLine("Key= {0}, Value={1}", pair.Key, path);   
        }            
    }

    await my_files.my_dictionary.ForEachAsync(maxThreads, async (pair) =>
    {
        foreach (var path in pair.Value)
        {
            // serially process each path for a particular host.
            await process_file(path);
        }
    });

}

static void Main(string[] args)
{
    MainAsync().Wait();
    Console.ReadKey();

}//Close static void Main(string[] args)

输出

MaxThreads = 5
Key= host1, Value=C:\host1_file1
Key= host1, Value=C:\host1_file2
Key= host1, Value=C:\host1_file3
Key= host2, Value=C:\host2_file1
Key= host2, Value=C:\host2_file2
Key= host3, Value=C:\host3_file1
Key= host4, Value=C:\host4_file1
Key= host4, Value=C:\host4_file2
Key= host5, Value=C:\host5_file1
Key= host6, Value=C:\host6_file1
Started:C:\host1_file1 ThreadId:10
Started:C:\host2_file1 ThreadId:12
Started:C:\host3_file1 ThreadId:13
Started:C:\host4_file1 ThreadId:11
Started:C:\host5_file1 ThreadId:10
Completed:C:\host1_file1 ThreadId:13
Completed:C:\host2_file1 ThreadId:12
Started:C:\host1_file2 ThreadId:13
Started:C:\host2_file2 ThreadId:12
Completed:C:\host2_file2 ThreadId:11
Completed:C:\host1_file2 ThreadId:13
Started:C:\host6_file1 ThreadId:11
Started:C:\host1_file3 ThreadId:13
Completed:C:\host5_file1 ThreadId:11
Completed:C:\host4_file1 ThreadId:12
Completed:C:\host3_file1 ThreadId:13
Started:C:\host4_file2 ThreadId:12
Completed:C:\host1_file3 ThreadId:11
Completed:C:\host6_file1 ThreadId:13
Completed:C:\host4_file2 ThreadId:12

这篇关于多线程任务以处理C#中的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆