mirror of
https://git.ugnet.gay/CrossTalk/azul.git
synced 2026-05-27 14:49:50 +00:00
188 lines
5.8 KiB
Python
188 lines
5.8 KiB
Python
from typing import Dict, Any, Optional
|
|
from datetime import datetime
|
|
import sqlalchemy as sa
|
|
from sqlalchemy.orm import declarative_base
|
|
from HLL import HyperLogLog
|
|
|
|
from core.conn import Conn
|
|
from core.client import Client
|
|
from core.models import User
|
|
from util.json_type import JSONType
|
|
|
|
class Stats:
|
|
__slots__ = ('logged_in', 'by_client', '_conn', '_client_id_cache')
|
|
|
|
logged_in: int
|
|
by_client: Dict[int, Dict[str, Any]]
|
|
_conn: Conn
|
|
_client_id_cache: Optional[Dict[Client, int]]
|
|
|
|
def __init__(self, conn: Conn) -> None:
|
|
self.logged_in = 0
|
|
self.by_client = {}
|
|
self._conn = conn
|
|
self._client_id_cache = None
|
|
|
|
hour = _current_hour()
|
|
with self._conn.session() as sess:
|
|
current = sess.query(CurrentStats).filter(CurrentStats.key == 'current_hour').one_or_none()
|
|
if not current:
|
|
return
|
|
if current.value['hour'] != hour:
|
|
return
|
|
self.by_client = {
|
|
int(client_id): _stats_from_json(stats)
|
|
for client_id, stats in current.value['by_client'].items()
|
|
}
|
|
|
|
def on_login(self) -> None:
|
|
self.logged_in += 1
|
|
|
|
def on_logout(self) -> None:
|
|
self.logged_in -= 1
|
|
|
|
def on_user_active(self, user: User, client: Client) -> None:
|
|
self._collect('users_active', user, client)
|
|
|
|
def on_message_sent(self, user: User, client: Client) -> None:
|
|
self._collect('messages_sent', user, client)
|
|
|
|
def on_message_received(self, user: User, client: Client) -> None:
|
|
self._collect('messages_received', user, client)
|
|
|
|
def _collect(self, stat: str, user: User, client: Client) -> None:
|
|
assert user is not None
|
|
assert client is not None
|
|
if self.by_client is None:
|
|
self.by_client = {}
|
|
bc = self.by_client
|
|
client_id = self._get_client_id(client)
|
|
if client_id not in bc:
|
|
bc[client_id] = {}
|
|
bhc = bc[client_id]
|
|
if stat == 'users_active':
|
|
if stat not in bhc:
|
|
bhc[stat] = HyperLogLog(12)
|
|
bhc[stat].add(user.email)
|
|
else:
|
|
if stat not in bhc:
|
|
bhc[stat] = 0
|
|
bhc[stat] += 1
|
|
|
|
def flush(self) -> None:
|
|
hour = _current_hour()
|
|
now = datetime.utcnow()
|
|
|
|
with self._conn.session() as sess:
|
|
current = sess.query(CurrentStats).filter(CurrentStats.key == 'logged_in').one_or_none()
|
|
if not current:
|
|
current = CurrentStats(key = 'logged_in')
|
|
current.date_updated = now
|
|
current.value = self.logged_in
|
|
sess.add(current)
|
|
sess.flush()
|
|
|
|
current = sess.query(CurrentStats).filter(CurrentStats.key == 'current_hour').one_or_none()
|
|
if not current:
|
|
current = CurrentStats(key = 'current_hour', value = { 'hour': hour })
|
|
|
|
cs_hour = current.value['hour']
|
|
current.date_updated = now
|
|
current.value = self._flush_to_hourly(sess, hour)
|
|
sess.add(current)
|
|
|
|
if cs_hour != hour:
|
|
self.by_client = {}
|
|
|
|
def _flush_to_hourly(self, sess: Any, hour: int) -> Dict[str, Any]:
|
|
for client_id, stats in self.by_client.items():
|
|
hcs_opt = sess.query(HourlyClientStats).filter(HourlyClientStats.hour == hour, HourlyClientStats.client_id == client_id).one_or_none()
|
|
if hcs_opt is None:
|
|
hcs = HourlyClientStats(hour = hour, client_id = client_id)
|
|
else:
|
|
hcs = hcs_opt
|
|
hcs.messages_sent = stats.get('messages_sent') or 0
|
|
hcs.messages_received = stats.get('messages_received') or 0
|
|
if 'users_active' in stats:
|
|
hcs.users_active = stats['users_active'].cardinality()
|
|
else:
|
|
hcs.users_active = 0
|
|
sess.add(hcs)
|
|
return {
|
|
'hour': hour,
|
|
'by_client': {
|
|
client_id: _stats_to_json(stats)
|
|
for client_id, stats in self.by_client.items()
|
|
}
|
|
}
|
|
|
|
def _get_client_id(self, client: Client) -> int:
|
|
if self._client_id_cache is None:
|
|
with self._conn.session() as sess:
|
|
self._client_id_cache = {
|
|
Client.FromJSON(row.data): row.id
|
|
for row in sess.query(DBClient).all()
|
|
}
|
|
if client not in self._client_id_cache:
|
|
with self._conn.session() as sess:
|
|
dbobj = DBClient(data = Client.ToJSON(client))
|
|
sess.add(dbobj)
|
|
sess.flush()
|
|
self._client_id_cache[client] = dbobj.id
|
|
return self._client_id_cache[client]
|
|
|
|
def _stats_to_json(stats: Dict[str, Any]) -> Dict[str, Any]:
|
|
json = {}
|
|
if 'messages_sent' in stats:
|
|
json['messages_sent'] = stats['messages_sent']
|
|
if 'messages_received' in stats:
|
|
json['messages_received'] = stats['messages_received']
|
|
if 'users_active' in stats:
|
|
json['users_active'] = list(stats['users_active'].registers())
|
|
return json
|
|
|
|
def _stats_from_json(json: Dict[str, Any]) -> Dict[str, Any]:
|
|
stats = {}
|
|
if 'messages_sent' in json:
|
|
stats['messages_sent'] = json['messages_sent']
|
|
if 'messages_received' in json:
|
|
stats['messages_received'] = json['messages_received']
|
|
if 'users_active' in json:
|
|
hll = HyperLogLog(12)
|
|
hll.set_registers(bytearray(json['users_active']))
|
|
stats['users_active'] = hll
|
|
return stats
|
|
|
|
def _current_hour() -> int:
|
|
now = datetime.utcnow()
|
|
ts = now.timestamp()
|
|
return int(ts // 3600)
|
|
|
|
class Base(declarative_base()): # type: ignore
|
|
__abstract__ = True
|
|
|
|
class DBClient(Base):
|
|
__tablename__ = 'client'
|
|
|
|
id = sa.Column(sa.Integer, nullable = False, primary_key = True, autoincrement = True)
|
|
data = sa.Column(JSONType, nullable = False)
|
|
|
|
class HourlyClientStats(Base):
|
|
__tablename__ = 'stats_hour_client'
|
|
|
|
hour = sa.Column(sa.BigInteger, nullable = False, primary_key = True)
|
|
client_id = sa.Column(sa.Integer, nullable = False, primary_key = True)
|
|
users_active = sa.Column(sa.Integer, nullable = False, server_default = sa.text('0'))
|
|
messages_sent = sa.Column(sa.Integer, nullable = False, server_default = sa.text('0'))
|
|
messages_received = sa.Column(sa.Integer, nullable = False, server_default = sa.text('0'))
|
|
|
|
__table_args__ = (
|
|
sa.Index('idx_hour', 'hour'),
|
|
)
|
|
|
|
class CurrentStats(Base):
|
|
__tablename__ = 'stats_current'
|
|
|
|
key = sa.Column(sa.String(255), nullable = False, primary_key = True)
|
|
date_updated = sa.Column(sa.DateTime, nullable = False, server_default = sa.text('CURRENT_TIMESTAMP'))
|
|
value = sa.Column(JSONType, nullable = False, default = {}) |