Files
Athena Funderburg 21f38ee3e1 production init
2026-05-26 16:41:23 +00:00

93 lines
2.5 KiB
Python

import settings
import asyncio
from typing import Optional
from util.misc import Logger
from core import event, error
from core.backend import Backend, BackendSession
from .msnp import MSNPCtrl
from .misc import Err
MSNP_DIALECTS = ['MSNP{}'.format(d) for d in (
22,
21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11,
10, 9, 8, 7, 6, 5, 4, 3, 2
)]
class MSNPCtrlDP(MSNPCtrl):
__slots__ = (
'backend', 'dialect', 'bs', 'redir_attempts'
)
backend: Backend
dialect: int
bs: Optional[BackendSession]
def __init__(self, logger: Logger, via: str, backend: Backend) -> None:
super().__init__(logger)
self.backend = backend
self.dialect = 0
self.redir_attempts = 0
self.bs = None
def _on_close(self) -> None:
if self.bs:
self.bs.close()
def on_connect(self) -> None:
pass
def _m_ver(self, trid: str, *args: str) -> None:
if self.dialect != 0:
self.send_reply(Err.NotExpected, trid)
self.close()
return
dialects = [a.upper() for a in args]
try:
_ = int(trid)
except ValueError:
self.close()
d = None
for d in MSNP_DIALECTS:
if d in dialects: break
if d not in dialects:
self.send_reply('VER', trid, 0)
self.close()
return
self.dialect = int(d[4:])
self.send_reply('VER', trid, d)
def _m_cvr(self, trid: str, *args: str) -> None:
v = args[5]
self.send_reply('CVR', trid, v, v, '1.0.0000', 'https://crosstalk.im/downloads', 'https://crosstalk.im/compat')
def _m_inf(self, trid: str) -> None:
if self.dialect < 9:
self.send_reply('INF', trid, 'MD5')
else:
self.close()
return
def __getattr__(self, name):
if name.startswith('_m_') and name not in ('_m_ver', '_m_cvr', '_m_inf'):
def handler(trid: str, *args: str):
if self.dialect == 2:
self.send_reply('XFR', trid, 'NS', f'{settings.TARGET_IP}:1864')
elif self.dialect <= 6:
self.send_reply('XFR', trid, 'NS', f'{settings.TARGET_IP}:1864', '0')
elif self.dialect <= 13:
self.send_reply('XFR', trid, 'NS', f'{settings.TARGET_IP}:1864', '0', f'{settings.TARGET_IP}:1863')
elif self.dialect <= 19:
self.send_reply('XFR', trid, 'NS', f'{settings.TARGET_IP}:1864', 'U', 'D')
else:
self.send_reply('XFR', trid, 'NS', f'{settings.TARGET_IP}:1864', 'U', 'D', 'VmVyc2lvbjogMQ0KWGZyQ291bnQ6IDINCg==')
asyncio.create_task(self._close_conn())
return handler
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
async def _close_conn(self):
await asyncio.sleep(5)
self.close()