Skip to content

Commit cc87e5e

Browse files
committed
WIP Controller refactor.
1 parent 2a83691 commit cc87e5e

13 files changed

Lines changed: 295 additions & 176 deletions

core/network_interface.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ class NetworkInterface {
160160
template <typename... ARGs>
161161
void RpcHandle<ARGs...>::rpc(NetworkInterface &p_interface, int p_peer_id, ARGs... p_args) const {
162162
ENSURE(p_interface.rpcs_info.size() > index);
163+
ASSERT_COND_MSG(p_interface.fetch_local_peer_id() != p_peer_id, "Sending an rpc to self is not allowed.");
163164

164165
DataBuffer db;
165166
db.begin_write(0);

core/object_data.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ struct ObjectData {
6969
/// The sync variables of this node. The order of this vector matters
7070
/// because the index is the `VarId`.
7171
std::vector<VarDescriptor> vars;
72-
NS::Processor<float> functions[PROCESSPHASE_COUNT];
72+
NS::Processor<double> functions[PROCESSPHASE_COUNT];
7373

7474
std::function<void(DataBuffer & /*out_buffer*/, float /*update_rate*/)> func_trickled_collect;
75-
std::function<void(float /*delta*/, float /*interpolation_alpha*/, DataBuffer & /*past_buffer*/, DataBuffer & /*future_buffer*/)> func_trickled_apply;
75+
std::function<void(double /*delta*/, float /*interpolation_alpha*/, DataBuffer & /*past_buffer*/, DataBuffer & /*future_buffer*/)> func_trickled_apply;
7676

7777
public:
7878
void set_net_id(ObjectNetId p_id);

net_utilities.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ void NS::PeerData::set_latency(int p_latency) {
1515
}
1616

1717
int NS::PeerData::get_latency() const {
18-
return compressed_latency * 4.0;
18+
return int(compressed_latency) * 4;
1919
}
2020

2121
void NS::PeerData::make_controller(NS::SceneSynchronizerBase &p_scene_sync, int p_peer) {
@@ -63,6 +63,9 @@ void NS::SyncGroup::mark_changes_as_notified() {
6363

6464
void NS::SyncGroup::add_listening_peer(int p_peer) {
6565
NS::VecFunc::insert_unique(listening_peers, p_peer);
66+
if (NS::VecFunc::insert_unique(networked_peers, p_peer)) {
67+
NS::VecFunc::insert_unique(peers_with_newly_calculated_latency, p_peer);
68+
}
6669

6770
// Make all the controlled objects as simulated.
6871
const std::vector<ObjectData *> *controlled_objects_ptr = scene_sync->get_peer_controlled_objects_data(p_peer);
@@ -184,6 +187,7 @@ void NS::SyncGroup::remove_sync_object(std::size_t p_index, bool p_is_simulated)
184187
// If no other simulated objects controlled by `associated_peer` remove it from
185188
bool is_simulating = false;
186189
bool is_networking = false;
190+
187191
for (auto &soi : simulated_sync_objects) {
188192
if (soi.od->get_controlled_by_peer() == associted_peer) {
189193
is_networking = true;
@@ -192,6 +196,12 @@ void NS::SyncGroup::remove_sync_object(std::size_t p_index, bool p_is_simulated)
192196
}
193197
}
194198

199+
if (!is_networking) {
200+
if (NS::VecFunc::has(listening_peers, associted_peer)) {
201+
is_networking = true;
202+
}
203+
}
204+
195205
if (!is_networking) {
196206
for (auto &toi : trickled_sync_objects) {
197207
if (toi.od->get_controlled_by_peer() == associted_peer) {

networked_controller.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ void NetworkedControllerBase::on_state_validated(FrameIndex p_frame_index) {
314314
}
315315

316316
void NetworkedControllerBase::on_rewind_frame_begin(FrameIndex p_input_id, int p_index, int p_count) {
317-
if (controller && is_realtime_enabled()) {
317+
if (controller && can_simulate()) {
318318
controller->queue_instant_process(p_input_id, p_index, p_count);
319319
}
320320
}
@@ -374,13 +374,15 @@ bool NetworkedControllerBase::player_has_new_input() const {
374374
return has_player_new_input;
375375
}
376376

377-
bool NetworkedControllerBase::is_realtime_enabled() {
378-
if (is_server_controller()) {
379-
return true;
380-
} else if (scene_synchronizer) {
381-
// TODO optimize by avoiding fetching the controlled objects in this way?
382-
const std::vector<ObjectData *> *controlled_objects = scene_synchronizer->get_peer_controlled_objects_data(get_authority_peer());
383-
if (controlled_objects) {
377+
bool NetworkedControllerBase::can_simulate() {
378+
NS_PROFILE
379+
380+
const std::vector<ObjectData *> *controlled_objects = scene_synchronizer ? scene_synchronizer->get_peer_controlled_objects_data(get_authority_peer()) : nullptr;
381+
if (controlled_objects) {
382+
if (is_server_controller() || is_player_controller()) {
383+
return controlled_objects->size() > 0;
384+
} else {
385+
// TODO optimize by avoiding fetching the controlled objects in this way?
384386
for (const ObjectData *od : *controlled_objects) {
385387
if (od->realtime_sync_enabled_on_client) {
386388
return true;

networked_controller.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ struct NoNetController;
3737
// instantiated.
3838
// The most important part is inside the `PlayerController`, `ServerController`,
3939
// `DollController`, `NoNetController`.
40-
class NetworkedControllerBase {
40+
class NetworkedControllerBase final {
4141
friend class NS::SceneSynchronizerBase;
4242
template <class NetInterfaceClass>
4343
friend class NetworkedController;
@@ -214,7 +214,7 @@ class NetworkedControllerBase {
214214
public:
215215
bool player_has_new_input() const;
216216

217-
bool is_realtime_enabled();
217+
bool can_simulate();
218218

219219
protected:
220220
void notify_controller_reset();

scene_synchronizer.cpp

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ void SceneSynchronizerBase::setup(SynchronizerManager &p_synchronizer_interface)
132132
// Make sure to reset all the assigned controllers.
133133
reset_controllers();
134134

135+
// Spawn the self peer.
136+
on_peer_connected(get_network_interface().fetch_local_peer_id());
137+
135138
// Init the peers already connected.
136139
std::vector<int> peer_ids;
137140
network_interface->fetch_connected_peers(peer_ids);
@@ -624,7 +627,7 @@ void SceneSynchronizerBase::untrack_variable_changes(ListenerHandle p_handle) {
624627
delete listener;
625628
}
626629

627-
NS::PHandler SceneSynchronizerBase::register_process(ObjectLocalId p_id, ProcessPhase p_phase, std::function<void(float)> p_func) {
630+
NS::PHandler SceneSynchronizerBase::register_process(ObjectLocalId p_id, ProcessPhase p_phase, std::function<void(double)> p_func) {
628631
ERR_FAIL_COND_V(p_id == NS::ObjectLocalId::NONE, NS::NullPHandler);
629632
ERR_FAIL_COND_V(!p_func, NS::NullPHandler);
630633

@@ -651,7 +654,7 @@ void SceneSynchronizerBase::unregister_process(ObjectLocalId p_id, ProcessPhase
651654
void SceneSynchronizerBase::set_trickled_sync(
652655
ObjectLocalId p_id,
653656
std::function<void(DataBuffer & /*out_buffer*/, float /*update_rate*/)> p_func_trickled_collect,
654-
std::function<void(float /*delta*/, float /*interpolation_alpha*/, DataBuffer & /*past_buffer*/, DataBuffer & /*future_buffer*/)> p_func_trickled_apply) {
657+
std::function<void(double /*delta*/, float /*interpolation_alpha*/, DataBuffer & /*past_buffer*/, DataBuffer & /*future_buffer*/)> p_func_trickled_apply) {
655658
ERR_FAIL_COND(p_id == ObjectLocalId::NONE);
656659

657660
NS::ObjectData *od = get_object_data(p_id);
@@ -884,7 +887,13 @@ bool SceneSynchronizerBase::is_peer_networking_enabled(int p_peer) const {
884887
void SceneSynchronizerBase::on_peer_connected(int p_peer) {
885888
PeerData npd;
886889
auto pd_it = MapFunc::insert_if_new(peer_data, p_peer, std::move(npd));
890+
if (pd_it->second.get_controller()) {
891+
// Nothing to do, already initialized.
892+
return;
893+
}
894+
887895
pd_it->second.make_controller(*this, p_peer);
896+
reset_controller(*pd_it->second.get_controller());
888897

889898
event_peer_status_updated.broadcast(p_peer, true, true);
890899

@@ -1372,7 +1381,7 @@ void SceneSynchronizerBase::process_functions__execute() {
13721381
cached_process_functions_valid = true;
13731382
}
13741383

1375-
SceneSynchronizerDebugger::singleton()->debug_print(network_interface, "Process functions START", true);
1384+
SceneSynchronizerDebugger::singleton()->print(INFO, "Process functions START");
13761385

13771386
// Pre process phase
13781387
for (int process_phase = PROCESSPHASE_EARLY; process_phase < PROCESSPHASE_PROCESS; ++process_phase) {
@@ -1386,9 +1395,9 @@ void SceneSynchronizerBase::process_functions__execute() {
13861395
const std::string info = "process phase -- CONTROLLER --";
13871396
NS_PROFILE_WITH_INFO(info);
13881397

1389-
// TODO optimize this, as `is_realtime_enabled` is expensive.
1398+
// TODO optimize this, as `can_simulate` is expensive.
13901399
for (auto &pd : peer_data) {
1391-
if (pd.second.get_controller() && pd.second.get_controller()->is_realtime_enabled()) {
1400+
if (pd.second.get_controller() && pd.second.get_controller()->can_simulate()) {
13921401
pd.second.get_controller()->process(get_fixed_frame_delta());
13931402
}
13941403
}
@@ -1935,6 +1944,11 @@ void ServerSynchronizer::process_snapshot_notificator() {
19351944
delta_snapshot.begin_write(MD_SIZE);
19361945

19371946
for (int peer_id : group.get_listening_peers()) {
1947+
if (peer_id == scene_synchronizer->get_network_interface().fetch_local_peer_id()) {
1948+
// Never send the snapshot to self (notice `self` is the server).
1949+
continue;
1950+
}
1951+
19381952
NS::PeerData *peer = MapFunc::get_or_null(scene_synchronizer->peer_data, peer_id);
19391953
if (peer == nullptr) {
19401954
SceneSynchronizerDebugger::singleton()->print(ERROR, "The `process_snapshot_notificator` failed to lookup the peer_id `" + std::to_string(peer_id) + "`. Was it removed but never cleared from sync_groups. Report this error, as this is a bug.");
@@ -2243,6 +2257,11 @@ void ServerSynchronizer::process_latency_update() {
22432257
const std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
22442258

22452259
for (auto &[peer, peer_data] : scene_synchronizer->get_peers()) {
2260+
if (peer == scene_synchronizer->get_network_interface().fetch_local_peer_id()) {
2261+
// No need to update the ping for `self` (the server).
2262+
continue;
2263+
}
2264+
22462265
std::map<int, PeerServerData>::iterator peer_server_data_it = NS::MapFunc::insert_if_new(peers_data, peer, PeerServerData());
22472266
if (peer_server_data_it->second.latency_calculation_in_progress) {
22482267
continue;
@@ -2291,7 +2310,7 @@ void ServerSynchronizer::process_adjust_clients_controller_tick_rate(double p_de
22912310
}
22922311

22932312
void ServerSynchronizer::process_adjust_client_controller_tick_rate(double p_delta, int p_controller_peer, NetworkedControllerBase &p_controller) {
2294-
CRASH_COND(!p_controller.is_server_controller());
2313+
ASSERT_COND(p_controller.is_server_controller());
22952314

22962315
if (!p_controller.get_server_controller_unchecked()->streaming_paused) {
22972316
return;
@@ -2368,7 +2387,7 @@ void ClientSynchronizer::process(double p_delta) {
23682387
process_trickled_sync(p_delta);
23692388

23702389
#if DEBUG_ENABLED
2371-
if (player_controller) {
2390+
if (player_controller && player_controller->can_simulate()) {
23722391
const int client_peer = scene_synchronizer->network_interface->fetch_local_peer_id();
23732392
SceneSynchronizerDebugger::singleton()->write_dump(client_peer, player_controller->get_current_frame_index().id);
23742393
SceneSynchronizerDebugger::singleton()->start_new_frame();
@@ -2447,7 +2466,7 @@ void ClientSynchronizer::signal_end_sync_changed_variables_events() {
24472466
}
24482467

24492468
void ClientSynchronizer::on_controller_reset(NetworkedControllerBase &p_controller) {
2450-
if (player_controller->is_player_controller()) {
2469+
if (p_controller.is_player_controller()) {
24512470
// This can't trigger because the reset function creates the player
24522471
// controller when the following condition is true.
24532472
ASSERT_COND(p_controller.get_authority_peer() == scene_synchronizer->get_network_interface().fetch_local_peer_id());
@@ -2461,7 +2480,7 @@ void ClientSynchronizer::on_controller_reset(NetworkedControllerBase &p_controll
24612480
}
24622481

24632482
const std::vector<ObjectData *> &ClientSynchronizer::get_active_objects() const {
2464-
if make_likely (player_controller && enabled) {
2483+
if make_likely (player_controller && player_controller->can_simulate() && enabled) {
24652484
return active_objects;
24662485
} else {
24672486
// Since there is no player controller or the sync is disabled, this
@@ -2901,7 +2920,7 @@ int ClientSynchronizer::calculates_sub_ticks(const double p_delta) {
29012920
void ClientSynchronizer::process_simulation(double p_delta) {
29022921
NS_PROFILE
29032922

2904-
if make_unlikely (player_controller == nullptr || enabled == false) {
2923+
if make_unlikely (player_controller == nullptr || enabled == false || !player_controller->can_simulate()) {
29052924
// No player controller so can't process the simulation.
29062925
// TODO Remove this constraint?
29072926

@@ -3434,7 +3453,7 @@ bool ClientSynchronizer::parse_snapshot(DataBuffer &p_snapshot) {
34343453
return false;
34353454
}
34363455

3437-
if make_unlikely (received_snapshot.input_id == FrameIndex::NONE && player_controller != nullptr) {
3456+
if make_unlikely (received_snapshot.input_id == FrameIndex::NONE && player_controller && player_controller->can_simulate()) {
34383457
// We espect that the player_controller is updated by this new snapshot,
34393458
// so make sure it's done so.
34403459
SceneSynchronizerDebugger::singleton()->print(ERROR, "The player controller (" + std::to_string(player_controller->get_authority_peer()) + ") was not part of the received snapshot, this happens when the server destroys the peer controller.");

scene_synchronizer.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class SceneSynchronizerBase {
211211
std::vector<ChangesListener *> changes_listeners;
212212

213213
bool cached_process_functions_valid = false;
214-
Processor<float> cached_process_functions[PROCESSPHASE_COUNT];
214+
Processor<double> cached_process_functions[PROCESSPHASE_COUNT];
215215

216216
// Set at runtime by the constructor by reading the project settings.
217217
bool debug_rewindings_enabled = false;
@@ -374,7 +374,7 @@ class SceneSynchronizerBase {
374374
void untrack_variable_changes(ListenerHandle p_handle);
375375

376376
/// You can use the macro `callable_mp()` to register custom C++ function.
377-
NS::PHandler register_process(ObjectLocalId p_id, ProcessPhase p_phase, std::function<void(float)> p_func);
377+
NS::PHandler register_process(ObjectLocalId p_id, ProcessPhase p_phase, std::function<void(double)> p_func);
378378
void unregister_process(ObjectLocalId p_id, ProcessPhase p_phase, NS::PHandler p_func_handler);
379379

380380
/// Setup the trickled sync method for this specific object.
@@ -383,7 +383,7 @@ class SceneSynchronizerBase {
383383
void set_trickled_sync(
384384
ObjectLocalId p_id,
385385
std::function<void(DataBuffer & /*out_buffer*/, float /*update_rate*/)> p_func_trickled_collect,
386-
std::function<void(float /*delta*/, float /*interpolation_alpha*/, DataBuffer & /*past_buffer*/, DataBuffer & /*future_buffer*/)> p_func_trickled_apply);
386+
std::function<void(double /*delta*/, float /*interpolation_alpha*/, DataBuffer & /*past_buffer*/, DataBuffer & /*future_buffer*/)> p_func_trickled_apply);
387387

388388
/// Returns the latency (RTT in ms) for this peer or -1 if the latency is not available.
389389
int get_peer_latency(int p_peer) const;

tests/local_network.cpp

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include "core/error/error_macros.h"
55
#include "core/math/vector3.h"
6+
#include "modules/network_synchronizer/core/ensure.h"
67
#include "modules/network_synchronizer/core/network_interface.h"
78
#include "modules/network_synchronizer/core/processor.h"
89
#include "modules/network_synchronizer/core/var_data.h"
@@ -39,11 +40,25 @@ void LocalNetwork::start_as_client(LocalNetwork &p_server_network) {
3940
this_peer = peer;
4041
p_server_network.peer_counter += 1;
4142

43+
// Insert this peer into the server connected peer.
4244
p_server_network.connected_peers[peer] = this;
45+
46+
// Put the server into the list of connected peers.
4347
connected_peers[1] = &p_server_network;
4448

49+
// Emit the connected event
4550
p_server_network.connected_event.broadcast(peer);
4651
connected_event.broadcast(1);
52+
53+
// Mark all the other peers as connected too.
54+
for (auto [other_peer, other_local_network] : p_server_network.connected_peers) {
55+
if (peer != other_peer) {
56+
connected_peers[other_peer] = other_local_network;
57+
other_local_network->connected_peers[peer] = this;
58+
other_local_network->connected_event.broadcast(peer);
59+
connected_event.broadcast(other_peer);
60+
}
61+
}
4762
}
4863

4964
void LocalNetwork::register_object(LocalNetworkInterface &p_interface) {
@@ -99,14 +114,16 @@ void LocalNetwork::process(float p_delta) {
99114
}
100115

101116
void LocalNetwork::rpc_send_internal(const std::shared_ptr<PendingPacket> &p_packet) {
117+
ASSERT_COND_MSG(p_packet->peer_recipient != get_peer(), "During the integration test was generated an RPC to self. This is a bug.");
118+
102119
auto recipient = connected_peers.find(p_packet->peer_recipient);
103-
CRASH_COND(recipient == connected_peers.end());
120+
ASSERT_COND(recipient != connected_peers.end());
104121

105122
auto object_map_it = registered_objects.find(p_packet->object_name);
106-
CRASH_COND(object_map_it == registered_objects.end());
123+
ASSERT_COND(object_map_it != registered_objects.end());
107124

108125
LocalNetworkInterface *object_net_interface = object_map_it->second;
109-
CRASH_COND(object_net_interface == nullptr);
126+
ASSERT_COND(object_net_interface);
110127

111128
recipient->second->rpc_receive_internal(this_peer, p_packet);
112129
}
@@ -155,7 +172,7 @@ void LocalNetworkInterface::fetch_connected_peers(std::vector<int> &p_connected_
155172
p_connected_peers.clear();
156173
// Get all the connected peers.
157174
for (const auto &[peer_id, _] : network->get_connected_peers()) {
158-
if (peer_id != get_unit_authority()) {
175+
if (peer_id != fetch_local_peer_id()) {
159176
p_connected_peers.push_back(peer_id);
160177
}
161178
}
@@ -288,14 +305,22 @@ void NS_Test::test_local_network() {
288305
CRASH_COND(peer_2_connection_event[0] != server.get_peer());
289306

290307
// Check the connected peers list is valid
291-
std::vector<int> connected_peers;
292-
server_obj_1.fetch_connected_peers(connected_peers);
293-
CRASH_COND(std::find(connected_peers.begin(), connected_peers.end(), peer_1.get_peer()) == connected_peers.end());
294-
CRASH_COND(std::find(connected_peers.begin(), connected_peers.end(), peer_2.get_peer()) == connected_peers.end());
295-
peer_1_obj_1.fetch_connected_peers(connected_peers);
296-
CRASH_COND(std::find(connected_peers.begin(), connected_peers.end(), server.get_peer()) == connected_peers.end());
297-
peer_2_obj_1.fetch_connected_peers(connected_peers);
298-
CRASH_COND(std::find(connected_peers.begin(), connected_peers.end(), server.get_peer()) == connected_peers.end());
308+
{
309+
std::vector<int> connected_peers;
310+
server_obj_1.fetch_connected_peers(connected_peers);
311+
ASSERT_COND(NS::VecFunc::has(connected_peers, peer_1.get_peer()));
312+
ASSERT_COND(NS::VecFunc::has(connected_peers, peer_2.get_peer()));
313+
314+
connected_peers.clear();
315+
peer_1_obj_1.fetch_connected_peers(connected_peers);
316+
ASSERT_COND(NS::VecFunc::has(connected_peers, server.get_peer()));
317+
ASSERT_COND(NS::VecFunc::has(connected_peers, peer_2.get_peer()));
318+
319+
connected_peers.clear();
320+
peer_2_obj_1.fetch_connected_peers(connected_peers);
321+
ASSERT_COND(NS::VecFunc::has(connected_peers, server.get_peer()));
322+
ASSERT_COND(NS::VecFunc::has(connected_peers, peer_1.get_peer()));
323+
}
299324

300325
Vector<uint8_t> vec;
301326
vec.push_back(1);

tests/local_scene.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ void LocalScene::remove_object(const char *p_object_name) {
165165
}
166166
}
167167

168-
void LocalScene::process(float p_delta) {
168+
void LocalScene::process(double p_delta) {
169169
scene_sync->process(p_delta);
170170
// Clear any pending RPC.
171171
// NOTE: The network process is executed after the scene_sync so any pending

tests/local_scene.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class LocalScene {
8181

8282
void remove_object(const char *p_object_name);
8383

84-
void process(float p_delta);
84+
void process(double p_delta);
8585
};
8686

8787
template <class T>

0 commit comments

Comments
 (0)