Параллельная обработка очереди сообщений

15.06.21

Разработка - Математика и алгоритмы

Описание алгоритма обработки очереди последовательных сообщений регистрации изменений записей регистра сведений. Алгоритм может быть применим к любым объектам метаданных. Алгоритм основан на обработке объектов по их ключам.

Альтернативная реализация РИБ на RabbitMQ хорошо показала себя на практике. Более того она получила своё развитие. Тем не менее узким местом стала загрузка данных на стороне 1С. Средняя производительность составила приблизительно 1000 сообщений в минуту в один поток (фоновое задание) при среднем размере сообщения равным 3 Кб. В конечном итоге встал вопрос оптимизации загрузки, так как количество входящих сообщений по некоторым типам сообщений (объектам метаданных) составляло в пиках нагрузки несколько десятков, а иногда сотен тысяч сообщений за достаточно короткий промежуток времени.

В данной публикации приводится подробное описание и алгоритм решения проблемы.

Алгоритм рассмотрен на примере параллельной обработки регистра сведений "КурсыВалют".

Тем не менее, всё ниже сказанное в равной степени может быть применимо к ссылочным объектам метаданных и регистрам накопления, учитывая их специфические особенности.

 
1. Описание проблемы.
 
2. Алгоритм решения. 
 
3. Реализация алгоритма.
 
4. Применимость алгоритма.
 
5. Дополнительный алгоритм "срез последних".
 
6. Примерный код 1С. 

 

параллельная многопоточная последовательность обмен данными очереди сообщений

См. также

Метод Дугласа-Пойкера для эффективного хранения метрик

Математика и алгоритмы Платформа 1C v8.2 Конфигурации 1cv8 Россия Абонемент ($m)

На написание данной работы меня вдохновила работа @glassman «Переход на ClickHouse для анализа метрик». Автор анализирует большой объем данных, много миллионов строк, и убедительно доказывает, что ClickHouse справляется лучше PostgreSQL. Я же покажу как можно сократить объем данных в 49.9 раз при этом: 1. Сохранить значения локальных экстремумов 2. Отклонения от реальных значений имеют наперед заданную допустимую погрешность.

1 стартмани

30.01.2024    1754    stopa85    12    

33

Алгоритм симплекс-метода для решения задачи раскроя

Математика и алгоритмы Бесплатно (free)

Разработка алгоритма, построенного на модели симплекс-метода, для нахождения оптимального раскроя.

19.10.2023    4419    user1959478    50    

34

Регулярные выражения на 1С

Математика и алгоритмы Инструментарий разработчика Платформа 1С v8.3 Мобильная платформа Россия Абонемент ($m)

Что ж... лучше поздно, чем никогда. Подсистема 1С для работы с регулярными выражениями: разбор выражения, проверка на соответствие шаблону, поиск вхождений в тексте.

1 стартмани

09.06.2023    7462    4    SpaceOfMyHead    17    

56

Модель распределения суммы по базе

Математика и алгоритмы Платформа 1С v8.3 Россия Абонемент ($m)

Обычно под распределением понимают определение сумм пропорционально коэффициентам. Предлагаю включить сюда также распределение по порядку (FIFO, LIFO) и повысить уровень размерности до 2-х. 1-ое означает, что распределение может быть не только пропорциональным, но и по порядку, а 2-ое - это вариант реализации матричного распределения: по строкам и столбцам. Возможно вас заинтересует также необычное решение этой задачи через создание DSL на базе реализации текучего интерфейса

1 стартмани

21.03.2022    7855    7    kalyaka    11    

44

Изменения формата файлов конфигурации (CF) в 8.3.16

Математика и алгоритмы Платформа 1С v8.3 Бесплатно (free)

Дополнение по формату файлов конфигурации (*.cf) в версии 8.3.16.

16.12.2021    4446    fishca    13    

36

Интересная задача на Yandex cup 2021

Математика и алгоритмы Бесплатно (free)

Мое решение задачи на Yandex cup 2021 (frontend). Лабиринт. JavaScript.

12.10.2021    8839    John_d    73    

46

Механизм анализа данных. Кластеризация.

Математика и алгоритмы Анализ учета Платформа 1С v8.3 Анализ и прогнозирование Бесплатно (free)

Подробный разбор, с примером использования, встроенного механизма кластеризации 1С.

31.08.2021    7805    dusha0020    8    

70
Комментарии
В избранное Подписаться на ответы Сортировка: Древо развёрнутое
Свернуть все
1. Cyberhawk 135 15.06.21 08:44 Сейчас в теме
регистрация изменений по всем полным ключам, удаляемых записей. Интересно отметить, что именно так работает регистрация изменений в таблицах изменений планов обмена 1С - там всегда региструются только полные ключи
Наверное, стоит отметить, что регистрируются "полные ключи" гранул регистрации изменений (т.е. все измерения, входящие в основной отбор), а не "полные ключи" записей регистра (которые могут состоять из большего числа измерений, чем кол-во входящих в основной отбор)
dabu-dabu; zhichkin; tormozit; +3 Ответить
3. zhichkin 1438 15.06.21 10:47 Сейчас в теме
(1) Согласен. "Гранула" - будет более точным подходом, тем более, что её состав может определяться не только свойством измерения "Основной отбор", но и бизнес-логикой приложения. Спасибо за уточнение.
2. Cyberhawk 135 15.06.21 08:56 Сейчас в теме
Неудобством алгоритма является то, что для каждого регистра сведений, для которого требуется выполнить оптимизацию параллельной обработки очереди сообщений, необходимо вручную создавать дополнительные специализированные регистры сведений
Предпринимались ли попытки построить для независимых регистров единый регистр-очередь для регистрации? В котором количество ресурсов, описывающих основной отбор, было бы равно максимальному числу измерений, входящих в основной отбор регистрируемых к обмену регистров сведений (Измерение1, Измерение2, Измерение3 и т.д.)?
zhichkin; +1 Ответить
4. zhichkin 1438 15.06.21 10:55 Сейчас в теме
(2) Нет, но мысль такая была. Стоит отметить вот эту статью - в ней используется ключ строкового типа длиной 200 символов, который получается путём "сложения" значений измерений - по сути хэш измерений, входящих в состав ключа.
5. DarkAn 1079 24.06.21 13:45 Сейчас в теме
(0) Ваша, задача без проблем решается с помощью Менеджер потоков 2.1, за счет "расчета ресурсов" и построения графа зависимостей, где номер сообщения является этим самым "ресурсом" и все последующие сообщения будут выстраиваться строго за ним и абсолютно не как не мешая параллельно обрабатывать другие сообщения с другим ключом.
(2) При этом не надо создавать промежуточные таблицы (справочники / регистры) на уровне метаданных. Сам менеджер состоит только из общих модулей - этого достаточно!!! При этом проблем с размером графа нет. В памяти могут строиться графы неограниченного размера, но если с памятью проблемы, в параметрах можно указать ограничение.
6. zhichkin 1438 24.06.21 16:17 Сейчас в теме
(5) Добрый день! Спасибо за комментарий!

Возник вопрос по использованию Вашей разработки.
Посоветуйте, пожалуйста, как её использовать в моём сценарии.

Имеем регистр сведений (очередь сообщений одного типа):
- НомерСообщения (число)
- ТелоСообщения (строка в формате JSON)

Ключи объектов лежат в поле ТелоСообщения, то есть предварительно нужно их "достать" из JSON'а, чтобы использовать для построения графа зависимостей. Требуется обработать 10 000 сообщений (записей регистра сведений) за один запуск регламентного задания. При этом ТелоСообщения может иметь размер от 2-3 Кб до 2-3 Мб. Заранее размер сообщений неизвестен.

Как будет правильно решить такую задачу при помощи Вашей разработки ?
7. DarkAn 1079 25.06.21 14:08 Сейчас в теме
(6)Добрый день!
Архитектура решения, следующая:
1) Запуск
• На клиенте вызывается (через метод) фоновое задание менеджера с передачей в него параметров;
• далее менеджер стартует фоновые задания (столько сколько указано было в параметрах, по умолчанию 10) обработки данных.
2) Следом на клиенте запускается цикл обработки сообщений – в вашем случае перебор данных из регистра сведений. Каждое сообщение (НомерСообщения и ТелоСообщения) постепенно скармливаются в менеджер, через метод «ОбработатьОбъект».
3) Далее менеджер в первую очередь оправляет в потоки данные для расчета ресурса – это определяется использованным методом «ОбработатьОбъект». Отправляет в те же самые фоновые задания запущенные для обработки, так что и расчет тоже проходит в многопоточном режиме (по умолчанию коэффициент определения сколько потоков выделено под расчет = 0,2 * на количество потоков для обработки, но не меньше 1 - это позволяет даже для длительных обработках иметь «коридор» - свободные потоки для расчета ресурсов). Как только все объекты будут рассчитаны или граф достигнет лимита (по умолчанию лимита нет) – потоки выделенные для расчета так же переходят в обработку и не простаивают.
4) После того как произошел расчет ресурса, менеджер по результатам строит граф зависимости. Результатом расчет ресурса может быть не только одно значение. На самом деле результатом является массив из массивов значений. Где массив верхнего уровня - это массив ресурсОВ, в вложенный массив это массив значений ресурсА. Что является ресурсом вы как разработчик определяете сами.
5) Как строится граф зависимости. Менеджер перебирает все ресурсы, полученные из расчета объекта. Если находит что в графе уже есть объект имеющий точно такой же ресурс, то текущий объект ставится в зависимость то объекта в графе. Далее берется следующий ресурс этого же объекта, и процедура повторяется, в итоге один объект может быть зависим от множества других объектов в графе и пока не будут обработаны все вышестоящие объекты графа текущий объект не будет передан в обработку потоками.
6) Далее менеджер выбирает из графа «свободные» объекты не зависящие от других и вот уже эти объекты менеджер передает в потоки на обработку.
В упрощенном виде это реализовано так.
Если я правильно понял Вашу архитектуру регистра, но НомерСообщения – это уникальный номер сообщения, и он не повторяется, а в ТелоСообщения – лежит как ключ, так и данные.
Для уменьшения нагрузки для работы менеджера (для передачи данных между ФЗ менеджер делает запись в БД и перегонять полное тело сообщения не выгодно), я бы сделал так:
В регистр добавил бы ресурс «Ключ» - перед записью «ТелаСообщения», я бы извлекал «Ключ» из «Тела» и складывал бы рядом в этом же регистре. Тогда в менеджер я бы передавал только «НомерСообщения» и его «Ключ». «Ключ» определял бы как результат расчета ресурсов (все сообщения с таким ключом становятся зависимыми и обрабатываются «последовательно»). А уже в ФЗ обработки я бы по «НомеруСообщения» вычитывал бы полное тело и обрабатывал его как требуется.
zhichkin; +1 Ответить
10. zhichkin 1438 26.06.21 14:27 Сейчас в теме
(7) Спасибо за подробные объяснения!
В регистр добавил бы ресурс «Ключ» - перед записью «ТелаСообщения», я бы извлекал «Ключ» из «Тела» и складывал бы рядом в этом же регистре.

Интересно чем это отличается от того, о чём я писал в своей публикации ? Мне кажется, что концептуально ничем.

Ваш алгоритм построения графа зависимостей основан на использовании объекта платформы 1С "Соответствие". Вам не кажется, что Ваш алгоритм аналогичен использованию оператора GROUP BY (СГРУППИРОВАТЬ ПО) СУБД, использование которого я тоже описываю в статье ? Разница есть конецно же - она в агрегирующей функции. Например SQL Server имеет аналогичную функцию STRING_AGG, а PostgreSQL - ARRAY_AGG. Так как язык запросов 1С не поддерживает эти функции, мне пришлось делать ЛЕВОЕ СОЕДИНЕНИЕ в запросе. Другими словами я использовал встроенные алгоритмы СУБД для решения задачи вместо того, чтобы реализовывать их средствами 1С.

Учитывая то, что Вы же сами и предлагаете добавить ресурс "Ключ" (см. цитату выше), Ваши первоначальные выводы относительно применимости Вашей разработки к решаемой задаче:
Ваша, задача без проблем решается с помощью "Менеджер потоков"

и
При этом не надо создавать промежуточные таблицы

выглядят, на мой взгляд, несколько поспешными.

В связи с этим у меня вопрос: как Вы понимаете термин "без проблем" ?
11. DarkAn 1079 27.06.21 14:16 Сейчас в теме
(10)"ключ" я предложил сделать только для того чтобы не гонять тело сообщения между ФЗ, можно конечно же без этого - это лишь кусок оптимизации. Прошу прощения за ошибки пишу с телефона
8. DarkAn 1079 25.06.21 14:15 Сейчас в теме
(6) На скринах я выложил построенный граф и алгоритм по которому программа его строила, ну и на всякий случай руководство тоже прикрепил, описание и эти же картинки приведены в руководстве на 43-44 стр.

Согласитесь, что описать столь сложную взаимосвязь объектов даже простым языком сложно, не говоря уже о программной линейной обработке, а алгоритм расчета ресурсов как видите весьма простой.
Прикрепленные файлы:
Руководство.pdf
9. DarkAn 1079 25.06.21 14:34 Сейчас в теме
(6) я Вам даже скажу так: если для обработки ТелаСообщения у Вас есть экспортная процедура, куда вы просто скидываете это тело, а дальше "оно само" обрабатывается, то менеджер потоков можно адаптировать под вашу задачу очень быстро - за 1-2 часа думаю можно легко уложиться. Изучите демо обработка в частности Вам будет интересна мпОбработатьОбъект

По большому счету весь код который надо написать написан ниже, а все остальное делает Менеджер :)

Вот основной код из обработки:
//*****************************************************************
&НаСервере
Процедура ВыполнитьВПотоках()
	ПараметрыМП = мпОсновнаяПрограммаВызовСервера.ПолучитьПараметрыИнициализацииМенеджераПотоков(
										"ОбработатьОбъект", 
										"мпСобытияРазработчикаОбработатьОбъект");
	ПараметрыМП.Граф.УчитыватьОбъектыБезРесурсов = ОбработатьОбъектыБезРесурса;
	ПараметрыМП.Общие.КоличествоПотоков = КоличествоПотоков;
	
	ПараметрыДляОбъекта = Новый Структура;
	ПараметрыДляОбъекта.Вставить("Алгоритм", Алгоритм);
	
	мпОсновнаяПрограммаВызовСервера.ИнициализироватьМенеджерПотоков(ПараметрыМП);
	Для Сч = 1 По РазмерКоллекции Цикл
		мпОсновнаяПрограммаВызовСервера.ОбработатьОбъект(ПараметрыМП, Сч, ПараметрыДляОбъекта);
	КонецЦикла;
	
	мпОсновнаяПрограммаВызовСервера.ДождатьсяОстановкиМенеджераПотоков(ПараметрыМП);	
КонецПроцедуры
Показать


А Вот модуль обработчиков:
#Область СлужебныйПрограммныйИнтерфейс

//*****************************************************************
Функция ОбработатьСобытиеРазработчика(пПараметрыСобытия) Экспорт
	ИмяСобытия      = пПараметрыСобытия.ИмяСобытия;
	Параметры       = пПараметрыСобытия.Параметры;
	
	ОтветСобытия = Неопределено;
	
	Если ИмяСобытия = "ПередЗапускомМенеджераПотоков" Тогда			//Основная программа
	ИначеЕсли ИмяСобытия = "ПриЗапускеМенеджераПотоков" Тогда		//ФЗ Менеджера потоков
	ИначеЕсли ИмяСобытия = "ПередЗапускомПотока" Тогда				//ФЗ Менеджера потоков
	ИначеЕсли ИмяСобытия = "ПриЗапускеПотока" Тогда   				//ФЗ Потока
	ИначеЕсли ИмяСобытия = "ПриРасчетеРесурсов" Тогда				//ФЗ Потока
		ОтветСобытия = ПриРасчетеРесурсов(пПараметрыСобытия);
	ИначеЕсли ИмяСобытия = "ПриОбработкеПотоком" Тогда				//ФЗ Потока
		ОтветСобытия = ПриОбработкеПотоком(пПараметрыСобытия);
	ИначеЕсли ИмяСобытия = "ПриОбработкеОшибки" Тогда				//Основная программа
	ИначеЕсли ИмяСобытия = "ПриОбработкеПропуска" Тогда				//Основная программа
		ПриОбработкеПропуска(пПараметрыСобытия);
	ИначеЕсли ИмяСобытия = "ПриОбработкеРезультата" Тогда			//Основная программа
		ПриОбработкеРезультата(пПараметрыСобытия);
	ИначеЕсли ИмяСобытия = "ПриПроизвольнойОбработкеГрафа" Тогда	//ФЗ Менеджера потоков
	ИначеЕсли ИмяСобытия = "ПередЗавершениемМенеджераПотоков" Тогда	//ФЗ Менеджера потоков
	ИначеЕсли ИмяСобытия = "ПриСохраненииГрафа" Тогда				//Основная программа
	ИначеЕсли ИмяСобытия = "ПриСборкеФрагментовКоллекции" Тогда		//Основная программа
	ИначеЕсли ИмяСобытия = "ПослеЗавершенияМенеджераПотоков" Тогда	//Основная программа
	КонецЕсли;
	
	Возврат ОтветСобытия;
КонецФункции

#КонецОбласти

#Область СлужебныеПроцедурыИФункции

#Область События_ФЗ_Потока

//*****************************************************************
Функция ПриОбработкеПотоком(пПараметрыСобытия)
	ОбрабатываемыйОбъект = пПараметрыСобытия.Данные.ОбрабатываемыйОбъект;

	Сообщение = Новый СообщениеПользователю;
	Сообщение.Текст = СтрШаблон(НСтр("ru='ОБРАБОТАН объект: %1 (сообщение из потока)'"), ОбрабатываемыйОбъект);
	Сообщение.Сообщить(); 
	
	Возврат НСтр("ru='Обработано'");
КонецФункции

//*****************************************************************
Функция ПриРасчетеРесурсов(пПараметрыСобытия)
	МассивРесурсов = Новый Массив;
	
	ОбрабатываемыйОбъект = пПараметрыСобытия.Данные.ОбрабатываемыйОбъект;
	ПараметрыДляОбъекта  = пПараметрыСобытия.Данные.ПараметрыДляОбъекта;
	
	Алгоритм = ПараметрыДляОбъекта.Алгоритм;
	Если Алгоритм = "СвязьПоДелителям4_9" Тогда
		Для Сч = 4 По 9 Цикл
			Если ОбрабатываемыйОбъект % Сч = 0 Тогда
				МассивРесурса = Новый Массив;
				МассивРесурса.Добавить(Сч);
				МассивРесурсов.Добавить(МассивРесурса);
			КонецЕсли;
		КонецЦикла;
		
	ИначеЕсли Алгоритм = "СвязьПоСуммеДелителей2_9" Тогда
		СуммаДелителей = 0;
		Для Сч = 2 По 9 Цикл
			Если ОбрабатываемыйОбъект % Сч = 0 Тогда
				СуммаДелителей = СуммаДелителей + Сч;
			КонецЕсли;
		КонецЦикла; 
		
		Если СуммаДелителей > 0 Тогда
			МассивРесурса = Новый Массив;
			МассивРесурса.Добавить(СуммаДелителей);
			МассивРесурсов.Добавить(МассивРесурса);
		КонецЕсли;	
	КонецЕсли;
	
	Возврат МассивРесурсов;
КонецФункции

#КонецОбласти

#Область События_ОсновнойПрограммы

//*****************************************************************
Процедура ПриОбработкеПропуска(пПараметрыСобытия)
	Данные = пПараметрыСобытия.Данные;
	
	Статус 				 = Данные.Статус;
	ОбрабатываемыйОбъект = Данные.ОбрабатываемыйОбъект;
	
	СтрокаДляПользователя = СтрШаблон(НСтр("ru='Статус обработки: %1; Объект: %2'"), Статус, ОбрабатываемыйОбъект);
	
	Сообщение = Новый СообщениеПользователю;
	Сообщение.Текст = СтрокаДляПользователя;
	Сообщение.Сообщить();
КонецПроцедуры

//*****************************************************************
Процедура ПриОбработкеРезультата(пПараметрыСобытия)
	Данные = пПараметрыСобытия.Данные;
	
	Статус 				 = Данные.Статус;
	ОбрабатываемыйОбъект = Данные.ОбрабатываемыйОбъект;
	РезультатОбработки   = Данные.РезультатОбработки;
	
	СтрокаДляПользователя = СтрШаблон(НСтр("ru='Статус обработки: %1; Объект: %2; Результат: %3'"), Статус, ОбрабатываемыйОбъект, РезультатОбработки);
	
	Сообщение = Новый СообщениеПользователю;
	Сообщение.Текст = СтрокаДляПользователя;
	Сообщение.Сообщить();	
КонецПроцедуры

#КонецОбласти

#КонецОбласти
Показать
Оставьте свое сообщение