New IRCContext, Channel and User classes

Right now these don't do anything, but in the future they will be how
we'll do channel and user handling.
This commit is contained in:
Vgr E. Barry 2016-10-25 18:20:31 -04:00
parent 0ad4af4240
commit 5ec273c6e0
3 changed files with 732 additions and 0 deletions

223
src/channels.py Normal file
View File

@ -0,0 +1,223 @@
import time
from src.context import IRCContext, Features
from src.logger import debuglog
from src import users
Main = None # main channel
all_channels = {}
_states = ("not yet joined", "pending join", "joined", "pending leave", "left channel", "", "deleted", "cleared")
def _strip(name):
return name.lstrip("".join(Features["STATUSMSG"]))
def predicate(name):
return not name.startswith(tuple(Features["CHANTYPES"]))
def get(name):
"""Return an existing channel, or raise a KeyError if it doesn't exist."""
return all_channels[_strip(name)]
def add(name, cli):
"""Add and return a new channel, or an existing one if it exists."""
name = _strip(name)
if name in all_channels:
if cli is not all_channels[name].client:
raise RuntimeError("different IRC client for channel {0}".format(name))
return all_channels[name]
cls = Channel
if predicate(name):
cls = FakeChannel
chan = all_channels[name] = cls(name, cli)
chan.join()
return chan
def exists(name):
"""Return True if a channel with the name exists, False otherwise."""
return _strip(name) in all_channels
class Channel(IRCContext):
is_channel = True
def __init__(self, name, client, **kwargs):
super().__init__(name, client, **kwargs)
self.users = set()
self.modes = {}
self.timestamp = None
self.state = 0
def __del__(self):
self.users.clear()
self.modes.clear()
self.state = -2
self.client = None
self.timestamp = None
def __str__(self):
return "{self.__class__.__name__}: {self.name} ({0})".format(_states[self.state], self=self)
def __repr__(self):
return "{self.__class__.__name__}({self.name!r})".format(self=self)
def join(self, key=""):
if self.state in (0, 4):
self.state = 1
self.client.send("JOIN {0} :{1}".format(self.name, key))
def part(self, message=""):
if self.state == 2:
self.state = 3
self.client.send("PART {0} :{1}".format(self.name, message))
def kick(self, target, message=""):
if self.state == 2:
self.client.send("KICK {0} {1} :{2}".format(self.name, target, message))
def mode(self, *changes):
if not changes:
self.client.send("MODE", self.name)
return
max_modes = Features["MODES"]
params = []
for change in changes:
if isinstance(change, str):
change = (change, None)
params.append(change)
params.sort(key=lambda x: x[0][0])
while params:
cur, params = params[:max_modes], params[max_modes:]
modes, targets = zip(*cur)
prefix = ""
final = []
for mode in modes:
if mode[0] == prefix:
mode = mode[1:]
elif mode.startswith(("+", "-")):
prefix = mode[0]
final.append(mode)
for target in targets:
if target is not None:
final.append(" ")
final.append(target)
self.client.send("MODE", self.name, "".join(final))
def update_modes(self, rawnick, mode, targets):
set_time = int(time.time()) # for list modes timestamp
list_modes, all_set, only_set, no_set = Features["CHANMODES"]
status_modes = Features["PREFIX"].values()
i = 0
for c in mode:
if c in ("+", "-"):
prefix = c
continue
if prefix == "+":
if c in status_modes:
if c not in self.modes:
self.modes[c] = set()
user = users.get(targets[i], allow_bot=True)
self.modes[c].add(user)
user.channels[self].add(c)
i += 1
elif c in list_modes:
if c not in self.modes:
self.modes[c] = {}
self.modes[c][targets[i]] = (rawnick, set_time)
i += 1
else:
if c in no_set:
targ = None
else:
targ = targets[i]
i += 1
if c in only_set and targ.isdigit(): # +l/+j
targ = int(targ)
self.modes[c] = targ
else:
if c in status_modes:
if c in self.modes:
user = users.get(targets[i], allow_bot=True)
self.modes[c].discard(user)
user.channels[self].discard(c)
if not self.modes[c]:
del self.modes[c]
i += 1
elif c in list_modes:
if c in self.modes:
self.modes[c].pop(targets[i], None)
if not self.modes[c]:
del self.modes[c]
i += 1
else:
if c in all_set:
i += 1 # -k needs a target, but we don't care about it
del self.modes[c]
def remove_user(self, user):
self.users.remove(user)
for mode in Features["PREFIX"].values():
if mode in self.modes:
self.modes[mode].discard(user)
if not self.modes[mode]:
del self.modes[mode]
del user.channels[self]
def clear(self):
for user in self.users:
del user.channels[self]
self.users.clear()
self.modes.clear()
self.state = -1
self.timestamp = None
del all_channels[self.name]
class FakeChannel(Channel):
is_fake = True
def join(self, key=""):
pass # don't actually do anything
def part(self, message=""):
pass
def send(self, data, *, notice=False, privmsg=False):
debuglog("Would message fake channel {0}: {1!r}".format(self.name, data))
def mode(self, *changes):
if not changes:
return
modes = []
targets = []
for change in changes:
if isinstance(change, str):
modes.append(change)
else:
mode, target = change
modes.append(mode)
if target is not None:
targets.append(target)
self.update_modes(users.Bot.rawnick, "".join(modes), targets)

69
src/context.py Normal file
View File

@ -0,0 +1,69 @@
Features = {"CASEMAPPING": "rfc1459", "CHARSET": "utf-8", "STATUSMSG": {"@", "+"}, "CHANTYPES": {"#"}} # IRC server features (these are defaults)
def lower(nick):
if nick is None:
return None
if isinstance(nick, IRCContext):
return nick.lower()
mapping = {
"[": "{",
"]": "}",
"\\": "|",
"^": "~",
}
if Features["CASEMAPPING"] == "strict-rfc1459":
mapping.pop("^")
elif Features["CASEMAPPING"] == "ascii":
mapping.clear()
return nick.lower().translate(str.maketrans(mapping))
class IRCContext:
"""Base class for channels and users."""
is_channel = False
is_user = False
is_fake = False
def __init__(self, name, client, *, ref=None):
self.name = name
self.client = client
self.ref = ref
def lower(self):
return type(self)(lower(name), client, ref=(self.ref or ref))
def get_send_type(self, *, is_notice=False, is_privmsg=False):
if is_notice and not is_privmsg:
return "NOTICE"
return "PRIVMSG"
@staticmethod
def raw_send(data, client, send_type, name):
full_address = "{cli.nickname}!{cli.ident}@{cli.hostmask}".format(cli=client)
# Maximum length of sent data is 0x200 (512) bytes. However,
# we have to reduce the maximum length allowed to account for:
# 1 (1) - The initial colon at the front of the data
# 2 (1) - The space between the command and the target
# 2 (1) - The space between the target and the data
# 3 (1) - The colon at the front of the data to send
# 4 (3) - I don't know why, but we need 3 more/less characters
length = 0x200 - 7
# Next, we need to reduce the length to account for our address
length -= len(full_address)
# Then we also need to account for the target's length
length -= len(name)
# Finally, we need to account for the send type's length
length -= len(send_type)
for line in data.splitlines():
while line:
extra, line = line[:length], line[length:]
client.send("{0} {1} :{2}".format(send_type, name, extra))
def send(self, data, target=None, *, notice=False, privmsg=False):
send_type = self.get_send_type(is_notice=notice, is_privmsg=privmsg)
self.raw_send(data, self.client, send_type, self.name)

440
src/users.py Normal file
View File

@ -0,0 +1,440 @@
from collections import defaultdict
from weakref import WeakSet
import fnmatch
import re
from src.context import IRCContext, Features, lower
from src.logger import debuglog
from src import settings as var
from src import db
import botconfig
Bot = None # bot instance
all_users = WeakSet()
_arg_msg = "(nick={0}, ident={1}, host={2}, realname={3}, account={4}, allow_bot={5})"
predicate = re.compile(r"^[0-9]+$").search
def get(nick=None, ident=None, host=None, realname=None, account=None, *, allow_multiple=False, allow_none=False, allow_bot=False, raw_nick=False):
"""Return the matching user(s) from the user list.
This takes up to 5 positional arguments (nick, ident, host, realname,
account) and may take up to four keyword-only arguments:
- allow_multiple (defaulting to False) allows multiple matches,
and returns a list, even if there's only one match;
- allow_none (defaulting to False) allows no match at all, and
returns None instead of raising an error; an empty list will be
returned if this is used with allow_multiple;
- allow_bot (defaulting to False) allows the bot to be matched and
returned;
- raw_nick (defaulting to False) means that the nick has not been
yet parsed, and so ident and host will be None, and nick will be
a raw nick of the form nick!ident@host.
If allow_multiple is not set and multiple users match, a ValueError
will be raised. If allow_none is not set and no users match, a KeyError
will be raised.
"""
if raw_nick:
if ident is not None or host is not None:
raise ValueError("ident and host need to be None if raw_nick is True")
nick, ident, host = parse_rawnick(nick)
potential = []
users = set(all_users)
if allow_bot:
users.add(Bot)
for user in users:
if nick is not None and user.nick != nick:
continue
if ident is not None and user.ident != ident:
continue
if host is not None and user.host != host:
continue
if realname is not None and user.realname != realname:
continue
if account is not None and user.account != account:
continue
if not potential or allow_multiple:
potential.append(user)
else:
raise ValueError("More than one user matches: " +
_arg_msg.format(nick, ident, host, realname, account, allow_bot))
if not potential and not allow_none:
raise KeyError(_arg_msg.format(nick, ident, host, realname, account, allow_bot))
if allow_multiple:
return potential
if not potential: # allow_none
return None
return potential[0]
def add(cli, *, nick, ident=None, host=None, realname=None, account=None, channels=None, raw_nick=False):
"""Create a new user, add it to the user list and return it.
This function takes up to 6 keyword-only arguments (and no positional
arguments): nick, ident, host, realname, account and channels.
With the exception of the first one, any parameter can be omitted.
If a matching user already exists, a ValueError will be raised.
The raw_nick keyword argument may be set if the nick has not yet
been parsed. In that case, ident and host must both be None, and
nick must be in the form nick!ident@host.
"""
if raw_nick:
if ident is not None or host is not None:
raise ValueError("ident and host need to be None if raw_nick is True")
nick, ident, host = parse_rawnick(nick)
if exists(nick, ident, host, realname, account, allow_multiple=True, allow_bot=True):
raise ValueError("User already exists: " + _arg_msg.format(nick, ident, host, realname, account, True))
if channels is None:
channels = {}
else:
channels = dict(channels)
cls = User
if predicate(nick):
cls = FakeUser
new = cls(cli, nick, ident, host, realname, account, channels)
all_users.add(new)
return new
def exists(*args, allow_none=False, **kwargs):
"""Return True if a matching user exists.
Positional and keyword arguments are the same as get(), with the
exception that allow_none may not be used (a RuntimeError will be
raised in that case).
"""
if allow_none: # why would you even want to do that?
raise RuntimeError("Cannot use allow_none=True with exists()")
try:
get(*args, **kwargs)
except (KeyError, ValueError):
return False
return True
_raw_nick_pattern = re.compile(
r"""
\A
(?P<nick> [^!@\s]+ (?=!|$) )? !?
(?P<ident> [^!@\s]+ )? @?
(?P<host> \S+ )?
\Z
""",
re.VERBOSE
)
def parse_rawnick(rawnick, *, default=None):
"""Return a tuple of (nick, ident, host) from rawnick."""
return _raw_nick_pattern.search(rawnick).groups(default)
def parse_rawnick_as_dict(rawnick, *, default=None):
"""Return a dict of {"nick": nick, "ident": ident, "host": host}."""
return _raw_nick_pattern.search(rawnick).groupdict(default)
def equals(nick1, nick2):
return lower(nick1) == lower(nick2)
class User(IRCContext):
is_user = True
_messages = defaultdict(list)
def __init__(self, cli, nick, ident, host, realname, account, channels, **kwargs):
super().__init__(nick, cli, **kwargs)
self.nick = nick
self.ident = ident
self.host = host
self.realname = realname
self.account = account
self.channels = channels
def __str__(self):
return "{self.__class__.__name__}: {self.nick}!{self.ident}@{self.host}#{self.realname}:{self.account}".format(self=self)
def __repr__(self):
return "{self.__class__.__name__}({self.nick!r}, {self.ident!r}, {self.host!r}, {self.realname!r}, {self.account!r}, {self.channels!r})".format(self=self)
def lower(self):
return type(self)(self.client, lower(self.nick), lower(self.ident), lower(self.host), lower(self.realname), lower(self.account), channels, ref=(self.ref or self))
def is_owner(self):
if self.is_fake:
return False # fake nicks can't ever be owner
hosts = set(botconfig.OWNERS)
accounts = set(botconfig.OWNERS_ACCOUNTS)
if not var.DISABLE_ACCOUNTS and self.account is not None:
for pattern in accounts:
if fnmatch.fnmatch(lower(self.account), lower(pattern)):
return True
for hostmask in hosts:
if match_hostmask(hostmask, self.nick, self.ident, self.host):
return True
return False
def is_admin(self):
if self.is_fake:
return False # they can't be admin, either
flags = var.FLAGS[self.rawnick] + var.FLAGS_ACCS[self.account]
if "F" not in flags:
try:
hosts = set(botconfig.ADMINS)
accounts = set(botconfig.ADMINS_ACCOUNTS)
if not var.DISABLE_ACCOUNTS and self.account is not None:
for pattern in accounts:
if fnmatch.fnmatch(lower(self.account), lower(pattern)):
return True
for hostmask in hosts:
if match_hostmask(hostmask, self.nick, self.ident, self.host):
return True
except AttributeError:
pass
return self.is_owner()
return True
def get_send_type(self, *, is_notice=False, is_privmsg=False):
if is_privmsg:
return "PRIVMSG"
if is_notice:
return "NOTICE"
if self.prefers_notice():
return "NOTICE"
return "PRIVMSG"
def match_hostmask(self, hostmask):
"""Match n!u@h, u@h, or just h by itself."""
nick, ident, host = re.match("(?:(?:(.*?)!)?(.*?)@)?(.*)", hostmask).groups("")
temp = self.lower()
return (fnmatch.fnmatch(temp.nick, lower(nick)) and
fnmatch.fnmatch(temp.ident, lower(ident)) and
fnmatch.fnmatch(temp.host, lower(host)))
def prefers_notice(self):
temp = self.lower()
if temp.account in var.PREFER_NOTICE_ACCS:
return True
if not var.ACCOUNTS_ONLY:
for hostmask in var.PREFER_NOTICE:
if temp.match_hostmask(hostmask):
return True
return False
def prefers_simple(self):
temp = self.lower()
if temp.account in var.SIMPLE_NOTIFY_ACCS:
return True
if not var.ACCOUNTS_ONLY:
for hostmask in var.SIMPLE_NOTIFY:
if temp.match_hostmask(hostmask):
return True
return False
def get_pingif_count(self):
temp = self.lower()
if not var.DISABLE_ACCOUNTS and temp.account is not None:
if temp.account in var.PING_IF_PREFS_ACCS:
return var.PING_IF_PREFS_ACCS[temp.account]
elif not var.ACCOUNTS_ONLY:
for hostmask, pref in var.PING_IF_PREFS.items():
if temp.match_hostmask(hostmask):
return pref
return 0
def set_pingif_count(self, value, old=None):
temp = self.lower()
if not value:
if not var.DISABLE_ACCOUNTS and temp.account:
if temp.account in var.PING_IF_PREFS_ACCS:
del var.PING_IF_PREFS_ACCS[temp.account]
db.set_pingif(0, temp.account, None)
if old is not None:
with var.WARNING_LOCK:
if old in var.PING_IF_NUMS_ACCS:
var.PING_IF_NUMS_ACCS[old].discard(temp.account)
if not var.ACCOUNTS_ONLY:
for hostmask in list(var.PING_IF_PREFS):
if temp.match_hostmask(hostmask):
del var.PING_IF_PREFS[hostmask]
db.set_pingif(0, None, hostmask)
if old is not None:
with var.WARNING_LOCK:
if old in var.PING_IF_NUMS:
var.PING_IF_NUMS[old].discard(hostmask)
var.PING_IF_NUMS[old].discard(temp.host)
else:
if not var.DISABLE_ACCOUNTS and temp.account:
var.PING_IF_PREFS[temp.account] = value
db.set_pingif(value, temp.account, None)
with var.WARNING_LOCK:
if value not in var.PING_IF_NUMS_ACCS:
var.PING_IF_NUMS_ACCS[value] = set()
var.PING_IF_NUMS_ACCS[value].add(temp.account)
if old is not None:
if old in var.PING_IF_NUMS_ACCS:
var.PING_IF_NUMS_ACCS[old].discard(temp.account)
elif not var.ACCOUNTS_ONLY:
var.PING_IF_PREFS[temp.userhost] = value
db.set_pingif(value, None, temp.userhost)
with var.WARNING_LOCK:
if value not in var.PING_IF_NUMS:
var.PING_IF_NUMS[value] = set()
var.PING_IF_NUMS[value].add(temp.userhost)
if old is not None:
if old in var.PING_IF_NUMS:
var.PING_IF_NUMS[old].discard(temp.host)
var.PING_IF_NUMS[old].discard(temp.userhost)
def wants_deadchat(self):
temp = self.lower()
if temp.account in var.DEADCHAT_PREFS_ACCS:
return False
elif var.ACCOUNTS_ONLY:
return True
elif temp.host in var.DEADCHAT_PREFS:
return False
return True
def stasis_count(self):
"""Return the number of games the user is in stasis for."""
temp = self.lower()
amount = 0
if not var.DISABLE_ACCOUNTS:
amount = var.STASISED_ACCS.get(temp.account, 0)
for hostmask in var.STASISED:
if temp.match_hostmask(hostmask):
amount = max(amount, var.STASISED[hostmask])
return amount
def queue_message(self, message):
self._messages[message].append(self)
@classmethod
def send_messages(cls, *, notice=False, privmsg=False):
for message, targets in cls._messages.items():
send_types = defaultdict(list)
for target in targets:
send_types[target.get_send_type(is_notice=notice, is_privmsg=privmsg)].append(target)
for send_type, targets in send_types.items():
max_targets = Features["TARGMAX"][send_type]
while targets:
using, targets = targets[:max_targets], targets[max_targets:]
cls.raw_send(message, targets[0].client, send_type, ",".join([t.nick for t in using]))
cls._messages.clear()
@property
def nick(self): # name should be the same as nick (for length calculation)
return self.name
@nick.setter
def nick(self, nick):
self.name = nick
if self is Bot: # update the client's nickname as well
self.client.nickname = nick
@property
def account(self): # automatically converts "0" and "*" to None
return self._account
@account.setter
def account(self, account):
if account in ("0", "*"):
account = None
self._account = account
@property
def rawnick(self):
if self.nick is None or self.ident is None or self.host is None:
return None
return "{self.nick}!{self.ident}@{self.host}".format(self=self)
@rawnick.setter
def rawnick(self, rawnick):
self.nick, self.ident, self.host = parse_rawnick(rawnick)
@property
def userhost(self):
if self.ident is None or self.host is None:
return None
return "{self.ident}@{self.host}".format(self=self)
@userhost.setter
def userhost(self, userhost):
nick, self.ident, self.host = parse_rawnick(userhost)
class FakeUser(User):
is_fake = True
def queue_message(self, message):
self.send(message) # don't actually queue it
def send(self, data, *, notice=False, privmsg=False):
debuglog("Would message fake user {0}: {1!r}".format(self.nick, data))
@property
def rawnick(self):
return self.nick # we don't have a raw nick
@rawnick.setter
def rawnick(self, rawnick):
self.nick = parse_rawnick_as_dict(rawnick)["nick"]