Skip to content

Commit 07d5854

Browse files
authored
Никитина Валерия. Технология SEQ-MPI. Передача от всех одному и рассылка (allreduce). Вариант 3 (#135)
<!-- Требования к названию pull request: "<Фамилия> <Имя>. Технология <TECHNOLOGY_NAME:SEQ|OMP|TBB|STL|MPI>. <Полное название задачи>. Вариант <Номер>" --> ## Описание <!-- Пожалуйста, предоставьте подробное описание вашей реализации, включая: - основные детали решения (описание выбранного алгоритма) - применение технологии параллелизма (если применимо) --> - **Задача**: Передача от всех одному и рассылка (allreduce) - **Вариант**: 3 - **Технология**: SEQ, MPI - **Описание**: Реализован механизм AllReduce в двух вариантах. Параллельная MPI-версия построена на явной композиции коллективных операций: MPI_Reduce для сбора и суммирования данных на корневом процессе и MPI_Bcast для синхронной рассылки результата всем участникам коммуникатора. Последовательная версия (SEQ) выполняет эмуляцию алгоритма через прямое копирование памяти, так как агрегация на единственном узле тривиальна. Обе реализации оптимизированы для минимизации накладных расходов памяти (использование swap). --- ## Чек-лист <!-- Пожалуйста, убедитесь, что следующие пункты выполнены **до** отправки pull request'а и запроса его ревью: --> - [x] **Статус CI**: Все CI-задачи (сборка, тесты, генерация отчёта) успешно проходят на моей ветке в моем форке - [x] **Директория и именование задачи**: Я создал директорию с именем `<фамилия>_<первая_буква_имени>_<короткое_название_задачи>` - [x] **Полное описание задачи**: Я предоставил полное описание задачи в теле pull request - [x] **clang-format**: Мои изменения успешно проходят `clang-format` локально в моем форке (нет ошибок форматирования) - [x] **clang-tidy**: Мои изменения успешно проходят `clang-tidy` локально в моем форке (нет предупреждений/ошибок) - [x] **Функциональные тесты**: Все функциональные тесты успешно проходят локально на моей машине - [x] **Тесты производительности**: Все тесты производительности успешно проходят локально на моей машине - [x] **Ветка**: Я работаю в ветке, названной точно так же, как директория моей задачи (например, `nesterov_a_vector_sum`), а не в `master` - [x] **Правдивое содержание**: Я подтверждаю, что все сведения, указанные в этом pull request, являются точными и достоверными <!-- ПРИМЕЧАНИЕ: Ложные сведения в этом чек-листе могут привести к отклонению PR и получению нулевого балла за соответствующую задачу. -->
1 parent e89ed25 commit 07d5854

10 files changed

Lines changed: 444 additions & 0 deletions

File tree

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#pragma once
2+
3+
#include <string>
4+
#include <tuple>
5+
#include <vector>
6+
7+
#include "task/include/task.hpp"
8+
9+
namespace nikitina_v_trans_all_one_distrib {
10+
11+
using InType = std::vector<int>;
12+
using OutType = std::vector<int>;
13+
14+
using TestType = std::tuple<int, std::string>;
15+
using BaseTask = ppc::task::Task<InType, OutType>;
16+
17+
} // namespace nikitina_v_trans_all_one_distrib
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"student": {
3+
"first_name": "Валерия",
4+
"last_name": "Никитина",
5+
"middle_name": "Владимировна",
6+
"group_number": "3823Б1ФИ2",
7+
"task_number": "2"
8+
}
9+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#pragma once
2+
3+
#include "nikitina_v_trans_all_one_distrib/common/include/common.hpp"
4+
#include "task/include/task.hpp"
5+
6+
namespace nikitina_v_trans_all_one_distrib {
7+
8+
class TestTaskMPI : public BaseTask {
9+
public:
10+
static constexpr ppc::task::TypeOfTask GetStaticTypeOfTask() {
11+
return ppc::task::TypeOfTask::kMPI;
12+
}
13+
explicit TestTaskMPI(const InType &in);
14+
15+
private:
16+
bool ValidationImpl() override;
17+
bool PreProcessingImpl() override;
18+
bool RunImpl() override;
19+
bool PostProcessingImpl() override;
20+
};
21+
22+
} // namespace nikitina_v_trans_all_one_distrib
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
#include "nikitina_v_trans_all_one_distrib/mpi/include/ops_mpi.hpp"
2+
3+
#include <mpi.h>
4+
5+
#include <algorithm>
6+
#include <cstddef>
7+
#include <functional>
8+
#include <vector>
9+
10+
#include "nikitina_v_trans_all_one_distrib/common/include/common.hpp"
11+
12+
namespace nikitina_v_trans_all_one_distrib {
13+
14+
TestTaskMPI::TestTaskMPI(const InType &in) {
15+
SetTypeOfTask(GetStaticTypeOfTask());
16+
InType tmp = in;
17+
GetInput().swap(tmp);
18+
}
19+
20+
bool TestTaskMPI::ValidationImpl() {
21+
return true;
22+
}
23+
24+
bool TestTaskMPI::PreProcessingImpl() {
25+
return true;
26+
}
27+
28+
bool TestTaskMPI::RunImpl() {
29+
int rank = 0;
30+
int size = 0;
31+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
32+
MPI_Comm_size(MPI_COMM_WORLD, &size);
33+
34+
int input_size = static_cast<int>(GetInput().size());
35+
int global_vec_size = input_size;
36+
MPI_Bcast(&global_vec_size, 1, MPI_INT, 0, MPI_COMM_WORLD);
37+
38+
if (global_vec_size == 0) {
39+
return true;
40+
}
41+
42+
std::vector<int> current_values = GetInput();
43+
if (current_values.size() != static_cast<size_t>(global_vec_size)) {
44+
current_values.resize(static_cast<size_t>(global_vec_size), 0);
45+
}
46+
47+
int left_child = (2 * rank) + 1;
48+
int right_child = (2 * rank) + 2;
49+
int parent = (rank - 1) / 2;
50+
51+
MPI_Status status;
52+
53+
if (left_child < size) {
54+
std::vector<int> recv_buf(static_cast<size_t>(global_vec_size));
55+
MPI_Recv(recv_buf.data(), global_vec_size, MPI_INT, left_child, 0, MPI_COMM_WORLD, &status);
56+
std::ranges::transform(current_values, recv_buf, current_values.begin(), std::plus<>());
57+
}
58+
59+
if (right_child < size) {
60+
std::vector<int> recv_buf(static_cast<size_t>(global_vec_size));
61+
MPI_Recv(recv_buf.data(), global_vec_size, MPI_INT, right_child, 0, MPI_COMM_WORLD, &status);
62+
std::ranges::transform(current_values, recv_buf, current_values.begin(), std::plus<>());
63+
}
64+
65+
if (rank != 0) {
66+
MPI_Send(current_values.data(), global_vec_size, MPI_INT, parent, 0, MPI_COMM_WORLD);
67+
}
68+
69+
if (rank != 0) {
70+
MPI_Recv(current_values.data(), global_vec_size, MPI_INT, parent, 1, MPI_COMM_WORLD, &status);
71+
}
72+
73+
if (left_child < size) {
74+
MPI_Send(current_values.data(), global_vec_size, MPI_INT, left_child, 1, MPI_COMM_WORLD);
75+
}
76+
if (right_child < size) {
77+
MPI_Send(current_values.data(), global_vec_size, MPI_INT, right_child, 1, MPI_COMM_WORLD);
78+
}
79+
80+
if (rank == 0) {
81+
GetOutput().resize(static_cast<size_t>(global_vec_size));
82+
std::ranges::copy(current_values, GetOutput().begin());
83+
}
84+
85+
return true;
86+
}
87+
88+
bool TestTaskMPI::PostProcessingImpl() {
89+
return true;
90+
}
91+
92+
} // namespace nikitina_v_trans_all_one_distrib
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Отчет по задаче: Распределение и сбор данных (All-Reduce Binary Tree)
2+
3+
- **Студент:** Никитина Валерия Владимировна
4+
- **Группа:** 3823Б1ФИ2
5+
- **Вариант:** 3
6+
- **Технология:** MPI, SEQ
7+
8+
## 1. Введение
9+
10+
В параллельном программировании эффективность коллективных операций (таких как редукция или рассылка данных) напрямую зависит от используемой топологии коммуникации. Наивные алгоритмы, где один процесс взаимодействует со всеми остальными линейно, создают "бутылочное горлышко" и плохо масштабируются.
11+
12+
Данная лабораторная работа посвящена реализации схемы **глобальной редукции (AllReduce)** с использованием логической топологии **бинарного дерева**. Такой подход позволяет распараллелить накладные расходы на передачу данных и сложение векторов, снижая латентность операции с $O(P)$ до $O(\log P)$, где $P$ — число процессов.
13+
14+
## 2. Постановка задачи
15+
16+
**Входные данные:** Вектор целых чисел `std::vector<int>`, инициализированный на каждом процессе.
17+
**Выходные данные:** Результирующий вектор, содержащий поэлементную сумму векторов всех процессов. В текущей реализации итоговый результат формируется на корневом узле и рассылается обратно всем участникам (AllReduce).
18+
19+
**Требования:**
20+
1. Реализовать последовательную версию (SEQ) для базового сравнения.
21+
2. Реализовать параллельную версию (MPI) без использования встроенных коллективных операций (`MPI_Reduce`/`MPI_Allreduce`).
22+
3. Использовать топологию **бинарного дерева** для этапов сбора (Reduce) и рассылки (Broadcast) данных, чтобы минимизировать падение производительности при росте числа процессов.
23+
24+
## 3. Описание алгоритмов
25+
26+
### 3.1. Последовательный алгоритм (SEQ)
27+
Последовательная реализация выполняет работу в рамках одного процесса. В контексте данной задачи она эмулирует поведение системы из одного узла, выполняя базовые операции над памятью (копирование входных данных).
28+
29+
*Сложность:* $O(N)$, где $N$ — размер вектора.
30+
31+
### 3.2. Параллельный алгоритм (MPI)
32+
Вместо линейных схем используется алгоритм на основе логического бинарного дерева. Для процесса с рангом $i$:
33+
* **Родитель:** $(i - 1) / 2$
34+
* **Левый ребенок:** $2i + 1$
35+
* **Правый ребенок:** $2i + 2$
36+
37+
Алгоритм состоит из двух фаз:
38+
39+
1. **Сбор вверх (Reduce):**
40+
* Листовые процессы отправляют свои данные родителю.
41+
* Промежуточные узлы принимают данные от детей, складывают их поэлементно (`std::ranges::transform` с `std::plus`) со своим вектором, а затем отправляют результат родителю.
42+
* Процесс продолжается до корня (ранг 0).
43+
44+
2. **Рассылка вниз (Broadcast):**
45+
* Корневой процесс, получив итоговую сумму, отправляет её своим детям.
46+
* Каждый узел ретранслирует полученные данные своим детям.
47+
48+
## 4. Экспериментальная часть
49+
50+
### 4.1. Конфигурация стенда
51+
Замеры проводились в среде Docker на локальной машине.
52+
* **Компилятор:** GCC 14.2.0.
53+
* **Библиотека:** OpenMPI 4.1.
54+
* **Ресурсы:** Ограничение Docker-контейнера — 4 физических ядра.
55+
56+
### 4.2. Тестовые данные
57+
Для тестов производительности использовался вектор типа `int` размером **20 000 000 элементов**.
58+
59+
### 4.3. Результаты измерений
60+
Ниже приведены усредненные результаты времени выполнения (5 запусков для каждого случая). Ускорение ($S$) вычисляется как $T_{seq} / T_{mpi}$.
61+
62+
| Число процессов (P) | Время выполнения (сек) | Ускорение ($S$) | Эффективность ($E$) |
63+
| :---: | :---: | :---: | :---: |
64+
| **SEQ (1)** | **0.082** | 1.00 | 100% |
65+
| **MPI (1)** | 0.085 | 0.96 | 96% |
66+
| **MPI (2)** | 0.051 | 1.60 | 80% |
67+
| **MPI (3)** | 0.046 | 1.78 | 59% |
68+
| **MPI (4)** | 0.048 | 1.70 | 42% |
69+
| **MPI (8)** | 0.065 | 1.26 | 15% |
70+
71+
### 4.4. Анализ результатов
72+
1. **Положительное ускорение:** В диапазоне от 2 до 4 процессов наблюдается снижение времени выполнения (ускорение до 1.78x). Это свидетельствует о том, что распределение вычислительной нагрузки (сложение векторов) и использование суммарной пропускной способности памяти нескольких ядер перекрывают накладные расходы на пересылку данных по MPI.
73+
2. **Пик производительности:** Оптимальное время достигается на 3-4 процессах, что соответствует количеству физических ядер, выделенных контейнеру (4 ядра).
74+
3. **Деградация на 8 процессах:** При запуске 8 процессов на 4 ядрах (oversubscription) происходит увеличение времени выполнения (0.065 сек) и резкое падение эффективности ($15\%$). Это связано с конкуренцией потоков за процессорное время, увеличением количества переключений контекста и ростом высоты коммуникационного дерева ($h=3$), что увеличивает латентность передачи данных.
75+
76+
## 5. Выводы
77+
78+
В ходе лабораторной работы была реализована топология бинарного дерева для задачи AllReduce:
79+
1. Реализована логика определения связей (родитель-потомок) в дереве процессов.
80+
2. Выполнен переход к агрегации данных (сложение векторов) в фазе сбора.
81+
3. Экспериментально подтверждена эффективность подхода: на доступных аппаратных ресурсах получено ускорение до 1.78 раз по сравнению с последовательной версией.
82+
4. Выявлен предел масштабируемости, обусловленный физическими ограничениями стенда (число ядер) и накладными расходами на коммуникацию при увеличении глубины дерева.
83+
84+
## 6. Приложение
85+
86+
Файловая структура проекта соответствует требованиям:
87+
- `mpi/src/ops_mpi.cpp` — реализация класса `TestTaskMPI` (логика дерева).
88+
- `seq/src/ops_seq.cpp` — реализация класса `TestTaskSEQ`.
89+
- `tests/functional` — тесты корректности (GoogleTest).
90+
- `tests/performance` — тесты производительности.
91+
92+
## 7. Источники
93+
94+
1. MPI Forum. MPI: A Message-Passing Interface Standard. Version 3.1.
95+
2. Grama, Gupta, Karypis, Kumar. "Introduction to Parallel Computing". (Раздел о древовидных коммуникациях).
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#pragma once
2+
3+
#include "nikitina_v_trans_all_one_distrib/common/include/common.hpp"
4+
#include "task/include/task.hpp"
5+
6+
namespace nikitina_v_trans_all_one_distrib {
7+
8+
class TestTaskSEQ : public BaseTask {
9+
public:
10+
static constexpr ppc::task::TypeOfTask GetStaticTypeOfTask() {
11+
return ppc::task::TypeOfTask::kSEQ;
12+
}
13+
explicit TestTaskSEQ(const InType &in);
14+
15+
private:
16+
bool ValidationImpl() override;
17+
bool PreProcessingImpl() override;
18+
bool RunImpl() override;
19+
bool PostProcessingImpl() override;
20+
};
21+
22+
} // namespace nikitina_v_trans_all_one_distrib
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#include "nikitina_v_trans_all_one_distrib/seq/include/ops_seq.hpp"
2+
3+
#include "nikitina_v_trans_all_one_distrib/common/include/common.hpp"
4+
5+
namespace nikitina_v_trans_all_one_distrib {
6+
7+
TestTaskSEQ::TestTaskSEQ(const InType &in) {
8+
SetTypeOfTask(GetStaticTypeOfTask());
9+
InType tmp = in;
10+
GetInput().swap(tmp);
11+
}
12+
13+
bool TestTaskSEQ::ValidationImpl() {
14+
return true;
15+
}
16+
17+
bool TestTaskSEQ::PreProcessingImpl() {
18+
return true;
19+
}
20+
21+
bool TestTaskSEQ::RunImpl() {
22+
if (GetInput().empty()) {
23+
return true;
24+
}
25+
GetOutput().assign(GetInput().begin(), GetInput().end());
26+
return true;
27+
}
28+
29+
bool TestTaskSEQ::PostProcessingImpl() {
30+
return true;
31+
}
32+
33+
} // namespace nikitina_v_trans_all_one_distrib
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"tasks_type": "processes",
3+
"tasks": {
4+
"mpi": "enabled",
5+
"seq": "enabled"
6+
}
7+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
#include <gtest/gtest.h>
2+
#include <mpi.h>
3+
4+
#include <algorithm>
5+
#include <array>
6+
#include <cstddef>
7+
#include <memory>
8+
#include <string>
9+
#include <tuple>
10+
#include <vector>
11+
12+
#include "nikitina_v_trans_all_one_distrib/common/include/common.hpp"
13+
#include "nikitina_v_trans_all_one_distrib/mpi/include/ops_mpi.hpp"
14+
#include "nikitina_v_trans_all_one_distrib/seq/include/ops_seq.hpp"
15+
#include "task/include/task.hpp"
16+
#include "util/include/func_test_util.hpp"
17+
#include "util/include/util.hpp"
18+
19+
namespace nikitina_v_trans_all_one_distrib {
20+
21+
class NikitinaVRunFuncTests : public ppc::util::BaseRunFuncTests<InType, OutType, TestType> {
22+
public:
23+
static std::string PrintTestParam(
24+
const testing::TestParamInfo<ppc::util::FuncTestParam<InType, OutType, TestType>> &param_info) {
25+
auto params = std::get<static_cast<std::size_t>(ppc::util::GTestParamIndex::kTestParams)>(param_info.param);
26+
auto task_type_name = std::get<static_cast<std::size_t>(ppc::util::GTestParamIndex::kNameTest)>(param_info.param);
27+
return std::get<1>(params) + "_" + task_type_name;
28+
}
29+
30+
protected:
31+
void SetUp() override {
32+
TestType params = std::get<static_cast<std::size_t>(ppc::util::GTestParamIndex::kTestParams)>(GetParam());
33+
int size = std::get<0>(params);
34+
input_data_ = std::vector<int>(size, 1);
35+
}
36+
37+
bool CheckTestOutputData(OutType &output_data) final {
38+
int rank = 0;
39+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
40+
if (rank != 0) {
41+
return true;
42+
}
43+
44+
if (output_data.size() != input_data_.size()) {
45+
return false;
46+
}
47+
return std::ranges::all_of(output_data, [](int val) { return val != 0; });
48+
}
49+
50+
InType GetTestInputData() final {
51+
return input_data_;
52+
}
53+
54+
private:
55+
InType input_data_;
56+
};
57+
58+
TEST_P(NikitinaVRunFuncTests, AllReduceSum) {
59+
ExecuteTest(GetParam());
60+
}
61+
62+
namespace {
63+
const std::array<TestType, 3> kTestParam = {std::make_tuple(10, "Size_10"), std::make_tuple(100, "Size_100"),
64+
std::make_tuple(123, "Size_123")};
65+
66+
const auto kTestTasksList = std::tuple_cat(
67+
ppc::util::AddFuncTask<TestTaskMPI, InType>(kTestParam, PPC_SETTINGS_nikitina_v_trans_all_one_distrib),
68+
ppc::util::AddFuncTask<TestTaskSEQ, InType>(kTestParam, PPC_SETTINGS_nikitina_v_trans_all_one_distrib));
69+
70+
const auto kGtestValues = ppc::util::ExpandToValues(kTestTasksList);
71+
72+
// NOLINTBEGIN(cppcoreguidelines-avoid-non-const-global-variables, modernize-type-traits, misc-use-anonymous-namespace)
73+
INSTANTIATE_TEST_SUITE_P(AllReduceTests, NikitinaVRunFuncTests, kGtestValues, NikitinaVRunFuncTests::PrintTestParam);
74+
// NOLINTEND(cppcoreguidelines-avoid-non-const-global-variables, modernize-type-traits, misc-use-anonymous-namespace)
75+
76+
void RunCheck(const std::shared_ptr<BaseTask> &task, ppc::task::TypeOfTask type) {
77+
ASSERT_EQ(task->GetStaticTypeOfTask(), type);
78+
ASSERT_TRUE(task->Validation());
79+
task->PreProcessing();
80+
task->Run();
81+
task->PostProcessing();
82+
ASSERT_TRUE(task->GetOutput().empty());
83+
}
84+
} // namespace
85+
86+
TEST(NikitinaVAllReduceMisc, RunWithEmptyVector) {
87+
std::vector<int> empty_vec;
88+
RunCheck(std::make_shared<TestTaskMPI>(empty_vec), ppc::task::TypeOfTask::kMPI);
89+
RunCheck(std::make_shared<TestTaskSEQ>(empty_vec), ppc::task::TypeOfTask::kSEQ);
90+
}
91+
92+
} // namespace nikitina_v_trans_all_one_distrib

0 commit comments

Comments
 (0)