Skip to content

Commit 0b001a4

Browse files
rroblakphlax
authored andcommitted
peak_ewma: fix segfault from timer thread-safety violation (#43526)
Commit Message: peak_ewma: fix segfault from timer thread-safety violation (#43513) Additional Description: Hopefully fixes #43513. The Peak EWMA LB constructor took an `Event::Dispatcher&` and called `createTimer()` on it. When instantiated on worker threads (via dynamic config such as Istio EnvoyFilter or Envoy Gateway EnvoyPatchPolicy), this violated Envoy's thread-safety model — timers must be created on the dispatcher's owning thread — causing `assert failure: isThreadSafe()` (debug) or segfault (release). This PR: - Replaces timer-based aggregation with inline aggregation in `chooseHost()`, removing the `Event::Dispatcher&` dependency entirely - Removes the destructor that cleared host `lbPolicyData` (raced with workers still reading) - Cleans up `all_host_stats_` entries on host removal (shared_ptr leak) Risk Level: Low — peak_ewma is a contrib extension; changes are isolated to its source and tests. Testing: New `peak_ewma_lb_host_lifecycle_test.cc` with regression tests for all 3 bugs. All existing peak_ewma tests pass. Docs Changes: N/A Release Notes: N/A Platform Specific Features: N/A --------- Signed-off-by: Ryan Oblak <rroblak@gmail.com> Signed-off-by: Christian Rohmann <christian.rohmann@inovex.de>
1 parent af80a09 commit 0b001a4

9 files changed

Lines changed: 448 additions & 106 deletions

File tree

contrib/peak_ewma/load_balancing_policies/source/config.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Upstream::LoadBalancerPtr PeakEwmaCreator::operator()(
2020
priority_set, nullptr, cluster_info.lbStats(), runtime, random,
2121
PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(cluster_info.lbConfig(),
2222
healthy_panic_threshold, 100, 50),
23-
cluster_info, time_source, config->lb_config_, config->main_dispatcher_);
23+
cluster_info, time_source, config->lb_config_);
2424
}
2525

2626
/**

contrib/peak_ewma/load_balancing_policies/source/config.h

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#pragma once
22

3-
#include "envoy/event/dispatcher.h"
4-
#include "envoy/thread_local/thread_local.h"
53
#include "envoy/upstream/load_balancer.h"
64

75
#include "source/common/common/logger.h"
@@ -20,11 +18,9 @@ using PeakEwmaLbProto = envoy::extensions::load_balancing_policies::peak_ewma::v
2018

2119
class TypedPeakEwmaLbConfig : public Upstream::LoadBalancerConfig {
2220
public:
23-
TypedPeakEwmaLbConfig(const PeakEwmaLbProto& lb_config, Event::Dispatcher& main_dispatcher)
24-
: lb_config_(lb_config), main_dispatcher_(main_dispatcher) {}
21+
explicit TypedPeakEwmaLbConfig(const PeakEwmaLbProto& lb_config) : lb_config_(lb_config) {}
2522

2623
PeakEwmaLbProto lb_config_;
27-
Event::Dispatcher& main_dispatcher_;
2824
};
2925

3026
struct PeakEwmaCreator : public Logger::Loggable<Logger::Id::upstream> {
@@ -41,11 +37,10 @@ class Factory
4137
Factory() : FactoryBase("envoy.load_balancing_policies.peak_ewma") {}
4238

4339
absl::StatusOr<Upstream::LoadBalancerConfigPtr>
44-
loadConfig(Server::Configuration::ServerFactoryContext& context,
40+
loadConfig(Server::Configuration::ServerFactoryContext&,
4541
const Protobuf::Message& config) override {
4642
const auto& typed_config = dynamic_cast<const PeakEwmaLbProto&>(config);
47-
return Upstream::LoadBalancerConfigPtr{
48-
new TypedPeakEwmaLbConfig(typed_config, context.mainThreadDispatcher())};
43+
return Upstream::LoadBalancerConfigPtr{new TypedPeakEwmaLbConfig(typed_config)};
4944
}
5045
};
5146

contrib/peak_ewma/load_balancing_policies/source/peak_ewma_lb.cc

Lines changed: 34 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,18 @@ PeakEwmaLoadBalancer::PeakEwmaLoadBalancer(
5151
Upstream::ClusterLbStats& /*stats*/, Runtime::Loader& runtime, Random::RandomGenerator& random,
5252
uint32_t /* healthy_panic_threshold */, const Upstream::ClusterInfo& cluster_info,
5353
TimeSource& time_source,
54-
const envoy::extensions::load_balancing_policies::peak_ewma::v3alpha::PeakEwma& config,
55-
Event::Dispatcher& main_dispatcher)
54+
const envoy::extensions::load_balancing_policies::peak_ewma::v3alpha::PeakEwma& config)
5655
: LoadBalancerBase(priority_set, cluster_info.lbStats(), runtime, random,
5756
PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
5857
cluster_info.lbConfig(), healthy_panic_threshold, 100, 50)),
5958
priority_set_(priority_set), config_proto_(config), random_(random),
6059
time_source_(time_source), stats_scope_(cluster_info.statsScope()),
6160
cost_(config.has_penalty_value() ? config.penalty_value().value() : 1000000.0),
62-
main_dispatcher_(main_dispatcher),
6361
aggregation_interval_(config_proto_.has_aggregation_interval()
6462
? std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
6563
config_proto_.aggregation_interval()))
6664
: std::chrono::milliseconds(100)),
65+
last_aggregation_time_(time_source_.monotonicTime()),
6766
tau_nanos_(config_proto_.has_decay_time()
6867
? DurationUtil::durationToMilliseconds(config_proto_.decay_time()) * 1000000LL
6968
: kDefaultDecayTimeSeconds * 1000000000LL),
@@ -76,41 +75,16 @@ PeakEwmaLoadBalancer::PeakEwmaLoadBalancer(
7675
addPeakEwmaLbPolicyDataToHosts(host_set->hosts());
7776
}
7877

79-
// Setup callback to add data to new hosts.
80-
priority_update_cb_ =
81-
priority_set_.addPriorityUpdateCb([this](uint32_t, const Upstream::HostVector& hosts_added,
82-
const Upstream::HostVector&) -> absl::Status {
78+
// Setup callback to add data to new hosts and clean up removed hosts.
79+
priority_update_cb_ = priority_set_.addPriorityUpdateCb(
80+
[this](uint32_t, const Upstream::HostVector& hosts_added,
81+
const Upstream::HostVector& hosts_removed) -> absl::Status {
8382
addPeakEwmaLbPolicyDataToHosts(hosts_added);
83+
for (const auto& host : hosts_removed) {
84+
all_host_stats_.erase(host);
85+
}
8486
return absl::OkStatus();
8587
});
86-
87-
// Create timer for EWMA aggregation.
88-
aggregation_timer_ = main_dispatcher_.createTimer([this]() -> void { onAggregationTimer(); });
89-
aggregation_timer_->enableTimer(aggregation_interval_);
90-
91-
// Peak EWMA load balancer initialized successfully.
92-
}
93-
94-
PeakEwmaLoadBalancer::~PeakEwmaLoadBalancer() {
95-
// Post timer cancellation to main thread to avoid cross-thread timer operations.
96-
// Timer must be disabled from the same thread that created it (main_dispatcher_).
97-
if (aggregation_timer_) {
98-
main_dispatcher_.post([timer = std::move(aggregation_timer_)]() mutable {
99-
if (timer) {
100-
timer->disableTimer();
101-
timer.reset();
102-
}
103-
});
104-
}
105-
106-
// EWMA snapshot cleanup is automatic via shared_ptr destructor.
107-
108-
// Clean up host data.
109-
for (const auto& host_set : priority_set_.hostSetsPerPriority()) {
110-
for (const auto& host : host_set->hosts()) {
111-
host->setLbPolicyData(nullptr);
112-
}
113-
}
11488
}
11589

11690
// Host management.
@@ -130,12 +104,12 @@ PeakEwmaHostLbPolicyData* PeakEwmaLoadBalancer::getPeakEwmaData(Upstream::HostCo
130104
return dynamic_cast<PeakEwmaHostLbPolicyData*>(lb_data.ptr());
131105
}
132106

133-
void PeakEwmaLoadBalancer::onAggregationTimer() {
134-
// Timer callback - aggregate EWMA data from all hosts.
135-
aggregateWorkerData();
136-
137-
// Reschedule timer for next cycle.
138-
aggregation_timer_->enableTimer(aggregation_interval_);
107+
void PeakEwmaLoadBalancer::maybeAggregate() {
108+
const auto now = time_source_.monotonicTime();
109+
if (now - last_aggregation_time_ >= aggregation_interval_) {
110+
aggregateWorkerData();
111+
last_aggregation_time_ = now;
112+
}
139113
}
140114

141115
double PeakEwmaLoadBalancer::calculateHostCost(Upstream::HostConstSharedPtr host) {
@@ -192,6 +166,9 @@ PeakEwmaLoadBalancer::selectFromTwoCandidates(const Upstream::HostVector& hosts,
192166

193167
Upstream::HostSelectionResponse
194168
PeakEwmaLoadBalancer::chooseHost(Upstream::LoadBalancerContext* /* context */) {
169+
// Lazily aggregate EWMA data if the interval has elapsed.
170+
maybeAggregate();
171+
195172
// Power of Two Choices selection using host-attached EWMA data.
196173
const auto& host_sets = priority_set_.hostSetsPerPriority();
197174

@@ -268,23 +245,9 @@ void PeakEwmaLoadBalancer::aggregateWorkerData() {
268245
// Aggregation cycle complete.
269246
}
270247

271-
size_t PeakEwmaLoadBalancer::calculateNewSampleCount(size_t last_processed, size_t current_write,
272-
size_t max_samples) {
273-
if (last_processed == current_write) {
274-
return 0;
275-
}
276-
277-
if (current_write >= last_processed) {
278-
return current_write - last_processed;
279-
} else {
280-
// Write index wrapped around.
281-
return (max_samples - last_processed) + current_write;
282-
}
283-
}
284-
285-
double PeakEwmaLoadBalancer::calculateTimeBasedAlpha(uint64_t current_time_ns,
286-
uint64_t sample_time_ns) {
287-
int64_t time_delta_ns = static_cast<int64_t>(current_time_ns - sample_time_ns);
248+
double PeakEwmaLoadBalancer::calculateTimeBasedAlpha(uint64_t later_time_ns,
249+
uint64_t earlier_time_ns) {
250+
int64_t time_delta_ns = static_cast<int64_t>(later_time_ns - earlier_time_ns);
288251
if (time_delta_ns <= 0) {
289252
return 1.0; // Use full weight for future/concurrent samples.
290253
}
@@ -316,17 +279,20 @@ void PeakEwmaLoadBalancer::processHostSamples(Upstream::HostConstSharedPtr /* ho
316279

317280
// Get the range of new samples to process (atomic ring buffer).
318281
auto [last_processed, current_write] = data->getNewSampleRange();
319-
320-
size_t num_new_samples =
321-
calculateNewSampleCount(last_processed, current_write, data->max_samples_);
322-
if (num_new_samples == 0)
282+
if (last_processed == current_write)
323283
return;
324284

285+
// If ring buffer was fully overwritten, skip to oldest valid slot.
286+
// Uses unsigned arithmetic (always correct since write_index_ only increments).
287+
if (current_write - last_processed > data->max_samples_) {
288+
last_processed = current_write - data->max_samples_;
289+
}
290+
291+
size_t num_new_samples = current_write - last_processed;
292+
325293
// Get current EWMA state.
326294
double current_ewma = data->getEwmaRtt();
327-
uint64_t current_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
328-
time_source_.monotonicTime().time_since_epoch())
329-
.count();
295+
uint64_t reference_time = data->last_update_timestamp_.load();
330296

331297
// Process all new samples in chronological order.
332298
size_t processed_index = last_processed;
@@ -342,13 +308,14 @@ void PeakEwmaLoadBalancer::processHostSamples(Upstream::HostConstSharedPtr /* ho
342308
continue;
343309
}
344310

345-
double alpha = calculateTimeBasedAlpha(current_time_ns, timestamp_ns);
311+
double alpha = calculateTimeBasedAlpha(timestamp_ns, reference_time);
346312
current_ewma = updateEwmaWithSample(current_ewma, rtt_ms, alpha);
313+
reference_time = timestamp_ns;
347314
processed_index++;
348315
}
349316

350317
// Update atomic EWMA in host data.
351-
data->updateEwma(current_ewma, current_time_ns);
318+
data->updateEwma(current_ewma, reference_time);
352319
data->markSamplesProcessed(current_write);
353320
}
354321

contrib/peak_ewma/load_balancing_policies/source/peak_ewma_lb.h

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@
88
#include <new>
99
#include <vector>
1010

11-
#include "envoy/event/dispatcher.h"
12-
#include "envoy/event/timer.h"
13-
#include "envoy/thread_local/thread_local.h"
14-
#include "envoy/thread_local/thread_local_object.h"
1511
#include "envoy/upstream/load_balancer.h"
1612

1713
#include "source/common/common/callback_impl.h"
@@ -70,7 +66,7 @@ struct GlobalHostStats {
7066
*
7167
* Architecture:
7268
* - HTTP filter records RTT samples in host-attached ring buffers (lock-free)
73-
* - Timer aggregates samples every 100ms and updates EWMA values
69+
* - Aggregation happens lazily inline in chooseHost() when the interval elapses
7470
* - P2C selection uses current EWMA + active requests for cost calculation
7571
*/
7672
class PeakEwmaLoadBalancer : public Upstream::LoadBalancerBase {
@@ -80,10 +76,7 @@ class PeakEwmaLoadBalancer : public Upstream::LoadBalancerBase {
8076
Upstream::ClusterLbStats& stats, Runtime::Loader& runtime, Random::RandomGenerator& random,
8177
uint32_t healthy_panic_threshold, const Upstream::ClusterInfo& cluster_info,
8278
TimeSource& time_source,
83-
const envoy::extensions::load_balancing_policies::peak_ewma::v3alpha::PeakEwma& config,
84-
Event::Dispatcher& main_dispatcher);
85-
86-
~PeakEwmaLoadBalancer();
79+
const envoy::extensions::load_balancing_policies::peak_ewma::v3alpha::PeakEwma& config);
8780

8881
// LoadBalancer interface
8982
Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext* context) override;
@@ -96,19 +89,18 @@ class PeakEwmaLoadBalancer : public Upstream::LoadBalancerBase {
9689
void addPeakEwmaLbPolicyDataToHosts(const Upstream::HostVector& hosts);
9790
PeakEwmaHostLbPolicyData* getPeakEwmaData(Upstream::HostConstSharedPtr host);
9891

99-
// Timer-based aggregation - processes host-attached sample data.
92+
// Inline aggregation - processes host-attached sample data.
10093
void aggregateWorkerData();
10194
void processHostSamples(Upstream::HostConstSharedPtr host, PeakEwmaHostLbPolicyData* data);
102-
void onAggregationTimer();
95+
void maybeAggregate();
10396

10497
// Power of Two Choices selection.
10598
Upstream::HostConstSharedPtr selectFromTwoCandidates(const Upstream::HostVector& hosts,
10699
uint64_t random_value);
107100
double calculateHostCost(Upstream::HostConstSharedPtr host);
108101

109102
// EWMA calculation helpers.
110-
size_t calculateNewSampleCount(size_t last_processed, size_t current_write, size_t max_samples);
111-
double calculateTimeBasedAlpha(uint64_t current_time_ns, uint64_t sample_time_ns);
103+
double calculateTimeBasedAlpha(uint64_t later_time_ns, uint64_t earlier_time_ns);
112104
double updateEwmaWithSample(double current_ewma, double new_rtt_ms, double alpha);
113105

114106
// Core infrastructure.
@@ -121,10 +113,9 @@ class PeakEwmaLoadBalancer : public Upstream::LoadBalancerBase {
121113
// Business logic components.
122114
Cost cost_;
123115

124-
// Timer infrastructure for periodic EWMA calculation.
125-
Event::Dispatcher& main_dispatcher_;
126-
Event::TimerPtr aggregation_timer_;
116+
// Inline aggregation state.
127117
const std::chrono::milliseconds aggregation_interval_;
118+
MonotonicTime last_aggregation_time_;
128119

129120
// Host stats for admin interface visibility.
130121
absl::flat_hash_map<Upstream::HostConstSharedPtr, std::unique_ptr<GlobalHostStats>>

contrib/peak_ewma/load_balancing_policies/test/BUILD

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ envoy_cc_test(
1616
"//source/common/network:address_lib",
1717
"//test/common/stats:stat_test_utility_lib",
1818
"//test/mocks:common_lib",
19-
"//test/mocks/event:event_mocks",
2019
"//test/mocks/runtime:runtime_mocks",
2120
"//test/mocks/upstream:upstream_mocks",
2221
],
@@ -62,7 +61,19 @@ envoy_cc_test(
6261
"//source/common/network:address_lib",
6362
"//test/common/stats:stat_test_utility_lib",
6463
"//test/mocks:common_lib",
65-
"//test/mocks/event:event_mocks",
64+
"//test/mocks/runtime:runtime_mocks",
65+
"//test/mocks/upstream:upstream_mocks",
66+
],
67+
)
68+
69+
envoy_cc_test(
70+
name = "peak_ewma_lb_host_lifecycle_test",
71+
srcs = ["peak_ewma_lb_host_lifecycle_test.cc"],
72+
deps = [
73+
"//contrib/peak_ewma/load_balancing_policies/source:peak_ewma_lb_lib",
74+
"//source/common/network:address_lib",
75+
"//test/common/stats:stat_test_utility_lib",
76+
"//test/mocks:common_lib",
6677
"//test/mocks/runtime:runtime_mocks",
6778
"//test/mocks/upstream:upstream_mocks",
6879
],

contrib/peak_ewma/load_balancing_policies/test/config_test.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
#include "test/common/stats/stat_test_utility.h"
88
#include "test/mocks/common.h"
9-
#include "test/mocks/event/mocks.h"
109
#include "test/mocks/runtime/mocks.h"
1110
#include "test/mocks/server/factory_context.h"
1211
#include "test/mocks/upstream/cluster_info.h"
@@ -65,7 +64,6 @@ class PeakEwmaConfigTest : public ::testing::Test {
6564
NiceMock<Runtime::MockLoader> runtime_;
6665
NiceMock<Random::MockRandomGenerator> random_;
6766
NiceMock<MockTimeSystem> time_source_;
68-
NiceMock<Event::MockDispatcher> dispatcher_;
6967
MockThreadLocalInstance tls_;
7068
};
7169

@@ -178,13 +176,13 @@ TEST_F(PeakEwmaConfigTest, ConfigValidation) {
178176
// Very small decay time
179177
proto_config.mutable_decay_time()->set_nanos(1000000); // 1ms
180178

181-
TypedPeakEwmaLbConfig config(proto_config, dispatcher_);
179+
TypedPeakEwmaLbConfig config(proto_config);
182180
EXPECT_EQ(config.lb_config_.decay_time().nanos(), 1000000);
183181

184182
// Very large decay time
185183
proto_config.mutable_decay_time()->set_seconds(300);
186184

187-
TypedPeakEwmaLbConfig config2(proto_config, dispatcher_);
185+
TypedPeakEwmaLbConfig config2(proto_config);
188186
EXPECT_EQ(config2.lb_config_.decay_time().seconds(), 300);
189187
}
190188

contrib/peak_ewma/load_balancing_policies/test/peak_ewma_lb_comprehensive_test.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
#include "test/common/stats/stat_test_utility.h"
44
#include "test/mocks/common.h"
5-
#include "test/mocks/event/mocks.h"
65
#include "test/mocks/runtime/mocks.h"
76
#include "test/mocks/stats/mocks.h"
87
#include "test/mocks/upstream/cluster_info.h"
@@ -58,8 +57,7 @@ class PeakEwmaLoadBalancerComprehensiveTest : public ::testing::Test {
5857

5958
void createLoadBalancer() {
6059
lb_ = std::make_unique<PeakEwmaLoadBalancer>(priority_set_, nullptr, *stats_, runtime_, random_,
61-
50, *cluster_info_, time_source_, config_,
62-
dispatcher_);
60+
50, *cluster_info_, time_source_, config_);
6361
}
6462

6563
// Note: In a real test we would access host data, but for simplicity
@@ -76,7 +74,6 @@ class PeakEwmaLoadBalancerComprehensiveTest : public ::testing::Test {
7674
NiceMock<Runtime::MockLoader> runtime_;
7775
NiceMock<Random::MockRandomGenerator> random_;
7876
NiceMock<MockTimeSystem> time_source_;
79-
NiceMock<Event::MockDispatcher> dispatcher_;
8077

8178
std::vector<Upstream::HostSharedPtr> hosts_;
8279
std::unique_ptr<PeakEwmaLoadBalancer> lb_;

0 commit comments

Comments
 (0)