Skip to content

Commit 84f75f7

Browse files
committed
dym sdk: add the config scheduler support to go sdk and cpp sdk (#43748)
Commit Message: dym sdk: add the config scheduler support to go sdk and cpp sdk Additional Description: Add the config scheduler to go sdk and cpp sdk. Risk Level: Testing: Docs Changes: Release Notes: Platform Specific Features: [Optional Runtime guard:] [Optional Fixes #Issue] [Optional Fixes commit #PR or SHA] [Optional Deprecated:] [Optional [API Considerations](https://github.com/envoyproxy/envoy/blob/main/api/review_checklist.md):] --------- Signed-off-by: wbpcode/wangbaiping <wbphub@gmail.com>
1 parent 167ae08 commit 84f75f7

8 files changed

Lines changed: 171 additions & 37 deletions

File tree

source/extensions/dynamic_modules/sdk/cpp/sdk.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,13 @@ class HttpFilterConfigHandle {
757757
* @param message The message to log.
758758
*/
759759
virtual void log(LogLevel level, absl::string_view message) = 0;
760+
761+
/**
762+
* Returns a scheduler for deferred task execution. This can only be called on config loading
763+
* event and then the returned Scheduler can be used in other threads.
764+
* @return Unique pointer to Scheduler instance.
765+
*/
766+
virtual std::shared_ptr<Scheduler> getScheduler() = 0;
760767
};
761768

762769
/**

source/extensions/dynamic_modules/sdk/cpp/sdk_internal.cc

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include <cstddef>
22
#include <cstdint>
3+
#include <memory>
4+
#include <mutex>
35

46
#include "source/extensions/dynamic_modules/abi/abi.h"
57

@@ -142,50 +144,76 @@ using ResponseHeaders = HeaderMapImpl<envoy_dynamic_module_type_http_header_type
142144
using ResponseTrailers = HeaderMapImpl<envoy_dynamic_module_type_http_header_type_ResponseTrailer>;
143145

144146
// Scheduler implementation
145-
class SchedulerImpl : public Scheduler {
147+
template <bool IsConfigScheduler> class SchedulerImplBase : public Scheduler {
146148
public:
147-
SchedulerImpl(void* host_plugin_ptr)
148-
: scheduler_ptr_(envoy_dynamic_module_callback_http_filter_scheduler_new(host_plugin_ptr)) {}
149+
SchedulerImplBase(void* host_ptr) : scheduler_ptr_(newScheduler(host_ptr)) {}
149150

150151
void schedule(std::function<void()> func) override {
152+
uint64_t task_id = 0;
153+
151154
// Lock to protect access to tasks_ and next_task_id_ manually
152-
mutex_.lock();
153-
const uint64_t task_id = next_task_id_++;
154-
tasks_[task_id] = std::move(func);
155-
mutex_.unlock();
155+
{
156+
std::lock_guard<std::mutex> lock(mutex_);
157+
task_id = next_task_id_++;
158+
tasks_[task_id] = std::move(func);
159+
}
156160

157-
envoy_dynamic_module_callback_http_filter_scheduler_commit(scheduler_ptr_, task_id);
161+
commitScheduler(scheduler_ptr_, task_id);
158162
}
159163

160164
void onScheduled(uint64_t task_id) {
161165
std::function<void()> func;
162166

163-
// Lock to protect access to tasks_ manually
164-
mutex_.lock();
165-
auto it = tasks_.find(task_id);
166-
if (it != tasks_.end()) {
167-
func = std::move(it->second);
168-
tasks_.erase(it);
167+
{
168+
// Lock to protect access to tasks_ manually
169+
std::lock_guard<std::mutex> lock(mutex_);
170+
auto it = tasks_.find(task_id);
171+
if (it != tasks_.end()) {
172+
func = std::move(it->second);
173+
tasks_.erase(it);
174+
}
169175
}
170-
mutex_.unlock();
171176

172177
if (func) {
173178
func();
174179
}
175180
}
176181

177-
~SchedulerImpl() override {
178-
envoy_dynamic_module_callback_http_filter_scheduler_delete(scheduler_ptr_);
179-
}
182+
~SchedulerImplBase() override { deleteScheduler(scheduler_ptr_); }
180183

181184
private:
185+
static void* newScheduler(void* host_ptr) {
186+
if constexpr (IsConfigScheduler) {
187+
return envoy_dynamic_module_callback_http_filter_config_scheduler_new(host_ptr);
188+
} else {
189+
return envoy_dynamic_module_callback_http_filter_scheduler_new(host_ptr);
190+
}
191+
}
192+
static void deleteScheduler(void* scheduler_ptr) {
193+
if constexpr (IsConfigScheduler) {
194+
envoy_dynamic_module_callback_http_filter_config_scheduler_delete(scheduler_ptr);
195+
} else {
196+
envoy_dynamic_module_callback_http_filter_scheduler_delete(scheduler_ptr);
197+
}
198+
}
199+
static void commitScheduler(void* scheduler_ptr, uint64_t task_id) {
200+
if constexpr (IsConfigScheduler) {
201+
envoy_dynamic_module_callback_http_filter_config_scheduler_commit(scheduler_ptr, task_id);
202+
} else {
203+
envoy_dynamic_module_callback_http_filter_scheduler_commit(scheduler_ptr, task_id);
204+
}
205+
}
206+
182207
void* scheduler_ptr_{};
183208

184-
absl::Mutex mutex_;
185-
uint64_t next_task_id_ ABSL_GUARDED_BY(mutex_){1}; // 0 is reserved.
186-
absl::flat_hash_map<uint64_t, std::function<void()>> tasks_ ABSL_GUARDED_BY(mutex_);
209+
std::mutex mutex_;
210+
uint64_t next_task_id_{1}; // 0 is reserved.
211+
absl::flat_hash_map<uint64_t, std::function<void()>> tasks_;
187212
};
188213

214+
using SchedulerImpl = SchedulerImplBase<false>;
215+
using ConfigSchedulerImpl = SchedulerImplBase<true>;
216+
189217
// HttpFilterHandle implementation
190218
class HttpFilterHandleImpl : public HttpFilterHandle {
191219
public:
@@ -626,6 +654,15 @@ class HttpFilterConfigHandleImpl : public HttpFilterConfigHandle {
626654
envoy_dynamic_module_type_module_buffer{message.data(), message.size()});
627655
}
628656

657+
std::shared_ptr<Scheduler> getScheduler() override {
658+
if (!scheduler_) {
659+
scheduler_ = std::make_shared<ConfigSchedulerImpl>(host_config_ptr_);
660+
}
661+
return scheduler_;
662+
}
663+
664+
std::shared_ptr<ConfigSchedulerImpl> scheduler_;
665+
629666
private:
630667
void* host_config_ptr_;
631668
};
@@ -657,7 +694,7 @@ template <class T> void* wrapPointer(const T* ptr) {
657694
}
658695

659696
struct HttpFilterFactoryWrapper {
660-
std::unique_ptr<HttpFilterConfigHandle> config_handle_;
697+
std::unique_ptr<HttpFilterConfigHandleImpl> config_handle_;
661698
std::unique_ptr<HttpFilterFactory> factory_;
662699
};
663700

@@ -992,6 +1029,17 @@ void envoy_dynamic_module_on_http_filter_downstream_below_write_buffer_low_water
9921029
plugin_handle->downstream_watermark_callbacks_->onBelowWriteBufferLowWatermark();
9931030
}
9941031
}
1032+
1033+
void envoy_dynamic_module_on_http_filter_config_scheduled(
1034+
envoy_dynamic_module_type_http_filter_config_envoy_ptr,
1035+
envoy_dynamic_module_type_http_filter_config_module_ptr filter_config_ptr, uint64_t event_id) {
1036+
auto* factory_wrapper = unwrapPointer<HttpFilterFactoryWrapper>(filter_config_ptr);
1037+
if (factory_wrapper == nullptr || factory_wrapper->config_handle_ == nullptr ||
1038+
factory_wrapper->config_handle_->scheduler_ == nullptr) {
1039+
return;
1040+
}
1041+
factory_wrapper->config_handle_->scheduler_->onScheduled(event_id);
1042+
}
9951043
}
9961044

9971045
} // namespace DynamicModules

source/extensions/dynamic_modules/sdk/go/abi/internal.go

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
type httpFilterConfigWrapper struct {
2828
pluginFactory shared.HttpFilterFactory
29+
configHandle *dymConfigHandle
2930
}
3031

3132
type httpFilterConfigWrapperPerRoute struct {
@@ -349,18 +350,21 @@ func (b *dymBodyBuffer) Drain(size uint64) {
349350
}
350351

351352
type dymScheduler struct {
352-
schedulerPtr C.envoy_dynamic_module_type_http_filter_scheduler_module_ptr
353+
schedulerPtr unsafe.Pointer
353354
schedulerLock sync.Mutex
354355
nextTaskID uint64
355356
tasks map[uint64]func()
357+
commitFunc func(unsafe.Pointer, C.uint64_t)
356358
}
357359

358360
func newDymScheduler(
359-
schedulerPtr C.envoy_dynamic_module_type_http_filter_scheduler_module_ptr,
361+
schedulerPtr unsafe.Pointer,
362+
commitFunc func(unsafe.Pointer, C.uint64_t),
360363
) *dymScheduler {
361364
return &dymScheduler{
362365
schedulerPtr: schedulerPtr,
363366
tasks: make(map[uint64]func()),
367+
commitFunc: commitFunc,
364368
}
365369
}
366370

@@ -372,10 +376,8 @@ func (s *dymScheduler) Schedule(task func()) {
372376
s.tasks[taskID] = task
373377
s.schedulerLock.Unlock()
374378

375-
C.envoy_dynamic_module_callback_http_filter_scheduler_commit(
376-
s.schedulerPtr,
377-
(C.uint64_t)(taskID),
378-
)
379+
// Call the host to schedule the task, passing the task ID as context
380+
s.commitFunc(s.schedulerPtr, C.uint64_t(taskID))
379381
}
380382

381383
func (s *dymScheduler) onScheduled(taskID uint64) {
@@ -850,11 +852,19 @@ func (h *dymHttpFilterHandle) GetScheduler() shared.Scheduler {
850852
// in practice. But it will be nil in mock tests.
851853
schedulerPtr := C.envoy_dynamic_module_callback_http_filter_scheduler_new(
852854
h.hostPluginPtr)
853-
h.scheduler = newDymScheduler(schedulerPtr)
855+
h.scheduler = newDymScheduler(
856+
unsafe.Pointer(schedulerPtr),
857+
func(schedulerPtr unsafe.Pointer, taskID C.uint64_t) {
858+
C.envoy_dynamic_module_callback_http_filter_scheduler_commit(
859+
(C.envoy_dynamic_module_type_http_filter_scheduler_module_ptr)(schedulerPtr),
860+
taskID,
861+
)
862+
},
863+
)
854864

855865
runtime.SetFinalizer(h.scheduler, func(s *dymScheduler) {
856866
C.envoy_dynamic_module_callback_http_filter_scheduler_delete(
857-
s.schedulerPtr,
867+
(C.envoy_dynamic_module_type_http_filter_scheduler_module_ptr)(s.schedulerPtr),
858868
)
859869
})
860870
}
@@ -1113,6 +1123,7 @@ func newDymStreamPluginHandle(
11131123

11141124
type dymConfigHandle struct {
11151125
hostConfigPtr C.envoy_dynamic_module_type_http_filter_config_envoy_ptr
1126+
scheduler *dymScheduler
11161127
}
11171128

11181129
func (h *dymConfigHandle) Log(level shared.LogLevel, format string, args ...any) {
@@ -1195,6 +1206,31 @@ func (h *dymConfigHandle) DefineCounter(name string,
11951206
return shared.MetricID(metricID), shared.MetricsResult(result)
11961207
}
11971208

1209+
func (h *dymConfigHandle) GetScheduler() shared.Scheduler {
1210+
if h.scheduler == nil {
1211+
// The scheduler is created lazily and should never be nil
1212+
// in practice. But it will be nil in mock tests.
1213+
schedulerPtr := C.envoy_dynamic_module_callback_http_filter_config_scheduler_new(
1214+
h.hostConfigPtr)
1215+
h.scheduler = newDymScheduler(
1216+
unsafe.Pointer(schedulerPtr),
1217+
func(schedulerPtr unsafe.Pointer, taskID C.uint64_t) {
1218+
C.envoy_dynamic_module_callback_http_filter_config_scheduler_commit(
1219+
(C.envoy_dynamic_module_type_http_filter_config_scheduler_module_ptr)(schedulerPtr),
1220+
taskID,
1221+
)
1222+
},
1223+
)
1224+
1225+
runtime.SetFinalizer(h.scheduler, func(s *dymScheduler) {
1226+
C.envoy_dynamic_module_callback_http_filter_config_scheduler_delete(
1227+
(C.envoy_dynamic_module_type_http_filter_config_scheduler_module_ptr)(s.schedulerPtr),
1228+
)
1229+
})
1230+
}
1231+
return h.scheduler
1232+
}
1233+
11981234
type dymRouteConfigHandle struct{}
11991235

12001236
func (h *dymRouteConfigHandle) Log(level shared.LogLevel, format string, args ...any) {
@@ -1236,7 +1272,7 @@ func envoy_dynamic_module_on_http_filter_config_new(
12361272
configHandle.Log(shared.LogLevelWarn, "Failed to load configuration: %v", err)
12371273
return nil
12381274
}
1239-
configPtr := configManager.record(&httpFilterConfigWrapper{pluginFactory: factory})
1275+
configPtr := configManager.record(&httpFilterConfigWrapper{pluginFactory: factory, configHandle: configHandle})
12401276
return C.envoy_dynamic_module_type_http_filter_config_module_ptr(configPtr)
12411277
}
12421278

@@ -1248,6 +1284,7 @@ func envoy_dynamic_module_on_http_filter_config_destroy(
12481284
if factoryWrapper == nil {
12491285
return
12501286
}
1287+
factoryWrapper.configHandle.scheduler = nil
12511288
factoryWrapper.pluginFactory.OnDestroy()
12521289
configManager.remove(unsafe.Pointer(configPtr))
12531290
}
@@ -1594,3 +1631,20 @@ func envoy_dynamic_module_on_http_filter_downstream_below_write_buffer_low_water
15941631
pluginWrapper.downstreamWatermarkCallbacks.OnBelowWriteBufferLowWatermark()
15951632
}
15961633
}
1634+
1635+
//export envoy_dynamic_module_on_http_filter_config_scheduled
1636+
func envoy_dynamic_module_on_http_filter_config_scheduled(
1637+
_ C.envoy_dynamic_module_type_http_filter_config_envoy_ptr,
1638+
configPtr C.envoy_dynamic_module_type_http_filter_config_module_ptr,
1639+
taskID C.uint64_t,
1640+
) {
1641+
configWrapper := configManager.unwrap(unsafe.Pointer(configPtr))
1642+
if configWrapper == nil || configWrapper.configHandle == nil {
1643+
return
1644+
}
1645+
ch := configWrapper.configHandle
1646+
1647+
if ch.scheduler != nil {
1648+
ch.scheduler.onScheduled(uint64(taskID))
1649+
}
1650+
}

source/extensions/dynamic_modules/sdk/go/shared/base.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,4 +656,9 @@ type HttpFilterConfigHandle interface {
656656
// @Return the counter metric id. This metric can never be used after the plugin
657657
// config is unloaded.
658658
DefineCounter(name string, tagKeys ...string) (MetricID, MetricsResult)
659+
660+
// GetScheduler retrieves a scheduler for deferred task execution in the config context.
661+
// This should be called only during the plugin configuration phase, and the returned
662+
// Scheduler can be used later even outside of the callbacks and at other threads.
663+
GetScheduler() Scheduler
659664
}

source/extensions/dynamic_modules/sdk/go/shared/mocks/mock_base.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test/extensions/dynamic_modules/test_data/cpp/http_integration_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,10 @@ class ConfigSchedulerConfigFactory : public HttpFilterConfigFactory {
9090
auto shared_status = std::make_shared<std::atomic<bool>>(false);
9191

9292
// Simulate async config update.
93-
std::thread([shared_status]() {
93+
handle.getScheduler()->schedule([shared_status]() {
9494
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NO_CHECK_FORMAT(real_time)
9595
shared_status->store(true);
96-
}).detach();
96+
});
9797

9898
return std::make_unique<ConfigSchedulerFilterFactory>(shared_status);
9999
}

test/extensions/dynamic_modules/test_data/go/http_integration_test/http_integration_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,10 @@ func (f *ConfigSchedulerConfigFactory) Create(handle shared.HttpFilterConfigHand
5353
sharedStatus := new(atomic.Bool)
5454
sharedStatus.Store(false)
5555

56-
// TODO(wbpcode): to support the actual config scheduler in golang SDK.
57-
go func() {
56+
handle.GetScheduler().Schedule(func() {
5857
time.Sleep(100 * time.Millisecond)
5958
sharedStatus.Store(true)
60-
}()
59+
})
6160

6261
return &ConfigSchedulerFilterFactory{sharedStatus: sharedStatus}, nil
6362
}
@@ -67,6 +66,10 @@ type ConfigSchedulerFilterFactory struct {
6766
sharedStatus *atomic.Bool
6867
}
6968

69+
func (f *ConfigSchedulerFilterFactory) OnDestroy() {
70+
runtime.GC()
71+
}
72+
7073
func (f *ConfigSchedulerFilterFactory) Create(handle shared.HttpFilterHandle) shared.HttpFilter {
7174
return &ConfigSchedulerFilter{sharedStatus: f.sharedStatus}
7275
}

tools/code_format/check_format.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -882,7 +882,10 @@ def fix_source_path(self, file_path):
882882

883883
def check_source_path(self, file_path):
884884
error_messages = []
885-
if self.run_code_validation:
885+
# This dynamic module SDK will be built into a shared library which we prefer to use
886+
# standard library rather then the absl equivalents. We simply skip the content check
887+
# for this directory to avoid false positives.
888+
if self.run_code_validation and "dynamic_modules/sdk/cpp" not in file_path:
886889
error_messages = self.check_file_contents(file_path, self.check_source_line)
887890
if file_path.endswith((".cc", ".h")):
888891
error_messages += self.check_namespace(file_path)

0 commit comments

Comments
 (0)