Allow clients to create new rooms to play

This commit is contained in:
2023-02-19 18:03:01 +01:00
parent 831a5003c3
commit 7c8330465f
6 changed files with 302 additions and 5 deletions

View File

@@ -1,17 +1,103 @@
mod player;
mod room;
use crate::room::{generate_room_name, Room, RoomHandle};
use anyhow::Result;
use futures::StreamExt;
use async_tungstenite::WebSocketStream;
use board_network::protocol::ClientMessage;
use futures::channel::mpsc::{unbounded, UnboundedSender};
use futures::future::join;
use futures::{future, SinkExt, StreamExt};
use smol::Async;
use std::collections::HashMap;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::{env, io};
use tungstenite::Message as WebsocketMessage;
async fn handle_connection(raw_stream: Async<TcpStream>, addr: SocketAddr) -> Result<()> {
type Rooms = Arc<Mutex<HashMap<String, RoomHandle>>>;
async fn run_player(
player_name: String,
addr: SocketAddr,
handle: RoomHandle,
ws_stream: WebSocketStream<Async<TcpStream>>,
) {
let (incoming, outgoing) = ws_stream.split();
let (tx, rx) = unbounded();
{
let room = &mut handle.room.lock().unwrap();
if let Err(e) = room.add_player(addr, player_name.clone(), tx) {
eprintln!("[{}] Failed to add player: {:?}", room.name, e);
return;
}
}
let write = handle.write.clone();
let ra = rx
.map(|c| {
serde_json::to_string(&c).unwrap_or_else(|_| panic!("Could not serialize {:?}", c))
})
.map(WebsocketMessage::Text)
.map(Ok)
.forward(incoming);
let rb = outgoing
.map(|m| match m {
Ok(WebsocketMessage::Text(t)) => serde_json::from_str::<ClientMessage>(&t).ok(),
_ => None,
})
.take_while(|m| future::ready(m.is_some()))
.map(|m| m.unwrap())
.chain(futures::stream::once(async { ClientMessage::Disconnected }))
.map(move |m| Ok((addr, m)))
.forward(write);
let (ra, rb) = join(ra, rb).await;
if let Err(e) = ra {
eprintln!("[{addr}] Got error {e} from player {player_name}'s rx queue");
}
if let Err(e) = rb {
eprintln!("[{addr}] Got error {e} from player {player_name}'s tx queue");
}
println!("[{addr}] Finished session with {player_name}");
}
async fn handle_connection(
rooms: Rooms,
raw_stream: Async<TcpStream>,
addr: SocketAddr,
mut close_room: UnboundedSender<String>,
) -> Result<()> {
println!("[{addr}] Incoming TCP connection");
let mut ws_stream = async_tungstenite::accept_async(raw_stream).await?;
println!("[{addr}] WebSocket connection established");
while let Some(Ok(WebsocketMessage::Text(text))) = ws_stream.next().await {
println!("[{addr}] Received message: {text}");
if let Some(Ok(WebsocketMessage::Text(text))) = ws_stream.next().await {
let msg = serde_json::from_str::<ClientMessage>(&text)?;
match msg {
ClientMessage::CreateRoom(player_name) => {
let (write, read) = unbounded();
let room = Arc::new(Mutex::new(Room::default()));
let handle = RoomHandle { write, room };
let room_name = generate_room_name(&mut rooms.lock().unwrap(), handle.clone());
println!("[{addr}] Creating room '{room_name}' for player '{player_name}'");
let mut h = handle.clone();
join(
h.run_room(read),
run_player(player_name, addr, handle, ws_stream),
)
.await;
if let Err(e) = close_room.send(room_name.clone()).await {
eprintln!("[{room_name}] Failed to close room: {e}");
}
return Ok(());
}
msg => eprintln!("[{addr}] Received illegal message {:?}", msg),
}
}
println!("[{addr}] Dropping connection");
@@ -25,13 +111,30 @@ fn main() -> Result<(), io::Error> {
.parse::<SocketAddr>()
.expect("Invalid address");
let rooms = Rooms::new(Mutex::new(HashMap::new()));
let close_room = {
let (tx, mut rx) = unbounded();
let rooms = rooms.clone();
smol::spawn(async move {
while let Some(room_name) = rx.next().await {
println!("Closing room {room_name}");
rooms.lock().unwrap().remove(&room_name);
}
})
.detach();
tx
};
smol::block_on(async {
println!("Listening on: {addr}");
let listener = Async::<TcpListener>::bind(addr).expect("Could not create listener");
while let Ok((stream, addr)) = listener.accept().await {
let close_room = close_room.clone();
let rooms = rooms.clone();
smol::spawn(async move {
if let Err(e) = handle_connection(stream, addr).await {
if let Err(e) = handle_connection(rooms, stream, addr, close_room).await {
eprintln!("Failed to handle connection from {addr}: {e}");
}
})

View File

@@ -0,0 +1,9 @@
use board_network::protocol::ServerMessage;
use futures::channel::mpsc::UnboundedSender;
#[derive(Debug)]
pub struct Player {
pub name: String,
pub score: u32,
pub ws: Option<UnboundedSender<ServerMessage>>,
}

129
board-server/src/room.rs Normal file
View File

@@ -0,0 +1,129 @@
use crate::player::Player;
use board_network::protocol::{ClientMessage, ServerMessage};
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::StreamExt;
use rand::distributions::{Alphanumeric, DistString};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
type TaggedClientMessage = (SocketAddr, ClientMessage);
#[derive(Debug, Default)]
pub struct Room {
pub name: String,
pub connections: HashMap<SocketAddr, usize>,
pub players: Vec<Player>,
}
impl Room {
pub fn add_player(
&mut self,
addr: SocketAddr,
player_name: String,
tx: UnboundedSender<ServerMessage>,
) -> anyhow::Result<()> {
// If the player name matches an existing player, but with a dropped connection,
// then replace this player.
let mut player_index = None;
for (i, p) in self.players.iter().enumerate() {
if p.name == player_name && p.ws.is_none() {
player_index = Some(i);
break;
}
}
if let Some(i) = player_index {
// Reclaim the player's spot
self.broadcast(ServerMessage::PlayerReconnected(i));
self.players[i].ws = Some(tx.clone());
} else {
self.broadcast(ServerMessage::PlayerConnected(player_name.clone()));
player_index = Some(self.players.len());
self.players.push(Player {
name: player_name,
score: 0,
ws: Some(tx.clone()),
});
}
let player_index = player_index.expect("A player index should have been attributed");
self.connections.insert(addr, player_index);
// Send the player the current state of the room
tx.unbounded_send(ServerMessage::JoinedRoom {
room_name: self.name.clone(),
players: self
.players
.iter()
.map(|p| (p.name.clone(), p.score, p.ws.is_some()))
.collect(),
})?;
Ok(())
}
pub fn on_message(&mut self, addr: SocketAddr, msg: ClientMessage) -> bool {
match msg {
ClientMessage::Disconnected => self.on_client_disconnected(addr),
ClientMessage::CreateRoom(_) | ClientMessage::JoinRoom(_, _) => {
eprintln!("[{}] Illegal client message {:?}", self.name, msg);
}
}
!self.connections.is_empty()
}
fn on_client_disconnected(&mut self, addr: SocketAddr) {
if let Some(p) = self.connections.remove(&addr) {
self.players[p].ws = None;
self.broadcast(ServerMessage::PlayerDisconnected(p));
}
}
fn broadcast(&self, s: ServerMessage) {
for c in self.connections.values() {
if let Some(ws) = &self.players[*c].ws {
if let Err(e) = ws.unbounded_send(s.clone()) {
eprintln!(
"[{}] Failed to send broadcast to {}: {}",
self.name, self.players[*c].name, e
);
}
}
}
}
}
type RoomPtr = Arc<Mutex<Room>>;
#[derive(Clone)]
pub struct RoomHandle {
pub write: UnboundedSender<TaggedClientMessage>,
pub room: RoomPtr,
}
impl RoomHandle {
pub async fn run_room(&mut self, mut read: UnboundedReceiver<TaggedClientMessage>) {
while let Some((addr, msg)) = read.next().await {
if !self.room.lock().unwrap().on_message(addr, msg) {
break;
}
}
}
}
pub type Rooms = HashMap<String, RoomHandle>;
pub fn generate_room_name(rooms: &mut Rooms, room: RoomHandle) -> String {
let mut rng = rand::thread_rng();
loop {
let name = Alphanumeric.sample_string(&mut rng, 5);
if let Entry::Vacant(v) = rooms.entry(name.clone()) {
room.room.lock().unwrap().name = name.clone();
v.insert(room);
return name;
}
}
}