Параллельное программирование с...

80

Transcript of Параллельное программирование с...

Page 1: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!
Page 2: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Параллельное программирование сOmniThreadLibrary

Primož Gabrijelčič and Alex Egorov

This book is for sale at http://leanpub.com/omnithreadlibrary-ru

This version was published on 2013-01-28

This is a Leanpub book. Leanpub empowers authors and publishers with the Lean Publishing process.

Lean Publishing is the act of publishing an in-progress ebook using lightweight tools and many iterations toget reader feedback, pivot until you have the right book and build traction once you do.

To learn more about Lean Publishing, go to http://leanpub.com/manifesto.

To learn more about Leanpub, go to http://leanpub.com.

©2012 - 2013 Primož Gabrijelčič and Alex Egorov

Page 3: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Tweet This Book!Please help Primož Gabrijelčič and Alex Egorov by spreading the word about this book on Twitter!

The suggested hashtag for this book is #omnithreadlibraryru.

Find out what other people are saying about the book by clicking on this link to search for this hashtag onTwitter:

https://twitter.com/search/#omnithreadlibraryru

Page 4: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Also By Primož Gabrijelčič

Parallel Programming with OmniThreadLibrary

A Smart Book

Programación Paralela con OmniThreadLibrary

Page 5: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Оглавление

Образец Книги i

Благодарности ii

Перевод iii

Введение iv

Соглашения форматирования . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . v

В разработке . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . vi

Объявление . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . vii

Примечания к выпуску viii

1 Введение в OmniThreadLibrary 1

1.1 Требования . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 2

1.2 Лицензия . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 3

1.3 Установка . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 5

1.3.1 Установка пакета времени разработки . . . . . . . . . . . . . . . . . . . . . . . . 5

1.4 Зачем использовать OmniThreadLibrary? . . . . . . . . . . . . . . . . . . . . . . . . . . . 7

1.5 Задачи или потоки . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 8

2 Высокоуровневая многопоточность 9

2.1 Асинхронность (Async) . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 10

2.1.1 Обработка исключений . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 12

2.2 Async/Await . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 14

2.3 Для каждого (For Each) . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 15

2.3.1 Сотрудничество . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 16

2.3.2 Перебираем … . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 16

2.3.2.1 … Числовые диапазоны . . . . . . . . . . . . . . . . . . . . . . . . . . . 16

2.3.2.2 … Перечисляемые коллекции . . . . . . . . . . . . . . . . . . . . . . . . 17

2.3.2.3 … Потокобезопасные перечисляемые коллекции . . . . . . . . . . . . . 18

2.3.2.4 … Блокирующие коллекции . . . . . . . . . . . . . . . . . . . . . . . . . 19

Page 6: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

ОГЛАВЛЕНИЕ

2.3.2.5 … Всё . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 19

2.3.3 Обеспечение внешнего ввода . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 20

2.3.4 IOmniParallelLoop . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 22

2.3.5 Сохранение выходного порядка . . . . . . . . . . . . . . . . . . . . . . . . . . . . 25

2.3.6 Агрегация . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 26

2.3.7 Отмена . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 29

2.3.8 Инициализация и завершение задачи . . . . . . . . . . . . . . . . . . . . . . . . 30

2.3.9 Обработка исключений . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 31

2.3.10 Внутреннее устройство . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 31

2.3.10.1 Поставщик источника . . . . . . . . . . . . . . . . . . . . . . . . . . . . 37

2.3.10.2 Менеджер данных . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 39

2.3.10.3 Локальная очередь . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 41

2.3.10.4 Упорядочивание вывода . . . . . . . . . . . . . . . . . . . . . . . . . . . 42

2.3.11 Примеры . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 46

3 Синхронизация 47

3.1 Критические секции . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 48

3.1.1 IOmniCriticalSection . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 48

3.1.2 TOmniCS . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 49

3.1.3 Locked . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 51

3.1.3.1 Почему бы не использовать TMonitor? . . . . . . . . . . . . . . . . . . 53

4 Как сделать… (How to…) 55

4.1 Async/Await . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 57

4.2 Быстрая сортировка и параллельный максимум . . . . . . . . . . . . . . . . . . . . . . . 61

4.2.1 Быстрая сортировка . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 61

4.2.1.1 Параллельный максимум . . . . . . . . . . . . . . . . . . . . . . . . . . 63

5 Демонстрационные приложения 65

Page 7: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Образец КнигиЭто просто образец настоящей книги. Здесь включены отдельные части из некоторых глав.

Чтобы проверить состояние полной книги, посетите блог The Delphi Geek¹.

Вы можете купить книгу на сайте LeanPub².

¹http://thedelphigeek.org²http://leanpub.com/omnithreadlibrary

i

Page 8: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

БлагодарностиАвтор хотел бы поблагодарить Горазда Джернейка (Gorazd Jernejc, также известного как GJ) за еговклад в проект OmniThreadLibrary. Горазд, OTL не был бы таким без тебя!

Часть кода была предоставлена АнтономАлисовым (Anton Alisov) и ЛиНовером (Lee Nover). Спасибо!

Следующие программисты (в алфавитном порядке) помогли с отчётами и исправлением ошибок:ajasja, andi, Anton Alisov, dottor_jeckill, geskill, Hallvard Vassbotn, Jamie, M.J. Brandt, Mason Wheeler,Mayjest, meishier, morlic, Passella, Qmodem, Unspoken, Zarko.

Огромная благодарность Пьеру ле Ричу (Pierre le Riche), который написал прекрасный менеджерпамяти FastMM³ и позволил мне включить его в состав дистрибутива.

Обложка (c) Дейв Гингрич (Dave Gingrich), http://www.flickr.com/photos/ndanger/2744507570/

³http://sourceforge.net/projects/fastmm/

ii

Page 9: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

ПереводЭту книгу перевёл ЕгоровАлександр (Alex Egorov), большойпоклонник библиотекиOmniThreadLibrary.

О переводчике: Егоров Александр является Delphi разработчиком с стажем более 18 лет, с самойпервой версии этого языка Borland Delphi 1.0. Основная направленность в программировании -работа с принтерами и контроль печати. Основные программы: Print Censor (контроль печати) иRemote Queue Manager (продвинутый менеджер очереди печати), которые можно найти по адресуhttp://usefulsoft.com

Связаться с переводчиком можно по электронной почте: [email protected]

iii

Page 10: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

ВведениеЭта книга о OmniThreadLibrary⁴, многопоточной библиотеки для среды разработки EmbarcaderoDelphi.

В книге предполагается, что читатель имеет некоторое представление о многопоточном программи-ровании. Если вы новичок в многопоточности, то прекрасным введением в многопоточность будеткнига Мартина Харвея (Martin Harvey) Multithreading - The Delphi Way⁵. Эта книга выпушена давно,но она одна из лучших.

Более свежее описание возможностей многопоточности в Delphi доступно в книге Криса Роллистона(Chris Rolliston) Delphi XE2 Foundations, Part 3⁶, которая сейчас доступна на Амазоне (Amazon).

⁴http://otl.17slon.com⁵http://thaddy.co.uk/threads/⁶http://delphifoundations.com/

iv

Page 11: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Введение v

Соглашения форматирования

Эта книга описывает последний официальный релиз OmniThreadLibrary (т.е. последний доступныйв Google Code⁷).

Когда часть книги описывает некоторые другие версии, то в индексе [тэг версии] будет указана со-ответствующая версия или версии. Номера версий (напр. 2.1) используются для старых релизов, аномера ревизий SVN (напр. r1184) используются для функциональности, которая была добавленапосле выхода последнего официального релиза.

Одиночная версия или номер релиза (напр. [r1184]) означает, что тема была представлена в этой версии,и что это по-прежнему поддерживается в текущем релизе.

Диапазон версий (напр. [1.0-1.1]) означает, что тема была представлена в первой версии указанногодиапазона и поддерживалась до второй версии диапазона. После этого поддержка этой темы былаудалена или изменена так, что был добавлен дополнительный раздел в описании функциональныхвозможностей.

⁷https://code.google.com/p/omnithreadlibrary/downloads/list

Page 12: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Введение vi

В разработке

Эта книга находится в стадии разработки - она публикуется по мере написания. Когда вы покупаетеэту книгу, то вы покупаете её не только ‘как есть’ - в соответствии с Lean Publishing Manifesto⁸(Манифест Издательства Lean) вы получите все последующие версии книги бесплатно, навсегда.

Для проверки статуса книги и для того, чтобы повлиять на написание книги с помощью голосованияпо значимости тех или иных тем, посетите блог The Delphi Geek⁹.

⁸http://leanpub.com/manifesto⁹http://thedelphigeek.com

Page 13: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Введение vii

Объявление

Я преподаю различные темы Delphi и Smart Mobile Studio. Я также доступен для консультаций поDelphi или Smart Mobile Studio. Подробности на http://www.glagolite.si/training.

(Примечание переводчика: обращаться нужно на английском языке).

Page 14: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Примечания к выпуску2013-01-27

• Завершена глава “Синхронизация”.

2012-10-08

• Адаптировано для релиза 3.02.

– Документированы инициализатор и финализатор фонового рабочего.– Документированы Async/Await.

• Исправлены некоторые проблемы, найденные [jachguate].• Некоторые исправления перевода и добавлен перевод некоторых комментариев в коде.

2012-09-04

• Завершена глава “Как сделать”.• Добавлены “Примечания к выпуску”.• Исправлены изображения и переносы строк в главе “Высокоуровневая многопоточность”.

2012-07-17

• Первый выпуск.

viii

Page 15: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

1 Введение в OmniThreadLibraryOmniThreadLibrary¹ является многопоточной библиотекой для Delphi, написанной главным образомавтором этой книги (см. Благодарности, где перечислен весь список участников). OmniThreadLibraryможно условно разделить на три части. Во-первых, существуют строительные блоки, которые могутбыть использованы либо с потоковыми помощниками OmniThreadLibrary или с любыми другимипотоковыми подходами (с TThread из Delphi или с AsyncCalls²). Большинство этих строительных бло-ков описаны в главе Разное, а некоторые части освещены в других разделах книги: (Неблокируемыеколлекции, Блокирующие коллекции, Синхронизация).

Во-вторых, OmniThreadLibrary привносит фреймворк низкоуровневой многопоточности, которыйможет рассматриваться как окружение, описывающее класс TThread. Этот фреймворк упрощаетпередачу сообщений из фоновых потоков, запуск фоновых задач, использование пула потоков имногое другое.

В-третьих, OmniThreadLibrary представляет концепцию высокоуровневой многопоточности. Высоко-уровневый фреймворк содержит несколько готовых решения (так называемые абстракции; pipeline,fork/join …), которые можно использовать в своём коде. Идея состоит в том, что пользователю нужнопросто выбрать подходящую абстракцию и написать рабочий код, а библиотека OmniThreadLibraryслужит основой, которая реализует сложные многопоточные части, заботится о синхронизации и такдалее.

¹http://otl.17slon.com²http://andy.jgknet.de/blog/?page_id=100

1

Page 16: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Введение в OmniThreadLibrary 2

1.1 Требования

OmniThreadLibrary требует, по крайней мере, Delphi 2007 и не работает с FreePascal. Причиной этогоявляется то, что большая часть OmniThreadLibrary использует языковые конструкции, которые ещёне поддерживаются компилятором FreePascal.

Фреймворк высокоуровневой многопоточности требует, по крайней мере, Delphi 2009.

В настоящее время OmniThreadLibrary предназначена только для установки на Windows системы.Поддерживаются 32-х и 64-х битные платформы.

Page 17: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Введение в OmniThreadLibrary 3

1.2 Лицензия

OmniThreadLibrary является библиотекой с открытым исходным кодом с лицензией OpenBSD.

Это программное обеспечение распространяется под лицензией BSD.

Copyright (c) 2012, Primoz Gabrijelcic

Все права защищены.

Распространение и использование в виде исходных кодов, с изменениями или без,допускается при соблюдении следующих условий:

• При повторном распространении исходного кода должно оставаться указанное вы-ше уведомление об авторском праве, этот список условий и последующий отказ отгарантий.

• При повторном распространении бинарного кода должно оставаться указанноевыше уведомление об авторском праве, этот список условий и последующий отказот гарантий в документации и/или других материалах, поставляемых при распро-странении.

• Имя Primoz Gabrijelcic не может быть использовано для рекламы продуктов, полу-ченныхиспользуя это программное обеспечение без предварительного письменногоразрешения.

ЭТО ПРОГРАММНОЕ ОБЕСПЕЧЕНИЕ ПРЕДОСТАВЛЕНО ВЛАДЕЛЬЦЕМ АВТОРСКИХПРАВ”КАК ЕСТЬ” И ЛЮБЫЕ ЯВНЫЕ ИЛИ ПОДРАЗУМЕВАЕМЫЕ ГАРАНТИИ, ВКЛЮ-ЧАЯ, НОНЕОГРАНИЧИВАЯСЬ, ГАРАНТИИПРИГОДНОСТИДЛЯОПРЕДЕЛЁННОЙЦЕ-ЛИНЕПРЕДОСТАВЛЯЮТСЯ. НИПРИКАКИХОБСТОЯТЕЛЬСТВАХВЛАДЕЛЕЦАВТОР-СКИХ ПРАВ НЕ НЕСЁТ ОТВЕТСТВЕННОСТИ ЗА ПРЯМЫЕ, НЕПРЯМЫЕ, СПЕЦИАЛЬ-НЫЕ, ШТРАФНЫЕ ИЛИ КОСВЕННЫЕ ПОТЕРИ (ВКЛЮЧАЯ, НО НЕ ОГРАНИЧИВАЯСЬ,ПРИОБРЕТЕНИЕ ИЛИ ЗАМЕНУ ТОВАРА ИЛИ УСЛУГ, ПОТЕРЮ ДАННЫХИЛИ ПРИБЫ-ЛИ, ПРИОСТАНОВЛЕНИЕ БИЗНЕСА), ВОЗНИКШИЕ ПО КАКОЙ-ЛИБО ПРИЧИНЕ, ВЫ-ЗВАННЫЕИВЛЮБОЙТЕОРИИОТВЕТСТВЕННОСТИ,НЕЗАВИСИМООТКОНТРАКТА,СТРОГОЙОТВЕТСТВЕННОСТИ,ИЛИПРАВОНАРУШЕНИЯ (ВКЛЮЧАЯНЕБРЕЖНОСТЬИЛИ ИНОЕ), ВОЗНИКАЮЩИЕ В РЕЗУЛЬТАТЕ ИСПОЛЬЗОВАНИЯ ДАННОГО ПРО-ГРАММНОГО ОБЕСПЕЧЕНИЯ, ДАЖЕ ВОЗМОЖНОСТИ ТАКОГО УЩЕРБА.

Короче говоря, это означает, что:

1. Вы можете использовать библиотеку в любом проекте, бесплатном, с открытым исходнымкодом или коммерческом, без необходимости упоминать моё имя или название библиотекив любом месте проекта, документации или на веб-сайте.

2. Вы можете изменять исходные коды для вашего собственного использования. Вы также можетепоместить модифицированную версию в Интернете, но вы не должны удалять моё имя илилицензию из исходного кода.

3. Я не виноват, если программное обеспечение ударит вас лицом в грязь. Помните, что выполучили OmniThreadLibrary бесплатно.

Page 18: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Введение в OmniThreadLibrary 4

В случает, если ваша компания хотела бы получить договор на поддержку OmniThreadLibrary,пожалуйста свяжитесь со мной.

Page 19: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Введение в OmniThreadLibrary 5

1.3 Установка

1. Загрузите последнюю стабильную редакцию из Google Code³ или проверьте сборку SVN⁴. Какправило, безопасно брать код из сборки, как только проверенный код зафиксирован. [Сказавэто, я должен признать, что время от времени туда пробираются ошибки, но они быстроустраняются].

2. Если вы загрузили последнюю стабильную версию, распакуйте её в папку.3. Добавьте папку, в которую вы распаковали последнюю стабильную версию или сборку SVN,

в Delphi Library path. Кроме того, добавьте подпапку src в Library path. В случае, если выуже используете модули из моего проекта GpDelphiUnits⁵, вы можете игнорировать копии вподпапке src и использовать версию GpDelphiUnits.

4. Добавьте необходимые модули в объявлении uses и начните использовать библиотеку!

1.3.1 Установка пакета времени разработки

OmniThreadLibrary включает в себя компонент времени разработки (TOmniEventMonitor), которыйможет быть использован для получения сообщений, отправленных из фоновых задач и контролиролясоздания/уничтожения потоков. Он используется в некоторых демонстрационных приложениях.

Чтобы скомпилировать и установить пакет, содержащий этот компонент, выполните следующиедействия:

• Из Delphi, откройте подпапку packages в установочной папке OmniThreadLibrary и выберитефайлOmniThreadLibraryPackages{VER}.groupproj (где {VER} указывает версиюDelphi, которую выиспользуете; На момент написания {VER} может быть 2007, 2009, 2010, XE, XE2 или XE3).

• В окне менеджера проектов вы найдёте два проекта - OmniThreadLibraryRuntime{VER}.bpl иOmniThreadLibraryDesigntime{VER}.bpl. Если окноменеджера проектов не видно, выберите пунктменю View, Project Manager.

³http://code.google.com/p/omnithreadlibrary/downloads/list⁴http://code.google.com/p/omnithreadlibrary/source/checkout⁵http://code.google.com/p/gpdelphiunits/

Page 20: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Введение в OmniThreadLibrary 6

• Щёлкните правой кнопкой мыши на OmniThreadLibraryRuntime{VER}.bpl и выберите Build извсплывающего меню.

• Щёлкните правой кнопкой мыши наOmniThreadLibraryDesigntime{VER}.bpl и выберите Build извсплывающего меню.

• Щёлкните снова правой кнопкой мыши на OmniThreadLibraryDesigntime{VER}.bpl и выберитеInstall из всплывающего меню.

• Delphi сообщит, что компонент TOmniEventMonitor был установлен.

• Загройте группу проекта, используя меню File, Close All. Если Delphi спрашивает, хотите ли высохранить изменённые файлы, выберите No.

Вы должны повторить эти шаги при установке обновления OmniThreadLibrary.

Page 21: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Введение в OmniThreadLibrary 7

1.4 Зачем использовать OmniThreadLibrary?

OmniThreadLibrary подходит к потоковой проблеме с другой точки зрения, чем TThread. В то время,как родной подход Delphi ориентирован на создание и управление потоками на очень низком уровне,основное руководство по проектированию OmniThreadLibrary это: “Сделать работу программиста спотоками такой простой и свободной, насколько это только возможно”. Код в идеале должен избавитьвас от всех тягот, которые обычно связаны с многопоточностью.

OmniThreadLibrary была разработана, чтобы сделать “VCL для многопоточности” - библиотека, кото-рая сделает типичные задачи многопоточности очень простыми, но все же позволяет вам работатьс многопоточностью на уровне операционной системы. Хотя по-прежнему позволяя низкоуровневоеобращение, OmniThreadLibrary позволяет работать большую часть времени на более высоком уровнеабстракций.

Есть два важных отличия между TThread и OmniThreadLibrary, оба объясняются далее в этой главе.Во-первых, OmniThreadLibrary фокусируется на задачах, а не потоках и второе, в OmniThreadLibraryсообщения пытаются заменить блокирование когда это возможно.

Перемещая наиболее критический многопоточный код в повторно используемые компоненты (клас-сы и абстракции высокого уровня), OmniThreadLibrary позволяет писать хороший многопоточныйкод быстрее.

Page 22: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Введение в OmniThreadLibrary 8

1.5 Задачи или потоки

В OmniThreadLibrary Вы создаёте не потоки, а задачи. Задача может быть выполнена в новом потокеили в уже существующем, взятом из пула потоков.

Задача создаётся с помощью функции CreateTask, которая принимает в качестве параметра глобаль-ную процедуру, метод или экземпляр класса TOmniWorker (или, обычно, наследника этого класса)или анонимный метод (в Delphi 2009 и выше). CreateTask возвращает интерфейс IOmniTaskControl,который может быть использован для контроля задачи. Задача всегда создаётся в приостановленномсостоянии и вы должны вызвать Run, чтобы активировать её (или Schedule, чтобы запустить её в пулепотоков).

Задача имеет доступ к интерфейсу IOmniTask и может использовать его для связи с владельцем (частьпрограммы, которая стартовала задачу). Оба интерфейса подробно объясняются в главе Низкоуров-невая многопоточность.

Различия между задачей и потоком могут быть изложены в нескольких простых пунктах:

Задача является частью кода, который должен быть выполнен.

Поток является средой выполнения.

Вы заботитесь о задаче, а OmniThreadLibrary заботится о потоке.

Page 23: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

2 Высокоуровневая многопоточностьПосмотрим правде в глаза - многопоточное программирование сложно. Трудно разработать многопо-точную программу, трудно написать и протестировать её, и очень трудно отладить её. Чтобы решитьэту проблему OmniThreadLibrary вводит ряд готовых многопоточных решений, так называемыеабстракции.

Идея высокоуровневых абстракций состоит в том, что пользователю нужно просто выбрать подхо-дящую абстракцию и написать рабочий код, а OmniThreadLibrary будет служить основой, котораяреализует сложные многопоточные части, позаботиться о синхронизации и так далее.

9

Page 24: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 10

2.1 Асинхронность (Async)

Абстракция Асинхронность (Async) это простейший уровень высокоуровневой абстракции и обычноиспользуется в сценариях “выстрелил и забыл”. Для создания асинхронных задач нужно вызватьParallel.Async.

..

Когда вы вызываете Parallel.Async, код запускается в новом потоке (выделенная жирнымвертикальная линия) и как основной, так и фоновый потоки продолжат своё выполнение. Вкакой-то момент фоновая задача завершает своё выполнение и исчезает.

См. также демо 46_Async.

Page 25: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 11

Пример:

1 Parallel.Async(

2 procedure

3 begin

4 MessageBeep($FFFFFFFF);

5 end);

Эта простая программа создаёт задачу в фоновом потоке с единственной целью, чтобы издать из этойзадачи некоторый звук. Задача написана как анонимныйметод, но вы также можете использовать длянаписания задачи обычный метод или обычную процедуру.

Класс Parallel определяет два перегруженных метода Async. Первый принимает параметр - фоновуюзадачу и опционально блок конфигурации задачи, второй принимает фоновую задачу с параметромIOmniTask и опционально блок конфигурации задачи.

1 type

2 TOmniTaskDelegate = reference to procedure(const task: IOmniTask);

3

4 Parallel = class

5 class procedure Async(task: TProc;

6 taskConfig: IOmniTaskConfig = nil); overload;

7 class procedure Async(task: TOmniTaskDelegate;

8 taskConfig: IOmniTaskConfig = nil); overload;

9 ...

10 end;

Вторая форма полезна, если фоновому коду необходим доступ к интерфейсу IOmniTask, например,для отправки сообщения владельцу или для выполнения кода в потоке владельца (обычно это будетосновной поток).

В приведённом ниже примере используется асинхронная задача для извлечения содержимого веб-страницы (используя таинственный вызов функции HttpGet) и затем использует метод Invoke длявыполнения кода, который записывает длину результата в основной поток.

Page 26: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 12

1 Parallel.Async(

2 procedure (const task: IOmniTask)

3 var

4 page: string;

5 begin

6 HttpGet('otl.17slon.com', 80, 'tutorials.htm', page, '');

7 task.Invoke(

8 procedure

9 begin

10 lbLogAsync.Items.Add(Format('Async GET: %d ms; page length = %d',

11 [time, Length(page)]))

12 end);

13 end);

Тот же результат может быть достигнут путём отправки сообщения из фонового потока в основной.TaskConfig блок используется для настройки обработчика сообщений.

1 const

2 WM_RESULT = WM_USER;

3

4 procedure LogResult(const task: IOmniTaskControl; const msg: TOmniMessage);

5 begin

6 lbLogAsync.Items.Add(Format('Async GET: %d ms; page length = %d',

7 [time, Length(page)]))

8 end;

9

10 Parallel.Async(

11 procedure (const task: IOmniTask)

12 var

13 page: string;

14 begin

15 HttpGet('otl.17slon.com', 80, 'tutorials.htm', page, '');

16 task.Comm.Send(WM_RESULT, page);

17 end,

18 TaskConfig.OnMessage(WM_RESULT, LogResult)

19 );

Позвольте мне предупредить Вас, что в случае, если вы хотите вернуть результат из фоновой задачи,абстракция Async не является наиболее подходящей. Для этого будет лучше использовать абстракциюБудущее (Future).

2.1.1 Обработка исключений

Если фоновый код вызывает необработанное исключение, то OmniThreadLibrary перехватит этоисключение и сгенерирует его в обработчике OnTerminated. Таким образом исключение может бытьперемещено из фонового потока в основной поток, в котором может быть обработано.

Page 27: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 13

Вызов обработчика OnTerminated происходит в неопределённый момент, когда Windows обрабаты-вает сообщения окон, поэтому не очень хорошо ловить это сообщение, используя блок try..except.Вызывающий вместо этого должен установить свой собственный обработчик OnTerminated и обраба-тывать исключения там.

В следующем примере используется обработчик OnTerminated для того, чтобы отделить фатальноеисключение от задачи, записать детали исключение и уничтожить объект исключения.

1 Parallel.Async(

2 procedure

3 begin

4 Sleep(1000);

5 raise Exception.Create('Exception in Async');

6 end,

7 Parallel.TaskConfig.OnTerminated(

8 procedure (const task: IOmniTaskControl)

9 var

10 excp: Exception;

11 begin

12 if assigned(task.FatalException) then begin

13 excp := task.DetachException;

14 Log('Caught async exception %s:%s',[excp.ClassName, excp.Message]);

15 FreeAndNil(excp);

16 end;

17 end

18 ));

Если вы не установите обработчик OnTerminated, то исключение будет обработано фильтром уровняприложения, который по умолчанию будет вызывать появление окна с сообщением.

См. также демо 48_OtlParallelExceptions.

Page 28: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 14

2.2 Async/Await

јсинхронность/ќжидание (Async/Await) ¤вл¤етс¤ упрощЄнной версией абстракции Async, котора¤имитирует механизм .NET Async/Await¹.²

 ороче говор¤, Async/Await принимает параметрами 2 анонимных метода. ѕервый выполн¤етс¤ вфоновом режиме, а второй выполн¤етс¤ в главном потоке после завершени¤ работы фонового потока.

—м. также демо 53_AsyncAwait.

»спользу¤ Async/Await, вы можете, например, создать фоновую операцию, котора¤ запускаетс¤щелчком мыши и котора¤ повторно активирует кнопку после завершени¤ работы фоновой задачи.

1 procedure TForm1.Button1Click(Sender: TObject);

2 var

3 button: TButton;

4 begin

5 button := Sender as TButton;

6 button.Caption := 'Working ...';

7 button.Enabled := false;

8 Async(

9 // выполн¤етс¤ в фоновом потоке

10 procedure begin

11 Sleep(5000);

12 end).

13 Await(

14 // выполн¤етс¤ в главном потоке после

15 // того как анонимного метода передал

16 // Async завершение своей работы

17 procedure begin

18 button.Enabled := true;

19 button.Caption := 'Done!';

20 end);

21 end;

..

—уществует лишь небольшое изменение между Async и Async/Await. ѕервыйвызываетс¤ Parallel.Async, а второй вызываетс¤ Async без префикса.

»сключени¤ в части Async в насто¤щее врем¤ не обрабатываютс¤ с помощью OmniThreadLibrary.

¹http://blogs.msdn.com/b/pfxteam/archive/2012/04/12/10293335.aspx²”знайте больше на http://www.thedelphigeek.com/2012/07/asyncawait-in-delphi.html.

Page 29: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 15

2.3 Для каждого (For Each)

Абстракция Для каждого (For Each) создаёт параллельный цикл for, который перебирает диапазонданных (численный диапазон, списки, очереди, наборы данных …) в несколько потоков. Чтобысоздать параллельный цикл вызовите Parallel.ForEach.

..

Если вы используете Parallel.ForEach, то OmniThreadLibrary запускает несколько фоновыхзадач и связывает их с источником данных через механизм сериализации. Вывод дополни-тельно сортируется в соответствии с порядком входных данных. По умолчанию, ForEachждёт окончания всех фоновых потоков перед возратом управления вызвавшему.

См. также демо 35_ParallelFor и 36_OrderedFor.

Page 30: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 16

Пример:

1 PrimeCount.Value := 0;

2 Parallel.ForEach(1, 1000000).Execute(

3 procedure (const value: integer)

4 begin

5 if IsPrime(value) then

6 PrimeCount.Increment;

7 end;

8 end);

Эта программа вычисляет количество простых чисел в диапазоне от одного до миллиона. ОбъектPrimeCount должен иметь атомарный (потокобезопасный) инкремент, который очень просто полу-чить используя запись TGp4AlignedInt. Задача ForEach кодируется как анонимный метод, но вы такжеможете использовать для этого обычный метод или обычную процедуру.

2.3.1 Сотрудничество

Суть абстракции ForEach это сотрудничество между параллельными задачами. ForEach делает всёвозможное, чтобы свести к минимуму возможные столкновения между потоками при доступе кисточнику данных. За исключением особых случаев (числовой диапазон, IOmniBlockingCollection),могут использоваться не потокобезопасные и блокируемые источники данных.

Чтобы минимизировать блокирование, исходные данные выделяются рабочим задачам блоками.ForEach создаёт объект поставщика данных, который будет иметь доступ к источнику данных впотоке потокобезопасным способом. Этот поставщик данных всегда гарантирует возвращение блокаданных соответствующего размера (размер будет зависеть от количества задач, типа исходныхданных и других факторов), когда он требуются задаче.

Поскольку исходные данные выделяются блоками, не исключено, что одна из задач окончит вы-числения в то время как другие задачи будут по-прежнему заняты. В этом случае задача будетворовать данные из этой задачи, которая еще не окончила вычисления. Такой подход делает всезадачи загруженными насколько это возможно, чтобы минимизировать простои.

Детали этого процесса обсуждаются в разделе Внутреннее устройство ниже.

2.3.2 Перебираем …

Класс Parallel определяет много перегруженных ForEach, каждая из которых поддерживает различ-ные типы контейнеров. Мы рассмотрим их более подробно в следующих разделах.

2.3.2.1 … Числовые диапазоны

Для перебора диапазона передаётся первый и последний индекс в вызов ForEach. Опционально, выможете передать параметр шаг, который по умолчанию равен 1. ForEach будет перебирать с первогодо последнего с шагом инкремента.

Page 31: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 17

1 class function ForEach(low, high: integer; step: integer = 1):

2 IOmniParallelLoop<integer>; overload;

Псевдокод для числового ForEach может быть записан так.

1 i := low;

2 while ((step > 0) and (i <= high)) or

3 ((step < 0) and (i >= high)) do

4 begin

5 // обработать 'i' параллельно

6 if low < high then Inc(i, step)

7 else Dec(i, step);

8 end;

2.3.2.2 … Перечисляемые коллекции

Если вам нужно перебрать коллекцию (скажем, TStringList), то вы можете сделать это двумяспособами.

Одним из них является использование эквивалента for i := 0 to sl.Count-1 do Something(sl[i]).

1 Parallel.ForEach(0, sl.Count-1).Execute(

2 procedure (const value: integer)

3 begin

4 Something(sl[value]);

5 end);

Другой способ заключается в использовании эквивалента for s in sl do Something(s).

1 Parallel.ForEach(sl).Execute(

2 procedure (const value: TOmniValue)

3 begin

4 Something(value);

5 end);

Во втором примере значение передаётся функции как параметр TOmniValue. В приведённом вышепримере оно будет автоматически преобразовано в строку, но иногда вам придётся это делатьвручную, с помощью вызова value.AsString (или использовать другие соответствующие функции,когда перебираете различные контейнеры).

Вариацией второхо подходаможно назвать ForEach, когда контейнер содержит строки. OmniThreadLibraryбудет делать преобразования за вас.

Page 32: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 18

1 Parallel.ForEach<string>(sl).Execute(

2 procedure (const value: string)

3 begin

4 Something(value);

5 end);

Вы можете задаться вопросом, какой из этих подходов лучше. Ответ на этот вопрос зависит от того,можно ли одновременно обращаться к различным элементам контейнера одновременно из разныхпотоков. Другими словами, вы должны знать, является ли контейнер потокобезопасным для чтения.К счастью все важные контейнеры Delphi (TList, TObjectList, TStringList) попадают в это категорию.

Если контейнер является потокобезопасным для чтения, то числовой подход (ForEach(0, sl.Count-1))является гораздо быстрее, чем подход for..in (ForEach(sl)). Разница в скорости происходит из-заблокировки - в первом примере ForEach никогда не блокирует ничего, а в во втором примереблокирование используется для синхронизации доступа к контейнеру.

Однако, если контейнер не потокобезопасный для чтения, вы должны использовать второй подход.

Существуют три способа перебора перечислимых контейнеров. При вызове ForEach вы можетеиспользовать интерфейс IEnumerable, с интерфейсом IEnumerator или с самой перечислимой кол-лекцией. В последнем случае, OmniThreadLibrary будет использовать RTTI для доступа к итераторудля коллекции. Чтобы это работало, итератор сам должен быть реализован как объект, а не как записьили интерфейс. К счастью большинство, если не все итераторы VCL реализованы таким образом.

1 class function ForEach(const enumerable: IEnumerable):

2 IOmniParallelLoop; overload;

3 class function ForEach(const enum: IEnumerator):

4 IOmniParallelLoop; overload;

5 class function ForEach(const enumerable: TObject):

6 IOmniParallelLoop; overload;

7 class function ForEach<T>(const enumerable: IEnumerable):

8 IOmniParallelLoop<T>; overload;

9 class function ForEach<T>(const enum: IEnumerator):

10 IOmniParallelLoop<T>; overload;

11 class function ForEach<T>(const enumerable: TEnumerable<T>):

12 IOmniParallelLoop<T>; overload;

13 class function ForEach<T>(const enum: TEnumerator<T>):

14 IOmniParallelLoop<T>; overload;

15 class function ForEach<T>(const enumerable: TObject):

16 IOmniParallelLoop<T>; overload;

2.3.2.3 … Потокобезопасные перечисляемые коллекции

Перечисление коллекций использует блокировку для синхронизации доступа к итератору кол-лекции, которая замедляет процесс перечисления. В некоторых особых случаях коллекция можетперечисляться без блокирования. Для перечисления таких коллекций они должны реализовывать ин-терфейсы IOmniValueEnumerable и IOmniValueEnumerator, которые определены в модуле OtlCommon.

Page 33: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 19

1 class function ForEach(const enumerable: IOmniValueEnumerable):

2 IOmniParallelLoop; overload;

3 class function ForEach(const enum: IOmniValueEnumerator):

4 IOmniParallelLoop; overload;

5 class function ForEach<T>(const enumerable: IOmniValueEnumerable):

6 IOmniParallelLoop<T>; overload;

7 class function ForEach<T>(const enum: IOmniValueEnumerator):

8 IOmniParallelLoop<T>; overload;

2.3.2.4 … Блокирующие коллекции

Для упрощения перечисления блокирующих коллекций, класс Parallel реализует две перегрузкиForEach принимающих блокирующие коллекции. Внутренне, блокирующая коллекция перечисляетсяс интерфейсом IOmniValueEnumerable.

1 class function ForEach(const source: IOmniBlockingCollection):

2 IOmniParallelLoop; overload;

3 class function ForEach<T>(const source: IOmniBlockingCollection):

4 IOmniParallelLoop<T>; overload;

2.3.2.5 … Всё

В крайнем случае, класс Parallel реализует три перегрузки ForEach, которые будут перечислятьлюбые данные (с некоторой помощью программиста).

Использование TOmniSourceProvider является мощным, но сложным.

1 class function ForEach(const sourceProvider: TOmniSourceProvider):

2 IOmniParallelLoop; overload;

Вы должны реализовать наследник класса TOmniSourceProvider. Все методы должны быть пото-кобезопасными. Для получения дополнительной информации об источнике данных см. секциюВнутреннее устройство ниже.

1 TOmniSourceProvider = class abstract

2 public

3 function Count: int64; virtual; abstract;

4 function CreateDataPackage: TOmniDataPackage; virtual; abstract;

5 function GetCapabilities: TOmniSourceProviderCapabilities;

6 virtual; abstract;

7 function GetPackage(dataCount: integer; package: TOmniDataPackage):

8 boolean; virtual; abstract;

9 function GetPackageSizeLimit: integer; virtual; abstract;

10 end;

Так как этот подход не для слабонервных, OmniThreadLibrary предоставляет более медленную, ногораздо более простую версию.

Page 34: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 20

1 class function ForEach(enumerator: TEnumeratorDelegate):

2 IOmniParallelLoop; overload;

3 class function ForEach<T>(enumerator: TEnumeratorDelegate<T>):

4 IOmniParallelLoop<T>; overload;

Здесь необходимо указать функцию, которая будет возвращать следующие данные, когда ForEach ихзапрашивает.

1 TEnumeratorDelegate = reference to function(var next: TOmniValue): boolean;

2 TEnumeratorDelegate<T> = reference to function(var next: T): boolean;

OmniThreadLibrary обеспечивает синхронизацию (блокирование), поэтому вы можете быть уверены,что этот метод будет вызываться только из одного потока в любой момент времени. Как вы можетеожидать, это замедлит ход событий, но распараллеливание всё-ещё может дать вам разумныйприрост производительности, если полезная нагрузка ForEach является существенной (например,если метод, который вы выполняете в цикле ForEach требует некоторое время для выполнения).

Функция TEnumeratorDelegate также может быть использована в качестве генератора. То есть онаможет вычислять значения, которые затем будут обрабатываться в параллельном цикле.

2.3.3 Обеспечение внешнего ввода

Иногда, особенно когда вы имеете дело с наборами данных, синхронизированного доступа будетне достаточно. Когда вы имеете дело с подключениями к базам данных, наборам данных и т.д., толегко может возникнуть проблема соответствия потоку - неспособность некоторых компонентовкорректно работать, если их вызывают из потока, отличающегося от потока их породившего.

[Всегда создавайте соединение с базой данных и набором данных в потоке, из которогобудете их использовать. Ваш код может работать без этой предосторожности, но дажеесли вы тщательно протестируете работу с компонентами базы данных в несколькихпотоках, вы не должны считать, что они будут работать правильно, если это условие невыполнено (инициализация и использование в том же потоке).]

В таком случае лучше всего обеспечить ввод напрямую из основного потока. Есть несколько способовдостижения этой цели.

1. Упаковать данные в другую коллекцию, которуюможно легко использовать в ForEach (TObjectList,TStringList, TOmniBlockingCollection).

2. Запустить ForEach в режиме NoWait, а затем записывать данные в входную очередь и после этогождать завершения цикла ForEach. Этот подход также полезен, если вы хотите использоватьForEach в фоновом режиме и предоставить ему данные из некоторых асинхронных обработ-чиков событий.

Пример второго подхода может прояснить эту ситуацию.

Page 35: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 21

1 uses

2 OtlCommon,

3 OtlCollections,

4 OtlParallel;

5

6 procedure Test;

7 var

8 i : integer;

9 input: IOmniBlockingCollection;

10 loop : IOmniParallelLoop<integer>;

11 wait : IOmniWaitableValue;

12 begin

13 // создание контейнера

14 input := TOmniBlockingCollection.Create;

15 // создание сигнала 'конец работы'

16 wait := CreateWaitableValue;

17 loop := Parallel.ForEach<integer>(input);

18 // создание метода завершения, который будет сигнализировать 'конец работ\

19 ы'

20 loop.OnStop(

21 procedure

22 begin

23 wait.Signal;

24 end);

25 // запуск параллельного цикла в режиме NoWait

26 loop.NoWait.Execute(

27 procedure (const value: integer)

28 begin

29 // делаем что-либо с входным значением

30 OutputDebugString(PChar(Format('%d', [value])));

31 end

32 );

33 // предоставление данных в параллельный цикл

34 for i := 1 to 1000 do

35 input.Add(i);

36 // сигнализировать в параллельный цикл, что больше нет данных для обработ\

37 ки

38 input.CompleteAdding;

39 // wait for the parallel for loop to stop

40 wait.WaitFor;

41 // уничтожить параллельный цикл

42 loop := nil;

43 end;

Page 36: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 22

2.3.4 IOmniParallelLoop

Parallel.ForEach возвращает интерфейс IOmniParallelLoop, который используется для настройки изапуска параллельного цикла.

1 IOmniParallelLoop = interface

2 function Aggregate(defaultAggregateValue: TOmniValue;

3 aggregator: TOmniAggregatorDelegate): IOmniParallelAggregatorLoop;

4 function AggregateSum: IOmniParallelAggregatorLoop;

5 procedure Execute(loopBody: TOmniIteratorDelegate); overload;

6 procedure Execute(loopBody: TOmniIteratorTaskDelegate); overload;

7 function CancelWith(const token: IOmniCancellationToken):

8 IOmniParallelLoop;

9 function Initialize(taskInitializer: TOmniTaskInitializerDelegate):

10 IOmniParallelInitializedLoop;

11 function Into(const queue: IOmniBlockingCollection):

12 IOmniParallelIntoLoop; overload;

13 function NoWait: IOmniParallelLoop;

14 function NumTasks(taskCount : integer): IOmniParallelLoop;

15 function OnMessage(eventDispatcher: TObject):

16 IOmniParallelLoop; overload; deprecated 'use TaskConfig';

17 function OnMessage(msgID: word; eventHandler: TOmniTaskMessageEvent):

18 IOmniParallelLoop; overload; deprecated 'use TaskConfig';

19 function OnMessage(msgID: word; eventHandler: TOmniOnMessageFunction):

20 IOmniParallelLoop; overload; deprecated 'use TaskConfig';

21 function OnTaskCreate(taskCreateDelegate: TOmniTaskCreateDelegate):

22 IOmniParallelLoop; overload;

23 function OnTaskCreate(taskCreateDelegate:

24 TOmniTaskControlCreateDelegate): IOmniParallelLoop; overload;

25 function OnStop(stopCode: TProc): IOmniParallelLoop;

26 function PreserveOrder: IOmniParallelLoop;

27 function TaskConfig(const config: IOmniTaskConfig): IOmniParallelLoop;

28 end;

ForEach<T> возвращает интерфейс IOmniParallelLoop<T>, который такой же, как и IOmniParallelLoop,только каждый его метод возвращает соответствующую <T> версию интерфейса.

Aggregate и AggregateSum используются для реализации агрегации. См. секцию Агрегация ниже.

Execute принимает блок кода, который будет выполнен для каждого значения входного контейнера.Поддерживаются две версии сигнатуры, обе с вариантом <T>. Одна принимает только значенияитераций, а вторая принимает параметр IOmniTask.

Page 37: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 23

1 TOmniIteratorDelegate = reference to procedure(const value: TOmniValue);

2 TOmniIteratorDelegate<T> = reference to procedure(const value: T);

3 TOmniIteratorTaskDelegate =

4 reference to procedure(const task: IOmniTask; const value: TOmniValue);

5 TOmniIteratorTaskDelegate<T> =

6 reference to procedure(const task: IOmniTask; const value: T);

CancelWith допускает механизм отмены.

С Initialize и OnTaskCreate можно инициализировать данные для каждой задачи перед началомисполнения задачи. См. раздел Инициализация задачи ниже.

Into устанавливает выходную очередь, см. Сохранение выходного порядка.

Если вы вызываете функцию NoWait, то параллельный цикл стартует в фоне и управление немедленнобудет возвращено в основной поток. Если NoWait не вызывается, то Execute возвратит управлениетолько после завершения работы всех задач.

Вызывая NumTasks, вы можете настроить количество работающих задач. По умолчанию число задачустановлено равное [число ядер, доступных для процесса] - 1, если используются модификаторыNoWait или PreserveOrder, и [число ядер, доступных для процесса] для всех других случаев.

Функция OnMessage является устаревшей, используйте вместо неё TaskConfig.

OnStop устанавливает обработчик завершения, который будет вызван после завершения работы всехпараллельных задач. Если вызвана функция NoWait, то OnStop будет вызвана из одного из рабочихпотоков. Однако, если функция NoWait не вызвана, OnStop будет вызываться из потока, создавшегоабстракцию ForEach. Такое поведение делает трудным выполнение VCL кода из OnStop и релиз 3.02вводит ещё один вариант делегата с параметром IOmniTask

1 TOmniTaskStopDelegate = reference to procedure (const task: IOmniTask);

2 IOmniParallelLoop = interface

3 function OnStop(stopCode: TOmniTaskStopDelegate): IOmniParallelLoop;

4 overload;

5 end;

Используя эту версию OnStop, обработчик завершения может вызывать task.Invoke для выполнениякода в главном потоке. Это, однако, требует от абстракции ForEach оставаться доступной покавыполняется код Invoke, так что вы должны хранить результат ForEach в глобальной переменой (полеформы, например) и уничтожать его только в обработчике завершения.

Page 38: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 24

1 var

2 loop: IOmniParallelLoop<integer>;

3

4 loop := Parallel.ForEach(1, N).NoWait;

5 loop.OnStop(

6 procedure (const task: IOmniTask)

7 begin

8 task.Invoke(

9 procedure

10 begin

11 // do anything

12 loop := nil;

13 end);

14 end);

15 loop.Execute(

16 procedure (const value: integer)

17 begin

18 ...

19 end);

PreserveOrder изменяет поведение параллельности так, что выходные значения создаются в порядке,соответствующем значениям входным данным. См. раздел Сохранение выходного порядка ниже.

TaskConfig устанавливает блок конфигурации задачи. Этот же блок конфигурации задачи будетиспользоваться для всех задач ForEach.

В следующем примере TaskConfig используется для установки обработчика сообщений, которыйбудет получать сообщения, отправленные из рабочих задач ForEach.

1 FParallel := Parallel.ForEach(1, 17)

2 .TaskConfig(Parallel.TaskConfig.OnMessage(Self))

3 .NoWait

4 .OnStop(procedure begin FParallel := nil; end);

5 FParallel

6 .Execute(

7 procedure (const task: IOmniTask; const value: integer)

8 begin

9 task.Comm.Send(WM_LOG, value);

10 end);

Сообщения, отправленные из рабочих задач, принимаются и управляются интерфейсом IOmniParallelLoop.Это требует, чтобы абстракция ForEach была доступной, пока обрабатываются сообщения, так что выдолжны хранить результат ForEach в глобальной переменной (поле формы, например) и уничтожатьеё только в обработчике OnStop.

Некоторые функции возвращают другой интерфейс. Как правило, он реализует только функциюExecute, принимающую параметры, отличающиеся от ‘нормальной’ Execute. Например, Aggregateвозвращает интерфейс IOmniParallelAggregatorLoop.

Page 39: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 25

1 TOmniIteratorIntoDelegate =

2 reference to procedure(const value: TOmniValue; var result: TOmniValue);

3

4 IOmniParallelAggregatorLoop = interface

5 function Execute(loopBody: TOmniIteratorIntoDelegate): TOmniValue;

6 end;

Эти варианты интерфейса IOmniParallelLoop будут описаны в последующих разделах.

2.3.5 Сохранение выходного порядка

Когда вы запускаете цикл ForEach, вы не можете сказать заранее, в каком порядке будут обработаныэлементы из входной коллекции. Например, следующий код будет генерировать все простые числаот 1 до CMaxPrime и записывать их в выходную очередь (primeQueue) в неопределённом порядке.

1 primeQueue := TOmniBlockingCollection.Create;

2 Parallel.ForEach(1, CMaxPrime).Execute(

3 procedure (const value: integer)

4 begin

5 if IsPrime(value) then begin

6 primeQueue.Add(value);

7 end;

8 end);

Иногда это будет представлять большую проблему, и вы будете должны написать функцию сорти-ровки, которая пересортирует выходных данные перед их дальнейшей обработкой. Чтобы решитьпроблему, IOmniParallelLoop реализует модификатор PreserveOrder. При его использовании, циклForEach самостоятельно упорядочит результат, полученный в результате работы задачи, переданнойв метод Execute.

Использование PreserveOrder, также заставит вас использовать метод Into, который возвращаетинтерфейс IOmniParallelIntoLoop. (Как вы можете ожидать, есть также версия <T> этого интерфейса.)

1 TOmniIteratorIntoDelegate =

2 reference to procedure(const value: TOmniValue; var result: TOmniValue);

3 TOmniIteratorIntoTaskDelegate =

4 reference to procedure(const task: IOmniTask; const value: TOmniValue;

5 var result: TOmniValue);

6

7 IOmniParallelIntoLoop = interface

8 procedure Execute(loopBody: TOmniIteratorIntoDelegate); overload;

9 procedure Execute(loopBody: TOmniIteratorIntoTaskDelegate); overload;

10 end;

Page 40: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 26

Как вы можете видеть, метод Execute в IOmniParallelIntoLoop принимает параметры, которыеотличаются от параметров ‘нормальной’ Execute. В связи с этим вам придётся изменить код, которыйпередаётся в Execute для возвращения результата.

1 primeQueue := TOmniBlockingCollection.Create;

2 Parallel.ForEach(1, CMaxPrime)

3 .PreserveOrder

4 .Into(primeQueue)

5 .Execute(

6 procedure (const value: integer; var res: TOmniValue)

7 begin

8 if IsPrime(value) then

9 res := value;

10 end);

При использовании PreserveOrder и Into, ForEach вызывает ваш рабочий код для каждого входногозначения. Если рабочий код присваивает выходному параметру (res) любое значение, оно будетвставлено во временный буфер. Затем произойдёт таинство (см. раздел Внутреннее устройство ниже),и как только соответствующее (отсортированное) значение становится доступным во временномбуфере, оно вставляется в выходную очередь (передаётся в параметре Into).

Вы также можете использовать Into без PreserveOrder. Это предоставит вам управление очередью,но не порядком.

2.3.6 Агрегация

Агрегирование позволяет собирать данные из параллельных задач и вычислять одно значение,которое возвращается пользователю.

Давайте начнём с примера - и очень не плохого! Следующий фрагмент кода пытается вычислитьколичество простых чисел между 1 и CMaxPrime.

1 numPrimes := 0;

2 Parallel.ForEach(1, CMaxPrime).Execute(

3 procedure (const value: integer)

4 begin

5 if IsPrime(value) then

6 Inc(numPrimes);

7 end);

Давайте скажем прямо - этот код неправильный! Доступ к общей переменной не синхронизиро-ван между потоками и это сделает результат непредсказуемым. Один из способов решения этойпроблемы является обёртка с блокировкой для Inc(numPrimes), а другой является использованиеInterlockedIncrement вместо Inc, но это значительно замедлит выполнение.

Решение этой проблемы заключается в использовании функции Aggregate.

Page 41: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 27

1 procedure SumPrimes(var aggregate: TOmniValue; const value: TOmniValue)

2 begin

3 aggregate := aggregate.AsInt64 + value.AsInt64;

4 end;

5

6 procedure CheckPrime(const value: integer; var result: TOmniValue)

7 begin

8 if IsPrime(value) then

9 Result := 1;

10 end;

11

12 numPrimes :=

13 Parallel.ForEach(1, CMaxPrime)

14 .Aggregate(0, SumPrimes)

15 .Execute(CheckPrime);

Aggregate принимает два параметра - первый это начальное значение для агрегации, а второйфункцию агрегации - часть кода, которая будет принимать текущее значение агрегации и обновлятьего для значения, возвращаемого из параллельной задачи.

При использовании Aggregate, параллельная задача (код, переданный функции Execute) имеет ту жесигнатуру, как при работе с Into. Она принимает текущее значение итерации и, возможно, выдаётрезультат.

Мы могли бы заменить приведённый выше код с циклом for.

Page 42: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 28

1 agg := 0;

2 result.Clear;

3 for value := 1 to CMaxPrime do begin

4 CheckPrime(value, result);

5 if not result.IsEmpty then begin

6 SumPrimes(agg, result);

7 result.Clear;

8 end;

9 end;

10 numPrimes := agg;

ForEach выполняет агрегирование в два этапа. В то время как параллельная задача выполняется, онабудет использовать этот подход для агрегирования данных в локальной переменной. Когда задачаокажется без работы, то она использует агрегирующий метод для агрегирования этой локальнойпеременной в глобальный результат. В этом втором этапе, однако, будет использовано блокированиедля защиты доступа к глобальному результату.

Поскольку суммирование является наиболее распространённымиспользованием агрегации, IOmniParallelLoopреализует функцию AggregateSum, которая работает точно так же, как приведённая выше SumPrimes.

Page 43: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 29

1 numPrimes :=

2 Parallel.ForEach(1, CMaxPrime)

3 .AggregateSum

4 .Execute(

5 procedure (const value: integer; var result: TOmniValue)

6 begin

7 if IsPrime(value) then

8 Result := 1;

9 end

10 );

Функция агрегации может делать кое-что ещё, кроме суммирования. Следующий фрагмент кодаиспользует агрегацию, чтобы найти длину самой длинной строки в файле.

1 function GetLongestLineInFile(const fileName: string): integer;

2 var

3 maxLength: TOmniValue;

4 sl : TStringList;

5 begin

6 sl := TStringList.Create;

7 try

8 sl.LoadFromFile(fileName);

9 maxLength := Parallel.ForEach<string>(sl)

10 .Aggregate(0,

11 procedure(var aggregate: TOmniValue; const value: TOmniValue)

12 begin

13 if value.AsInteger > aggregate.AsInteger then

14 aggregate := value.AsInteger;

15 end)

16 .Execute(

17 procedure(const value: string; var result: TOmniValue)

18 begin

19 result := Length(value);

20 end);

21 Result := maxLength;

22 finally FreeAndNil(sl); end;

23 end;

2.3.7 Отмена

ForEach имеет встроенный механизм отмены. Чтобы его использовать, создайте токен отмены ипередайте его функции CancelWith. При токен отмены получает сигнал, все рабочие циклы завершаттекущую итерацию и затем остановятся.

Пример использования механизма отмены можно найти в разделе Параллельный поиск в дереве.

Page 44: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 30

2.3.8 Инициализация и завершение задачи

В некоторых случаях было бы неплохо, если бы каждая параллельная задача имела некоторые данные,инициализированные в начале и доступные коду итератора (переданные в Execute). Для такихслучаев, ForEach реализует функцию Initialize.

1 TOmniTaskInitializerDelegate =

2 reference to procedure(var taskState: TOmniValue);

3 TOmniTaskFinalizerDelegate =

4 reference to procedure(const taskState: TOmniValue);

5 TOmniIteratorStateDelegate =

6 reference to procedure(const value: TOmniValue; var taskState: TOmniValue\

7 );

8

9 IOmniParallelInitializedLoop = interface

10 function Finalize(taskFinalizer: TOmniTaskFinalizerDelegate):

11 IOmniParallelInitializedLoop;

12 procedure Execute(loopBody: TOmniIteratorStateDelegate);

13 end;

14

15 IOmniParallelLoop = interface

16 ...

17 function Initialize(taskInitializer: TOmniTaskInitializerDelegate):

18 IOmniParallelInitializedLoop;

19 end;

Вы предоставляете Initialize с инициализатором задачи, процедурой, которая будет вызыватьсяв каждой параллельной рабочей задаче, когда она создана и перед тем, как она начнёт перебиратьзначения. Эта процедура может инициализировать параметр taskState любым значением.

Initialize возвращает интерфейс IOmniParallelInitializedLoop, который реализует две функции -Finalize и Execute. Вызовите Finalize, чтобы установить завершитель задачи, процедурой, котораявызывается после перечисления всех значений и перез тем, как параллельная задача завершит работу.

Execute принимает рабочий метод с двумя параметрами - первый из них является обычным значе-нием из перечисляемого контейнера, а второй содержит состояние общего задания.

Конечно, все эти функции и интерфейсы реализованы также в версии с <T>.

Следующий пример показывает, как вычислить количество простых чисел от 1 до CHighPrime спомощью Initialize и Finalize.

Page 45: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 31

1 var

2 lockNum : TOmniCS;

3 numPrimes: integer;

4 begin

5 numPrimes := 0;

6 Parallel.ForEach(1, CHighPrime)

7 .Initialize(

8 procedure (var taskState: TOmniValue)

9 begin

10 taskState.AsInteger := 0;

11 end)

12 .Finalize(

13 procedure (const taskState: TOmniValue)

14 begin

15 lockNum.Acquire;

16 try

17 numPrimes := numPrimes + taskState.AsInteger;

18 finally lockNum.Release; end;

19 end)

20 .Execute(

21 procedure (const value: integer; var taskState: TOmniValue)

22 begin

23 if IsPrime(value) then

24 taskState.AsInteger := taskState.AsInteger + 1;

25 end

26 );

27 end;

2.3.9 Обработка исключений

Абстракция ForEach пока не реализует какую либо обработку исключений. Вы всегда должныоборачивать рабочий метод (код, передаваемый в Execute) в try..except если вы ожидаете, что кодможет вызвать исключение.

2.3.10 Внутреннее устройство

В этом разделе попытаемся объяснить, как реализован ForEach. Рассмотрим это в качестве бонусногоматериала для тех пользователей, которые хотят знать больше. Вы не обязаны это читать и понимать,чтобы использовать ForEach, так что вы можете пропустить эту часть книги.

Давайте начнём с очень простого кода.

Page 46: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 32

1 Parallel.ForEach(1, 1000)

2 .Execute(

3 procedure (const elem: integer)

4 begin

5 end);

Этот простой код перебирает с 1 до 1000 на всех доступных ядрах параллельно и выполняет простуюпроцедуру, которая не содержит рабочей нагрузки. В целом, код ничего не будет делать, но он будетэто делать очень сложным образом.

Метод ForEach создаёт новый объект TOmniParallelLoop<integer> (это объект, который будет коорди-нировать параллельные задачи) и передаёт его поставщику источника - объекту, который знает, какполучить доступ к значениям, которые в настоящее время перебираются (целые от 1 до 1000 в этомпримере).

Page 47: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 33

Модуль OtlDataManager содержит четыре различных поставщика источника - по одному для каждоготипа источника, который может быть передан в метод ForEach. Если есть необходимость расширенияForEach с новым итератором источника, я хотел бы добавить несколько простых методов в модульOtlParallel и написать новый поставщик источника.

Page 48: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 34

1 class function Parallel.ForEach(low, high: integer; step: integer):

2 IOmniParallelLoop<integer>;

3 begin

4 Result := TOmniParallelLoop<integer>.Create(

5 CreateSourceProvider(low, high, step), true);

6 end;

Параллельность для задач запускается в InternalExecuteTask. Этот метод сначала создает менеджерданных и присоединяет его к поставщику источника (сравните с рисунком выше - есть одинпоставщик источника и один менеджер данных). Далее он создаёт необходимое количество задачи вызывает специфичные для задач делегаты методов от каждого из них. [Этот делегат обёртываетваш параллельный код и обеспечивает его надлежащим вводом (а иногда и выводом). В модулеOtlParallelЕсть много вызовов InternalExecuteTask, каждый с разным taskDelegate и каждый изкоторых обеспечивает поддержу различных видов цикла.]

1 procedure TOmniParallelLoopBase.InternalExecuteTask(

2 taskDelegate: TOmniTaskDelegate);

3 var

4 dmOptions : TOmniDataManagerOptions;

5 iTask : integer;

6 numTasks : integer;

7 task : IOmniTaskControl;

8 begin

9 ...

10 oplDataManager := CreateDataManager(oplSourceProvider,

11 numTasks, dmOptions);

12 ...

13 for iTask := 1 to numTasks do begin

14 task := CreateTask(

15 procedure (const task: IOmniTask)

16 begin

17 ...

18 taskDelegate(task);

19 ...

20 end,

21 ...

22 task.Schedule(GParallelPool);

23 end;

24 ...

25 end;

26 end;

Менеджер данных это глобальное поле в объекте TOmniParallelLoop<T>, так что его можно использо-вать из делегатов задачи. Простейшие делегаты задачи (ниже) просто создают локальные очереди, и

Page 49: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 35

выбираю значения из локальной очереди по одному. Эти результаты во многих локальных очередях- по одной на задачу - все подключены к одному менеджеру данных.

В случае, если вам интересно, чем является loopBody - это анонимный метод, который вы передали вметод Parallel.ForEach.Execute.

Page 50: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 36

1 procedure InternalExecuteTask(const task: IOmniTask)

2 var

3 localQueue: TOmniLocalQueue;

4 value : TOmniValue;

5 begin

6 localQueue := oplDataManager.CreateLocalQueue;

7 try

8 while (not Stopped) and localQueue.GetNext(value) do

9 loopBody(task, value);

10 finally FreeAndNil(localQueue); end;

11 end;

Давайте ещё раз:

• Поставщик источника создан.• Менеджер данных создан и связан с поставщиком источника.• Каждое задание создаёт свою собственную локальную очередь и использует её для доступа кданным источника.

• Как вы увидите в следующем разделе, локальная очередь извлекает данные пакетами (паке-тами данных) и отправляет их в выходной буфер, который гарантирует, что вывод произве-дён в правильном порядке (часть выходного буфера вызывается, только если вызван методPreserveOrder в вызывающем высокоуровневом коде).

• Если задача остаётся без работы, она запрашивает новый пакет данных из менеджера данных,который получает эти данные от поставщика источника (подробнее об этом ниже). Еслипоставщик источника выбирает все данные, менеджер данных попытается украсть некоторыеданные из других задач.

Page 51: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 37

Всё это было разработано, чтобы обеспечить быстрый доступ к данным (блокировка ограничива-ется поставщиком источника, все другие взаимодействия неблокируемые), хорошее распределениенагрузки (когда задача остаётся без работы, то она ворует часть работы из других задач) и упорядо-чивает данные на выходе (при необходимости).

2.3.10.1 Поставщик источника

Поставщик источника это объект, который получает данные от перечисления источника (данные,которые передаются параллельно) и заново упаковывает их в формат, пригодный для параллельногоупотребления. В настоящее время в модуле OtlDataManager определены три поставщика источника.

Page 52: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 38

• TOmniIntegerRangeProvider

Обрабатывает целые диапазоны (так же, как делает ‘обычный’ оператор for). Как таковая, онана самом деле не извлекает данные из перечисляемого источника, а генерирует их внутри.

• TOmniValueEnumeratorProvider

Обрабатывает IOmniValueEnumerator, являющийся специальнымитератором, к которомуможнообратиться из нескольких читателей и не требует блокировки. В настоящее время поставляетсятолько интерфейсом IOmniBlockingCollection.

• TOmniEnumeratorProvider

Обрабатывает итераторы Windows (IEnumerator) или итераторы Delphi (GetEnumerator, завёр-нутый в класс TOmniValueEnumerator).

Все поставщики источников происходят от абстрактного класса TOmniSourceProvider, который обес-печивает общий интерфейс поставщика источника. В принципе, для этого должен быть использованинтерфейс, но на практике поставщик источника очень высокопроизводительный и поэтому отказот использования интерфейсов очень ускоряет программу.

1 TOmniSourceProvider = class abstract

2 public

3 function Count: int64; virtual; abstract;

4 function CreateDataPackage: TOmniDataPackage; virtual; abstract;

5 function GetCapabilities: TOmniSourceProviderCapabilities;

6 virtual; abstract;

7 function GetPackage(dataCount: integer;

8 package: TOmniDataPackage): boolean; virtual; abstract;

9 function GetPackageSizeLimit: integer; virtual; abstract;

10 end;

Не все поставщики источников созданы равными и поэтому функция GetCapabilities возвращаетвозможности поставщика источника:

1 TOmniSourceProviderCapability = (

2 spcCountable, // поставщик источкика, который знает,

3 // сколько данных он содержит

4 spcFast, // поставщик источника, операции O(1)

5 spcDataLimit // пакет данных может содержать только

6 // ограниченное количество данных

7 );

8

9 TOmniSourceProviderCapabilities = set of

10 TOmniSourceProviderCapability;

TOmniIntegerRangeProvider является перечислимым (очень просто узнать, сколько, например, значе-ний между 1 и 10) и быстрым (займёт столько же времени чтобы получить 10 значений или 10.000

Page 53: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 39

значений), а два других поставщика не являются ни перечислимыми, не быстрыми. Третья возмож-ность: spcDataLimit устарела и не используется. Она была заменена методом GetPackageSizeLimit.

Другим важным аспектом поставщика источника является GetPackage. Он обращается к источнику(путём обеспечения блокированного доступа, если необходима), извлекает данные и возвращает ихв пакете данных. Реализация очень сильно зависит от исходных данных. Например, поставщикцелых чисел просто перемещает текущее нижнее значение и возвращает пакет данных, который несодержит кучу значений, а только нижнюю и верхнюю границы (и именно поэтому он считаетсябыстрым). Итератор поставщика источника блокирует источник, извлекает данные и составляет, та-ким образом, пакет данных значение за значением. И в самом простом случае, поставщик источникаTOmniValueEnumerator просто выбирает значения и собирает пакет данных.

1 function TOmniValueEnumeratorProvider.GetPackage(dataCount: integer;

2 package: TOmniDataPackage): boolean;

3 var

4 iData : integer;

5 intPackage: TOmniValueEnumeratorDataPackage absolute package;

6 timeout : cardinal;

7 value : TOmniValue;

8 begin

9 Assert(not StorePositions);

10 Result := false;

11 dataCount := intPackage.Prepare(dataCount);

12 timeout := INFINITE;

13 for iData := 1 to dataCount do begin

14 if not vepEnumerator.TryTake(value, timeout) then

15 break; //for

16 intPackage.Add(value);

17 timeout := 0;

18 Result := true;

19 end;

20 end;

2.3.10.2 Менеджер данных

Менеджер данных является центром в иерархии OtlDataManager. Он находится между несколькимилокальными очередями и одним поставщиком источника, и гарантирует, что все параллельныезадачи всегда имеют некоторую работу для исполнения.

На данный момент реализованы два разных менеджера данных - перечислимый менеджер данныхи эвристический. Первый из них используется, если поставщик источника перечислимый и второй,если это не так. Оба они происходят он абстрактного класса TOmniDataManager.

Page 54: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 40

1 TOmniDataManager = class abstract

2 public

3 function CreateLocalQueue: TOmniLocalQueue; virtual; abstract;

4 function AllocateOutputBuffer: TOmniOutputBuffer;

5 virtual; abstract;

6 function GetNext(package: TOmniDataPackage): boolean;

7 virtual; abstract;

8 procedure ReleaseOutputBuffer(buffer: TOmniOutputBuffer);

9 virtual; abstract;

10 procedure SetOutput(const queue: IOmniBlockingCollection);

11 overload; virtual; abstract;

12 end;

Основное различие между ними заключается в функции GetNextFromProvider, которая считываетданные из поставщика источника (вызывая метод GetPackage). В перечислимом поставщике этопростой перенаправитель, а в эвристическом поставщике эта функция пытается найти хорошийразмер пакета, который позволит всем параллельным задачам работать на полной скорости.

1 function TOmniHeuristicDataManager.GetNextFromProvider(

2 package: TOmniDataPackage; generation: integer): boolean;

3 const

4 CDataLimit = Trunc(High(integer) / CFetchTimeout_ms);

5 var

6 dataPerMs: cardinal;

7 dataSize : integer;

8 time : int64;

9 begin

10 // целью является получить столько же (но не более <fetch_limit>)

11 // данных, сколько возможно в <fetch_timeout> миллисекунд;

12 // наибольший объём данных ограничен методом GetDataCountForGeneration.

13 dataSize := GetDataCountForGeneration(generation);

14 if dataSize > hdmEstimatedPackageSize.Value then

15 dataSize := hdmEstimatedPackageSize.Value;

16 time := DSiTimeGetTime64;

17 Result := SourceProvider.GetPackage(dataSize, package);

18 time := DSiTimeGetTime64 - time;

19 if Result then begin

20 if time = 0 then

21 dataPerMs := CDataLimit

22 else begin

23 dataPerMs := Round(dataSize / time);

24 if dataPerMs >= CDataLimit then

25 dataPerMs := CDataLimit;

26 end;

27 // среднее за последние четыре выборки для динамической адаптации

Page 55: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 41

28 hdmEstimatedPackageSize.Value := Round

29 ((hdmEstimatedPackageSize.Value / 4 * 3) +

30 (dataPerMs / 4) * CFetchTimeout_ms);

31 end;

32 end;

2.3.10.3 Локальная очередь

Каждая параллельная задача считывает данные из локальной очереди, которая является простыминтерфейсом для менеджера данных. Наиболее важной частью локальной очереди является методGetNext, который предоставляет заданию следующее значение.

1 function TOmniLocalQueueImpl.GetNext(var value: TOmniValue): boolean;

2 begin

3 Result := lqiDataPackage.GetNext(value);

4 if not Result then begin

5 Result := lqiDataManager_ref.GetNext(lqiDataPackage);

6 if Result then

7 Result := lqiDataPackage.GetNext(value);

8 end;

9 end;

Каждая локальная очередь содержит локальный пакет данных. GetNext сначала пытается читатьследующее значение из этого пакета данных. Если это не удаётся (пакет данных пуст - он уже былополностью обработан), она пытается получить новый пакет данных от менеджера данных, и (в случаеуспеха) попытается извлечь следующие данные из полученного пакета данных.

GetNext в менеджере данных сначала пытается получить следующий пакет от поставщика источника(через приватный метод GetNextFromProvider, который вызывает метод GetPackage поставщикаисточника). Если это не удаётся, он пытается “украсть” часть нагрузки из другой задачи.

Кража это возможность, которая позволяет всем параллельным задачам быть активными до пере-числения последнего значения. Чтобы это осуществить, менеджер данных перебирает все локальныеочереди и пытается разбить пакет данных каждой локальной очереди пополам. Если это удаётся,половина пакета данных остаётся в исходной локальной очереди, а другая половина возвращается влокальную очередь, которая запросила данные.

Разделение пакета сильно зависит от типа данных. Например, целочисленный пакет данных простопересчитывает границы, а пакет, основанный на итераторе, должен копировать соседние данные.

Page 56: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 42

1 function TOmniValueEnumeratorDataPackage.Split(

2 package: TOmniDataPackage): boolean;

3 var

4 intPackage: TOmniValueEnumeratorDataPackage absolute package;

5 iValue : integer;

6 value : TOmniValue;

7 begin

8 Result := false;

9 for iValue := 1 to intPackage.Prepare(vedpApproxCount.Value div 2)

10 do begin

11 if not GetNext(value) then

12 break; //for

13 intPackage.Add(value);

14 Result := true;

15 end;

16 end;

2.3.10.4 Упорядочивание вывода

Упорядочивание (PreserveOrder) обычно используется вместе с модификатором Into. Причина этогозаключается в интеграции между инфраструктурой Parallel.ForEach и вашим параллельным кодом(тот, который выполняется как полезная нагрузка Execute). В ‘обычной’ ForEach, вывод из этойпараллельной нагрузки не определён. Вы имеете право формировать любой вывод в полезнойнагрузке, но ForEach не будет ничего об этом знать. В этом случае OTL не имеет возможностисохранить порядок, потому что - по крайней мере с точки зрения библиотеки - параллельный кодне производит никакого вывода.

Когда используется Into, то ваш код использует различные сигнатуры (различные параметры).

1 Parallel.ForEach(1, CMaxTest)

2 .PreserveOrder

3 .Into(primeQueue)

4 .Execute(

5 procedure (const value: integer; var res: TOmniValue)

6 begin

7 if IsPrime(value) then

8 res := value;

9 end);

Параллельная полезная нагрузка теперь принимает два параметра. Во-первых, это - как и в болееобщем случае - входное значение, в то время как второй имеет выходное значение. Как вы можетевидеть из приведённого примера, параллельный код может произвести ноль или один вывод, но неболее того.

Эта небольшая модификация всё меняет. Параллельная инфраструктура имеет контроль над выход-ным параметром, им можно управлять внутри, связать его с вводом и убедиться, что вывод создаётсяв том же порядке, каким был ввод.

Page 57: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 43

Давайте посмотрим на внутренний код - часть, которая планирует параллельные задачи. Когдаиспользуется Into, то InternalExecuteTask выполняет следующий довольно сложный код.

1 InternalExecuteTask(

2 procedure (const task: IOmniTask)

3 var

4 localQueue : TOmniLocalQueue;

5 outputBuffer_ref: TOmniOutputBuffer;

6 position : int64;

7 result : TOmniValue;

8 value : TOmniValue;

9 begin

10 oplDataManager.SetOutput(oplIntoQueueIntf);

11 localQueue := oplDataManager.CreateLocalQueue;

12 try

13 outputBuffer_ref := oplDataManager.AllocateOutputBuffer;

14 try

15 localQueue.AssociateBuffer(outputBuffer_ref);

16 result := TOmniValue.Null;

17 while (not Stopped) and

18 localQueue.GetNext(position, value) do

19 begin

20 loopBody(task, value, result);

21 if not result.IsEmpty then begin

22 outputBuffer_ref.Submit(position, result);

23 result := TOmniValue.Null;

24 end;

25 end;

26 finally

27 oplDataManager.ReleaseOutputBuffer(outputBuffer_ref);

28 end;

29 finally

30 FreeAndNil(localQueue);

31 end;

32 end);

Здесь перечислены важные моменты:

• Менеджер данных связан с выходной очередью. (поле oplIntoQueueIntf содержит значение,передаваемое в метод Into.)

• Создаётся локальная очередь, как и при выполнении ‘обычного’ ForEach.• Выходной буфер создаётся менеджером данных и связывается с локальной очередью.• Для каждого входа выполняется пользовательский код и каждое непустое выходное значениезаписывается в выходной буфер.

Page 58: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 44

• Выходной буфер освобождается, как локальная очередь.

Самое интересное спрятано в тени; внутри локальной очереди, менеджера данных и выходногобуфера.

Первое изменение заключается в источнике данных. Когда используется PreserveOrder, каждый пакетданных знает положение в источника, откуда он был прочитан. Чтобы упростить дело, разделениепакета данных в этом случае не используется. [И поэтому кража данных не может быть использована,вызывая чуть менее эффективное использование CPU, как и в случае простого ForEach.]

Каждая локальная очередь имеет связанный с ней набор выходного буфера.

Каждыйнабор выходного буфера управляет двумя буферами. Один из них является активными задачазаписывает в него, а другой может быль либо пустой, либо полный. Каждый выходной буфер связанвходной позицией - так же, как пакет данных.

Когда мы смотрим на чтение/запись данных с точки зрения одной задачи, всё очень просто. Задачачитает данные из локальной очереди (которая считывает данные из пакета данных, связанных сопределённой позицией) и записывает их в выходной буфер (связанный с той же позицией).

Сложная часть начинается, когда пакет данных будет исчерпан (часть if not Result приведена в кодениже).

Page 59: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 45

1 function TOmniLocalQueueImpl.GetNext(var position: int64; var value: TOmniV\

2 alue): boolean;

3 begin

4 Result := lqiDataPackage.GetNext(position, value);

5 if not Result then begin

6 lqiBufferSet.ActiveBuffer.MarkFull;

7 lqiBufferSet.ActivateBuffer;

8 // это будет заблокировано, если альтернативный буфер тоже полный

9 Result := lqiDataManager_ref.GetNext(lqiDataPackage);

10 if Result then begin

11 Result := lqiDataPackage.GetNext(position, value);

12 if Result then

13 lqiBufferSet.ActiveBuffer.Range := lqiDataPackage.Range;

14 end;

15 end;

16 end;

Вопервых, активный буфер будет помечен как полный. Это приведёт к вызову NotifyBufferFull

(см. ниже). Затем активируется альтернативный буфер. Этот вызов (ActivateBuffer) будет на самомделе заблокирован, если альтернативный буфер не свободен. В этом случае текущий поток будетзаблокирован, пока один из его буферов записывает в выходную очередь.

С этой точки зрения GetNext работает так же, как при использовании обычного ForEach, крометого, что он устанавливает позицию активного буфера, когда новый пакет данных считывается изменеджера данных.

Другая часть волшебства происходит в методе, который вызывается из MarkFull. Он проходит посписку буферов и проверяет, если ли выходные буферы, которые a) полные и b) предназначены длятекущей позиции вывода. Такие буферы копируются в выход и возвращаются для использования.

1 procedure TOmniBaseDataManager.NotifyBufferFull(

2 buffer: TOmniOutputBufferImpl);

3 begin

4 // Удалить буфер из списка. Проверить, что следующий буфер ждёт в

5 // списке. Скопировать буфер, если он полный и повторить процесс.

6 dmBufferRangeLock.Acquire;

7 try

8 while (dmBufferRangeList.Count > 0) and

9 (BufferList[0].Range.First = dmNextPosition) and

10 BufferList[0].IsFull do

11 begin

12 buffer := TOmniOutputBufferImpl(

13 dmBufferRangeList.ExtractObject(0));

14 dmNextPosition := buffer.Range.Last + 1;

15 buffer.CopyToOutput;

16 end;

Page 60: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Высокоуровневая многопоточность 46

17 finally dmBufferRangeLock.Release; end;

18 end;

Напомним:

• Каждый буфер данных связан с позицией.• Каждая локальная очередь имеет два выходных буфера, один активный, а другой или свобод-ный или полный.

• Каждый выходной буфер также связан с позицией.• Локальная очередь записывает данные в выходной буфер.• Когда буфер полон, он помещается в список ожидающих буферов. В этот момент все соответ-ствующие ожидающие буферы копируются в выход.

2.3.11 Примеры

Практический пример использования For Each можно найти в главах Параллельный For с синхрони-зированным выводом и Параллельный поиск в дереве.

Page 61: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

3 СинхронизацияНесмотря на то, что OmniThreadLibrary рассматривает коммуникацию как превосходный подходк блокировке, есть ещё случаи, когда использование “стандартных” примитивов синхронизации,таких как критические секции, неизбежно. Стандартный подход Delphi/Windows к блокировке оченьнизкоуровневый, OmniThreadLibrary основывается на нём и улучшает его в нескольких важныхнаправлениях. Все эти улучшения собраны в модуле OtlSync и описаны в следующих разделах.Единственное исключение это ожидающее значение класса/интерфейса, которое объявлено и модулеOtlCommon.

Эта часть книги предполагает, что у вас есть базовое понимание блокировки. Если вы новичок в этойтеме, Вы должны скачала прочитать соответствующие главы одной из книг, упомянутых в введении.

47

Page 62: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Синхронизация 48

3.1 Критические секции

Самый полезный примитив синхронизации для многопоточного программирования бесспорнокритическая секция¹.

OmniThreadLibrary упрощает совместное использование критической секции между владельцемзадачи и задачей с использованием метода WithLock. Высокоуровневые задачи могут получитьдоступ к этому методу через блок конфигурации задачи.

Я всегда придерживаюсь мнения, что блокировки должны быть прозрачны, насколько это возможно.Использование множества небольших блокировок вокруг многих несвязанных фрагментов кода луч-ше, чем использование одной гигантской блокировки для всего, но программисты часто используютодну или несколько блокировок, поскольку управление многими критическими секциями можетбыть беспокойно.

Чтобы помочь вам в написании лучшего кода, OmniThreadLibrary реализует три расширения Delphiкласса TCriticalSection - IOmniCriticalSection, TOmniCS и Locked<T>.

3.1.1 IOmniCriticalSection

Delphi реализует поддержку критических секций с классом TCriticalSection, который должен быть со-здан и уничтожен в коде. (Существует также запись TRTLCriticalSection, но она поддерживается толь-ко в Windows.) OmniThreadLibrary расширяет эту реализацию с интерфейсом IOmniCriticalSection,который вы только должны создать. Компилятор сделает так, что он будет уничтожен автоматическив нужное время.

1 type

2 IOmniCriticalSection = interface

3 ['{AA92906B-B92E-4C54-922C-7B87C23DABA9}']

4 function GetLockCount: integer;

5 //

6 procedure Acquire;

7 procedure Release;

8 function GetSyncObj: TSynchroObject;

9 property LockCount: integer read GetLockCount;

10 end;

11

12 function CreateOmniCriticalSection: IOmniCriticalSection;

IOmniCriticalSection внутри использует TCriticalSection. Он действует как прокси, который вызы-вает функции TCriticalSection. Кроме того, он обеспечивает дополнительную функциональность,подсчитывая, сколько раз была вызвана критическая секция, что может очень помочь при отладке.Этот счётчик может быть прочитан через свойство LockCount.

¹http://en.wikipedia.org/wiki/Critical_section, http://docwiki.embarcadero.com/Libraries/XE3/en/System.SyncObjs.TCriticalSection

Page 63: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Синхронизация 49

Критическая секция может быть вызвана (acquired) несколько раз из одного потока.Например, следующий код является вполне допустимым:

1 cSec := CreateOmniCriticalSection; //LockCount = 0

2 cSec.Acquire; //LockCount = 1

3 cSec.Acquire; //LockCount = 2

4 cSec.Release; //LockCount = 1

5 cSec.Release; //LockCount = 0

Кроме того, IOmniCriticalSection не использует TCriticalSection напрямую, но оборачивает её вбольший объект, как было предложено Eric Grange².

3.1.2 TOmniCS

Другое расширение TCriticalSection находится в OmniThreadLibrary, это запись TOmniCS. Это позво-ляет вам использовать критическую секцию, просто объявляя запись в соответствующем месте.

Использование TOmniCS, блокирование может быть столь же простым, как это:

1 uses

2 GpLists,

3 OtlSync;

4

5 procedure ProcessList(const intf: IGpIntegerList);

6 begin

7 //...

8 end;

9

10 var

11 lock: TOmniCS;

12 intf: IGpIntegerList;

13

14 procedure Test1;

15 begin

16 intf := TGpIntegerList.Create;

17 //...

18 lock.Acquire;

19 try

20 ProcessList(intf);

21 finally lock.Release; end;

22 end;

TOmniCS реализован в виде записи с однимприватнымполем, содержащиминтерфейс IOmniCriticalSection.

²http://delphitools.info/2011/11/30/fixing-tcriticalsection/

Page 64: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Синхронизация 50

1 type

2 TOmniCS = record

3 strict private

4 ocsSync: IOmniCriticalSection;

5 private

6 function GetLockCount: integer; inline;

7 function GetSyncObj: TSynchroObject; inline;

8 public

9 procedure Initialize;

10 procedure Acquire; inline;

11 procedure Release; inline;

12 property LockCount: integer read GetLockCount;

13 property SyncObj: TSynchroObject read GetSyncObj;

14 end;

Метод Release просто вызывает метод Release внутреннего интерфейса, в то время как метод Acquire

является более сложным, т.к. он сначала инициализирует поле ocsSync.

1 procedure TOmniCS.Acquire;

2 begin

3 Initialize;

4 ocsSync.Acquire;

5 end;

6

7 procedure TOmniCS.Release;

8 begin

9 ocsSync.Release;

10 end;

Инициализация, спрятанная внутри метода Initialize (который вы также можете вызвать явно изкода инициализации критической секции) довольно сложна, потому что у неё есть для инициали-зации ocsSync только однажды и должна работать надлежащим образом при вызове из друх мест(двух потоков) в то же время. Это достигается с помощью подхода оптимистичной оптимизации,описанного далее в этой главе.

Page 65: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Синхронизация 51

1 procedure TOmniCS.Initialize;

2 var

3 syncIntf: IOmniCriticalSection;

4 begin

5 Assert(cardinal(@ocsSync) mod SizeOf(pointer) = 0,

6 'TOmniCS.Initialize: ocsSync is not properly aligned!');

7 Assert(cardinal(@syncIntf) mod SizeOf(pointer) = 0,

8 'TOmniCS.Initialize: syncIntf is not properly aligned!');

9 if not assigned(ocsSync) then begin

10 syncIntf := CreateOmniCriticalSection;

11 if CAS(nil, pointer(syncIntf), ocsSync) then

12 pointer(syncIntf) := nil;

13 end;

14 end;

3.1.3 Locked

TOmniCS это большое упрощение концепции критических секций, но оно по прежнему требует от васобявление отдельной блокирующей сущности. Если эта блокирующая сущность используется толькодля синхронизации доступа к конкретному экземпляру (объекту, записи, интерфейсу или дажепростому типу), то часто лучше объявить переменную/поле типа Locked<T> которая комбинируетлюбой тип с критической секцией.

Использование Locked<T>, пример из раздела TOmniCS можно переписать следущим образом.

1 uses

2 GpLists,

3 OtlSync;

4

5 procedure ProcessList(const intf: IGpIntegerList);

6 begin

7 //...

8 end;

9

10 var

11 lockedIntf: Locked<IGpIntegerList>;

12

13 procedure Test2;

14 begin

15 lockedIntf := TGpIntegerList.CreateInterface;

16 //...

17 lockedIntf.Acquire;

18 try

19 ProcessList(lockedIntf);

20 finally lockedIntf.Release; end;

21 end;

Page 66: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Синхронизация 52

Интересный факт, обратите внимание на то, что хотя lockedIntf объявлена как переменная типаLocked<IGpIntegerList>, её можно инициализировать и использовать, как будто она типа IGpIntegerList.Это достигается путём предоставления Неявных операторов для преобразования из Locked<T> в T иобратно. Компилятор Delphi (к сожалению) не достаточно умён, чтобы использовать этот операторпреобразования в некоторых случаях, так что вам всё равно иногда приходится использовать предо-ставленное свойство Value. Например, вам придётся сделать это, чтобы освободить обёрнутый объект.(В приведённом выше примере мы обернули интерфейс, и сам компилятор обработал уничтожение.)

1 procedure ProcessObjList(obj: TGpIntegerList);

2 begin

3 //...

4 end;

5

6 var

7 lockedObj: Locked<TGpIntegerList>;

8

9 procedure Test3;

10 begin

11 lockedObj := TGpIntegerList.Create;

12 try

13 //...

14 lockedObj.Acquire;

15 try

16 ProcessObjList(lockedObj);

17 finally lockedObj.Release; end;

18 //...

19 finally lockedObj.Value.Free; end;

20 end;

Помимо стандартныхметодов Acquire/Release, Locked<T> также реализует методы, используемые дляпессимистичной блокировки, которые описаны далее в этой главе, и два почти идентичных метода,названных Locked, которые позволяют выполнять сегмент кода (процедуры, метода или анонимногометода) который критическая секция получила. (Другими словами, вы можете быть уверены, чтокод, передаваемый в метод Locked всегда выполняется только один раз при условии, что весь код впрограмме правильно блокирует доступ к общей переменной.)

Page 67: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Синхронизация 53

1 type

2 Locked<T> = record

3 public

4 type TFactory = reference to function: T;

5 type TProcT = reference to procedure(const value: T);

6 constructor Create(const value: T; ownsObject: boolean = true);

7 class operator Implicit(const value: Locked<T>): T; inline;

8 class operator Implicit(const value: T): Locked<T>; inline;

9 function Initialize(factory: TFactory): T; overload;

10 {$IFDEF OTL_ERTTI}

11 function Initialize: T; overload;

12 {$ENDIF OTL_ERTTI}

13 procedure Acquire; inline;

14 procedure Locked(proc: TProc); overload; inline;

15 procedure Locked(proc: TProcT); overload; inline;

16 procedure Release; inline;

17 procedure Free; inline;

18 property Value: T read GetValue;

19 end;

20

21 procedure Locked<T>.Locked(proc: TProc);

22 begin

23 Acquire;

24 try

25 proc;

26 finally Release; end;

27 end;

28

29 procedure Locked<T>.Locked(proc: TProcT);

30 begin

31 Acquire;

32 try

33 proc(Value);

34 finally Release; end;

35 end;

3.1.3.1 Почему бы не использовать TMonitor?

Существует альтернатива, встроенная в Delphi начиная с версии 2009, которая обеспечивает функци-ональность, аналогичную Locked<T> – TMonitor. В современнойDelphi, каждый объект может быть за-блокирован с помощьюфункции System.TMonitor.Enterи разблокирован с помощью System.TMonitor.Exit.Приведённый выше пример может быть переписан с использованием TMonitor без особого труда.

Page 68: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Синхронизация 54

1 var

2 obj: TGpIntegerList;

3

4 procedure Test4;

5 begin

6 obj := TGpIntegerList.Create;

7 try

8 //...

9 System.TMonitor.Enter(obj);

10 try

11 ProcessObjList(obj);

12 finally System.TMonitor.Exit(obj); end;

13 //...

14 finally FreeAndNil(obj); end;

15 end;

Возникает резонный вопрос, почему реализован Locked<T>. Почему TMonitor не достаточно хорош?Есть множество причин для этого.

• TMonitor был с ошибками с момента его создания³ (хотя я считаю, что Enter и Exit могут бытьдостаточно стабильны для финального кода) и я не хотел бы его использовать.

• Использование TMonitor не передаёт ваши намерения. Просто глядя на объявление переен-ной/поля вы не знаете, что сущность должна быть использована потокобезопасным способом.Используя Locked<T>, однако, прямо заявляет ваши намерения.

• TMonitor.Enter/Exitне работает и интерфейсами, записямиипримитивными типами. Locked<T>может.

³http://stackoverflow.com/questions/4856306/tthreadedqueue-not-capable-of-multiple-consumers, http://www.thedelphigeek.com/2011/05/tmonitor-bug.html

Page 69: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

4 Как сделать… (How to…)Эта часть книги содержит практические примеры использования OmniThreadLibrary. Каждый из нихначинается с вопроса, который вводит проблему, а дальше продолжается обсуждение решения.

Затронуты следующие темы:

• Фоновое сканирование файлов

Сканирование папок и файлов в фоновом режиме.

• Async/Await

Как написать многопоточный ‘блокирующий’ код с механизмом, аналогичным в .NET async/await.

• Загрузка из интернет и хранилище базы данных

Несколько рабочих загружают данные и сохраняют их в одной базе данных.

• Параллельный For с синхронизированным выводом

Перенаправление вывода из параллельного цикла for в структуру, которая не поддерживает много-поточный доступ.

• Фоновый рабочий и разделение списка

Написание сервероподобной фоновой обработки.

• Параллельное производство данных

Несколько рабочих производят данные и записывают их в один файл.

• Создание пула соединений

Использование OmniThreadLibrary для создания пула соединений с базой данных.

• Быстрая сортировка и параллельный максимум

Как отсортировать массив и как его обрабатывать используя несколько потоков.

• Параллельный поиск в дереве

55

Page 70: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Как сделать… (How to…) 56

Поиск данных в дереве.

• Несколько рабочих с несколькими фреймами

Графический пользовательский интерфейс содержит несколько фреймов, где каждый фрейм работаеткак оболочка для фоновой задачи.

• OmniThreadLibrary и базы данных

Использование баз данных из OmniThreadLibrary.

• OmniThreadLibrary и COM/OLE

Использование COM/OLE из OmniThreadLibrary.

Page 71: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Как сделать… (How to…) 57

4.1 Async/Await

..

.NET 4.5 представила очень удобные концепции асинхронности - async и await. Есть ли способреализации этих языковых конструкций в Delphi?

Короче говоря – они не могут быть реализованы, только сэмулированы.

Прежде чем показать, как это сделать, давайте вернёмся к основам и посмотрим как async/awaitмоглибы быть использованы, если бы они существовали в Delphi.

Давайте предположим, что вы унаследовали этот довольно бесполезный код.

1 procedure TForm125.Button1Click(Sender: TObject);

2 var

3 button: TButton;

4 begin

5 button := Sender as TButton;

6 button.Caption := 'Working ...';

7 button.Enabled := false;

8 Sleep(5000);

9 button.Enabled := true;

10 button.Caption := 'Done!';

11 end;

Теперь ваш бос говорит, что вы должны сделать это параллельно, так чтобы пользователь могзапустить три копии. (Вы также должны добавить две новые кнопки на форму, чтобы стартоватьэти две копии, но это легко сделать.)

Есть много способов решить эту проблему, некоторые более, а некоторые менее сложные. Я хотел быотметить наиболее простое решение. Но сначала давайте проведём экскурс в .NET…

.NET 4.5 вводит сложные магические концепции ‘async’ и ‘await’¹. Короче говоря, это позволяет писатьвроде этого:

¹http://blogs.msdn.com/b/pfxteam/archive/2012/04/12/10293335.aspx

Page 72: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Как сделать… (How to…) 58

1 procedure TForm125.Button1Click(Sender: TObject); async;

2 var

3 button: TButton;

4 begin

5 button := Sender as TButton;

6 button.Caption := 'Working ...';

7 button.Enabled := false;

8 await CreateTask(

9 procedure begin

10 Sleep(5000);

11 end);

12 button.Enabled := true;

13 button.Caption := 'Done!';

14 end;

[Пожалуйста, обратите внимание, что этот синтаксис не поддерживается, это просто пример того, каксинтаксис .NET мог бы выглядеть, если бы Delphi поддерживал его.]

Хитрость тут в том, что await на самом деле не ждёт. Он возвращает управление обратно в основнойцикл, который должен обрабатывать события и т.д. Другими словами – вся остальная программаработает в обычном режиме. Это также может вызвать другую асинхронную функцию и await для неё.Только тогда, когда асинхронная функция завершается (любая из них, если их запущено более одной),управление возвращается в соответствующую точку вызова await и код продолжается со следующейстроки. [Карло Кок (Carlo Kok) написал хорошую статью о как работает await² в блоге RemObjects.]

Async/await нуждается в экстенсивной поддержке компилятора, и нет абсолютно никакого способа,чтобы написать клон async/await в Delphi. Но… есть простой трюк, который позволяет писать кодпочти также. Он использует конструкцию OmniThreadLibrary’s Async и магию анонимных методов.

1 procedure TForm125.Button1Click(Sender: TObject);

2 var

3 button: TButton;

4 begin

5 button := Sender as TButton;

6 button.Caption := 'Working ...';

7 button.Enabled := false;

8 Parallel.Async(

9 procedure begin

10 Sleep(5000);

11 end,

12 Parallel.TaskConfig.OnTerminate(

13 procedure begin

14 button.Enabled := true;

15 button.Caption := 'Done!';

²http://blogs.remobjects.com/blogs/ck/2012/08/08/p4690

Page 73: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Как сделать… (How to…) 59

16 end));

17 end;

Async выполняет свой параметр (делегат, содержащий вызов Sleep) в фоновом режиме. Когда этафоновая задача выполнена, он выполняет второй параметр (делегат OnTerminate) в основном потоке.Во время работы фоновой задачи основной поток обрабатывает свой собственный цикл сообщенийи запускает пользовательский интерфейс – так же, как это было бы в случае .NET.

С некоторыми синтаксическими уловками вы можете очень убедительно подделать .NET-подобноеповедение.

1 type

2 IAwait = interface

3 procedure Await(proc: TProc);

4 end;

5

6 TAwait = class(TInterfacedObject, IAwait)

7 strict private

8 FAsync: TProc;

9 public

10 constructor Create(async: TProc);

11 procedure Await(proc: TProc);

12 end;

13

14 function Async(proc: TProc): IAwait;

15 begin

16 Result := TAwait.Create(proc);

17 end;

18

19 { TAwait }

20

21 constructor TAwait.Create(async: TProc);

22 begin

23 inherited Create;

24 FAsync := async;

25 end;

26

27 procedure TAwait.Await(proc: TProc);

28 begin

29 Parallel.Async(FAsync, Parallel.TaskConfig.OnTerminated(

30 procedure begin

31 proc;

32 end));

33 end;

34

35 { TForm125 }

Page 74: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Как сделать… (How to…) 60

36

37 procedure TForm125.Button1Click(Sender: TObject);

38 var

39 button: TButton;

40 begin

41 button := Sender as TButton;

42 button.Caption := 'Working ...';

43 button.Enabled := false;

44 Async(

45 procedure begin

46 Sleep(5000);

47 end).

48 Await(

49 procedure begin

50 button.Enabled := true;

51 button.Caption := 'Done!';

52 end);

53 end;

Для теста, разместите на форме три кнопки и назначьте обработчик Button1Click для всех трёх.Нажмите кнопки и наслаждайтесь.

Page 75: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Как сделать… (How to…) 61

4.2 Быстрая сортировка и параллельный максимум

..

Я хотел бы отсортировать большой массив данных, но моя функция сравнения достаточнозапутана и сортировка занимает много времени. Могу ли я использовать OmniThreadLibrary,чтобы ускорить сортировку?

На аналогичную тему – иногда я хотел бы найти максимальный элемент данных в этомбольшом массиве, не делая сортировку. Как я мог бы подойти к этой проблеме?

Ответ на обе части проблемы один – использование абстракции Fork/Join.

4.2.1 Быстрая сортировка

Первая часть реализует известный алгоритм быстрой сортировки³ параллельным способом (см. демоприложение 44_Fork-Join QuickSort для полного кода).

Давайте начнём с неоптимизированного однопоточного сортировщика. Эта простая реализацияочень легко превращается в многопоточную форму.

1 procedure TSequentialSorter.QuickSort(left, right: integer);

2 var

3 pivotIndex: integer;

4 begin

5 if right > left then begin

6 if (right - left) <= CSortThreshold then

7 InsertionSort(left, right)

8 else begin

9 pivotIndex := Partition(left, right, (left + right) div 2);

10 QuickSort(left, pivotIndex - 1);

11 QuickSort(pivotIndex + 1, right);

12 end;

13 end;

14 end;

Как видите, код переключается на InsertionSort, когда размерность массива падает ниже некоторогопорога. Это не очень важно для однопоточной версии (только приносит небольшое ускорение), ноэто сильно поможет в многопоточной версии.

Преобразование этой быстрой сортировки в многопоточную версию довольно просто.

Во-первых, мы должны создать вычислительный пул fork/join. В данном примере он хранится вглобальной области.

³http://en.wikipedia.org/wiki/Quicksort

Page 76: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Как сделать… (How to…) 62

1 FForkJoin := Parallel.ForkJoin;

Во-вторых, мы должны адаптировать метод QuickSort.

1 procedure TParallelSorter.QuickSort(left, right: integer);

2 var

3 pivotIndex: integer;

4 sortLeft : IOmniCompute;

5 sortRight : IOmniCompute;

6 begin

7 if right > left then begin

8 if (right - left) <= CSortThreshold then

9 InsertionSort(left, right)

10 else begin

11 pivotIndex := Partition(left, right, (left + right) div 2);

12 sortLeft := FForkJoin.Compute(

13 procedure

14 begin

15 QuickSort(left, pivotIndex - 1);

16 end);

17 sortRight := FForkJoin.Compute(

18 procedure

19 begin

20 QuickSort(pivotIndex + 1, right);

21 end);

22 sortLeft.Await;

23 sortRight.Await;

24 end;

25 end;

26 end;

Код выглядит гораздо длиннее, но изменения очень просты. Каждый рекурсивный вызов QuickSort

заменяется на вызов Compute …

1 sortLeft := FForkJoin.Compute(

2 procedure

3 begin

4 QuickSort(left, pivotIndex - 1);

5 end);

… и на код Await в обеих подзадачах.

Вместо вызова QuickSort напрямую, параллельная версия создаёт интерфейс IOmniCompute, вызываяFForkJoin.Compute. Это создаёт подзадачу с анонимной функцией, переданной в Compute и помещаетэту подзадачу в вычислительный пул fork/join.

Page 77: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Как сделать… (How to…) 63

Позже, подзадачи читаются из этого пула fork/join одна за другой и обрабатываются в фоновомпотоке.

Вызов Await проверяет, завершила ли подзадача работу. Если да, то Await просто возвращаетрезультат и код может быть продолжен. В противном случае (подзадача всё ещё работает), Awaitпробует получить одну подзадачу из вычислительного пула, выполняет её, а затем повторяет всё ссамого начала (с проверкой завершения работы подзадачи). Таким образом, все потоки всегда занятылибо выполнением своего кода, либо кода подзадач из вычислительного пула.

Т.к. в каждом вызове QuickSort в стеке хранятся два интерфейса IOmniCompute, этот код используетстек больше, чем его однопоточная версия. Это основная причина, почему параллельное выполнениеостанавливается на определённом уровне и для сортировки оставшихся полей используется простойпоследовательный вариант.

4.2.1.1 Параллельный максимум

Этот пример находит максимальный элемент массива параллельным образом (см. демо 45_Fork-Joinmax для получения полного кода).

Параллельное решение похоже на быструю сортировку в приведённом выше примере с несколькимиважными отличиями, связанными с тем, что код должен возвращать значение (код быстрой сорти-ровки только сортирует массив и не возвращает ничего).

Это напрямую влияет на использование интерфейся - вместо работы с IOmniForkJoin и IOmniCompute,код использует IOmniForkJoin<T> и IOmniCompute<T>. В нашем примере массив содержит целые числа,параллельный код создаёт IOmniForkJoin<integer> и передаёт его функции ParallelMax.

1 max := ParallelMax(Parallel.ForkJoin<integer>, Low(FData), High(FData));

В этом примере вычислительный пул fork/join передаётся в качестве параметра. Такой подход болеегибкий, но немного медленней, что ещё более важно - использует больше стековой памяти.

1 function ParallelMax(

2 const forkJoin: IOmniForkJoin<integer>;

3 left, right: integer): integer;

4

5 var

6 computeLeft : IOmniCompute<integer>;

7 computeRight: IOmniCompute<integer>;

8 mid : integer;

9

10 function Compute(left, right: integer): IOmniCompute<integer>;

11 begin

12 Result := forkJoin.Compute(

13 function: integer

14 begin

15 Result := ParallelMax(forkJoin, left, right);

16 end

Page 78: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Как сделать… (How to…) 64

17 );

18 end;

19

20 begin

21 if (right - left) < CSeqThreshold then

22 Result := SequentialMax(left, right)

23 else begin

24 mid := (left + right) div 2;

25 computeLeft := Compute(left, mid);

26 computeRight := Compute(mid + 1, right);

27 Result := Max(computeLeft.Value, computeRight.Value);

28 end;

29 end;

Когда поддиапазон массива достаточно мал, ParallelMax вызывает последовательную (однопоточ-ную) версию - подобно тому, как делал QuickSort, и из-за той же причины - не переполняет стек.

С большим поддиапазоном, код создаёт две подзадачи IOmniCompute<integer>, каждая упакована вфункцию, возвращающую целое число. Эта функция, в свою очередь, снова вызывает ParallelMax

(но с меньшим диапазоном). Чтобы получить результат анонимной функции, переданной в Compute,код вызывает функцию Value. Также, как Await, Value либо возвращает результат (если он уже былвычислен), либо выполняет другую подзадачу fork/join из вычислительного пула.

..

При создании программ fork/join, помните этот анти-паттерн. Следующий кодошибочный!

1 Result := Max(Compute(left, mid).Value,

2 Compute(mid + 1, right).Value);

Вы всегда должны создавать все подзадачи перед вызовом Await или Value! Впротивном случае ваш код не будет выполняться параллельно - всё это будетобрабатываться в одном потоке!

Page 79: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

5 Демонстрационные приложенияДистрибутив OmniThreadLibrary и сборка SVN включают большое количество демонстрационныхприложений, которые помогут вам начать работу. Они хранятся в папке tests. В этой главе пере-числены все тестовые приложения.

• 0_Beep Самый простой поточный код OmniThreadLibrary.• 1_HelloWorld Поточный “Hello, World” с компонентом TOmniEventMonitor, созданным во времяисполнения.

• 2_TwoWayHello “Hello, World” с двунаправленной связью; TOmniEventMonitor создаётся во времяисполнения.

• 3_HelloWorld_with_package Поточный “Hello, World” с компонентом TOmniEventMonitor наформе.

• 4_TwoWayHello_with_package “Hello, World” с двунаправленной связью; TOmniEventMonitor наформе.

• 5_TwoWayHello_without_loop “Hello, World” с двунаправленной связью, способ OTL.• 6_TwoWayHello_with_object_worker Устаревший, почти полностью эквивалентен примеру 5_-TwoWayHello_without_loop.

• 7_InitTest Демонстрирует .WaitForInit, .ExitCode, .ExitMessage и .SetPriority.• 8_RegisterComm Демонстрирует создание дополнительных каналов связи.• 9_Communications Простой тестер подсистемы связи.• 10_Containers Полномасштабный тестер коммуникационной подсистемы. Используется дляпроверки правильности неблокируемого кода.

• 11_ThreadPool Пример пула потоков.• 12_Lock Демонстрирует .WithLock.• 13_Exceptions Демонстрирует обработку исключений.• 14_TerminateWhen Демонстрирует .TerminateWhen и .WithCounter.• 15_TaskGroup Пример группы задач.• 16_ChainTo Демонстрирует .ChainTo.• 17_MsgWait Демонстрирует .MsgWait и обработку сообщений Windows внутри задачи.• 18_StringMsgDispatch Вызывает методы задачи по имени и адресу.• 19_StringMsgBenchmark Различные тесты вызова методов задач.• 20_QuickSort Пример параллельной быстрой сортировки.• 21_Anonymous_methods Демонстрирует использование анонимных методов как рабочих задач вDelphi 2009.

• 22_Termination Тесты для .Terminate и .Terminated.• 23_BackgroundFileSearch Демонстрирует сканирование файлов в фоновом потоке.• 24_ConnectionPool Демонстрирует, как создать соединительный пул с OmniThreadLibrary.• 25_WaitableComm Пример для ReceiveWait и SendWait.• 26_MultiEventMonitorКак работать с несколькимимониторами событий примногопоточности.• 27_RecursiveTree Параллельная обработка дерева.• 28_Hooks Пример для новой системы хуков.

65

Page 80: Параллельное программирование с OmniThreadLibrarysamples.leanpub.com/omnithreadlibrary-ru-sample.pdf · 03/02/2011  · TweetThisBook! PleasehelpPrimožGabrijelčičandAlexEgorovbyspreadingthewordaboutthisbookonTwitter!

Демонстрационные приложения 66

• 29_ImplicitEventMonitor Пример для OnMessage и OnTerminated с именованным методом.• 30_AnonymousEventMonitor Пример для OnMessage и OnTerminated с анонимным методом.• 31_WaitableObjects Пример для API RegisterWaitObject/UnregisterWaitObject.• 32_Queue Стресс-тест для новых TOmniBaseQueue и TOmniQueue.• 33_BlockingCollection Стресс-тест для TOmniBlockingCollection, а также демонстрирует ис-пользование окружающей среды, чтобы установить соответствие процесса (process affinity).

• 34_TreeScan Параллельное сканирование дерева, используя TOmniBlockingCollection.• 35_ParallelFor Параллельное сканирование дерева, используя Parallel.ForEach (Delphi 2009 ивыше).

• 37_ParallelJoin Пример Parallel.Join.• 38_OrderedFor Упорядоченая параллельность для циклов.• 39_Future Пример Futures.• 40_Mandelbrot Очень простой пример параллельных графиков.• 41_Pipeline Многоступенчатые параллельные процессы.• 42_MessageQueue Стресс-тест для TOmniMessageQueue.• 43_InvokeAnonymous Пример для IOmniTask.Invoke.• 44_Fork-Join QuickSort Реализация быстрой сортировки, используя Parallel.ForkJoin.• 45_Fork-Join max Реализация поискамаксимального значениямассива, используя Parallel.ForkJoin.• 46_Async Пример для Parallel.Async.• 47_TaskConfig Пример для конфигурации задачи с Parallel.TaskConfig.• 48_OtlParallelExceptions Обработка исключений в высокоуровневых конструкциях OTL.• 49_FramedWorkers Несколько структур, каждая связывается с собственной рабочей задачей.• 50_OmniValueArray Упаковка массивов, хешей и записей в TOmniValue.• 51_PipelineStressTest Стресс-тест для Pipeline от [Антона Алисова].• 52_BackgroundWorker Пример для Parallel.BackgroundWorker.• 53_AsyncAwait Пример для абстракции Async/Await.