Параллельный Линк

искусственный интеллект
  • Параллельный LINQ
  1. Параллельный запрос

.NET 4 включает в себя новый класс ParallelIEnumerable в пространстве имен System.Linq, который разбивает работу над запросом на несколько потоков. Хотя класс Enmerable определяет методы расширения для интерфейса IEnunerable,

Большинство методов расширения класса ParallelIEnumerable являются расширениями класса ParallelQuery. Важным исключением является метод AsParallel(), который расширяет интерфейс IEnumerable и возвращает класс ParallelQuery, так что обычные классы коллекций могут запрашиваться параллельно.

пример:

    const int arraySize = 100000000;

            var data = new int[arraySize];

            var r = new Random();

            for (int i = 0; i < arraySize; i++)

            {

                data[i] = r.Next(40);

            }

Теперь вы можете фильтровать данные с помощью запроса LINQ, чтобы получить сумму отфильтрованных данных. Запрос определяет фильтр с предложением where для выбора только тех элементов, чье соответствующее значение меньше 20, а затем вызывает метод суммирующей функции Sum(). Единственное отличие от предыдущего запроса LINQ заключается в том, что на этот раз вызывается метод AsParallel().

    var sum = (from x in data.AsParallel()

                                   where x < 20

                                   select x).Sum();

Как и в предыдущем запросе LINQ, компилятор изменяет синтаксис для вызова методов AsParallel(), Where(), Select() и Sum(). Метод AsParallel() определен в классе ParallelEnumerable для расширения интерфейса IEnumerable, поэтому вызывайте его для простых данных. Метод AsParallel() возвращает ParallelQuery. Из-за возвращаемого типа метод Where(), выбранный компилятором, является ParallelEnumerable.Where(), а не Enumerable.Where(). В приведенном ниже коде методы Select() и Sum() также относятся к классу ParallelEnumerable. В отличие от кода реализации класса Enumerable, в классе ParallelEnumerable запрос разбивается на разделы, чтобы его могли обрабатывать несколько потоков одновременно. Массивы можно разделить на разделы, где каждый раздел обрабатывается отдельным потоком для фильтрации оставшихся элементов. После того, как вы сделали свою работу с разделами, вам нужно объединить, чтобы получить сумму всех частей.

        var sum=data.AsParallel().Where(x=>x<20).Select(x=>x).Sum();

Выполнение этой строки кода запустит диспетчер задач, и вы увидите, что все процессоры системы заняты. Если вы удалите метод AsParallel(), использование нескольких процессоров будет невозможно. Конечно, если в вашей системе нет нескольких процессоров, вы не увидите улучшений по сравнению с параллельной версией.

  1. Разделитель

Метод AsParallel() не только расширяет интерфейс IEnumerable, но также расширяет класс Partition. Через него можно влиять на создаваемый раздел.

Класс Partitioner определяется пространством имен System.Collection.Concurrent и имеет различные варианты. Метод Create принимает массив или объект, реализующий класс IList. На основании этого вместе с параметром типа loadBalance и некоторыми перегруженными версиями этого метода возвращается другой тип Partitioner. Для массивов .Net4 включает классы DynamicPartitionerForArray и StaticPartitionerForArray, производные от абстрактного базового класса OrderablePartitioner.

var q1 = (from x in Partitioner.Create(data).AsParallel()

                      where x < 20

                     select x).Sum();

Вы также можете вызвать методы WithExecutionMode() и WithDegreeOfParallelism(), чтобы передать значение по умолчанию или значение ForceParallelism для ParallelExecutionMode. По умолчанию параллельный LINQ позволяет избежать дорогостоящего параллелизма. Для метода WithDegreeOfParallelism() можно передать целочисленное значение, чтобы указать максимальное количество задач для параллельного выполнения.

пример:

  const int arraySize = 100000000;

            var data = new int[arraySize];

            var r = new Random();

            for (int i = 0; i < arraySize; i++)

            {

                data[i] = r.Next(40);

            }

            Stopwatch watch = new Stopwatch();

            watch.Start();

//Способ записи, без добавления динамической балансировки нагрузки, время, необходимое для завершения выполнения, составляет 1300 миллисекунд

            var q1 = (from x in Partitioner.Create(data).AsParallel()

                      where x < 80

                     select x).Sum();

//Второй способ записи добавляет динамическую балансировку нагрузки, а время, необходимое для завершения выполнения, составляет 660 миллисекунд.

var q1 = (from x in Partitioner.Create(data,true).AsParallel()

                      where x < 80

                     select x).Sum();

            watch.Stop();

            Console.WriteLine(watch.ElapsedMilliseconds.ToString());

  1. Отмена

.Net предоставляет стандартный способ отмены длительных задач, который также работает с параллельным LINQ. Чтобы отменить длинный запрос, добавьте в запрос метод WithCancellation(), передав в качестве параметра CancellationToken. Маркеры CancellationToken создаются из класса CancellationTokenSource. Запрос выполняется в отдельном потоке, в котором перехватывается исключение типа OperationCancelException. Это исключение запускается, если запрос отменен. В основном потоке вызов метода Cancel() класса CancellationTokenSource может отменить задачу.

  const int arraySize = 100000000;

            var data = new int[arraySize];

            var r = new Random();

            for (int i = 0; i < arraySize; i++)

            {

                data[i] = r.Next(40);

            }

            var cts = new CancellationTokenSource();

            new Thread(() =>

                {

                    try

                    {

                        var sum = (from x in data.AsParallel().WithCancellation(cts.Token)

                                   where x < 80

                                   select x).Sum();

                        Console.WriteLine("query finished, sum: {0}", sum);

                    }

                    catch (OperationCanceledException ex)

                    {

                        Console.WriteLine(ex.Message);

                    }

                }).Start();

            Console.WriteLine("query started");

            Console.Write("cancel? ");

            int input = Console.Read();

            if (input == 'Y' || input == 'y')

            {

                // cancel!

                cts.Cancel();

               }