snen.dev blog

Taking "Flight" with Bevy

Part 3 - Integrating Online Networking (feat. NAIA)

This series is about writing a Bevy project. If you aren't familiar with this series, start here to read about the background. To summarize, we have a small Bevy project called Crabber which we could consider a rough draft of a Frogger clone.

This post adds online networking to Crabber. Since there are a number of subproblems involved in writing netcode, this post focuses on a lot of nuances involved in integrating networking code into an existing game repository. Ultimately, I want to improve the implementation here in many ways, but in this post, we will first get our code to a "working" state.

To accomplish this, I use a networking library called NAIA to implement the messaging protocol and handle a lot of the required synchronization behavior between client and server. NAIA provides an extremely convenient set of APIs, so in my experience, it feels generally quite easy to use. I want to specify this ahead of time, because although I do run into several challenges throughout this post, the overall result is powerful, and those challenges are more often a result of the overall complexity of the problem we're taking on.

The workbook repository contains two commits for this post, tagged part-3a and part-3b. Feel free to explore these commits to see for yourself how the code behaves.

With that, let's get started.

Multiplayer Crabber

Before we worry about adding online networking, let's first try to consider what it takes to enable local multiplayer.

Turns out, we don't have to do much. First we'll make sure there's a controller that Player 2 can use:

// src/inputs.rs
#[derive(Bundle)]
pub struct ArrowKeysControllerBundle {
    input_manager: InputManagerBundle<Action>,
}

impl ArrowKeysControllerBundle {
    pub fn new() -> Self {
        ArrowKeysControllerBundle {
            input_manager: InputManagerBundle::<Action> {       
                action_state: ActionState::default(),
                input_map: InputMap::new([
                    (KeyCode::Up, Action::Up),
                    (KeyCode::Left, Action::Left),
                    (KeyCode::Down, Action::Down),
                    (KeyCode::Right, Action::Right),
                    (KeyCode::Escape, Action::Menu),
                ])
                .build(),
            },
        }
    }
}

With that, we can copy our full-game test and add logic to spawn a second player:

// e2e/local-multi.rs
use bevy::{
    prelude::{Commands, IntoSystemAppConfig, OnEnter, Res},
    sprite::{SpriteSheetBundle, TextureAtlasSprite},
};

use common_e2e::Test;

use crabber::{
    components::{Crab, StepMotor, Transform},
    constants::PLAYER_Z,
    resources::SpriteSheetAssets,
    AppState, ArrowKeysControllerBundle, CoreGameLoopPlugin, GraphicsPlugin as CrabGraphicsPlugin,
    InputPlugin, LevelPlugin, TileRow, WASDControllerBundle,
};

fn spawn_players(mut commands: Commands, spritesheets: Res<SpriteSheetAssets>) {
    commands.spawn((
        Crab,
        SpriteSheetBundle {
            texture_atlas: spritesheets.crab.clone(),
            sprite: TextureAtlasSprite::new(0),
            transform: Transform::from_xyz(0., f32::from(TileRow(0)), PLAYER_Z),
            ..Default::default()
        },
        WASDControllerBundle::new(),
        StepMotor::new(),
    ));
    // !!! NEW CODE !!!
    commands.spawn((
        Crab,
        SpriteSheetBundle {
            texture_atlas: spritesheets.crab.clone(),
            sprite: TextureAtlasSprite::new(0),
            transform: Transform::from_xyz(0., f32::from(TileRow(0)), PLAYER_Z),
            ..Default::default()
        },
        ArrowKeysControllerBundle::new(),
        StepMotor::new(),
    ));
}

fn main() {
    Test {
        label: "Test local multiplayer".to_string(),
        setup: |app| {
            app.add_state::<AppState>()
                .add_plugin(InputPlugin)
                .add_plugin(CoreGameLoopPlugin)
                .add_plugin(LevelPlugin)
                .add_system(spawn_players.in_schedule(OnEnter(AppState::InGame)));
        },
        setup_graphics: |app| {
            app.add_plugin(CrabGraphicsPlugin);
        },
    }
    .run();
}

Since our tick logic was implemented in a way that iterated through the queried entities, all entities spawned with the right components receive the inputs for their attached "controllers". As long as each has the appropriate components, each inherits the appropriate behaviors.

Running the test and spamming some inputs shows that things work:


Both crabs move independently. I'm not showing it in the video, but please trust me that the controllers work as intended :). Also, it's a little more obvious at this point that we haven't programmed any sort of "game over" state, but I'm going to ignore that for now.

To see this code in action, check out the associated commit in the workbook repository.

So supporting offline multiplayer is straightforward. Unfortunately, online takes just a smidge more work.

Multiplayer Networking Concepts & Background

When considering "netcode" options for a game, there are a lot of strategies to choose from. Each aims to smooth over the fundamental problem of latency between clients: unfortunately, unlike our local example above, it takes time for computers to send messages to each other. This means that online, players can never be perfectly in sync in a real-time game. And the problem only gets worse if we consider the possibility for unreliable connection speeds, arbitrary disconnections, and all sorts of other Very Real and Unavoidable Properties of the Internet.

Different netcode strategies have very different approaches to smoothing over this synchronization problem. Typically, a solution would either define a source of truth for a given game's state, such as an authoritative server or a host client, or some conflict resolution strategy in a peer-to-peer system. In both cases, it is useful for clients to have some amount of client-side prediction as well.

I will try to provide background on networking related topics whenever I can appropriately introduce them, but if you're totally unfamiliar with game networking, a good place to start may be this Gaffer On Games post. There's a lot more in that blog on the topic, and elsewhere on the internet, because there's a lot of approaches and a lot of challenging subproblems.

For this game, we will adopt a server-authoritative strategy. To be clear, based on the actual gameplay in Crabber, this isn't the appropriate choice at all: as it is now, there are no player-to-player interactions that we have even suggested. So, for example, it would be fine to decide that our clients play in "parallel" and only see delayed information about other players. However, from a game design perspective, this is pretty limiting. The goal of this project isn't necessarily to build Crabber, but to show how one could build something larger out of it! So, I figure we can justify the choice from a game design perspective later. Pinky-promise.

Server-authoritative State Synchronization

Consider the nature of each tick for clients and the server. A tick refers to a full cycle of the game loop, which can be thought of as some point in the game's timeline. Since there is some latency between client and server, in our server-oriented strategy, we make it so that clients process inputs for ticks farther in the "future" of the game state than what the server would be processing at the same real-time moment. This allows the server to guarantee a convenient fact: by the time it needs to process some tick T, it would already have received a message from each client carrying the inputs to be executed at tick T. Therefore, the server does not need to wait for inputs and can avoid any sort of conflict resolution.

On the client-side, however, since what is shown to the player is a simulation of the future game state, we will need to do some conflict resolution. Without information from other clients, this simulation is necessarily incorrect. And when clients do receive updates from the server, even those updates refer to past state. For good gameplay experience, it's critical to provide users the most accurate representation of state possible, so we need to hide this problem somehow.

What are our strategies here? We don't want to, but for thoroughness's sake, we could ignore the latency problem, let the client be the authority of its own state, and expect delayed information about one's peers. Alternatively, we could perform interpolation between the update messages and the simulated state each tick so that eventual consistency is always guaranteed, and accept some time-local inconsistencies.

Another, more heavy-handed alternative would be to have clients rollback the "clock" when receiving updates, and then play-forward from the "confirmed" tick to the current client tick. Additionally, it often makes sense to minimize the number of times we need to rollback, since, under healthy network conditions, we should expect to receive many update messages every tick. One famous peer-to-peer rollback implementation, GGPO (docs), popularized a prediction strategy that assumed that all predicted player inputs are the same as the latest confirmed input. In this way, we prevent performing unnecessary rollbacks. For example, 60 frames per second is quite fast. Player actions typically persist through many frames.

Using an ECS means we have convenient strategies available that help integrate these techniques. In particular, we can maintain a separate "source" and "prediction" copy of each entity to track the various. The source entity is, in a sense, in the "past" relative to the prediction entity, and receives updates from the server. The prediction entity, on the other hand, is predicted-forward with each client tick, so that player inputs and the like can be performed in real-time.

Networking with NAIA

NAIA is (from the README) "a networking architecture for interactive applications". NAIA can be used on its own or with another game engine: right now, the repository includes a variety of demos that show how to set it up with Bevy, hecs, and Macroquad. For Bevy and hecs, NAIA offers "adapter" crates that provide plugins which integrate the NAIA World into (in our case) Bevy's World. We haven't talked about deployment much yet, but NAIA even supports building for native and for WebAssembly, meaning it's easy to create builds for web too.

For us, NAIA handles all the complexity of synchronizing clients and the server: tick-rates, "replicated" components, and more. We will use its traits and its Bevy plugins to break our current Crabber crate into distinct crates for shared code and for the client and server binaries.

To use NAIA, we need to first define a protocol that encapsulates the set of types that can be synced across the network and the messages that can be sent between client and server. As part of defining the protocol, we can choose how messages should be transported, allowing developers to optimize message-passing strategies for individual use-cases. (We haven't touched on reliable-vs-unreliable message passing yet, but if you're unfamiliar, there is a nice Gaffer On Games post about TCP vs UDP in game development which provides some insights on why these types of strategic choices can be critical for performance.)

NAIA's Bevy Demo

Before continuing, it may be helpful to take a close look at NAIA's Bevy demo. There are some significant differences in how we will approach certain specific problems, but the codebase I wrote that led to this series was originally built off this demo, albeit a much earlier version.

One useful note is where the different logical pieces of the game logic are located in the code. For example, graphics-related logic and application logic reside strictly in the client crate. Some behavior needs to be shared: both the client and server need some ability to process game ticks and other logic. There is also relevant network-related logic, about connections, authentication, and more.

Unlike this series, that demo uses a client-authoritative approach, which is worth comparing. This is a relatively new feature that I haven't yet worked with at all (this project was originally forked from an earlier version of that demo, before this was supported). Some familiarity with the demo isn't required but do provide useful context as we dive into the next sections.

In a similar vein, this post will not show how to build this for the web, but I aim to cover that in a future post. I recommend looking at the Makefile.toml for now, for those who are curious (and see cargo-make if you are unfamiliar). Bevy also recently announced WebGPU support which is extremely cool but I have not tried working with it yet.

The Protocol crate

First, we need a the network protocol object that can be used to construct either of NAIA's server- and client-side adapter Plugins. In particular, we need to build a naia_bevy_shared::Protocol that registers all the necessary types. In place of our single crabber crate, we will build the 3 required crates, which I name protocol, server, and app. We can move our old code somewhere else, in the meanwhile, but either way our directories will ultimately look something like the following:

- crabber/
 |- app/
   |- src/
   |- Cargo.toml
 |- protocol/
   |- src/
   |- Cargo.toml
 |- server
   |- src/
   |- Cargo.toml
- lib
 |- common-e2e
- Cargo.toml

We will also have to update the workspace Cargo.toml (the one at the root) in order to recognize our new crates. We can do that one at a time as we build our new crates.

Components

Starting with the (shared) protocol crate, we can update the workspace manifest accordingly:

[workspace]
resolver = "2"
members = [
    "crabber/protocol", # we will change this to crabber/* later
    "lib/*", # the * is a glob matching every immediate subdirectory
]

Next, let's fix up crabber/protocol/Cargo.toml and add the NAIA packages that we need for each:

# protocol
[package]
name = "crabber_protocol"
version = "0.1.0"
authors = ["Sean Sullivan <me@snen.dev>"]
workspace = "../.."
edition = "2021"
license = "MIT OR Apache-2.0"
publish = false

[dependencies]
bevy_ecs = { version = "0.10", default-features=false }
bevy_math = { version = "0.10", default-features=false }
bevy_transform = { version = "0.10", default-features=false }
naia-bevy-shared = { version = "0.20" }
rand = { version = "0.8" }

We don't need all of Bevy, so we're now only importing the specific crates from Bevy that we will use.

Next, we copy the components.rs and constants.rs files from the old crabber crate into crabber/protocol/src. In fact, let's move components.rs to components/mod.rs, since we will add a components/level.rs later to bring in the Level component. Now we can add a lib.rs for our crate:

// crabber/protocol/src/lib.rs
pub mod components;
pub mod constants;

Nothing in constants.rs needs to be changed. However, we need to update our Bevy components so that they can be serialized and synced with NAIA by implementing the naia_bevy_shared::Replicate trait on them. NAIA provides a derive macro for this, which expects that any properties are decorated with the naia_bevy_shared::Property type. The Replicate trait requires that its properties are Property so that it can track mutations on these components and send network messages as-needed when properties are mutated. When we have custom-defined properties, such as Direction, we can derive the naia_bevy_shared::Serde trait on them to enable the serialization and deserialization needed for replication.

We can see how this works with the ConstantMotor component:

// crabber/protocol/src/components/mod.rs
use bevy_ecs::prelude::Component;
use bevy_math::prelude::Vec3;

use naia_bevy_shared::{Property, Replicate, Serde};

#[derive(Clone, Copy, PartialEq, Serde)]
//                               ^^^^^
pub enum Direction {
    Up,
    Right,
    Down,
    Left,
}

#[derive(Component, Replicate)]
pub struct ConstantMotor {
    pub speed: Property<f32>,
    //         ^^^^^^^^
    pub direction: Property<Direction>,
    //             ^^^^^^^^
}

impl ConstantMotor {
    pub fn drive_offscreen(&self, transform: &mut Transform) -> bool {
        // derefencing the `Property` type returns the underlying type
        transform.translation += *self.direction.to_vec() * *self.speed;
        //                       ^                          ^
        is_offscreen(transform, *self.direction)
        //                      ^
    }

    pub fn drive_and_loop(&self, transform: &mut Transform) {
        if self.drive_offscreen(transform) {
            transform.translation += get_offset_for_loop(*self.direction);
            //                                           ^
        }
    }
}

We need to construct Property values to build our types now, so Replicate exposes a new_complete constructor. It's often useful to define a new method which wraps it:

// impl ConstantMotor { ...
    pub fn new(speed: f32, direction: Direction) -> Self {
        Self::new_complete(speed, direction)
    }

In the above case, new doesn't do anything differently from new_complete, but some of our components will prefer to initialize with specific arguments. That's the case for StepMotor:

// impl StepMotor { ...
    pub fn new() -> Self {
        Self::new_complete(None)
    }

A particularly attentive reader may have noticed that I skipped Transform here, which was used for positioning in the last post. Specifically, it's used in ConstantMotor methods, but since this component is exported by Bevy, so we can't implement external traits on it due to the orphan rule. We could try to make a newtype over Transform, but we would also have to implement Serde on Transform in order to create a Property<Transform>, which then fails for the same reason. So, ultimately, we need to define our own struct that enables replication.

For our purposes, since we really only need to represent a 2D position and orientation, we can define a much simpler struct than Transform, which also means smaller messages to send over the network.

#[derive(Component, Replicate)]
pub struct Position {
    // represents transform.translation.x
    pub x: Property<f32>,
    // represents transform.translation.y
    pub y: Property<f32>,
    // represents rotation around the z axis to control
    // the direction the entity is "facing"
    pub direction: Property<Direction>,
}

impl Position {
    pub fn new(x: f32, y: f32, direction: Direction) -> Self {
        Position::new_complete(x, y, direction)
    }
}

Later on, we'll need a system to synchronize the Position and Transform of each entity so that the sprites move on the camera as the position updates. To support that, it might be nice to implement From<Position> for Transform and From<Transform> for Position. This would be particularly appropriate if our game operated in 3D, but in our case, we will want to pass values for z and Direction when converting between these types, and we will only use Transform on the client-side. So instead we will define these later, when we define the client-side crate.

Next, we can replace any references to Transform in ConstantMotor-related code with our Position struct:

fn is_offscreen(position: &Position, direction: Direction) -> bool {
    let max_abs = match direction {
        Direction::Left | Direction::Right => MAX_X_F32,
        Direction::Up | Direction::Down => MAX_Y_F32,
    };
    let current_value = match direction {
        Direction::Left | Direction::Right => *position.x,
        Direction::Up | Direction::Down => *position.y,
    };
    let motion_vector = match direction {
        Direction::Left | Direction::Right => direction.to_vec().x,
        Direction::Up | Direction::Down => direction.to_vec().y,
    };
    current_value.abs() > max_abs
        && current_value.is_sign_positive() == motion_vector.is_sign_positive()
}

impl ConstantMotor {
    // ...

    pub fn drive_offscreen(&self, position: &mut Position) -> bool {
        let delta = self.direction.to_vec() * *self.speed;
        *position.x += delta.x;
        *position.y += delta.y;
        is_offscreen(position, *self.direction)
    }

    pub fn drive_and_loop(&self, position: &mut Position) {
        if self.drive_offscreen(position) {
            let delta = get_offset_for_loop(*self.direction);
            *position.x += delta.x;
            *position.y += delta.y;
        }
    }
}

StepMotor is more complicated because we need to define how to update the position based on the direction of movement. We can add some convenient methods on Position, and use them in StepMotor:

impl Position {
    // ...
    pub fn move_direction(&mut self, delta: f32, direction: Direction) {
        match direction {
            Direction::Up => {
                *self.y += delta;
            }
            Direction::Down => {
                *self.y -= delta;
            }
            Direction::Right => {
                *self.x += delta;
            }
            Direction::Left => {
                *self.x -= delta;
            }
        }
    }

    pub fn move_forward(&mut self, delta: f32) {
        self.move_direction(delta, *self.direction);
    }
}

impl StepMotor {
    // ...
    pub fn start(&mut self, position: &mut Position, direction: Direction) {
        *position.direction = direction;
        *transform = transform.looking_to(Vec3::Z, direction.to_vec());
        *self.step = Some(0);
    }

    pub fn drive(&mut self, position: &mut Position) {
        if self.is_running() {
            position.move_forward(STEP_SPEED);
            *self.step = self
                .step
                .map(|step| step + 1)
                .filter(|&step| step < MOTION_STEPS);
        }
    }
}

One other note: Our Score component is currently implemented as a tuple struct. NAIA supports this, but the developer diagnostics are a little nicer when it's written as a struct, so I've changed that as well.

#[derive(Component, Replicate)]
pub struct Score {
    pub value: Property<u16>,
}

I don't want to bloat this post with the code for every single component, but this is the gist of it. For the rest, feel free to check the workbook repository.

Next, we'll bring in the relevant code for Level.

The Level

Previously, the Level component handled two somewhat distinct purposes: we used it to build a randomly-generated vector of "rows", and we used it to spawn the tilemap that visualizes these rows. The first influences game logic, but the second is only graphics. Since we're focused on a crate is shared with the server, we don't want to include the graphics-related code. That means we need to pull out the logic from create_level_and_spawn_entities in our previous implementation which builds the Level's vector of rows from the logic that spawns the cars and renders the tilemaps, and make adjustments of that sort.

I don't want to copy all the code from our previous level.rs here and go through it all in detail because it's quite a lot. The methods on Level defined below contain condensed logic from create_level_and_spawn_entities and build_random_row. The code for LevelRow, TileRow, and TileCol are copied over intact. Everything else was deleted. I want to avoid dwelling too long on the code, so here's a big block:

// crabber/protocol/src/components/level.rs
#[derive(Component, Replicate)]
pub struct Level {
    pub rows: Property<Vec<LevelRow>>,
}

impl Level {
    // this builds a random vector of rows
    pub fn new_random() -> Self {
        let mut rows = Vec::new();
        // The level should start with grass
        let mut level_row_kind = LevelRow::Grass;
        rows.push(level_row_kind);
        // Then we should go up to the N-1 row from there
        for _ in 1..(LEVEL_HEIGHT_I16 - 1) {
            level_row_kind = level_row_kind.get_random_next();
            rows.push(level_row_kind);
        }
        // Finally, add a finish line.
        rows.push(LevelRow::Finish);
        Level::new_complete(rows)
    }

    // this spawns cars and rafts for a level
    pub fn spawn_level_entities(&self, commands: &mut Commands) {
        // Then we should go up to the N-1 row from there
        for (row_index, row_kind) in self.rows.iter().enumerate() {
            if LevelRow::Road == *row_kind {
                for (position, motor) in build_random_motors(row_index as i16).into_iter() {
                    commands.spawn((Car, position, motor));
                }
            }
            if LevelRow::River == *row_kind {
                for (position, motor) in build_random_motors(row_index as i16).into_iter() {
                    commands.spawn((Raft, position, motor));
                }
            }
        }
    }

    // a useful resource for determining the row kind
    pub fn is_row_of_kind(&self, row: TileRow, target: LevelRow) -> bool {
        self.rows
            .get(row.0 as usize)
            .and_then(|row_kind| if *row_kind == target { Some(()) } else { None })
            .is_some()
    }
}

Nice. Let's export the level from mod.rs, and that should do the trick.

// .../components/mod.rs
mod level;
pub use level::{Level, LevelRow};

Input Messages

When we handle inputs, we're going to send them as messages over the network. To do that using NAIA, we'll define a types that implement the Message and Channel trait.

First, let's create a file called messages.rs and add two types: an InputAction type that defines the set of relevant inputs and an InputMessage type that associates it with an entity. The latter will derive NAIA's Message trait, and in order to add our InputAction as a property, we will also derive NAIA's Serde trait on InputAction, which enables serialization and deserialization.

// .../messages.rs
use naia_bevy_shared::{EntityProperty, Message, Serde};

#[derive(Clone, Copy, Debug, PartialEq, Serde)]
pub enum InputAction {
    Up,
    Down,
    Left,
    Right,
}

#[derive(Message)]
pub struct InputMessage {
    pub entity: EntityProperty,
    pub action: Option<InputAction>,
}

impl InputMessage {
    pub fn new(action: Option<InputAction>) -> Self {
        InputMessage {
            entity: EntityProperty::new_empty(),
            action,
        }
    }
}

To send this message, we define a Channel, which we do in a new file named channels.rs:

// .../channels.rs
use naia_bevy_shared::{
    Channel, ChannelDirection, ChannelMode, Protocol, TickBufferSettings,
};

// We will use this channel to send inputs to the server
#[derive(Channel)]
pub struct PlayerInputChannel;
impl PlayerInputChannel {
    pub fn add_to_protocol(protocol: &mut Protocol) {
        protocol.add_channel::<PlayerInputChannel>(
            // the channel allows messages to be sent from client to server
            ChannelDirection::ClientToServer,
            // the channel is "tick-buffered",
            // meaning it is syncronized with NAIA's tick event loop
            ChannelMode::TickBuffered(TickBufferSettings::default()),
        );
    }
}

Take note of the ChannelDirection and the ChannelMode: the data travels from client to server, so we use the ClientToServer variant; we want to process inputs each tick, so we use a tick-buffered channel, which has a number of useful properties related to syncing ticks between server and client, but I won't go into the deeper details here. Later on, we can use NAIA's Client or Server struct to access the messages for the current tick and read each event from that queue.

Building the Protocol

All that's left now is to build the "protocol plugin" that we can use to instantiate NAIA's network protocol. To do that, we need to build a NAIA Protocol object for registering our types. NAIA supports defining ProtocolPlugins which I find very useful: later, we could compartmentalize and compose other types of network logic in convenient ways.

Let's define a CrabberProtocolPlugin and then export a protocol builder function that uses it appropriately:

// .../lib.rs
use std::time::Duration;
use naia_bevy_shared::{LinkConditionerConfig, Protocol, ProtocolPlugin};

pub mod channels;
pub mod components;
pub mod constants;
pub mod messages;

struct CrabberProtocolPlugin;

impl ProtocolPlugin for CrabberProtocolPlugin {
    fn build(&self, protocol: &mut Protocol) {
        // register the channels
        channels::PlayerInputChannel::add_to_protocol(protocol);

        protocol
            // register the message types
            .add_message::<messages::InputMessage>()
            // register the component types
            .add_component::<components::Crab>()
            .add_component::<components::Car>()
            .add_component::<components::Raft>()
            .add_component::<components::Knockout>()
            .add_component::<components::ConstantMotor>()
            .add_component::<components::StepMotor>()
            .add_component::<components::Position>()
            .add_component::<components::Level>()
            .add_component::<components::Score>();
    }
}

pub fn protocol() -> Protocol {
    Protocol::builder()
        .tick_interval(Duration::from_millis(16))
        .link_condition(LinkConditionerConfig::good_condition())
        .add_plugin(CrabberProtocolPlugin)
        .build()
}

Nice! We can call protocol() whenever we want to build one of NAIA's protocol plugins.

Getting Ready for the Server & App Crates

Now that we have implemented Replicate on our components and have defined our protocol, we can build the client and server crates that run our game.

First, I have to acknowledge, for those following along in code: we're going to leave these crates broken for a while. This is a big refactor. As we go, I won't detail every tiny change to lib.rs, and I may miss some details. But we'll build our way back to cargo build soon enough.

Now that we've accepted fate, let's define the Cargo.tomls for the app and server crates.

# crabber/app/Cargo.toml
[package]
name = "crabber_app"
version = "0.1.0"
authors = ["Sean Sullivan <me@snen.dev>"]
workspace = "../.."
edition = "2021"
license = "MIT OR Apache-2.0"
publish = false

[dependencies]
crabber_protocol = { path = "../protocol" }
bevy = "0.10.0"
bevy_asset_loader = { version = "0.15.0", features = [ "2d" ] }
bevy_ecs_tilemap = { version = "0.10", features = [ "atlas" ] }
leafwing-input-manager = "0.9"
naia-bevy-client = { version = "0.20", features = ["transport_webrtc"]  }
rand = "0.8"

# crabber/server/Cargo.toml
[package]
name = "crabber_server"
version = "0.1.0"
authors = ["Sean Sullivan <me@snen.dev>"]
workspace = "../.."
edition = "2021"
license = "MIT OR Apache-2.0"
publish = false

[dependencies]
crabber_protocol = { path = "../protocol" }
naia-bevy-server = { version = "0.20", features = ["transport_webrtc"]  }
bevy_app = { version = "0.10", default-features=false }
bevy_core = { version = "0.10", default-features=false }
bevy_ecs = { version = "0.10", default-features=false }
bevy_log = { version = "0.10", default-features=false }

There are also a few files remaining from our old crabber code that I haven't said anything about, so let's find a convenient home in our new crates for those modules before we continue.

I'm tempted to mention at this moment that we could split these crates up even further. It is helpful to prefer small crates . For example, I could attempt to refactor separate crates for each of CoreGameLoopPlugin, GraphicsPlugin, and InputsPlugin, since none of these rely on each other, but all rely on the protocol, and each could be useful for creating different flavors of test environments for tests in any crabber crate. This is a very nice approach, but doing so would introduce somewhat too many things to explain at once. So we will come back to that idea in the next post.

The remaining crabber code: Shared Behavior & Tests

We have four modules to consider:

  1. graphics.rs,
  2. the rest of level.rs (also graphics-related)
  3. inputs.rs, and
  4. tick.rs

...and all of our tests in crabber/e2e.

Of these, the first three are client-only, whereas tick.rs contains code that both client and server will share. So we can maintain the first three in crabber_app and tick.rs in crabber_protocol.

Since our end-to-end tests rely on graphics systems, we can put those in the app crate in some app/e2e, the same way as before. We won't quite be able to just grab our top-level plugin to run tests, however, since the code used in the app binary will soon run over the network and connect to a server. Of course, it would be wise to build network-integration tests as well, but I would prefer the tests written thus far to operate unchanged. So I will aim to keep them unchanged.

From here, we can build out the systems that support NAIA's networking strategy, and the files we just moved will be integrated and refactored naturally as we continue.

Crabber Client and Server Interactions

Since the server is the authority on game state, we should describe that first. For our purposes, the server will have some initialization system, and from there we need to consider only a few overall kinds of events from clients:

  1. Socket events (connection/disconnection) or errors
  2. Authentication events
  3. Input events from clients (which will be processed in a tick-buffered queue)

The server also will send some messages to the client:

  1. Spawn/despawn events for called with the naia_bevy_server::CommandsExt::enable_replication
  2. Component insert/update messages for such entities when the component is registered with the protocol
  3. Some player-related messages, such as letting a user know which player they are (which we will skip for now, but enjoy the foreshadowing)

NAIA also determines which users to send messages to, using two mechanisms: entities and users can join "rooms" and users typically have some "scope".

On the client-side, a typical user would authenticate and attempt to connect with the server after some explicit input action from some menu. This isn't a post about GUIs nor authentication strategies, so even though that is an important facet of building games, we will skip enforcing any sort of authorization on the server for now.

Once connected, the client can attach client-only behavior like graphics and such to the appropriate entities. For now, we will implement basically no client prediction, but later we can implement a rollback strategy.

Implementing the Server crate

To start, I will copy borrow some code from the NAIA Bevy demo to build a first draft of crabber/server/src/main.rs.

// crabber/server/src/main.rs
use std::time::Duration;

use bevy_app::{App, ScheduleRunnerPlugin, ScheduleRunnerSettings};
use bevy_core::{FrameCountPlugin, TaskPoolPlugin, TypeRegistrationPlugin};
use bevy_log::{info, LogPlugin};

use naia_bevy_server::{Plugin as ServerPlugin, ServerConfig};

use crabber_protocol::protocol;

fn main() {
    info!("Starting up Crabber server...");

    App::default()
        // Some base Bevy plugins
        .add_plugin(TaskPoolPlugin::default())
        .add_plugin(TypeRegistrationPlugin::default())
        .add_plugin(FrameCountPlugin::default())
        .insert_resource(ScheduleRunnerSettings::run_loop(Duration::from_millis(3)))
        .add_plugin(ScheduleRunnerPlugin::default())
        .add_plugin(LogPlugin::default())
        // NAIA's server plugin
        .add_plugin(ServerPlugin::new(
            // settings to monitor ping, jitter, manage connection details, etc.
            ServerConfig {
                // we don't have any form of auth yet
                require_auth: false,
                ..Default::default(),
            },
            // our protocol!
            protocol(),
        ))
        // Run App
        .run();
}

With naia_bevy_server::Plugin attached, we can now write systems that use the naia_bevy_server::Server type, which implements SystemParam, and use it to execute server commands.

We'll use this Server type to instantiate the network socket and listen on it. In other systems, we will use it to perform a number of other server operations.

Handling Connections

Let's start by writing some init.rs and a system to initialize the server instance.

use bevy_log::info;

use naia_bevy_server::{transport::webrtc, Server};

pub fn init(mut server: Server) {
    info!("Starting Crabber server");

    let server_addresses = webrtc::ServerAddrs::new(
        "127.0.0.1:14191" // TODO: replace this with an env variable or something
            .parse()
            .expect("could not parse Signaling address/port"),
        // IP Address to listen on for UDP WebRTC data channels
        "127.0.0.1:14192" // TODO: replace this with an env variable or something
            .parse()
            .expect("could not parse WebRTC data address/port"),
        // The public WebRTC IP address to advertise
        "http://127.0.0.1:14192", // TODO: replace this with an env variable or something
    );
    let socket = webrtc::Socket::new(&server_addresses, server.socket_config());
    // Initialize the server
    server.listen(socket);
}

Once the server is running, we need to respond to connection events from clients. Let's define some connection.rs, which will grab a room key from the server and spawn an entity to represent the player in that room.

// .../connection.rs
pub fn connect_events(
    mut commands: Commands,
    mut server: Server,
    mut event_reader: EventReader<ConnectEvent>,
) {
    for ConnectEvent(user_key) in event_reader.iter() {
        // get the server's first key if there is one
        // TODO: check if the room is "full"
        let room_key = server
            .room_keys()
            .into_iter()
            // get the first RoomKey
            .next()
            // if there are no RoomKeys, spawn one
            .unwrap_or_else(|| server.make_room().key());

        // add user to the room
        let address = server
            .user_mut(user_key)
            .enter_room(&room_key)
            .address();

        // write a note to the log
        info!("Client connected from: {}", address);

        // spawn new entity for the player
        let entity = commands
            .spawn_empty()
            // call this to register the entity for replication with NAIA
            .enable_replication(&mut server)
            .insert(todo!("Add components here")) // N.B. the todo
            .id();

        server.room_mut(&room_key).add_entity(&entity);
    }
}

Similarly, it is useful to add some systems to handle disconnection events:

// .../connection.rs
pub fn disconnect_events(
    mut commands: Commands,
    mut server: Server,
    mut global: ResMut<Global>,
    mut event_reader: EventReader<DisconnectEvent>,
) {
    for DisconnectEvent(user_key, user) in event_reader.iter() {
        info!("Crabber Server disconnected from: {:?}", user.address);
        
        // TODO Remove entities that are associated with user_key
        let entity = todo!();
        server
            .room_mut(todo!()) // also need to figure out which remove they are in
            .remove_entity(&entity);
        commands.entity(entity).despawn();
        // later on, this code also might want to set fields that help identify reconnecting users, etc.
    }
}

pub fn error_events(mut event_reader: EventReader<ErrorEvent>) {
    for ErrorEvent(error) in event_reader.iter() {
        info!("Crabber Server Error: {:?}", error);
    }
}

As my comment in the middle shows, we haven't yet implemented a way to differentiate which entity is associated with which user, nor to recall which room a user is in. To do that, we can add Resources that maintains mappings we can use for these purposes.

// .../resources.rs
// this is similar to the naia-bevy-server-demo "Global" resource
#[derive(Resource, Default)]
pub struct UserEntities {
    user_to_entity_map: HashMap<UserKey, Entity>,
    entity_to_user_map: HashMap<Entity, UserKey>,
}

// Some methods that operate on the inner HashMaps for ease-of-use
impl UserEntities {
    fn get_by_user(&self, &UserKey) -> Option<&Entity> { /* ... */ }
    fn get_by_entity(&self, &Entity) -> Option<&UserKey> { /* ... */ }
    fn insert(&mut self, UserKey, Entity) { /* ... */ }
    fn remove(&mut self, &UserKey) -> Option<&Entity> { /* ... */ }
}

fn main() {
    App::default()
        // initialize the resource using its `Default` implementation
        .init_resource::<UserEntities>()
}

Now we can add users to this map once we spawn their player entity, and handle deletion accordingly:

// .../connection.rs
pub fn connect_events(
    /* ... */
    mut user_entities: ResMut<UserEntities>,
) {
    for ConnectEvent(user_key) in event_reader.iter() {
        /* ... */
        user_entities.insert(*user_key, entity);
    }
}

pub fn disconnect_events(
    /* ... */
    mut user_entities: ResMut<UserEntities>,
) {
    for DisconnectEvent(user_key, user) in event_reader.iter() {
        if let Some(entity) = user_entities.remove(user_key) {
            server
                .room_mut(todo!()) // still TODO
                .remove_entity(&entity);
            commands.entity(entity).despawn();
        }
    }
}

To remove the user entity from any rooms it has subscribed to, we can iterate through the UserRef::room_keys method accessible via user's user_key and make sure that the player's entity no longer belongs to any of them. (In practice, each player/entity will only belong to one room, but others could choose otherwise.)

// .../connection.rs
pub fn disconnect_events(/* ... */) {
    for DisconnectEvent(user_key, user) in event_reader.iter() {
        if let Some(entity) = user_entities.remove(user_key) {
            let room_keys = {
                let user_ref = server.user(user_key);
                // collecting the iterator so that we don't maintain a reference to server
                user_ref.room_keys().map(|key| *key).collect::<Vec<_>>()
            };
            for room_key in room_keys.into_iter() {
                server.room_mut(&room_key).remove_entity(&entity);
            }
            commands.entity(entity).despawn();
        }
    }
}

Finally, we do need a way to spawn the components necessary to track player entities. These are Crab, StepMotor, Position, and Score. Since we haven't yet, it would be useful to define a Bundle that encapsulates this. (I unintentionally showed the value of this by defining this struct incorrectly three separate times during writing. Which is impressive since there are only four components included.)

Let's add a new file to protocol, bundles.rs, and define a CrabBundle:

// crabber/protocol/src/bundles.rs
use bevy_ecs::prelude::Bundle;

use crate::components::{Crab, Direction, Position, StepMotor, Score};

#[derive(Bundle)]
pub struct CrabBundle {
    crab: Crab,
    motor: StepMotor,
    position: Position,
    score: Score,
}

impl CrabBundle {
    pub fn new() -> Self {
        CrabBundle {
            crab: Crab,
            motor: StepMotor::new(),
            position: Position::new(0., f32::from(TileRow(0)), Direction::Up),
            score: Score::new()
        }
    }
}

Again, since this will be used server-side, this should not include any SpriteSheetBundle or anything like that. With this, we can update our connect_events to spawn the player entity:

// crabber/server/src/connection.rs
pub fn connect_events(
    mut commands: Commands,
    mut server: Server,
    mut user_entities: ResMut<UserEntities>,
    mut event_reader: EventReader<ConnectEvent>,
) {
    for ConnectEvent(user_key) in event_reader.iter() {
        // ...
        let entity = commands
            .spawn_empty()
            .enable_replication(&mut server)
            .insert(CrabBundle::new())
            // new: ^^^^^^^^^^^^^^^^^
            .id();
        // ...
    }
}

With these systems defined, let's add them to our App. Since many of them perform client operations, it's important to attach these systems with a special Bevy tool known as a SystemSet. Bevy's 0.10 release docs are probably the best resource on this. In short, SystemSets can be used as powerful labeling and categorization mechanisms that help configure how Bevy runs the attached systems. In this case, we add the systems into the ReceiveEvents system set so that, for example, NAIA's client operations and mutations to Property are flushed at the right moments.

// .../main.rs
use bevy_ecs::schedule::IntoSystemConfigs;
use naia_bevy_server::ReceiveEvents;

mod connection;
mod init;

fn main() {
    App::default()
        // ... (skipping the base Bevy plugins and ServerPlugin)
        .init_resource::<UserEntities>()
        .add_startup_system(init::init)
        .add_systems(
            (
                connection::connect_events,
                connection::disconnect_events,
                connection::error_events,
            )
                // Add them to the SystemSet that NAIA uses for network-related systems
                .in_set(ReceiveEvents),
        )
        .run();
}

To recap, the server can now handle connecting and disconnecting clients. In the process, we add all clients to the same Room and spawn CrabBundles for each player. There are two remaining parts of the server-side logic to implement: first, the code that handles ticks, and second, the code to initialize a game instance with a Level and actually start running the game for each player.

The second of these is quicker, so let's do that first.

Spawning the Level

To determine when to instantiate a Level, we can add some more code to our connect_events system. Let's do the easy thing and say that the game is active once two players load, and then treat everyone else as a spectator (by adding them to a room but ignoring them otherwise).

Technically, once this level spawns, systems relating to it will start acting on the game state. It won't wait for players at all. In fact, there is currently no server-side system code that stops the first player from executing inputs while waiting for a second player. Later on, we can add checks for this, and we can perform a "ready" handshake (using Message channels) to each client to determine when to activate the game state. Then we could track the "State" of each room in a Resource or Component.

For now, let's just spawn the level as soon as the second player joins, and the game instance management code can be added in additional systems later on.

pub fn connect_events(/* ... */) {
    for ConnectEvent(user_key) in event_reader.iter() {
        // ...

        let num_players = players_query.into_iter().count();

        // spawn a level if we are about to spawn the second player
        if num_players == 1 {
            let level = Level::new_random();
            // level.spawn_level_entities(&mut commands);
            // TODO! ^^^ how to `enable_replication`?
            let entity = commands
                .spawn_empty()
                .enable_replication(&mut server)
                .insert(level)
                .id(); // spawn our level!
            server.room_mut(&room_key).add_entity(&entity);
        }

        // only spawn player entities for the first two players
        if num_players < 2 {
            let entity = commands
                .spawn_empty()
                .enable_replication(&mut server)
                .insert(CrabBundle::new())
                .id();
            server.room_mut(&room_key).add_entity(&entity);
            // ...
        }
    }
}

As hinted in the code comments, we need to support a way to call enable_replication on the EntityBuilder when we call spawn_level_entities! Let's make this easy by editing spawn_level_entities into a new function that returns some vectors with the bundles to spawn:

// crabber/protocol/src/components/level.rs
// impl Level
    pub fn create_level_bundles(
        &self,
    ) -> (
        Vec<(Car, Position, ConstantMotor)>,
        Vec<(Raft, Position, ConstantMotor)>,
    ) {
        let mut car_bundles = Vec::new();
        let mut raft_bundles = Vec::new();
        // Then we should go up to the N-1 row from there
        for (row_index, row_kind) in self.rows.iter().enumerate() {
            if LevelRow::Road == *row_kind {
                for (position, motor) in build_random_motors(row_index as i16).into_iter() {
                    car_bundles.push((Car, position, motor));
                }
            }
            if LevelRow::River == *row_kind {
                for (position, motor) in build_random_motors(row_index as i16).into_iter() {
                    raft_bundles.push((Raft, position, motor));
                }
            }
        }
        (car_bundles, raft_bundles)
    }

Not the prettiest code, and we are initializing vectors where we otherwise didn't have to, but it does the trick:

// crabber/server/src/connection.rs
// pub fn connect_events(...) { ...
//  let room_key = ...;
    if num_players == 1 {
        let level = Level::new_random();
        let (car_bundles, raft_bundles) = level.create_level_bundles();
        // for every entity we want to create:
        // 1. spawn the bundle
        // 2. enable replication
        // 3. the entity to room_key's room
        for bundle in car_bundles.into_iter() {
            let entity = commands.spawn(bundle).enable_replication(&mut server).id();
            server.room_mut(&room_key).add_entity(&entity);
        }
        for bundle in raft_bundles.into_iter() {
            let entity = commands.spawn(bundle).enable_replication(&mut server).id();
            server.room_mut(&room_key).add_entity(&entity);
        }
        let entity = commands
            .spawn_empty()
            .enable_replication(&mut server)
            .insert(level)
            .id();
        server.room_mut(&room_key).add_entity(&entity);
    }

Now the Level is spawned and replicated across the network, and so are all the Car and Raft entities.

Ticks and Inputs

Finally, to process ticks, we have to react to TickEvents, which we use to track server ticks. Here is the example given from NAIA's demo, marked up with my own comments:

// naia-bevy-demo-server/src/events.rs
pub fn tick_events(
    mut server: Server,
    mut position_query: Query<&mut Position>,
    mut tick_reader: EventReader<TickEvent>,
) {
    let mut has_ticked = false;

    for TickEvent(server_tick) in tick_reader.iter() {
        has_ticked = true;

        // All game logic happens here!
        // Meaning all the logic in our "CoreGameLoopPlugin" systems should be moved into here
        // (Or into other systems that read from the EventReader<TickEvent>)

        // This lets us read from the InputMessageChannel for the current tick
        let mut messages = server.receive_tick_buffer_messages(server_tick);
        for (_user_key, command) in messages.read::<PlayerInputChannel, InputMessage>() {
            let Some(entity) = &command.entity.get(&server) else {
                continue;
            };
            let Ok(mut position) = position_query.get_mut(*entity) else {
                continue;
            };
            shared_behavior::process_command(&command, &mut position);
            // ^^^ we don't have this! but we can define it
        }
    }

    // this block is determines which users should be sent information about which entities
    if has_ticked {
        for (_, user_key, entity) in server.scope_checks() {
            server.user_scope(&user_key).include(&entity);
        }
    }
}

We can add add this to the App too:

// crabber/server/src/main.rs

mod tick;

fn main() {
    App::default()
        // ...
        .add_systems(
            (
                connection::connect_events,
                connection::disconnect_events,
                connection::error_events,
                tick::tick_events, // <-- new!
            )
                .in_set(ReceiveEvents),
        )
        .run();
}

Remember when I talked about how there will be a moment when we need to change how tick logic is run? This is that moment. This moment also reveals a small lie that I told before, which is that we wouldn't need the logic from the original (offline) crate's crabber/src/inputs.rs. Our equivalent to the register_inputs code, in the original inputs.rs, looks like this:

fn register_inputs(
    mut player_query: Query<(&mut Position, &mut StepMotor, &PlayerActionState), Without<Knockout>>,
) {
    for (mut position, mut motor, action) in player_query.iter_mut() {
        let direction = if action.pressed(Action::Up) {
            Some(Direction::Up)
        } else if action.pressed(Action::Down) {
            Some(Direction::Down)
        } else if action.pressed(Action::Left) {
            Some(Direction::Left)
        } else if action.pressed(Action::Right) {
            Some(Direction::Right)
        } else {
            None
        };
        if let Some(direction) = direction && !motor.is_running() {
            motor.start(&mut position, direction);
        }
    }
}

But on the server-side we won't be adding any leafwing_input_manager::PlayerActionState components: only clients need controllers. The server will instead accept InputMessages based on the tick-buffered event queue, as seen in tick_events above. We can split out a utility function into the protocol crate in a new protocol/src/inputs.rs:

// crabber/protocol/src/inputs.rs
pub fn process_input_messages(
    // Each player entity and the associated input action for this tick
    inputs: Vec<(Entity, Option<InputAction>)>,
    // we will need a mutable reference to this query since it will be called in a loop
    player_query: &mut Query<(&mut Position, &mut StepMotor), Without<Knockout>>,
) {
    for (entity, action) in inputs.into_iter() {
        let direction = action.map(|action| match action {
            InputAction::Up => Direction::Up,
            InputAction::Down => Direction::Down,
            InputAction::Left => Direction::Left,
            InputAction::Right => Direction::Right,
        });
        if let Some(direction) = direction {
            if let Ok((mut position, mut motor)) = player_query.get_mut(entity) {
                if !motor.is_running() {
                    motor.start(&mut position, direction);
                }
            }
        }
    }
}

Now we can call this function with all the input actions received during each tick:

// crabber/server/src/tick.rs
use crabber_protocol::inputs;

pub fn tick_events(
    mut server: Server,
    // new query type
    mut player_query: Query<(&mut Position, &mut StepMotor), Without<Knockout>>,
    mut tick_reader: EventReader<TickEvent>,
) {
    for TickEvent(server_tick) in tick_reader.iter() {
        let mut messages = server.receive_tick_buffer_messages(server_tick);
        let mut player_actions = Vec::new();
        for (_user_key, input) in messages.read::<PlayerInputChannel, InputMessage>() {
            if let Some(entity) = command.entity.get(&server) {
                player_actions.push((entity, input.action))
            };
        }
        inputs::process_input_messages(player_actions, &mut player_query);
        // TODO: other tick systems
    }
}

And then we can add our other tick systems in the same way. We can start by bringing in tick_constant_motors, tick_step_motors, and tick_score. These can be updated similarly, by writing a utility that accepts a reference to the required query, that way we can call them in the above TickEvent loop:

fn tick_constant_motors(motor_query: &mut Query<(&mut Position, &ConstantMotor)>) {}

fn tick_step_motors(motor_query: &mut Query<(&mut Position, &mut StepMotor), Without<Knockout>>) {}

fn tick_score(player_query: &mut Query<(&mut Score, &Position), Without<Knockout>>) {}

All good, right? Unfortunately, no. The problem here is that do call these from the same system, we would have to introduce each of these systems as separate arguments in tick_events. But tick_score's player_query argument and tick_step_motor's motor_query argument actually refer to overlapping sets of entities, meaning some entities appear in both queries! This is a big problem: if it were allowed, users could break mutability rules by mutably borrowing the same entity's components from separate queries.

Bevy will catch this when registering the system if we try, though. The following code:

pub fn tick_events(
    mut server: Server,
    mut player_query: Query<(&mut Position, &mut StepMotor), Without<Knockout>>,
    mut score_query: Query<(&mut Score, &Position), Without<Knockout>>,
    mut tick_reader: EventReader<TickEvent>,
) { /* ... */ }

...leads to the following error on startup:

2023-05-05T15:48:40.455622Z  INFO crabber_server::systems::init: Starting Crabber server
thread 'main' panicked at 'error[B0001]: Query<(&mut crabber_protocol::components::Score,
&crabber_protocol::components::Position), bevy_ecs::query::filter::Without<crabber_protocol::components::Knockout>>
in system  crabber_server::systems::tick::tick_events accesses component(s) crabber_protocol::components::Position
in a way that conflicts with a previous system parameter. Consider using `Without<T>` to create disjoint Queries
or merging conflicting Queries into a `ParamSet`.'

...which says something to the effect of, "either add these queries to a ParamSet or write non-conflicting queries." The queries are currently "conflicting" because both include the Position component, and neither includes any filter that guarantees exclusivity. By wrapping the conflicting queries in a ParamSet, we ensure that we only read from each query in a way that respects Rust's borrow checker.

To fix this, as the error states, we can change the argument into a ParamSet, or try to finagle our code into playing nicely some other way. We could also refactor the code into smaller helpers and one large system, but I think that in a larger game this would quickly become a restriction, so I want to see what strategies are available to avoid that.

The ParamSet option will do the trick for the time being. Essentially, the ParamSet owns a set of queries which might reference intersecting sets of entities. Most of the sub-queries will be used for one specific subsystem each. After a few minor adjustments, the following code will do the trick:

pub fn tick_events(
    mut server: Server,
    mut query_set: ParamSet<(
        Query<(&mut Position, &mut StepMotor), Without<Knockout>>,
        Query<(&mut Position, &ConstantMotor)>,
        Query<(&mut Score, &Position), Without<Knockout>>,
        Query<(Entity, &mut Position, &StepMotor), (With<Crab>, Without<Knockout>)>,
    )>,
    mut tick_reader: EventReader<TickEvent>,
) {
    for TickEvent(server_tick) in tick_reader.iter() {
        let mut messages = server.receive_tick_buffer_messages(server_tick);
        let mut player_actions = Vec::new();
        for (_user_key, input) in messages.read::<PlayerInputChannel, InputMessage>() {
            if let Some(entity) = input.entity.get(&server) {
                player_actions.push((entity, input.action))
            };
        }
        inputs::process_inputs(player_actions, &mut query_set.p0());
        tick::tick_step_motors(&mut query_set.p0());
        tick::tick_constant_motors(&mut query_set.p1());
        tick::tick_score(&mut query_set.p2());
    }
    // ...
}

Some readers may be wondering whether it is possible to split this into a few different systems that each read from the EventReader<TickEvent>. This would work just fine for any logic we could execute in parallel. However, it's important here (and, probably, in most cases) that all of our tick steps are processed before any system from the next tick runs. Such an approach could fail if the server ever needed to process two ticks in the same frame.

Anyway, we still need to add tick_road_collisions and tick_river_collisions, which have multiple query arguments. Any conflicting ones must be wrapped in a ParamSet or must include additional query filters. Admittedly, this can get a little out of hand, especially since this function signature bloat extends to the system helper functions.

After a series of these changes -- primarily adding With<Crab> and Without<Crab> to the appropriate queries, and creating helper functions that accept references to each query and the Commands struct -- it works again:

pub fn tick_events(
    mut commands: Commands,
    mut server: Server,
    mut player_query_set: ParamSet<(
        Query<(&mut Position, &mut StepMotor), (With<Crab>, Without<Knockout>)>,
        Query<(&mut Score, &Position), (With<Crab>, Without<Knockout>)>,
        Query<(Entity, &mut Position, &StepMotor), (With<Crab>, Without<Knockout>)>,
    )>,
    mut objects_query_set: ParamSet<(
        Query<(&mut Position, &ConstantMotor), Without<Crab>>,
        Query<&Position, (With<Car>, Without<Raft>, Without<Crab>)>,
        Query<(&Position, &ConstantMotor), (With<Raft>, Without<Car>, Without<Crab>)>,
    )>,
    level_query: Query<&Level>,
    mut tick_reader: EventReader<TickEvent>,
) {
    for TickEvent(server_tick) in tick_reader.iter() {
        // ...
        inputs::process_inputs(player_actions, &mut player_query_set.p0());
        tick::tick_step_motors(&mut player_query_set.p0());
        tick::tick_constant_motors(&mut objects_query_set.p0());
        tick::tick_score(&mut player_query_set.p1());
        tick::tick_road_collisions(
            &mut commands,
            &level_query,
            &player_query_set.p2().to_readonly(),
            &objects_query_set.p1(),
        );
        tick::tick_river_collisions(
            &mut commands,
            &level_query,
            &mut player_query_set.p2(),
            &objects_query_set.p2(),
        );
    }
    // ...
}

As I said, it's a bit cumbersome. In a future post, we can use Bevy Schedules to help get around this problem.

Anyway, for the time being, the server compiles and starts up without error, so let's continue.

Before we do that, I want to mention some other server-side NAIA event types that we aren't using. This most importantly includes AuthEvents (to authenticate connecting users). I am ignoring this for now since we don't have a "real" way to authenticate users without introducing a number of other problems. Also relevant are SpawnEntityEvent, DespawnEntityEvent, RemoveComponentEvents, InsertComponentEvents and UpdateComponentEvents, which all help track updates to client-authoritative entities. Since we aren't using that feature, we will not use these server-side, but we will use the equivalent client-side event types.

With that out of the way, let's move on to the client app.

Implementing the App crate

At this point, we should be able to rely on the server to instantiate the game state and replicate it to each connected client. Since I haven't been able to run the game end-to-end for a while, I want to start by writing the systems that connect to the server. With NAIA's behavior integrated end-to-end into the application, we can verify as soon as possible that our entities and components are replicated to clients.

This section follows a bit like what we did for the server crate, with a few twists. First, we need to add NAIA's Plugin to our App, and add logic that will connect to our server. From there, we can reintegrate the graphics systems and synchronize entity Transforms (which determine location on camera) with their Positions. Finally, we will write similar systems to tick_events above, using TickEvent and UpdateComponentEvent to send input messages to the server, perform clientside predictions, and execute rollback.

Connection

Spawn a socket, just like the server's init function, and connect using it:

use naia_bevy_client::{
    transport::webrtc,
    Client,
};
use crate::AppState;

pub fn inititate_connection(mut client: Client) {
    // create a socket
    let socket = webrtc::Socket::new("http://127.0.0.1:14191", client.socket_config());
    // connect to it
    client.connect(socket);
}

We used to use the AppState enum for graphics, but we will be changing some of the semantics around that. Now, when assets are loaded, we will transition to the Connecting state, in which we will listen to connection events. Then, we can transition to the InGame state once connected to the server instance.

use bevy::prelude::{info, EventReader, NextState, ResMut};
use naia_bevy_client::events::ConnectEvent;

pub fn connection_events(
    mut event_reader: EventReader<ConnectEvent>,
    client: Client,
    mut state: ResMut<NextState<AppState>>,
) {
    for _event in event_reader.iter() {
        info!("Client connected to: {:?}", client.server_address());
        // enter the "InGame" state
        state.set(AppState::InGame);
    }
}

Next, I want to verify that when the server instantiates entities, they are correctly replicated to the client app. With no graphics or behavior systems on the client-side, we will only see the state represented server-side, with some delay. But that's fine.

Before we continue, let's also add events readers to detect when the connection drops or is rejected.

use naia_bevy_client::events::{DisconnectEvent, RejectEvent};

pub fn disconnection_events(
    client: Client,
    mut state: ResMut<NextState<AppState>>,
    mut event_reader: EventReader<DisconnectEvent>,
) {
    for _event in event_reader.into_iter() {
        info!("Client disconnected from: {:?}", client.server_address());
        // reset to loading state for now
        // eventually, we could use this to show an alert supporting user actions
        // or handle some sort of reconnection
        state.set(AppState::Loading);
    }
}

pub fn rejection_events(mut event_reader: EventReader<RejectEvent>) {
    for _ in event_reader.iter() {
        info!("Client rejected from connecting to Server");
    }
}

Since this build also will spawn a window, and there aren't any graphics being rendered yet, it will be easiest to add these systems using our common_e2e from before.

// crabber/app/src/lib.rs
use bevy::prelude::{App, IntoSystemConfigs, States};
use naia_bevy_client::{ClientConfig, Plugin as ClientPlugin, ReceiveEvents};
use crabber_protocol::protocol;

mod connection;

#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, States)]
pub enum AppState {
    #[default]
    Loading,
    InGame,
}

pub fn build(app: &mut App) -> &mut App {
    app.add_state::<AppState>()
        // add the client plugin
        .add_plugin(ClientPlugin::new(ClientConfig::default(), protocol()))
        // our init system
        .add_startup_system(connection::inititate_connection)
        // and the event systems
        .add_systems(
            (
                connection::connection_events,
                connection::disconnection_events,
                connection::rejection_events,
            )
                .in_set(ReceiveEvents),
        )
}

// crabber/app/e2e/full-client.rs
use common_e2e::Test;

use crabber_app::build;

fn main() {
    Test {
        label: "Test full client".to_string(),
        setup: |app| {
            build(app);
        },
        setup_graphics: |app| {},
    }
    .run();
}

Now we can run the server crate with cargo run -p crabber_server and simultaneously run the test with cargo test --test e2e-full-client.

The output of bevy_inspector_egui when running the client crate, which shows several entities displaying the components added above.

Come to think of it, I could have set this up with a test on the server-side too. At first I thought that it wouldn't make much a difference since we won't need to spawn a window for anything, but there would be value in being able to see e.g. the bevy_inspector_egui panel. We won't be able to do much with it now, but perhaps in the future we could spawn a test console and do server actions or other neat stuff.

But the image above does show that the server instantiates the objects we expect. So this should be enough for now. Let's reintegrate graphics next.

Graphics

For graphics code, the primary structs we want to consider are SpriteSheetBundle and Transform. Before, in the original crabber crate, we attached SpriteSheetBundles (or entire tilemaps) wherever the matching Crab, Car, Raft or Level component was spawned. Transform is also attached by SpriteSheetBundle, so we should probably figure out the relationship between Transform and our Position struct first.

Right now, the association is relatively straightforward. Position's x and y fields directly map to the Transform.translation field's x and y fields. Direction is equivalent to a rotation around the z-axis. Let's add some quick useful conversion functions.

// crabber/app/src/graphics.rs
fn direction_to_angle(direction: Direction) -> f32 {
    match direction {
        Direction::Up => 0.,
        Direction::Right => -std::f32::consts::FRAC_PI_2,
        Direction::Down => std::f32::consts::PI,
        Direction::Left => std::f32::consts::FRAC_PI_2,
    }
}

fn position_to_transform(position: &Position, z: f32) -> Transform {
    Transform::from_xyz(*self.x, *self.y, z)
        .with_rotation(Quat::from_rotation_z(self.direction.to_angle()))
}

With these defined, the system that will forward Position data to Transforms each tick is only a few lines:

// .../graphics.rs
fn sync_transforms(mut position_query: Query<(&Position, &mut Transform)>) {
    for (position, mut transform) in position_query.iter_mut() {
        // don't change any z's
        *transform = position_to_transform(position, transform.translation.z);
    }
}

That should do it. That should help spawn SpriteSheetBundles and the tilemaps as well.

Now that we're spawning the entities via the server, we need to attach these components after the entity has been replicated to the client. In essence, NAIA will handle spawning the entities as-needed for entities spawned with enable_replication. When it does spawn those entities, it will fire a SpawnEntityEvent for the entity and InsertComponentEvents for each attached component with the Replicate trait. So, if we want, we can use those SpawnEntityEvent and InsertComponentEvent events as entrypoints to attach sprites and level graphics:

pub fn insert_component_events(
    mut commands: Commands,
    mut event_reader: EventReader<InsertComponentEvents>,
    spritesheets: Res<SpriteSheetAssets>,
) {
    for events in event_reader.iter() {
        for entity in events.read::<Raft>() {
            info!("raft spawned");
            commands.entity(entity)
                .insert(SpriteSheetBundle { /* ... */ });
        }
        /* ... more types */
    }
}

This is a solid approach. As an alternative, we can also rely on Bevy's query filter type Added. When used with a Query, Added filters for entites for which the new component was inserted within the last one or two frames.

(Be extra careful with this if your system is attached under run conditions. If an instance of the component is attached in another system while the Added-querying system isn't running, it could "skip" those entities, since the query was not run during a tick where the Added filter would have applied.)

We were doing this before to support the graphics for Knockout:

// .../graphics.rs
fn handle_knockout(mut ko_query: Query<&mut TextureAtlasSprite, Added<Knockout>>) {
    for mut sprite in ko_query.iter_mut() {
        sprite.color = Color::rgba(1., 1., 1., 0.5);
        sprite.flip_y = true;
    }
}

But this approach can lead to some tricky behavior. In particular, it could suffer from a 1-frame delay because of the way that the Commands buffer works. The details are actively under development and were changed slightly with the Schedule v3 API, so I will be a little hand-wavey here rather than trying to be exactly correct.

In essence, Bevy chooses to buffer operations like spawning entities and attaching components; commands.spawn and such functions do not immediately execute the behavior, but instead queue that behavior to be executed later. These queues are flushed when the apply_system_buffers exclusive system is run. That system is already attached by Bevy in certain places by default, and you can schedule it yourself if needed. It is recommended to avoid doing so frequently however.

In the case we've described here, if the system buffers are not flushed between the system that inserted the component and the system making the Added query, a full tick could be executed before the Added query picks up the new component. To get around this, we can attach our graphics systems so that they run after the ReceiveEvents set. If we wanted to be certain about system buffers being flushed, we could add the apply_system_buffers system in between ReceiveEvents and our graphics-related systems. (It may already occur, and this is worth testing, but I'll save that for later.)

I'm not very concerned about 1-frame delays for our sprites, since the delay should be relatively invisible to the user, and graphics systems don't impact game state. Furthermore, it will be useful later on for our graphics code to work in a way that's relatively network-agnostic. So here, we will use normal Bevy queries to spawn graphics components.

In comparison, other use-cases are likely to require that such events are processed immediately during the same ReceiveEvents set tick. (This is especially true for core game logic or server-side events, such as when client-authoritative updates reach the server in NAIA's demo.) So, I would probably recommend not using this approach for things other than graphics.

Anyway, moving on: let's consider what it looks like to render our players' Crabs.

// .../graphics.rs
fn setup_crab_sprites(
    mut commands: Commands,
    added_crabs_query: Query<(Entity, &Position), Added<Crab>>,
    spritesheets: Res<SpriteSheetAssets>,
) {
    for (entity, position) in added_crabs_query.iter() {
        commands.entity(entity).insert(SpriteSheetBundle {
            texture_atlas: spritesheets.crab.clone(),
            sprite: TextureAtlasSprite::new(0),
            // Convert the `Position` to a `Transform`
            transform: position_to_transform(position, PLAYER_Z),
            ..Default::default()
        });
    }
}

The code to render Car sprites, for example, doesn't look much different from the previous level.rs function spawn_level_entities:

// .../graphics.rs
fn setup_car_sprites(
    mut commands: Commands,
    added_cars_query: Query<(Entity, &Position), Added<Car>>,
    spritesheets: Res<SpriteSheetAssets>,
) {
    let mut rng = rand::thread_rng();
    for (entity, position) in added_cars_query.iter() {
        let random_color_offset = rng.gen_range(0..=2);
        let start_sprite_index = if Direction::Left == *position.direction {
            3
        } else {
            0
        };
        commands.entity(entity).insert(SpriteSheetBundle {
            texture_atlas: spritesheets.car.clone(),
            sprite: TextureAtlasSprite::new(start_sprite_index + random_color_offset),
            transform: position_to_transform(position, LEVEL_Z),
            ..Default::default()
        });
    }
}

The code for Raft and adding the tilemap for Level works in the same way, so I will avoid posting it here. There is something slightly wrong about this code, leading (if we skip ahead a little) to a funny little bug:


The graphics code implemented in the original crabber crate used Transform rotation to orient the crab sprite, but for Car and Rafts we relied on setting specific directional sprites. Rather than editing our sprite behavior, I think it is reasonable to support both possibilities here, since readers may prefer either approach for a given situation. After a small change to position_to_transform, we can then use Option in query arguments as a convenient condition to use for determining rotation.

// .../graphics.rs
fn position_to_transform(position: &Position, z: f32, rotated: bool) -> Transform {
    if rotated {
        Transform::from_xyz(*position.x, *position.y, z).with_rotation(Quat::from_rotation_z(
            direction_to_angle(*position.direction),
        ))
    } else {
        Transform::from_xyz(*position.x, *position.y, z)
    }
}

fn sync_transforms(mut position_query: Query<(&Position, &mut Transform, Option<&Crab>)>) {
    for (position, mut transform, crab /*: Option<&Crab> */) in position_query.iter_mut() {
        *transform = position_to_transform(position, transform.translation.z, crab.is_some());
    }
}

For other cases where we call position_to_transform, we just set true or false directly.

Now we are ready to fix the GraphicsPlugin.

Right now, we have asset loading configured via Bevy States. The mechanism involves picking a "loading" state and a "continue" state. The game starts loading spritesheets into Resources once the loading state is reached, and once all assets have finished loading, it continues to the continue state. We also need some connecting state where we connect to the server, so let's update AppState to include the other possible states we can be in:

// .../lib.rs
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, States)]
pub enum AppState {
    #[default]
    Loading,      // loading assets
    Connecting,   // connecting to game
    InGame,       // in game actively
    Disconnected, // disconnected
}

Thinking further on this, we don't need to couple loading the assets and loading the server connection in the way that we do. In fact, it seems like for a larger-scale game with many assets, it could be valuable to define many small batches that can load as-needed. But it isn't very interesting to do this here besides maybe to save a bit of loading time.

Anyway, the plugin looks like the following:

// .../graphics.rs
// these traits enable `.run_if` and `.after` below
use bevy::prelude::{IntoSystemConfig, IntoSystemConfigs};

use bevy_ecs_tilemap::prelude::TilemapPlugin;

pub struct GraphicsPlugin;

impl Plugin for GraphicsPlugin {
    fn build(&self, app: &mut App) {
        app
        .add_plugin(TilemapPlugin) // make sure to keep bevy_ecs_tilemap's plugin
        // add the asset loading code
        .add_loading_state(
            LoadingState::new(AppState::Loading).continue_to_state(AppState::Connecting),
        )
        .add_collection_to_loading_state::<_, SpriteSheetAssets>(AppState::Loading)
        // now our systems
        .add_startup_system(camera)
        .add_systems(
            (
                // this code adds the run condition to each system, but could use distributed_run_if instead
                handle_knockout.run_if(in_state(AppState::InGame)),
                setup_crab_sprites.run_if(in_state(AppState::InGame)),
                setup_car_sprites.run_if(in_state(AppState::InGame)),
                setup_raft_sprites.run_if(in_state(AppState::InGame)),
                setup_level_tilemap.run_if(in_state(AppState::InGame)),
                animate_sprites.run_if(in_state(AppState::InGame)),
                sync_transforms.run_if(in_state(AppState::InGame)),
            )
                .after(ReceiveEvents),
        );
    }
}

Now we can confirm that the level behaves reasonably well even when relying on updates from the server:


Nice! Up next, we can start getting the game loop integrated.

Controllers

Let's take a quick moment to migrate the player controller code. The goal is to map from player controls to game actions, and consume those game actions each client tick. We can move this code into a new controller.rs file.

// .../controller.rs
pub enum Action { /* ... */ }
pub type PlayerActionState = // ...

#[derive(Bundle)]
pub struct WASDControllerBundle { /* ... */ }
impl WASDControllerBundle { /* ... */ }

#[derive(Bundle)]
pub struct ArrowKeysControllerBundle { /* ... */ }
impl ArrowKeysControllerBundle { /* ... */ }

fn get_action(action: &PlayerActionState) -> Option<InputAction> {
    if action.pressed(Action::Up) {
        Some(InputAction::Up)
    } else if action.pressed(Action::Down) {
        Some(InputAction::Down)
    } else if action.pressed(Action::Left) {
        Some(InputAction::Left)
    } else if action.pressed(Action::Right) {
        Some(InputAction::Right)
    } else {
        None
    }
}

Since client ticks will not necessarily occur every execution of Bevy's main schedule, it's useful to buffer the player's most recent inputs into a collection of queued inputs. We can write a system that populates a HashMap and later drain from that HashMap to iterate through entity actions.

#[derive(Default, Resource)]
pub struct QueuedInputs(pub HashMap<Entity, InputAction>);

fn register_inputs(
    mut player_query: Query<(Entity, &PlayerActionState), Without<Knockout>>,
    mut player_inputs: ResMut<QueuedInputs>,
) {
    for (entity, action_state) in player_query.iter_mut() {
        if let Some(input_action) = get_action(action_state) {
            player_inputs.0.insert(entity, input_action);
        }
    }
}

Then we can write a simple Plugin that attaches requirements and adds our input system.

impl Plugin for ControllerPlugin {
    fn build(&self, app: &mut App) {
        app.add_plugin(InputManagerPlugin::<Action>::default())
            .init_resource::<QueuedInputs>()
            .add_system(
                register_inputs
                    .in_base_set(CoreSet::PreUpdate)
                    .after(InputSystem),
            );
    }
}

So when we add ControllerPlugin, we know that any system run after CoreSet::PreUpdate (which includes everything in ReceiveEvents) can read from the QueuedInputs resource to pull the latest tick's inputs.

Next, we will rely on QueuedInputs to run prediction ticks and sending client inputs to the server.

Input Messages & Tick Prediction

With that in mind, we can start defining the system that handles tick events, where we can drain the queued inputs to both iterate and clear the map. What we're looking for should look something like the following:

// .../tick.rs
pub fn tick(
    mut client: Client,
    mut tick_reader: EventReader<ClientTickEvent>,
    mut player_query: Query<
        (Entity, &mut Position, &mut StepMotor),
        (With<Crab>, Without<Knockout>),
    >,
    mut player_inputs: ResMut<QueuedInputs>,
) {
    // each of these represents a client tick
    // the variable `client_tick` is a number representing the current tick count
    // we iterate through each predicted tick and execute game logic on predicted entities
    let did_tick = false;
    for ClientTickEvent(client_tick) in tick_reader.iter() {
        did_tick = true;
        // iterate through player inputs
        // if multiple ticks are processed in one frame, send the same inputs for both
        for (entity, action) in player_inputs.0.iter() {
            if let Ok((entity, mut position, mut motor)) = player_query.get_mut(*entity) {
                let mut input_message = InputMessage::new(Some(*action));
                input_message.entity.set(&client, &entity);
                // Send command to server
                client.send_tick_buffer_message::<PlayerInputChannel, InputMessage>(
                    client_tick,
                    &input_message,
                );
                // also execute them
                process_input(input_message.action, &mut position, &mut motor); // ***
            }
        }

        tick_players(...); // ***
        tick_level(...); // ***
    }
    // if we ticked, clear the map
    if did_tick {
        player_inputs.0.clear();
    }
}

But this isn't quite right. Earlier in this post, I wrote about some the strategy for prediction and rollback that will be used here, but let's take a moment to recontextualize that intent around the above code. The problem with that code is that it creates an ambiguity about about which "timeline" the entities are on: the client's or the server's. In doing so, we're blurring the lines between server state and client state.

As-is, the client doesn't spawn any of its own game objects; it only attaches clientside behavior as server events are received. NAIA handles updating client entities to match whatever server state updates are received. So, for any entity we spawn server-side, we could consider some specific tick, let's pretend it's number 50. We might receive an update message from the server for tick 50, and in the same frame, a ClientEvent for tick 62 might be processed.

For example, looking at the code above, we can examine the behavior if we remove the lines marked with the // *** comments (i.e., the calls to process_input, tick_players, and tick_level). In that case, inputs do work, but they are delayed by the network round-trip: When I press the "up" key, I only see feedback once the server has processed the tick and sent my client the update. However, with those lines marked // ***, the behavior would be pretty unpredictable: we might perform some actions on the entity, only for NAIA to overwrite those operations because of messages from the server. So there's always going to be some timeline mismatch if we want to process inputs on the server-spawned entities.

The appropriate decision, since we want two "copies" of each entity, is to spawn a duplicate entity. Each client-side "server entity" then could have a "prediction" counterpart with all the relevant shared components. The prediction entity is displayed to the client and performs tick behavior on client ticks, whereas the server entity is updated as the client receives messages confirming each server tick. Server entities represent the confirmed source of truth for that entity at the confirmed server tick (I will sometimes call them "source" entities), and we can use that to apply whatever sort of clientside smoothing effects we need to the "prediction" entities as data is received from the server.

Which specific kinds of game entities should duplicate a predicted entity? It makes sense, at a minimum, that the client predicts any local player entities, so that inputs can be processed and displayed to the user in real-time. Things players can interact with often may need some level of prediction too, depending on the situation. However, some level of inaccuracy can be acceptable too. Ultimately, in a complex enough game, it will be important to create a tailored overall strategy.

Crabber is somewhat simple. Since players don't interact in any way, other player's positions shouldn't significantly impact player decisionmaking. The behavior of cars and rafts, on the other hand, would appear inconsistent without clientside prediction: the player might observe themselves landing squarely on a raft, but due to the timeline mismatch, could still miss on the serverside. This would be a bad experience, so we should avoid it.

Let's focus for now on prediction for the level and the player's crab, so that the player is at least correctly visualizing their game state. We can start with the player.

There are a number of ways to set up this type of entity relationship. My approach relies on two components, PredictionOf and SourceOf, which will be used to query for either prediction entities or source (server) entities as-needed. They are defined as follows:

// .../components.rs

// e.g. getting the server entity from a predicted entity
// fn system(query: Query<(Entity, &PredictionOf)>) {
//     for (prediction_entity, PredictionOf(source_entity)) in query.iter() { ... }
// }
#[derive(Component)]
pub struct PredictionOf(pub Entity);
// e.g. getting the predicted entity from a server entity
// fn system(query: Query<(Entity, &SourceOf)>) {
//     for (source_entity, SourceOf(prediction_entity)) in query.iter() { ... }
// }
#[derive(Component)]
pub struct SourceOf(pub Entity);

Now each client can use these components to distinguish between server and client entities and to retrieve the other entity when needed.

Let's focus on the player first. We don't have any way to identify the player yet, but what we want is some sort of event that we can listen on when the server spawns an entity associated with the local player.

// .../events.rs
use naia_bevy_client::CommandsExt;

fn receive_player_assignment_message(
    mut commands: Commands,
) {
    for /* TODO */ {
        let entity = /* TODO: get_entity(event); */
        // create the prediction entity
        let prediction_entity =
            commands.entity(entity)
                .duplicate() // this requires `CommandsExt`
                .insert(PredictionOf(entity))
                .id();
        // attach controls to the source entity
        commands.entity(entity).insert((
            SourceOf(prediction_entity),
            WASDControllerBundle::new(),
        ));
    }
}

To support this, we can need to add a new message type, so that the server can trigger an event on the clientside. Let's add a PlayerAssignment message and a corresponding channel:

// crabber/protocol/src/messages.rs
use naia_bevy_shared::{EntityProperty, Message};

#[derive(Message)]
pub struct PlayerAssignmentMessage {
    pub entity: EntityProperty,
}

impl PlayerAssignmentMessage {
    pub fn new() -> Self {
        PlayerAssignmentMessage {
            entity: EntityProperty::new_empty(),
        }
    }
}

// crabber/protocol/src/channels.rs
use naia_bevy_shared::{
    Channel, ChannelDirection, ChannelMode, Protocol, ReliableSettings,
};

// This one will be used to tell clients which player entity is theirs
#[derive(Channel)]
pub struct PlayerAssignmentChannel;
impl PlayerAssignmentChannel {
    pub fn add_to_protocol(protocol: &mut Protocol) {
        protocol.add_channel::<PlayerAssignmentChannel>(
            // the channel sends server -> client
            ChannelDirection::ServerToClient,
            // the channel sends an unordered reliable message
            // we need to ensure that the player receives the message,
            // but the order and tick do not matter, and it is OK
            // to spend additional time to ensure reliability
            ChannelMode::UnorderedReliable(ReliableSettings::default()),
        );
    }
}

(Don't forget to add it to the Protocol!)

When the server spawns player entities, it can send these messages to the appropriate client:

// crabber/server/src/connection.rs
pub fn connect_events(/* ... */) {
    // ...
    let entity = commands
        .spawn_empty()
        .enable_replication(&mut server)
        .insert(CrabBundle::new())
        .id();

    server.room_mut(&room_key).add_entity(&entity);
    user_entities.insert(*user_key, entity);

    // new: assign the player when we spawn its entity
    let mut assignment_message = PlayerAssignmentMessage::new();
    assignment_message.entity.set(&server, &entity);
    server.send_message::<PlayerAssignmentChannel, PlayerAssignmentMessage>(
        user_key,
        &assignment_message,
    );
}

Then we can read from these events on the client-side:

// crabber/app/src/events.rs
use naia_bevy_client::CommandsExt;

pub fn receive_entity_assignment_message(
    mut event_reader: EventReader<MessageEvents>,
    mut commands: Commands,
    client: Client,
) {
    for event in event_reader.iter() {
        for assignment in event.read::<PlayerAssignmentChannel, PlayerAssignmentMessage>() {
            let entity = assignment.entity.get(&client).unwrap();
            let prediction_entity = commands
                .entity(entity)
                .duplicate() // create a new entity and copy all `Replicate`
                .insert(PredictionOf(entity)) // this is the prediction entity
                .id();

            commands
                .entity(entity)
                .insert((
                    SourceOf(prediction_entity),
                    WASDControllerBundle::new(),
                ));
        }
    }
}

Next we need to recontextualize some of the code we've already written. We no longer want to render graphics for the player's source entity, but instead its prediction entity.

It would also be nice to distinguish the player's crab from other crabs. An Option in the sprite-spawning system's query lets us detect whether an entity is a prediction or source entity, which enables this type of conditional logic.

// .../graphics.rs
fn setup_crab_sprites(
    mut commands: Commands,
    added_crabs_query: Query<(Entity, &Position, Option<&SourceOf>), Added<Crab>>,
    //                                           ^^^^^^^^^^^^^^^^^
    spritesheets: Res<SpriteSheetAssets>,
) {
    for (entity, position, is_source) in added_crabs_query.iter() {
        let mut sprite = TextureAtlasSprite::new(0);
        // add a tint for non-player crabs
        if is_source.is_some() {
            sprite.color = Color::rgba(1., 1., 1., 0.9);
        }
        commands.entity(entity).insert(SpriteSheetBundle {
            texture_atlas: spritesheets.crab.clone(),
            sprite,
            transform: position_to_transform(position, PLAYER_Z, true),
            ..Default::default()
        });
    }
}

Furthermore, in what we currently have for tick.rs, the input messages are associated with the server's spawned entity, not our prediction entity. However, the tick behavior will operate on the prediction entity, not the server entity. So we need queries for both in this system:

// .../tick.rs
pub fn tick(
    mut client: Client,
    mut tick_reader: EventReader<ClientTickEvent>,
    source_query: Query<&SourceOf>,
    mut prediction_query: Query<
        (&mut Position, &mut StepMotor),
        (With<PredictionOf>, With<Crab>, Without<Knockout>),
    >,
    mut player_inputs: ResMut<QueuedInputs>,
) {
    // ...
    for ClientTickEvent(client_tick) in tick_reader.iter() {
        for (entity, action) in player_inputs.0.iter() {
            let Ok(SourceOf(prediction)) = source_query.get(*entity) else { continue };
            if let Ok((mut position, mut motor)) = player_query.get_mut(*prediction) {
                // send inputs to source entity, process inputs with prediction components
            }
            // process the rest of the tick with prediction components
        }
    }
    // ...
}

Similarly, when we spawn Cars and Rafts, we can duplicate them and attach the source and prediction components.

// .../events.rs
use naia_bevy_client::events::InsertComponentEvents;

pub fn receive_insert_component_events(
    mut commands: Commands,
    mut event_reader: EventReader<InsertComponentEvents>,
) {
    for event in event_reader.iter() {
        for entity in event.read::<Raft>() {
            let prediction_entity = commands
                .entity(entity)
                .duplicate()
                .insert(PredictionOf(entity))
                .id();

            commands.entity(entity)
                .insert(SourceOf(prediction_entity));
        }
        // ... same thing but for `Car`
    }
}

Then we can restrict the graphics code to only render the prediction rafts and cars:

// .../graphics.rs
fn setup_car_sprites(
    // ...
    added_cars_query: Query<(Entity, &Position), (Added<Car>, With<PredictionOf>)>,
    //                                                        ^^^^^^^^^^^^^^^^^^
) {
    // ...
}
fn setup_raft_sprites(
    // ...
    added_rafts_query: Query<(Entity, &Position), (Added<Raft>, With<PredictionOf>)>,
    //                                                          ^^^^^^^^^^^^^^^^^^
) {
    // ...
}

...and now the cars and rafts we render is what is run on client tick, whereas their source entity are updated by the server. Arguably, we could merge a couple of these systems, but they will become useful later, and I am not concerned about 1-frame delay for these sprites.

One last touch, since Knockout is attached on the server, we need to detect when that occurs and also attach to the prediction component.

Finally, we can use the techniques from the server-side to finish implementing tick. The following wall of code shows a first attempt. It is mostly identical to its server-side counterpart, with minor differences in the ParamSets such as the addition of With<PredictionOf> to each query's filter parameters, and the fact that inputs are sent not received. (It also removes tick_score since we can wait until the server confirms that.)

// .../tick.rs
pub fn tick(
    mut client: Client,
    mut commands: Commands,
    mut tick_reader: EventReader<ClientTickEvent>,
    level_query: &Query<&Level>,
    source_query: Query<&SourceOf>,
    mut player_query_set: ParamSet<(
        Query<
            (Entity, &mut Position, &mut StepMotor),
            (With<PredictionOf>, With<Crab>, Without<Knockout>),
        >,
        Query<(&mut Position, &mut StepMotor), (With<PredictionOf>, With<Crab>, Without<Knockout>)>,
        Query<
            (Entity, &mut Position, &StepMotor),
            (With<PredictionOf>, With<Crab>, Without<Knockout>),
        >,
    )>,
    mut objects_query_set: ParamSet<(
        Query<(&mut Position, &ConstantMotor), (With<PredictionOf>, Without<Crab>)>,
        Query<&Position, (With<PredictionOf>, With<Car>, Without<Raft>, Without<Crab>)>,
        Query<
            (&Position, &ConstantMotor),
            (With<PredictionOf>, With<Raft>, Without<Car>, Without<Crab>),
        >,
    )>,
    mut player_inputs: ResMut<QueuedInputs>,
) {
    let mut did_tick = false;
    for ClientTickEvent(client_tick) in tick_reader.iter() {
        did_tick = true;
        for (entity, action) in player_inputs.0.iter() {
            let Ok(SourceOf(prediction)) = source_query.get(*entity) else { continue };

            if let Ok((_, mut position, mut motor)) = player_query_set.p0().get_mut(*prediction) {
                let mut input_message = InputMessage::new(Some(*action));
                input_message.entity.set(&client, &entity);
                // Send command to server
                client.send_tick_buffer_message::<PlayerInputChannel, InputMessage>(
                    client_tick,
                    &input_message,
                );
                process_input(input_message.action, &mut position, &mut motor);
            }
        }
        tick::tick_step_motors(&mut player_query_set.p1());
        tick::tick_constant_motors(&mut objects_query_set.p0());
        tick::tick_road_collisions(
            &mut commands,
            &level_query,
            &player_query_set.p2().to_readonly(),
            &objects_query_set.p1(),
        );
        tick::tick_river_collisions(
            &mut commands,
            &level_query,
            &mut player_query_set.p2(),
            &objects_query_set.p2(),
        );
    }
    if did_tick {
        player_inputs.0.clear();
    }
}

However, it does not compile. Since we added the extra filter parameter, With<PredictionOf>, to many of our system helpers, the arguments no longer match those function signatures.

To make this work, we could do a few things. One option, which I used for a while, was to make the tick generic over a ReadOnlyWorldQuery type. This lets the client enforce the stricter type than the server. An example might look like:

// crabber/protocol/src/tick.rs
pub fn tick_step_motors<Filter: ReadOnlyWorldQuery>(
    motor_query: &mut Query<
        (&mut Position, &mut StepMotor),
        (With<Crab>, Without<Knockout>, Filter),
    >,
) { /* ... */ }

If we don't want to do anything special, such as on the server, the empty struct () implements ReadOnlyWorldQuery, opting out of any additional behavior. On the client, however, this enables us to configure systems that target predicted (or source) components specifically.

This layer of complexity can be a little kludgy, though, since it adds a significant amount of complexity and boilerplate down-the-line. However, this tool can be pretty useful when used in limited circumstances.

Instead, I'm going to solve this by adding an additional marker component that defines when an entity should be "controlled" by the core game logic.

// crabber/protocol/src/components/controlled.rs
#[derive(Component)] // n.b.: do not want to `Replicate` this
struct Controlled;

With this, input, graphics, and tick systems can query for With<Controlled> to instead of With<PredictionOf>.

// crabber/protocol/src/tick.rs
fn tick_constant_motors_system(
    mut motor_query: Query<(&mut Position, &ConstantMotor), (Without<Crab>, With<Controlled>)>,
    //                                                                      ^^^^^^^^^^^^^^^^
) { /* ... */ }

// etc

Now, wherever we currently add PredictionOf, we can add Controlled. Some examples in the server and client:

// crabber/server/src/connection.rs
fn connect_event(/* ... */) {
    // ...
    if should_spawn_player {
        let entity = commands
            .spawn_empty()
            .enable_replication(&mut server)
            .insert((CrabBundle::new(), Controlled))
            .id();
    }
}

// crabber/app/src/graphics.rs
fn setup_raft_sprites(
    mut commands: Commands,
    added_rafts_query: Query<(Entity, &Position), (Added<Raft>, With<Controlled>)>,
    spritesheets: Res<SpriteSheetAssets>,
) { /* ... */ }

// crabber/app/src/events.rs
fn handle_whichever_event(/* ... */) {
    for event in event.read() {
        let source_entity = /* ... */;
        let prediction_entity = commands
            .entity(source_entity)
            .duplicate()
            .insert((PredictionOf(source_entity), Controlled))
            .id();
        commands.entity(source_entity).insert(SourceOf(prediction_entity));
    }
}

As this shows, marker component types are very useful for enabling opt-in behavior across abstraction boundaries. Once this is added in the appropriate places, the tick systems are working and doing so only on the prediction entities. I have chosen not to show every instance of the above code, but feel free to look at the repository for more specific information.

Rollback

At this point, however, the server is no longer updating the positions of the entities we display on-screen.

Right now, this could be somewhat acceptable, since no predicted entities can be influenced by other players over the network, nor does the server add any logic that isn't predicted client-side. However, that won't always be true, so I want to show how it works by adding rollback to the player entity.

To start, we need to make sure we can playback the user's commands since the latest confirmed tick. To do that, we can add a Resource that contains an instance of NAIA's CommandHistory struct, which is a wrapper over a VecDeque<Tick, T>. Since we want to allow capturing inputs from multiple users on a client (for local play), each tick will hold a vector of all captured inputs:

// crabber/app/src/resources.rs
use naia_bevy_client::CommandHistory;
use crabber_protocol::messages::InputAction;

#[derive(Resource, Default)]
pub struct TickHistory(pub CommandHistory<Vec<(Entity, InputAction)>>);

Then in tick, when we process inputs, we store associated the tick in this history:

// .../tick.rs
pub fn tick(
    // ...
    mut tick_history: ResMut<TickHistory>,
) {
    // ...
    for ClientTickEvent(client_tick) in tick_reader.iter() {
        // ...
        // Store the tick in tick history for rollback
        let tick_record = player_inputs
            .0
            .iter()
            .map(|(entity, action)| (*entity, action.clone()))
            .collect::<Vec<_>>();
        tick_history.0.insert(*client_tick, tick_record);
        // ...
    }
}

Now, when we receive a server update that qualifies for rollback, we can read from tick_history. Right now we'll do this on every UpdateComponentEvent with reckless abandon:

// .../events.rs
use naia_bevy_client::{events::UpdateComponentEvents, sequence_greater_than};

pub fn receive_update_component_events(
    mut commands: Commands,
    mut event_reader: EventReader<UpdateComponentEvents>,
    mut tick_history: ResMut<TickHistory>,
) {
    // We only care about whatever the latest tick is
    // so we check the events for the latest tick count,
    // and use that to get the commands we need to replay
    let mut latest_tick: Option<Tick> = None;
    for events in event_reader.iter() {
        for (server_tick, _entity) in events.read::<Position>() {
            if let Some(last_tick) = latest_tick {
                if sequence_greater_than(server_tick, last_tick) {
                    latest_tick = Some(server_tick);
                }
            } else {
                latest_tick = Some(server_tick);
            }
        }
    }
    // once we know the latest-confirmed tick, execute rollback from there
    if let Some(latest_tick) = latest_tick {
        // Reset all expected entities to their source states
        todo!();
        let replay_ticks = tick_history.0.replays(&latest_tick);
        // For each tick we need to replay,
        // (reversing the iterator to play the ticks in the correct order,)
        for (tick, tick_actions) in replay_ticks.into_iter().rev() {
            // read all the recorded actions and apply them
            for (entity, action) in tick_actions.into_iter() {
                todo!();
            }
            // and then process ticks
            todo!();
        }
    }
}

Once we've identified the latest tick from the server, we know that we will execute a rollback, so we should reset all predicted components to match their source state before playing ticks forward.

NAIA's Replicate trait exposes a mirror method to make this convenient.

pub fn receive_update_component_events(
    // ...
    source_query: Query<
        (&Position, &StepMotor, &SourceOf),
        (With<Crab>, Without<Controlled>),
    >,
    mut player_query: Query<
        (&mut Position, &mut StepMotor),
        (With<Crab>, Without<Knockout>, With<Controlled>),
    >
    source_objects_query: Query<
        (&Position, &SourceOf),
        (Without<Crab>, Without<Controlled>),
    >,
    mut objects_query: Query<&mut Position, (Without<Crab>, With<Controlled>)>,
) {
    // ...
    // for each source player entity
    for (source_position, source_motor, SourceOf(prediction)) in source_query.iter() {
        // get the prediction components
        if let Ok((mut position, mut motor)) = player_query.get_mut(*prediction) {
            // and reset prediction state back to source state
            position.mirror(source_position);
            motor.mirror(source_motor);
        }
    }
    // same for each source object entity
    for (source_position, SourceOf(prediction)) in source_objects_query.iter() {
        if let Ok((mut position, _)) = objects_query_set.p0().get_mut(*prediction) {
            position.mirror(source_position);
        }
    }
}

Then the same approach from tick before lets us modify the above function to include all the tick functions that run the core game loop. (It's almost entirely copied over, so I skipped the ParamSet definitions, and player_query is absorbed into one of its variants.)

pub fn receive_update_component_events(
    mut commands: Commands,
    mut event_reader: EventReader<UpdateComponentEvents>,
    mut tick_history: ResMut<TickHistory>,
    source_query: Query</* ... */>,
    mut player_query_set: ParamSet</* ... */>,
    mut objects_query_set: ParamSet</* ... */>,
    level_query: Query<&Level>,
) {
    // ... (getting latest_tick)
    // ... (resetting state)
    for (_tick, tick_actions) in replay_ticks.into_iter() {
        for (entity, action) in tick_actions.into_iter() {
            if let Ok((_entity, mut position, mut motor)) =
                player_query_set.p0().get_mut(entity)
            {
                process_input(Some(action), &mut position, &mut motor);
            }
            tick::tick_step_motors(&mut player_query_set.p1());
            tick::tick_constant_motors(&mut objects_query_set.p0());
            tick::tick_road_collisions(
                &mut commands,
                &level_query,
                &player_query_set.p2().to_readonly(),
                &objects_query_set.p1(),
            );
            tick::tick_river_collisions(
                &mut commands,
                &level_query,
                &mut player_query_set.p2(),
                &objects_query_set.p2(),
            );
        }
    }
}

Conclusion

If we run the server binary and two e2e-full-client tests in parallel, we observe good behavior:


Which shows that things are working. Keep in mind that although it looks like it operates slower, the tests treat every frame as a tick, whereas NAIA systems operate on some variable tick rate, which slows it down. It does show a bit of jitter when executing inputs, but we'll iron that out in the next post. The goal here was to get this working, and it works!

There are still plenty of things that I'd love to improve from here. In particular, the next post will focus on improving ergonomics for our tick systems. After that, we can start to build the necessary infrastructure to scale the server, and integrate features like authentication.

Also, getting the tests working again is not particularly interesting and was somewhat cumbersome. Check out the repository to see the details, but I don't think it is worth detailing here. As a short promise, here's the e2e-local-multi test running:


With that all said, however, this post should sufficiently show how NAIA allows us to build online multiplayer for our game. Next time, we'll introduce more techniques that should make it easier to streamline development as we continue to add features to the game.