6
6
import cchardet as chardet # type: ignore
7
7
import trio
8
8
9
+ # Assuming latency variance is less than 100%
10
+ MAX_LATENCY_VARIANCE = 2
11
+
12
+
13
+ def encode_codepage (string : str ) -> bytes :
14
+ for codepage in range (1250 , 1259 ):
15
+ try :
16
+ return string .encode (f'cp{ codepage } ' )
17
+ except UnicodeEncodeError :
18
+ continue
19
+
20
+ raise ValueError (f'Unable to encode string "{ string } "' )
21
+
22
+
23
+ def pack_string (string : str , len_type : str ) -> bytes :
24
+ format = f'<{ len_type } '
25
+ return struct .pack (format , len (string )) + encode_codepage (string )
26
+
9
27
10
28
def unpack_string (data : bytes , len_type : str ) -> tuple [str , bytes ]:
11
29
format = f'<{ len_type } '
@@ -16,6 +34,18 @@ def unpack_string(data: bytes, len_type: str) -> tuple[str, bytes]:
16
34
return string .decode (encoding ), data
17
35
18
36
37
+ class MissingRCONPassword (Exception ):
38
+ pass
39
+
40
+
41
+ class InvalidRCONPassword (Exception ):
42
+ pass
43
+
44
+
45
+ class RCONDisabled (Exception ):
46
+ pass
47
+
48
+
19
49
@dataclass
20
50
class ServerInfo :
21
51
name : str
@@ -170,8 +200,7 @@ async def is_omp(self) -> bool:
170
200
ping = await self .ping ()
171
201
payload = random .getrandbits (32 ).to_bytes (4 , 'little' )
172
202
173
- # Assuming latency variance is less than 100%
174
- with trio .move_on_after (2 * ping ):
203
+ with trio .move_on_after (MAX_LATENCY_VARIANCE * ping ):
175
204
await self .send (b'o' , payload )
176
205
assert self .prefix
177
206
data = await self .receive (header = self .prefix + b'o' + payload )
@@ -197,3 +226,37 @@ async def rules(self) -> RuleList:
197
226
assert self .prefix
198
227
data = await self .receive (header = self .prefix + b'r' )
199
228
return RuleList .from_data (data )
229
+
230
+ async def rcon (self , command : str ) -> str :
231
+ if not self .rcon_password :
232
+ raise MissingRCONPassword ()
233
+
234
+ ping = await self .ping ()
235
+ payload = (
236
+ pack_string (self .rcon_password , 'H' )
237
+ + pack_string (command , 'H' )
238
+ )
239
+ await self .send (b'x' , payload )
240
+ assert self .prefix
241
+
242
+ response = ''
243
+
244
+ with trio .move_on_after (MAX_LATENCY_VARIANCE * ping ) as cancel_scope :
245
+ while True :
246
+ start_time = trio .current_time ()
247
+ data = await self .receive (header = self .prefix + b'x' )
248
+ receive_duration = trio .current_time () - start_time
249
+ line_len = struct .unpack_from ('<H' , data )[0 ]
250
+ data = data [2 :] # short, see above
251
+ assert len (data ) == line_len
252
+ encoding : str = chardet .detect (data )['encoding' ]
253
+ response += data .decode (encoding ) + '\n '
254
+ cancel_scope .deadline += receive_duration
255
+
256
+ if not response :
257
+ raise RCONDisabled ()
258
+
259
+ if response == 'Invalid RCON password.\n ' :
260
+ raise InvalidRCONPassword ()
261
+
262
+ return response [:- 1 ] # Strip trailing newline
0 commit comments