diff --git a/CMakeLists.txt b/CMakeLists.txt index 2d7d318..b3625a0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -82,6 +82,7 @@ include(cmake/install.cmake) # --- Tests --- if(BUILD_UNIT_TESTS) find_package(GTest REQUIRED CONFIG) + find_package(argparse REQUIRED CONFIG) enable_testing() add_subdirectory(tests) endif() diff --git a/conan/conanfile.py b/conan/conanfile.py index 6032f2a..f0be2bf 100644 --- a/conan/conanfile.py +++ b/conan/conanfile.py @@ -23,6 +23,7 @@ class KickmsgDev(ConanFile): def requirements(self): if self.options.unit_tests: self.requires("gtest/1.15.0") + self.requires("argparse/3.2") # stress-test CLI if self.options.benchmarks: self.requires("benchmark/1.9.1") diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e5d935a..a01d654 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -15,6 +15,7 @@ add_test(NAME kickmsg COMMAND kickmsg_unit WORKING_DIRECTORY ${CMAKE_BINARY_DIR} add_executable(kickmsg_stress_test stress/main.cc + stress/common.cc stress/treiber.cc stress/churn.cc stress/gc_recovery.cc @@ -24,17 +25,18 @@ add_executable(kickmsg_stress_test stress/live_repair.cc stress/edge_cases.cc ) -target_link_libraries(kickmsg_stress_test PRIVATE kickmsg) +target_link_libraries(kickmsg_stress_test PRIVATE kickmsg argparse::argparse) set_target_properties(kickmsg_stress_test PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) -# Deliberately NOT registered with add_test. The suite oversubscribes (up to -# 32 threads) and the publisher spin-waits do not yield, so on the 2-4 core -# CI runners the spinners starve the very threads they wait on and the suite -# fails to finish within any sane timeout (observed: >900s vs ~10s on a -# 24-core host). Run it manually via scripts/validate.sh or tests/endurance.sh. -# TODO: re-register once the spin-wait backoff/yield lands (tracked: the -# busy-spin starvation under oversubscription is a real bug, not a test flake). +# Contention thread counts scale to the host core count (see contention_count +# / the --oversub flag), keeping the suite bounded on 2-4 core CI runners +# instead of the oversubscription livelock a fixed 32-thread count caused. +# Generous timeout to absorb slow or TSAN-instrumented runs. +# Follow-up: split this integration/stress suite from the unit tests so it can +# run as its own CI job rather than alongside kickmsg_unit. +add_test(NAME stress COMMAND kickmsg_stress_test WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +set_tests_properties(stress PROPERTIES TIMEOUT 300) # Crash test uses fork/waitpid/SIGKILL — POSIX only if(NOT WIN32) diff --git a/tests/stress/common.cc b/tests/stress/common.cc new file mode 100644 index 0000000..8a10606 --- /dev/null +++ b/tests/stress/common.cc @@ -0,0 +1,15 @@ +#include "common.h" + +uint16_t g_oversub_pct = 150; + +uint16_t contention_count() +{ + uint32_t hw = std::thread::hardware_concurrency(); + if (hw == 0) + { + hw = 4; + } + uint32_t total = (hw * g_oversub_pct + 99) / 100; // ceil(hw * pct / 100) + uint32_t per_side = std::max(2, (total + 1) / 2); + return static_cast(std::min(per_side, UINT16_MAX)); +} diff --git a/tests/stress/common.h b/tests/stress/common.h index c2ac8b0..ef8927e 100644 --- a/tests/stress/common.h +++ b/tests/stress/common.h @@ -29,6 +29,20 @@ using namespace kickmsg; constexpr int TSAN_SCALE = 1; #endif +// Target TOTAL contention threads (publishers + subscribers) as a percentage +// of the host core count. Default 150 (~1.5x cores: oversubscribed enough to +// contend, bounded enough to finish). Settable from the stress binary's +// command line so a run can be dialed up or down. Read by contention_count(). +extern uint16_t g_oversub_pct; + +// Per-side thread count for a contention scenario, derived from the host core +// count and g_oversub_pct. This SCALES with the machine (a 192-core box gets +// hundreds of threads, still oversubscribed) instead of a fixed count that +// would leave a big box undersubscribed -- while staying bounded on a 2-core +// CI runner. Floored at 2. Callers must size max_subs / max_subscribers to +// match the returned value. +uint16_t contention_count(); + struct Payload { static constexpr uint32_t MAGIC = 0xCAFEBABE; diff --git a/tests/stress/fairness.cc b/tests/stress/fairness.cc index 637f0d6..3e2b98f 100644 --- a/tests/stress/fairness.cc +++ b/tests/stress/fairness.cc @@ -2,13 +2,17 @@ bool run_fairness_test() { - std::printf("--- Fairness test: 1 pub x 100000 msgs, 16 subs (ring=256, pool=512) ---\n"); - g_all_publishers_done = false; - constexpr int NUM_SUBS = 16; + // Subscriber count scales to the host (max_subscribers tracks it, so the + // ring count scales too): bounded on a low-core CI box, oversubscribed on + // a big one. See contention_count(). + int const NUM_SUBS = static_cast(contention_count()); uint32_t const NUM_MSGS = 100000 / TSAN_SCALE; + std::printf("--- Fairness test: 1 pub x %u msgs, %d subs (ring=256, pool=512) ---\n", + NUM_MSGS, NUM_SUBS); + kickmsg::channel::Config cfg; cfg.max_subscribers = NUM_SUBS; cfg.sub_ring_capacity = 256; diff --git a/tests/stress/main.cc b/tests/stress/main.cc index 5a0452b..5099cd5 100644 --- a/tests/stress/main.cc +++ b/tests/stress/main.cc @@ -1,6 +1,8 @@ #include "common.h" #include "kickmsg/version.h" +#include + // Forward declarations bool run_treiber_stress(); bool run_subscriber_churn(); @@ -12,15 +14,47 @@ bool run_live_repair(); bool run_single_slot_ring(); bool run_subscriber_saturation(); -int main() +int main(int argc, char** argv) { + argparse::ArgumentParser program("kickmsg_stress_test"); + program.add_description( + "Lock-free shared-memory stress suite. Thread counts scale to the host " + "CPU; --oversub tunes how hard it contends."); + program.add_argument("--oversub") + .help("total contention threads as a percentage of CPU cores " + "(150 = ~1.5x cores, 50 = light, 400 = heavy)") + .metavar("PCT") + .scan<'i', int>() + .default_value(static_cast(g_oversub_pct)); + + try + { + program.parse_args(argc, argv); + } + catch (std::exception const& e) + { + std::fprintf(stderr, "%s\n", e.what()); + return 2; + } + + int pct = program.get("--oversub"); + if (pct > 0) + { + g_oversub_pct = static_cast(std::min(pct, 65535)); + } + std::printf("=== Kickmsg Lock-Free Stress Tests ===\n"); - // Build stamp: confirm which binary is running. __DATE__/__TIME__ is this - // harness TU's compile time; shm ABI version confirms the layout in use. - std::printf("kickmsg %s | shm ABI v%u | harness built %s %s\n\n", + // Build stamp + resolved contention so a run is self-describing: + // __DATE__/__TIME__ is this harness TU's compile time; the ABI version + // confirms the layout; oversub/cores show the contention actually used. + std::printf("kickmsg %s | shm ABI v%u | harness built %s %s\n", KICKMSG_VERSION_STRING, static_cast(kickmsg::VERSION), __DATE__, __TIME__); + std::printf("contention: %u%% of %u cores -> %u threads/side\n\n", + static_cast(g_oversub_pct), + std::thread::hardware_concurrency(), + static_cast(contention_count())); TestRunner runner; diff --git a/tests/stress/mpmc.cc b/tests/stress/mpmc.cc index 05d2701..5a3e465 100644 --- a/tests/stress/mpmc.cc +++ b/tests/stress/mpmc.cc @@ -169,14 +169,15 @@ void run_all_mpmc(TestRunner& runner) } { + int const n = contention_count(); TestConfig tc; - tc.num_publishers = 8; - tc.num_subscribers = 8; + tc.num_publishers = n; + tc.num_subscribers = n; tc.msgs_per_pub = 50000 / TSAN_SCALE; tc.pool_size = 128; tc.ring_capacity = 32; - tc.max_subs = 16; - runner.run("mpmc 8p/8s", [&]{ return run_stress_test(tc); }); + tc.max_subs = static_cast(n); + runner.run("mpmc contended (pool 128)", [&]{ return run_stress_test(tc); }); } { @@ -190,16 +191,17 @@ void run_all_mpmc(TestRunner& runner) runner.run("mpmc 1p/1s", [&]{ return run_stress_test(tc); }); } - // High contention: many pubs, small pool, heavy overflow + // High contention: small pool, heavy overflow { + int const n = contention_count(); TestConfig tc; - tc.num_publishers = 16; - tc.num_subscribers = 16; + tc.num_publishers = n; + tc.num_subscribers = n; tc.msgs_per_pub = 20000 / TSAN_SCALE; tc.pool_size = 32; tc.ring_capacity = 8; - tc.max_subs = 16; - runner.run("mpmc 16p/16s hi-contention", [&]{ return run_stress_test(tc); }); + tc.max_subs = static_cast(n); + runner.run("mpmc contended (pool 32, tiny)", [&]{ return run_stress_test(tc); }); } // Zero-copy receive tests -- exercises SampleView pin CAS, @@ -217,27 +219,29 @@ void run_all_mpmc(TestRunner& runner) } { + int const n = contention_count(); TestConfig tc; - tc.num_publishers = 8; - tc.num_subscribers = 8; + tc.num_publishers = n; + tc.num_subscribers = n; tc.msgs_per_pub = 50000 / TSAN_SCALE; tc.pool_size = 128; tc.ring_capacity = 32; - tc.max_subs = 16; + tc.max_subs = static_cast(n); tc.use_zerocopy = true; - runner.run("mpmc 8p/8s zerocopy", [&]{ return run_stress_test(tc); }); + runner.run("mpmc contended (pool 128) zerocopy", [&]{ return run_stress_test(tc); }); } - // High contention zero-copy + // High contention zero-copy: small pool, heavy overflow { + int const n = contention_count(); TestConfig tc; - tc.num_publishers = 16; - tc.num_subscribers = 16; + tc.num_publishers = n; + tc.num_subscribers = n; tc.msgs_per_pub = 20000 / TSAN_SCALE; tc.pool_size = 32; tc.ring_capacity = 8; - tc.max_subs = 16; + tc.max_subs = static_cast(n); tc.use_zerocopy = true; - runner.run("mpmc 16p/16s zerocopy hi-contention", [&]{ return run_stress_test(tc); }); + runner.run("mpmc contended (pool 32, tiny) zerocopy", [&]{ return run_stress_test(tc); }); } }