mirror of
https://github.com/redstrate/Kawari.git
synced 2025-07-19 11:17:46 +00:00
Move packet replay to the global server state
This makes it way more stable, and can actually work now.
This commit is contained in:
parent
06debd5eb0
commit
a4c8466088
5 changed files with 156 additions and 91 deletions
|
@ -1,4 +1,3 @@
|
||||||
use std::collections::VecDeque;
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
@ -1185,49 +1184,6 @@ async fn client_loop(
|
||||||
intended_use: zone.intended_use,
|
intended_use: zone.intended_use,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(entry) = connection.replay_entries.pop_front() {
|
|
||||||
// only care about Ipc packets
|
|
||||||
let filename = entry.file_name().unwrap().to_str().unwrap();
|
|
||||||
if !filename.contains("Ipc") {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// only care about packets from the server
|
|
||||||
if !filename.contains("(to client)") {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let path = entry.to_str().unwrap();
|
|
||||||
|
|
||||||
tracing::info!("- Replaying {path}");
|
|
||||||
|
|
||||||
let source_actor_bytes = std::fs::read(format!("{path}/source_actor.bin")).unwrap();
|
|
||||||
let target_actor_bytes = std::fs::read(format!("{path}/target_actor.bin")).unwrap();
|
|
||||||
let source_actor = u32::from_le_bytes(source_actor_bytes[0..4].try_into().unwrap());
|
|
||||||
let target_actor = u32::from_le_bytes(target_actor_bytes[0..4].try_into().unwrap());
|
|
||||||
|
|
||||||
let ipc_header_bytes = std::fs::read(format!("{path}/ipc_header.bin")).unwrap();
|
|
||||||
let opcode = u16::from_le_bytes(ipc_header_bytes[2..4].try_into().unwrap());
|
|
||||||
|
|
||||||
let unk = std::fs::read(format!("{path}/data.bin")).unwrap();
|
|
||||||
|
|
||||||
connection.send_segment(PacketSegment {
|
|
||||||
source_actor,
|
|
||||||
target_actor,
|
|
||||||
segment_type: SegmentType::Ipc,
|
|
||||||
data: SegmentData::Ipc {
|
|
||||||
data: ServerZoneIpcSegment {
|
|
||||||
unk1: 20,
|
|
||||||
unk2: 0,
|
|
||||||
op_code: ServerZoneIpcType::Unknown(opcode),
|
|
||||||
option: 0,
|
|
||||||
timestamp: timestamp_secs(),
|
|
||||||
data: ServerZoneIpcData::Unknown { unk },
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
|
@ -1249,6 +1205,7 @@ async fn client_loop(
|
||||||
FromServer::ActionCancelled() => connection.cancel_action().await,
|
FromServer::ActionCancelled() => connection.cancel_action().await,
|
||||||
FromServer::UpdateConfig(actor_id, config) => connection.update_config(actor_id, config).await,
|
FromServer::UpdateConfig(actor_id, config) => connection.update_config(actor_id, config).await,
|
||||||
FromServer::ActorEquip(actor_id, main_weapon_id, model_ids) => connection.update_equip(actor_id, main_weapon_id, model_ids).await,
|
FromServer::ActorEquip(actor_id, main_weapon_id, model_ids) => connection.update_equip(actor_id, main_weapon_id, model_ids).await,
|
||||||
|
FromServer::ReplayPacket(segment) => connection.send_segment(segment).await,
|
||||||
},
|
},
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
|
@ -1347,7 +1304,6 @@ async fn main() {
|
||||||
gracefully_logged_out: false,
|
gracefully_logged_out: false,
|
||||||
weather_id: 0,
|
weather_id: 0,
|
||||||
obsfucation_data: ObsfucationData::default(),
|
obsfucation_data: ObsfucationData::default(),
|
||||||
replay_entries: VecDeque::default(),
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Some((mut socket, _)) = handle_rcon(&rcon_listener) => {
|
Some((mut socket, _)) = handle_rcon(&rcon_listener) => {
|
||||||
|
|
|
@ -145,7 +145,14 @@ pub type ServerZoneIpcSegment = IpcSegment<ServerZoneIpcType, ServerZoneIpcData>
|
||||||
|
|
||||||
impl ReadWriteIpcSegment for ServerZoneIpcSegment {
|
impl ReadWriteIpcSegment for ServerZoneIpcSegment {
|
||||||
fn calc_size(&self) -> u32 {
|
fn calc_size(&self) -> u32 {
|
||||||
IPC_HEADER_SIZE + self.op_code.calc_size()
|
IPC_HEADER_SIZE
|
||||||
|
+ match &self.op_code {
|
||||||
|
ServerZoneIpcType::Unknown(..) => match &self.data {
|
||||||
|
ServerZoneIpcData::Unknown { unk } => unk.len() as u32,
|
||||||
|
_ => panic!("Unknown packet type doesn't have unknown data?"),
|
||||||
|
},
|
||||||
|
_ => self.op_code.calc_size(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_name(&self) -> &'static str {
|
fn get_name(&self) -> &'static str {
|
||||||
|
|
|
@ -12,8 +12,9 @@ use crate::{
|
||||||
common::Position,
|
common::Position,
|
||||||
ipc::zone::{
|
ipc::zone::{
|
||||||
ActionRequest, ActorControl, ActorControlSelf, ActorControlTarget, ClientTrigger,
|
ActionRequest, ActorControl, ActorControlSelf, ActorControlTarget, ClientTrigger,
|
||||||
CommonSpawn, Config, NpcSpawn,
|
CommonSpawn, Config, NpcSpawn, ServerZoneIpcSegment,
|
||||||
},
|
},
|
||||||
|
packet::PacketSegment,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::Actor;
|
use super::Actor;
|
||||||
|
@ -44,6 +45,7 @@ pub enum FromServer {
|
||||||
UpdateConfig(u32, Config),
|
UpdateConfig(u32, Config),
|
||||||
/// Update an actor's model IDs.
|
/// Update an actor's model IDs.
|
||||||
ActorEquip(u32, u64, [u32; 10]),
|
ActorEquip(u32, u64, [u32; 10]),
|
||||||
|
ReplayPacket(PacketSegment<ServerZoneIpcSegment>),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
@ -106,6 +108,8 @@ pub enum ToServer {
|
||||||
Config(ClientId, u32, Config),
|
Config(ClientId, u32, Config),
|
||||||
/// Tell the server what models IDs we have equipped.
|
/// Tell the server what models IDs we have equipped.
|
||||||
Equip(ClientId, u32, u64, [u32; 10]),
|
Equip(ClientId, u32, u64, [u32; 10]),
|
||||||
|
/// Begins a packet replay.
|
||||||
|
BeginReplay(ClientId, String),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, VecDeque},
|
collections::HashMap,
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
path::PathBuf,
|
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
|
@ -135,8 +134,6 @@ pub struct ZoneConnection {
|
||||||
pub weather_id: u16,
|
pub weather_id: u16,
|
||||||
|
|
||||||
pub obsfucation_data: ObsfucationData,
|
pub obsfucation_data: ObsfucationData,
|
||||||
|
|
||||||
pub replay_entries: VecDeque<PathBuf>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ZoneConnection {
|
impl ZoneConnection {
|
||||||
|
@ -259,11 +256,8 @@ impl ZoneConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn spawn_actor(&mut self, mut actor: Actor, mut spawn: NpcSpawn) {
|
pub async fn spawn_actor(&mut self, mut actor: Actor, mut spawn: NpcSpawn) {
|
||||||
// skip during replay
|
// There is no reason for us to spawn our own player again. It's probably a bug!'
|
||||||
if self.replay_entries.is_empty() {
|
assert!(actor.id.0 != self.player_data.actor_id);
|
||||||
// There is no reason for us to spawn our own player again. It's probably a bug!'
|
|
||||||
assert!(actor.id.0 != self.player_data.actor_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
actor.spawn_index = self.get_free_spawn_index() as u32;
|
actor.spawn_index = self.get_free_spawn_index() as u32;
|
||||||
spawn.common.spawn_index = actor.spawn_index as u8;
|
spawn.common.spawn_index = actor.spawn_index as u8;
|
||||||
|
@ -1493,38 +1487,8 @@ impl ZoneConnection {
|
||||||
|
|
||||||
pub async fn replay_packets(&mut self, path: &str) {
|
pub async fn replay_packets(&mut self, path: &str) {
|
||||||
tracing::info!("Beginning replay from {path}...");
|
tracing::info!("Beginning replay from {path}...");
|
||||||
|
self.handle
|
||||||
let mut entries = std::fs::read_dir(path)
|
.send(ToServer::BeginReplay(self.id, path.to_string()))
|
||||||
.unwrap()
|
.await;
|
||||||
.map(|res| res.map(|e| e.path()))
|
|
||||||
.collect::<Result<Vec<_>, std::io::Error>>()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
entries.sort_by(|a, b| {
|
|
||||||
let a_seq = a
|
|
||||||
.file_name()
|
|
||||||
.unwrap()
|
|
||||||
.to_str()
|
|
||||||
.unwrap()
|
|
||||||
.split_once('-')
|
|
||||||
.unwrap()
|
|
||||||
.0
|
|
||||||
.parse::<i32>()
|
|
||||||
.unwrap();
|
|
||||||
let b_seq = b
|
|
||||||
.file_name()
|
|
||||||
.unwrap()
|
|
||||||
.to_str()
|
|
||||||
.unwrap()
|
|
||||||
.split_once('-')
|
|
||||||
.unwrap()
|
|
||||||
.0
|
|
||||||
.parse::<i32>()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
a_seq.cmp(&b_seq)
|
|
||||||
});
|
|
||||||
|
|
||||||
self.replay_entries = entries.into();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,16 +1,22 @@
|
||||||
|
use binrw::{BinRead, BinWrite};
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
|
io::Cursor,
|
||||||
|
path::PathBuf,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
common::{CustomizeData, GameData, ObjectId, ObjectTypeId},
|
common::{CustomizeData, GameData, ObjectId, ObjectTypeId, timestamp_secs},
|
||||||
ipc::zone::{
|
ipc::zone::{
|
||||||
ActorControl, ActorControlCategory, ActorControlSelf, ActorControlTarget, BattleNpcSubKind,
|
ActorControl, ActorControlCategory, ActorControlSelf, ActorControlTarget, BattleNpcSubKind,
|
||||||
ClientTriggerCommand, CommonSpawn, NpcSpawn, ObjectKind,
|
ClientTriggerCommand, CommonSpawn, NpcSpawn, ObjectKind, ServerZoneIpcData,
|
||||||
|
ServerZoneIpcSegment,
|
||||||
},
|
},
|
||||||
|
opcodes::ServerZoneIpcType,
|
||||||
|
packet::{PacketSegment, SegmentData, SegmentType},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{Actor, ClientHandle, ClientId, FromServer, ToServer};
|
use super::{Actor, ClientHandle, ClientId, FromServer, ToServer};
|
||||||
|
@ -673,6 +679,134 @@ pub async fn server_main_loop(mut recv: Receiver<ToServer>) -> Result<(), std::i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ToServer::BeginReplay(from_id, path) => {
|
||||||
|
let mut entries = std::fs::read_dir(path)
|
||||||
|
.unwrap()
|
||||||
|
.map(|res| res.map(|e| e.path()))
|
||||||
|
.collect::<Result<Vec<_>, std::io::Error>>()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
entries.sort_by(|a, b| {
|
||||||
|
let a_seq = a
|
||||||
|
.file_name()
|
||||||
|
.unwrap()
|
||||||
|
.to_str()
|
||||||
|
.unwrap()
|
||||||
|
.split_once('-')
|
||||||
|
.unwrap()
|
||||||
|
.0
|
||||||
|
.parse::<i32>()
|
||||||
|
.unwrap();
|
||||||
|
let b_seq = b
|
||||||
|
.file_name()
|
||||||
|
.unwrap()
|
||||||
|
.to_str()
|
||||||
|
.unwrap()
|
||||||
|
.split_once('-')
|
||||||
|
.unwrap()
|
||||||
|
.0
|
||||||
|
.parse::<i32>()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
a_seq.cmp(&b_seq)
|
||||||
|
});
|
||||||
|
|
||||||
|
let send_execution = |from_id: ClientId,
|
||||||
|
data: Arc<Mutex<WorldServer>>,
|
||||||
|
entry: &PathBuf| {
|
||||||
|
let mut data = data.lock().unwrap();
|
||||||
|
|
||||||
|
for (id, (handle, _)) in &mut data.clients {
|
||||||
|
let id = *id;
|
||||||
|
|
||||||
|
if id == from_id {
|
||||||
|
// only care about Ipc packets
|
||||||
|
let filename = entry.file_name().unwrap().to_str().unwrap();
|
||||||
|
if !filename.contains("Ipc") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// only care about packets from the server
|
||||||
|
if !filename.contains("(to client)") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let path = entry.to_str().unwrap();
|
||||||
|
|
||||||
|
tracing::info!("- Replaying {path}");
|
||||||
|
|
||||||
|
let source_actor_bytes =
|
||||||
|
std::fs::read(format!("{path}/source_actor.bin")).unwrap();
|
||||||
|
let target_actor_bytes =
|
||||||
|
std::fs::read(format!("{path}/target_actor.bin")).unwrap();
|
||||||
|
let source_actor =
|
||||||
|
u32::from_le_bytes(source_actor_bytes[0..4].try_into().unwrap());
|
||||||
|
let target_actor =
|
||||||
|
u32::from_le_bytes(target_actor_bytes[0..4].try_into().unwrap());
|
||||||
|
|
||||||
|
let ipc_header_bytes =
|
||||||
|
std::fs::read(format!("{path}/ipc_header.bin")).unwrap();
|
||||||
|
let opcode =
|
||||||
|
u16::from_le_bytes(ipc_header_bytes[2..4].try_into().unwrap());
|
||||||
|
|
||||||
|
let mut ipc_data = std::fs::read(format!("{path}/data.bin")).unwrap();
|
||||||
|
let ipc_len = ipc_data.len() as u32 + 32;
|
||||||
|
let mut cursor = Cursor::new(&mut ipc_data);
|
||||||
|
if let Ok(parsed) =
|
||||||
|
ServerZoneIpcSegment::read_le_args(&mut cursor, (&ipc_len,))
|
||||||
|
{
|
||||||
|
match parsed.data {
|
||||||
|
ServerZoneIpcData::InitZone(mut init_zone) => {
|
||||||
|
tracing::info!("- Fixing up InitZone");
|
||||||
|
|
||||||
|
// stop it from trying to initialize obsfucation
|
||||||
|
init_zone.obsfucation_mode = 0;
|
||||||
|
init_zone.seed1 = 0;
|
||||||
|
init_zone.seed2 = 0;
|
||||||
|
init_zone.seed3 = 0;
|
||||||
|
|
||||||
|
let mut cursor = Cursor::new(Vec::new());
|
||||||
|
init_zone.write_le(&mut cursor).unwrap();
|
||||||
|
ipc_data = cursor.into_inner().to_vec();
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let msg = FromServer::ReplayPacket(PacketSegment {
|
||||||
|
source_actor,
|
||||||
|
target_actor,
|
||||||
|
segment_type: SegmentType::Ipc,
|
||||||
|
data: SegmentData::Ipc {
|
||||||
|
data: ServerZoneIpcSegment {
|
||||||
|
unk1: 20,
|
||||||
|
unk2: 0,
|
||||||
|
op_code: ServerZoneIpcType::Unknown(opcode),
|
||||||
|
option: 0,
|
||||||
|
timestamp: timestamp_secs(),
|
||||||
|
data: ServerZoneIpcData::Unknown { unk: ipc_data },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
if handle.send(msg).is_err() {
|
||||||
|
data.to_remove.push(id);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let data = data.clone();
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
for entry in &entries {
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_millis(100));
|
||||||
|
interval.tick().await;
|
||||||
|
interval.tick().await;
|
||||||
|
send_execution(from_id, data.clone(), entry);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
ToServer::Disconnected(from_id) => {
|
ToServer::Disconnected(from_id) => {
|
||||||
let mut data = data.lock().unwrap();
|
let mut data = data.lock().unwrap();
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue