Skip to content

Commit 9e588e2

Browse files
Кутергин Антон. Технология SEQ-MPI. Передача от всех одному и рассылка. Вариант 3 (#302)
## Описание <!-- Пожалуйста, предоставьте подробное описание вашей реализации, включая: - основные детали решения (описание выбранного алгоритма) - применение технологии параллелизма (если применимо) --> - **Задача**: Передачи от всех одному и рассылка - **Вариант**: 3 - **Технология**: Технологии SEQ и MPI - **Описание** В рамках задачи реализованы последовательный и параллельный алгоритмы вычисления глобальной суммы элементов вектора. Отчёт содержит анализ производительности на системе с 4 физическими ядрами (8 логических), расчет ускорения и эффективности, а также обсуждение ограничений масштабируемости из-за специфики доступа к памяти. - **Реализация: Последовательная версия: Реализована в классе AllreduceSequential. Использует стандартный алгоритм std::accumulate для прохода по вектору и вычисления суммы за один проход Параллельная версия: Реализована в классе AllreduceMPI. Каждый процесс вычисляет локальную сумму своей части данных, после чего результаты агрегируются и распределяются между всеми участниками. --- ## Чек-лист <!-- Пожалуйста, убедитесь, что следующие пункты выполнены **до** отправки pull request'а и запроса его ревью: --> - [x] **Статус CI**: Все CI-задачи (сборка, тесты, генерация отчёта) успешно проходят на моей ветке в моем форке - [x] **Директория и именование задачи**: Я создал директорию с именем `<фамилия>_<первая_буква_имени>_<короткое_название_задачи>` - [x] **Полное описание задачи**: Я предоставил полное описание задачи в теле pull request - [x] **clang-format**: Мои изменения успешно проходят `clang-format` локально в моем форке (нет ошибок форматирования) - [x] **clang-tidy**: Мои изменения успешно проходят `clang-tidy` локально в моем форке (нет предупреждений/ошибок) - [x] **Функциональные тесты**: Все функциональные тесты успешно проходят локально на моей машине - [x] **Тесты производительности**: Все тесты производительности успешно проходят локально на моей машине - [x] **Ветка**: Я работаю в ветке, названной точно так же, как директория моей задачи (например, `nesterov_a_vector_sum`), а не в `master` - [x] **Правдивое содержание**: Я подтверждаю, что все сведения, указанные в этом pull request, являются точными и достоверными <!-- ПРИМЕЧАНИЕ: Ложные сведения в этом чек-листе могут привести к отклонению PR и получению нулевого балла за соответствующую задачу. -->
1 parent c0191b8 commit 9e588e2

11 files changed

Lines changed: 501 additions & 0 deletions

File tree

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include <string>
4+
#include <tuple>
5+
#include <vector>
6+
7+
#include "task/include/task.hpp"
8+
9+
namespace kutergin_a_allreduce {
10+
11+
struct InData {
12+
std::vector<int> elements;
13+
int root;
14+
15+
bool operator==(const InData &other) const {
16+
return elements == other.elements && root == other.root;
17+
}
18+
};
19+
20+
using InType = InData;
21+
using OutType = int;
22+
using BaseTask = ppc::task::Task<InType, OutType>;
23+
using TestType = std::tuple<int, int, std::string>;
24+
25+
} // namespace kutergin_a_allreduce
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"student": {
3+
"first_name": "Антон",
4+
"last_name": "Кутергин",
5+
"middle_name": "Андреевич",
6+
"group_number": "3823Б1ФИ1",
7+
"task_number": "3"
8+
}
9+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#pragma once
2+
3+
#include <mpi.h>
4+
5+
#include "../../common/include/common.hpp"
6+
#include "task/include/task.hpp"
7+
8+
namespace kutergin_a_allreduce {
9+
10+
int Allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
11+
12+
class AllreduceMPI : public BaseTask {
13+
public:
14+
static constexpr ppc::task::TypeOfTask GetStaticTypeOfTask() {
15+
return ppc::task::TypeOfTask::kMPI;
16+
}
17+
18+
explicit AllreduceMPI(const InType &in);
19+
20+
protected:
21+
bool ValidationImpl() override;
22+
bool PreProcessingImpl() override;
23+
bool RunImpl() override;
24+
bool PostProcessingImpl() override;
25+
};
26+
27+
} // namespace kutergin_a_allreduce
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#include "../include/ops_mpi.hpp"
2+
3+
#include <mpi.h>
4+
5+
#include <cstdint>
6+
#include <cstring>
7+
#include <numeric>
8+
#include <vector>
9+
10+
#include "../../common/include/common.hpp"
11+
12+
namespace kutergin_a_allreduce {
13+
14+
namespace {
15+
16+
void ApplyOp(void *a, const void *b, int count, MPI_Datatype datatype, MPI_Op op) {
17+
if (op == MPI_SUM && datatype == MPI_INT) {
18+
for (int i = 0; i < count; ++i) {
19+
reinterpret_cast<int *>(a)[i] += reinterpret_cast<const int *>(b)[i];
20+
}
21+
}
22+
}
23+
24+
} // namespace
25+
26+
AllreduceMPI::AllreduceMPI(const InType &in) {
27+
SetTypeOfTask(GetStaticTypeOfTask());
28+
GetInput() = in;
29+
GetOutput() = 0;
30+
}
31+
32+
int Allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
33+
int rank = 0;
34+
int size = 0;
35+
MPI_Comm_rank(comm, &rank);
36+
MPI_Comm_size(comm, &size);
37+
38+
int type_size = 0;
39+
MPI_Type_size(datatype, &type_size);
40+
41+
std::memcpy(recvbuf, sendbuf, static_cast<size_t>(count) * type_size);
42+
43+
for (int mask = 1; mask < size; mask <<= 1) {
44+
if ((rank & mask) != 0) {
45+
MPI_Send(recvbuf, count, datatype, rank - mask, 0, comm);
46+
break;
47+
}
48+
49+
if (rank + mask < size) {
50+
std::vector<uint8_t> tmp(static_cast<size_t>(count) * type_size);
51+
MPI_Recv(tmp.data(), count, datatype, rank + mask, 0, comm, MPI_STATUS_IGNORE);
52+
ApplyOp(recvbuf, tmp.data(), count, datatype, op);
53+
}
54+
}
55+
56+
for (int mask = 1; mask < size; mask <<= 1) {
57+
if (rank < mask && rank + mask < size) {
58+
MPI_Send(recvbuf, count, datatype, rank + mask, 0, comm);
59+
} else if (rank >= mask && rank < 2 * mask) {
60+
MPI_Recv(recvbuf, count, datatype, rank - mask, 0, comm, MPI_STATUS_IGNORE);
61+
}
62+
}
63+
64+
return MPI_SUCCESS;
65+
}
66+
67+
bool AllreduceMPI::ValidationImpl() {
68+
return true;
69+
}
70+
71+
bool AllreduceMPI::PreProcessingImpl() {
72+
return true;
73+
}
74+
75+
bool AllreduceMPI::RunImpl() {
76+
int rank = 0;
77+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
78+
79+
const auto &input_struct = GetInput();
80+
81+
int local_sum = 0;
82+
if (!input_struct.elements.empty()) {
83+
local_sum = std::accumulate(input_struct.elements.begin(), input_struct.elements.end(), 0);
84+
}
85+
86+
int global_sum = 0;
87+
88+
Allreduce(&local_sum, &global_sum, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
89+
90+
GetOutput() = global_sum;
91+
92+
return true;
93+
}
94+
95+
bool AllreduceMPI::PostProcessingImpl() {
96+
return true;
97+
}
98+
99+
} // namespace kutergin_a_allreduce
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Реализация коллективной операции AllReduce
2+
3+
**Студент** Кутергин Антон
4+
**Группа** 3823Б1ФИ1
5+
**Вариант** 3
6+
7+
## 1 Введение
8+
Задача AllReduce заключается в объединении данных со всех процессов с использованием некоторой операции (в данном случае — суммирования) и последующей рассылке результата обратно всем участникам.Данная работа мотивирована необходимостью эффективного вычисления глобальных агрегатов на распределенных системах, где прямой последовательный расчет становится неэффективным из-за больших объемов данных.
9+
10+
## 2 Постановка задачи
11+
Требуется реализовать функцию, повторяюшая MPI_Reduce.
12+
Необходимо вычислить общую сумму всех элементов вектора со всех процессов и сделать этот результат доступным каждому процессу.
13+
14+
## 3 Последовательная версия
15+
Последовательный алгоритм реализован в классе AllreduceSequential. Он принимает на вход весь вектор данных и использует стандартную функцию std::accumulate из заголовочного файла <numeric>. Алгоритм проходит по вектору один раз, выполняя N-1 операций сложения, где $N$ — общее количество элементов.
16+
17+
## 4 Параллельная версия
18+
Схема параллельной реализации базируется на распределении данных и коллективном взаимодействии:
19+
* Распределение данных: Общий объем данных делится поровну между P процессами.
20+
* Локальное вычисление: Каждый процесс параллельно вычисляет сумму своей части вектора с помощью std::accumulate.
21+
* Коммуникационный паттерн: Используется функция MPI_Allreduce. Внутри она может быть реализована через дерево (Recursive Doubling) или алгоритм кольца (Bruck algorithm), что обеспечивает логарифмическую сложность обмена данными O(log P).
22+
* Роли рангов: Все процессы симметричны, по завершении каждый получает итоговое значение.
23+
24+
## 5 Детали реализации
25+
ops_seq.cpp/hpp: Последовательная реализация.
26+
ops_mpi.cpp/hpp: Параллельная реализация с вызовом MPI.
27+
Память: Использование std::vector обеспечивает эффективное управление памятью. Данные в тестах производительности генерируются «на лету» для экономии места.
28+
29+
## 6 Экспериментальная установка
30+
* Hardware/OS: Intel Core i5-8300H, 4 ядра, 12 Gb RAM, Windows 10
31+
* Toolchain: MSVC (Visual Studio 2022)
32+
* Build type: Release
33+
* Environment: Тесты запускались через mpiexec с флагами -n 1, -n 2, -n 4, -n 8.
34+
* Data: Вектор из 100 000 000 элементов.
35+
36+
## 7 Результаты
37+
38+
# 7.1 Корректность
39+
Корректность была подтверждена с помощью функциональных тестов, покрывающих:
40+
* Случаи с пустыми векторами.
41+
* Векторы разного размера.
42+
* Различные идентификаторы корневого процесса.
43+
* Сравнение результата MPI с эталонным последовательным результатом.
44+
45+
# 7.2 Перфоманс
46+
| Mode | Count | Time, s | Speedup | Efficiency |
47+
|-------------|-------|---------|---------|------------|
48+
| seq | 1 | 0.022678| 1.00 | N/A |
49+
| mpi | 2 | 0.017855| 1.27 | 63.5% |
50+
| mpi | 4 | 0.017777| 1.28 | 32.0% |
51+
| mpi | 8 | 0.016690| 1.36 | 17.0% |
52+
53+
# 7.3 Обсуждение
54+
На объеме в 100 млн элементов удалось достичь стабильного ускорения. Максимальное ускорение (1.36x) наблюдается при 8 процессах. Относительно низкая эффективность при росте числа ядер объясняется тем, что операция суммирования является memory-bound (ограничена скоростью памяти). Процессы конкурируют за пропускную способность шины памяти при чтении вектора, что не дает времени выполнения сокращаться линейно.
55+
56+
# 8 Вывод
57+
В ходе работы реализован параллельный алгоритм AllReduce, полностью повторяющий MPI_Reduce. Эксперименты показали, что параллелизация эффективна для больших массивов данных, однако для простых арифметических операций основным сдерживающим фактором является пропускная способность памяти. MPI-реализация успешно проходит все тесты на корректность и демонстрирует преимущество над последовательной версией.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#pragma once
2+
3+
#include "../../common/include/common.hpp"
4+
#include "task/include/task.hpp"
5+
6+
namespace kutergin_a_allreduce {
7+
8+
class AllreduceSequential : public BaseTask {
9+
public:
10+
static constexpr ppc::task::TypeOfTask GetStaticTypeOfTask() {
11+
return ppc::task::TypeOfTask::kSEQ;
12+
}
13+
14+
explicit AllreduceSequential(const InType &in);
15+
16+
protected:
17+
bool ValidationImpl() override;
18+
bool PreProcessingImpl() override;
19+
bool RunImpl() override;
20+
bool PostProcessingImpl() override;
21+
};
22+
23+
} // namespace kutergin_a_allreduce
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#include "../include/ops_seq.hpp"
2+
3+
#include <numeric>
4+
5+
#include "../../common/include/common.hpp"
6+
7+
namespace kutergin_a_allreduce {
8+
9+
AllreduceSequential::AllreduceSequential(const InType &in) {
10+
SetTypeOfTask(GetStaticTypeOfTask());
11+
GetInput() = in;
12+
GetOutput() = 0;
13+
}
14+
15+
bool AllreduceSequential::ValidationImpl() {
16+
return true;
17+
}
18+
19+
bool AllreduceSequential::PreProcessingImpl() {
20+
GetOutput() = 0;
21+
return true;
22+
}
23+
24+
bool AllreduceSequential::RunImpl() {
25+
const InType &input_data = GetInput();
26+
27+
if (input_data.elements.empty()) {
28+
GetOutput() = 0;
29+
} else {
30+
GetOutput() = std::accumulate(input_data.elements.begin(), input_data.elements.end(), 0);
31+
}
32+
return true;
33+
}
34+
35+
bool AllreduceSequential::PostProcessingImpl() {
36+
return true;
37+
}
38+
39+
} // namespace kutergin_a_allreduce
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"tasks_type": "processes",
3+
"tasks": {
4+
"mpi": "enabled",
5+
"seq": "enabled"
6+
}
7+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
InheritParentConfig: true
2+
3+
Checks: >
4+
-modernize-loop-convert,
5+
-cppcoreguidelines-avoid-goto,
6+
-cppcoreguidelines-avoid-non-const-global-variables,
7+
-misc-use-anonymous-namespace,
8+
-modernize-use-std-print,
9+
-modernize-type-traits
10+
11+
CheckOptions:
12+
- key: readability-function-cognitive-complexity.Threshold
13+
value: 50 # Relaxed for tests

0 commit comments

Comments
 (0)