use std::collections::VecDeque; use std::str; extern crate custom_error; use custom_error::custom_error; // Defines how many bytes is used to encode "AMOUNT" field in the response from ue-server about // amount of bytes it received since the last update const UE_RECEIVED_FIELD_SIZE: usize = 4; // Defines how many bytes is used to encode "LENGTH" field, describing length of // next JSON message from ue-server const UE_LENGTH_FIELD_SIZE: usize = 4; // Value indicating that next byte sequence from ue-server reports amount of bytes received by // that server so far. Value itself is arbitrary. const HEAD_UE_RECEIVED: u8 = 85; // Value indicating that next byte sequence from ue-server contains JSON message. // Value itself is arbitrary. const HEAD_UE_MESSAGE: u8 = 42; // Maximum allowed size of JSON message sent from ue-server. const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024; custom_error! { pub ReadingStreamError InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}", MessageTooLong{length: usize} = "Message to receive is too long: {length}", InvalidUnicode = "Invalid utf-8 was received", BrokenStream = "Used stream is broken" } enum ReadingState { Head, ReceivedBytes, Length, Payload, } pub struct MessageReader { is_broken: bool, reading_state: ReadingState, read_bytes: usize, current_message_length: usize, current_message: Vec, read_messages: VecDeque, next_received_bytes: u32, received_bytes: u64, } /// For converting byte stream that is expected from the ue-server into actual messages. /// Expected format is a sequence of either: /// 1. [HEAD_UE_RECEIVED: marker byte | 1 byte] /// [AMOUNT: amount of bytes received by ue-server since last update | 4 bytes: u32 BE] /// 2. [HEAD_UE_MESSAGE: marker byte | 1 byte] /// [LENGTH: length of the JSON message in utf8 encoding | 4 bytes: u32 BE] /// [PAYLOAD: utf8-encoded string | `LENGTH` bytes] /// On any invalid input enters into a failure state (can be checked by `is_broken()`) and /// never recovers from it. /// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to /// retrieve resulting messages. impl MessageReader { pub fn new() -> MessageReader { MessageReader { is_broken: false, reading_state: ReadingState::Head, read_bytes: 0, current_message_length: 0, current_message: Vec::new(), read_messages: VecDeque::new(), next_received_bytes: 0, received_bytes: 0, } } pub fn push_byte(&mut self, input: u8) -> Result<(), ReadingStreamError> { if self.is_broken { return Err(ReadingStreamError::BrokenStream); } match &self.reading_state { ReadingState::Head => { if input == HEAD_UE_RECEIVED { self.reading_state = ReadingState::ReceivedBytes; } else if input == HEAD_UE_MESSAGE { self.reading_state = ReadingState::Length; } else { self.is_broken = true; return Err(ReadingStreamError::InvalidHead { input }); } } ReadingState::ReceivedBytes => { self.next_received_bytes = self.next_received_bytes << 8; self.next_received_bytes += input as u32; self.read_bytes += 1; if self.read_bytes >= UE_RECEIVED_FIELD_SIZE { self.received_bytes += self.next_received_bytes as u64; self.next_received_bytes = 0; self.read_bytes = 0; self.reading_state = ReadingState::Head; } } ReadingState::Length => { self.current_message_length = self.current_message_length << 8; self.current_message_length += input as usize; self.read_bytes += 1; if self.read_bytes >= UE_LENGTH_FIELD_SIZE { self.read_bytes = 0; self.reading_state = ReadingState::Payload; if self.current_message_length > MAX_UE_MESSAGE_LENGTH { self.is_broken = true; return Err(ReadingStreamError::MessageTooLong { length: self.current_message_length, }); } self.current_message = Vec::with_capacity(self.current_message_length); } } ReadingState::Payload => { self.current_message.push(input); self.read_bytes += 1 as usize; if self.read_bytes >= self.current_message_length { match str::from_utf8(&self.current_message) { Ok(next_message) => self.read_messages.push_front(next_message.to_owned()), _ => { self.is_broken = true; return Err(ReadingStreamError::InvalidUnicode); } }; self.current_message.clear(); self.current_message_length = 0; self.read_bytes = 0; self.reading_state = ReadingState::Head; } } } Ok(()) } pub fn push(&mut self, input: &[u8]) -> Result<(), ReadingStreamError> { for &byte in input { self.push_byte(byte)?; } Ok(()) } pub fn pop(&mut self) -> Option { self.read_messages.pop_back() } pub fn received_bytes(&self) -> u64 { self.received_bytes } pub fn is_broken(&self) -> bool { self.is_broken } } #[test] fn message_push_byte() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(13).unwrap(); reader.push_byte(0x48).unwrap(); // H reader.push_byte(0x65).unwrap(); // e reader.push_byte(0x6c).unwrap(); // l reader.push_byte(0x6c).unwrap(); // l reader.push_byte(0x6f).unwrap(); // o reader.push_byte(0x2c).unwrap(); // , reader.push_byte(0x20).unwrap(); // reader.push_byte(0x77).unwrap(); // w reader.push_byte(0x6f).unwrap(); // o reader.push_byte(0x72).unwrap(); // r reader.push_byte(0x6c).unwrap(); // l reader.push_byte(0x64).unwrap(); // d reader.push_byte(0x21).unwrap(); // reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(3).unwrap(); reader.push_byte(0x59).unwrap(); // Y reader.push_byte(0x6f).unwrap(); // o reader.push_byte(0x21).unwrap(); // assert_eq!(reader.pop().unwrap(), "Hello, world!"); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); } #[test] fn received_push_byte() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(243).unwrap(); assert_eq!(reader.received_bytes(), 243); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(65).unwrap(); reader.push_byte(25).unwrap(); reader.push_byte(178).unwrap(); reader.push_byte(4).unwrap(); assert_eq!(reader.received_bytes(), 1092203255); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(231).unwrap(); reader.push_byte(34).unwrap(); reader.push_byte(154).unwrap(); assert_eq!(reader.received_bytes(), 1092203255); } #[test] fn mixed_push_byte() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(243).unwrap(); reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(3).unwrap(); reader.push_byte(0x59).unwrap(); // Y reader.push_byte(0x6f).unwrap(); // o reader.push_byte(0x21).unwrap(); // reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(65).unwrap(); reader.push_byte(25).unwrap(); reader.push_byte(178).unwrap(); reader.push_byte(4).unwrap(); assert_eq!(reader.received_bytes(), 1092203255); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); } #[test] fn pushing_many_bytes_at_once() { let mut reader = MessageReader::new(); reader .push(&[ HEAD_UE_RECEIVED, 0, 0, 0, 243, HEAD_UE_MESSAGE, 0, 0, 0, 3, 0x59, // Y 0x6f, // o 0x21, // HEAD_UE_RECEIVED, 65, 25, 178, 4, ]) .unwrap(); assert_eq!(reader.received_bytes(), 1092203255); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); } #[test] fn generates_error_invalid_head() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(243).unwrap(); assert!(!reader.is_broken()); reader .push_byte(25) .expect_err("Testing failing on incorrect HEAD"); assert!(reader.is_broken()); } #[test] fn generates_error_message_too_long() { let mut reader = MessageReader::new(); let huge_length = MAX_UE_MESSAGE_LENGTH + 1; let bytes = (huge_length as u32).to_be_bytes(); reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(bytes[0]).unwrap(); reader.push_byte(bytes[1]).unwrap(); reader.push_byte(bytes[2]).unwrap(); assert!(!reader.is_broken()); reader .push_byte(bytes[3]) .expect_err("Testing failing on exceeding allowed message length"); assert!(reader.is_broken()); } #[test] fn generates_error_invalid_unicode() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(2).unwrap(); reader.push_byte(0b11010011).unwrap(); // start of 2-byte sequence assert!(!reader.is_broken()); // Bytes inside multi-byte code point have to have `1` for their high bit reader .push_byte(0b01010011) .expect_err("Testing failing on incorrect unicode"); assert!(reader.is_broken()); }