mirror of
https://git.ugnet.gay/CrossTalk/azul.git
synced 2026-05-27 22:59:49 +00:00
250 lines
5.8 KiB
Python
250 lines
5.8 KiB
Python
import struct
|
|
|
|
from array import array
|
|
from dataclasses import dataclass
|
|
|
|
from .tlv import TLV, marshal_tlvs, unmarshal_tlvs
|
|
|
|
|
|
@dataclass
|
|
class Buffer:
|
|
data: bytes
|
|
|
|
def __init__(self, data: bytes = b''):
|
|
self.data = data
|
|
|
|
def __len__(self) -> int:
|
|
return len(self.data)
|
|
|
|
########################
|
|
# read_xxx() functions #
|
|
########################
|
|
def read_bytes(self, length: int) -> bytes:
|
|
"""Reads the specified amount of bytes from the buffer.
|
|
|
|
Args:
|
|
length: bytes to read
|
|
|
|
Returns:
|
|
bytes: the bytes read
|
|
"""
|
|
value = self.data[:length]
|
|
|
|
self.data = self.data[length:]
|
|
|
|
return value
|
|
|
|
def read_u8(self) -> int:
|
|
"""Reads a byte (8 bits) from the buffer.
|
|
|
|
Returns:
|
|
int: The read byte from the buffer.
|
|
"""
|
|
value, = struct.unpack('>B', self.data[:1])
|
|
|
|
self.data = self.data[1:]
|
|
|
|
return value
|
|
|
|
def read_u16(self) -> int:
|
|
"""Reads an unsigned short (16 bits) from the buffer.
|
|
|
|
Returns:
|
|
int: The read unsigned short from the buffer.
|
|
"""
|
|
value, = struct.unpack('>H', self.data[:2])
|
|
|
|
self.data = self.data[2:]
|
|
|
|
return value
|
|
|
|
def read_u32(self) -> int:
|
|
"""Reads an unsigned int (32 bits) from the buffer.
|
|
|
|
Returns:
|
|
int: The read unsigned int from the buffer.
|
|
"""
|
|
value, = struct.unpack('>L', self.data[:4])
|
|
|
|
self.data = self.data[4:]
|
|
|
|
return value
|
|
|
|
def read_string(self, length: int) -> str:
|
|
"""Reads a string with the specified length from the buffer.
|
|
|
|
Args:
|
|
length: The length of the string to read.
|
|
|
|
Returns:
|
|
str: The read string from the buffer."""
|
|
str_bytes = self.data[:length]
|
|
|
|
self.data = self.data[length:]
|
|
|
|
return str_bytes.decode('utf-8')
|
|
|
|
def read_string_u8(self) -> str:
|
|
"""Reads a string prepended with a u8 showing the length of the string.
|
|
|
|
Returns:
|
|
str: The read string from the buffer.
|
|
"""
|
|
return self.read_string(self.read_u8())
|
|
|
|
def read_string_u16(self) -> str:
|
|
"""Reads a string prepended with a u16 showing the length of the string.
|
|
|
|
Returns:
|
|
str: The read string from the buffer.
|
|
"""
|
|
return self.read_string(self.read_u16())
|
|
|
|
def read_string_u32(self) -> str:
|
|
"""Reads a string prepended with a u32 showing the length of the string.
|
|
|
|
Returns:
|
|
str: The read string from the buffer.
|
|
"""
|
|
return self.read_string(self.read_u32())
|
|
|
|
def read_tlv_block(self) -> array[TLV]:
|
|
"""Reads a TLV list prepended with a u16 showing the number of TLVs in the list, from the buffer.
|
|
|
|
Returns:
|
|
array[TLV]: The TLVs found in the block.
|
|
"""
|
|
length = self.read_u16()
|
|
tlvs = []
|
|
|
|
for _ in range(length):
|
|
type, length = struct.unpack('>HH', self.data[0:4])
|
|
value = self.data[4:length + 4]
|
|
|
|
# make sure length is not too long
|
|
assert len(self) > length - 4
|
|
|
|
tlvs.append(TLV(type, value))
|
|
self.data = self.data[length + 4:]
|
|
|
|
return tlvs
|
|
|
|
def read_tlv_l_block(self) -> array[TLV]:
|
|
"""Reads a list of TLVs prepended with a u16 showing the length of the total TLV bytes, from the buffer.
|
|
|
|
Returns:
|
|
array[TLV]: The TLVs found in the block.
|
|
"""
|
|
length = self.read_u16()
|
|
tlvs = unmarshal_tlvs(self.data[:length])
|
|
|
|
self.data = self.data[length:]
|
|
|
|
return tlvs
|
|
|
|
#########################
|
|
# write_xxx() functions #
|
|
#########################
|
|
def write_bytes(self, value: bytes) -> None:
|
|
"""Writes the specified bytes into the buffer.
|
|
|
|
Args:
|
|
value: The bytes to write into the buffer.
|
|
"""
|
|
self.data += value
|
|
|
|
def write_u8(self, value: int) -> None:
|
|
"""Writes a byte (8 bits) into the buffer.
|
|
|
|
Args:
|
|
value: The byte to write into the buffer.
|
|
"""
|
|
self.data += struct.pack('>B', value)
|
|
|
|
def write_u16(self, value: int) -> None:
|
|
"""Writes a unsigned short (16 bits) into the buffer.
|
|
|
|
Args:
|
|
value: The unsigned short to write into the buffer.
|
|
"""
|
|
self.data += struct.pack('>H', value)
|
|
|
|
def write_u32(self, value: int) -> None:
|
|
"""Writes a unsigned int (32 bits) into the buffer.
|
|
|
|
Args:
|
|
value: The unsigned int to write into the buffer.
|
|
"""
|
|
self.data += struct.pack('>L', value)
|
|
|
|
def write_string(self, value: str) -> None:
|
|
"""Writes a string imto the buffer.
|
|
|
|
Args:
|
|
value: The string to write into the buffer.
|
|
"""
|
|
self.data += value.encode('utf-8')
|
|
|
|
def write_string_u8(self, value: str) -> None:
|
|
"""Writes a string prepended with a u8 showing the length of the string into the buffer.
|
|
|
|
Args:
|
|
value: The string to write into the buffer.
|
|
"""
|
|
self.write_u8(len(value))
|
|
self.write_string(value)
|
|
|
|
def write_string_u16(self, value: str) -> None:
|
|
"""Writes a string prepended with a u16 showing the length of the string into the buffer.
|
|
|
|
Args:
|
|
value: The string to write into the buffer.
|
|
"""
|
|
self.write_u16(len(value))
|
|
self.write_string(value)
|
|
|
|
def write_string_u32(self, value: str) -> None:
|
|
"""Writes a string prepended with a u32 showing the length of the string into the buffer.
|
|
|
|
Args:
|
|
value: The string to write into the buffer.
|
|
"""
|
|
self.write_u32(len(value))
|
|
self.write_string(value)
|
|
|
|
def write_tlv(self, tlv: TLV) -> None:
|
|
"""Writes the specified TLV into the buffer.
|
|
|
|
Args:
|
|
tlv: The TLV to write into the buffer.
|
|
"""
|
|
self.data += tlv.marshal()
|
|
|
|
def write_tlvs(self, tlvs: array[TLV]) -> None:
|
|
"""Writes a list of TLVs into the buffer.
|
|
|
|
Args:
|
|
tlvs: The list of TLVs to write into the buffer.
|
|
"""
|
|
self.data += marshal_tlvs(tlvs)
|
|
|
|
def write_tlv_block(self, tlvs: array[TLV]) -> None:
|
|
"""Writes a list of TLVs prepended with a u16 describing the TLV count in the list, into the buffer.
|
|
|
|
Args:
|
|
tlvs: The list of TLVs to write into the buffer.
|
|
"""
|
|
self.write_u16(len(tlvs))
|
|
self.write_tlvs(tlvs)
|
|
|
|
def write_tlv_l_block(self, tlvs: array[TLV]) -> None:
|
|
"""Writes a list of TLVs prepended with a u16 describing the length of the total TLV bytes, into the buffer.
|
|
|
|
Args:
|
|
tlvs: The list of TLVs to write into the buffers.
|
|
"""
|
|
marshalled_tlvs = marshal_tlvs(tlvs)
|
|
|
|
self.write_u16(len(marshalled_tlvs))
|
|
self.write_bytes(marshalled_tlvs)
|