Merge pull request #248 from lykoss/context-base-stuff

New IRCContext, Channel and User classes
This commit is contained in:
Ryan Schmidt 2016-10-27 15:26:45 -07:00 committed by GitHub
commit af96a2544e
5 changed files with 762 additions and 1 deletions

View File

@ -1 +1,2 @@
typing
enum

271
src/channels.py Normal file
View File

@ -0,0 +1,271 @@
import time
from enum import Enum
from src.context import IRCContext, Features
from src.logger import debuglog
from src import users
Main = None # main channel
_channels = {}
_states = ("not yet joined", "pending join", "joined", "pending leave", "left channel", "", "deleted", "cleared")
class _States(Enum):
NotJoined = "not yet joined"
PendingJoin = "pending join"
Joined = "joined"
PendingLeave = "pending leave"
Left = "left channel"
Deleted = "deleted"
Cleared = "cleared"
# This is used to tell if this is a fake channel or not. If this
# function returns a true value, then it's a fake channel. This is
# useful for testing, where we might want the users in fake channels.
def predicate(name):
return not name.startswith(tuple(Features["CHANTYPES"]))
get = _channels.__getitem__
def add(name, cli, key=""):
"""Add and return a new channel, or an existing one if it exists."""
# We use add() in a bunch of places where the channel probably (but
# not surely) already exists. If it does, obviously we want to use
# that one. However, if the client is *not* the same, that means we
# would be trying to send something from one connection over to
# another one (or some other weird stuff like that). Instead of
# jumping through hoops, we just disallow it here.
if name in _channels:
if cli is not _channels[name].client:
raise RuntimeError("different IRC client for channel {0}".format(name))
return _channels[name]
cls = Channel
if predicate(name):
cls = FakeChannel
chan = _channels[name] = cls(name, cli)
chan.join(key)
return chan
exists = _channels.__contains__
def channels():
"""Iterate over all the current channels."""
yield from _channels.values()
class Channel(IRCContext):
is_channel = True
def __init__(self, name, client, **kwargs):
super().__init__(name, client, **kwargs)
self.users = set()
self.modes = {}
self.timestamp = None
self.state = _States.NotJoined
def __del__(self):
self.users.clear()
self.modes.clear()
self.state = _States.Deleted
self.client = None
self.timestamp = None
def __str__(self):
return "{self.__class__.__name__}: {self.name} ({0})".format(self.state.value, self=self)
def __repr__(self):
return "{self.__class__.__name__}({self.name!r})".format(self=self)
def join(self, key=""):
if self.state in (_States.NotJoined, _States.Left):
self.state = _States.PendingJoin
self.client.send("JOIN {0} :{1}".format(self.name, key))
def part(self, message=""):
if self.state is _States.Joined:
self.state = _States.PendingLeave
self.client.send("PART {0} :{1}".format(self.name, message))
def kick(self, target, message=""):
if self.state is _States.Joined:
self.client.send("KICK {0} {1} :{2}".format(self.name, target, message))
def mode(self, *changes):
"""Perform a mode change on the channel.
Usage:
chan.mode() # Will get back the modes on the channel
chan.mode("b") # Will get the banlist back
chan.mode("-m")
chan.mode(["-v", "woffle"], ["+o", "Vgr"])
chan.mode("-m", ("+v", "jacob1"), ("-o", "nyuszika7h"))
This performs both single and complex mode changes.
Note: Not giving a prefix with the mode is the same as giving a
'+ prefix. For instance, the following are identical:
chan.mode(("+o", "woffle"))
chan.mode(("o", "woffle"))
"""
if not changes: # bare call; get channel modes
self.client.send("MODE", self.name)
return
max_modes = Features["MODES"]
params = []
for change in changes:
if isinstance(change, str):
change = (change, None)
mode, target = change
if len(mode) < 2:
mode = "+" + mode
params.append((mode, target))
params.sort(key=lambda x: x[0][0]) # sort by prefix
while params:
cur, params = params[:max_modes], params[max_modes:]
modes, targets = zip(*cur)
prefix = ""
final = []
for mode in modes:
if mode[0] == prefix:
mode = mode[1:]
elif mode.startswith(("+", "-")):
prefix = mode[0]
final.append(mode)
for target in targets:
if target is not None: # target will be None if the mode is parameter-less
final.append(" ")
final.append(target)
self.client.send("MODE", self.name, "".join(final))
def update_modes(self, rawnick, mode, targets):
"""Update the channel's mode registry with the new modes.
This is called whenever a MODE event is received. All of the
modes are kept up-to-date in the channel, even if we don't need
it. For instance, banlists are updated properly when the bot
receives them. We don't need all the mode information, but it's
better to have everything stored than only some parts.
"""
set_time = int(time.time()) # for list modes timestamp
list_modes, all_set, only_set, no_set = Features["CHANMODES"]
status_modes = Features["PREFIX"].values()
i = 0
for c in mode:
if c in ("+", "-"):
prefix = c
continue
if prefix == "+":
if c in status_modes: # op/voice status; keep it here and update the user's registry too
if c not in self.modes:
self.modes[c] = set()
user = users.get(targets[i], allow_bot=True)
self.modes[c].add(user)
user.channels[self].add(c)
i += 1
elif c in list_modes: # stuff like bans, quiets, and ban and invite exempts
if c not in self.modes:
self.modes[c] = {}
self.modes[c][targets[i]] = (rawnick, set_time)
i += 1
else:
if c in no_set: # everything else; e.g. +m, +i, +f, etc.
targ = None
else:
targ = targets[i]
i += 1
if c in only_set and targ.isdigit(): # +l/+j
targ = int(targ)
self.modes[c] = targ
else:
if c in status_modes:
if c in self.modes:
user = users.get(targets[i], allow_bot=True)
self.modes[c].discard(user)
user.channels[self].discard(c)
if not self.modes[c]:
del self.modes[c]
i += 1
elif c in list_modes:
if c in self.modes:
self.modes[c].pop(targets[i], None)
if not self.modes[c]:
del self.modes[c]
i += 1
else:
if c in all_set:
i += 1 # -k needs a target, but we don't care about it
del self.modes[c]
def remove_user(self, user):
self.users.remove(user)
for mode in Features["PREFIX"].values():
if mode in self.modes:
self.modes[mode].discard(user)
if not self.modes[mode]:
del self.modes[mode]
del user.channels[self]
def _clear(self):
for user in self.users:
del user.channels[self]
self.users.clear()
self.modes.clear()
self.state = _States.Cleared
self.timestamp = None
del _channels[self.name]
class FakeChannel(Channel):
is_fake = True
def join(self, key=""):
pass # don't actually do anything
def part(self, message=""):
pass
def send(self, data, *, notice=False, privmsg=False):
debuglog("Would message fake channel {0}: {1!r}".format(self.name, data))
def mode(self, *changes):
if not changes:
return
modes = []
targets = []
for change in changes:
if isinstance(change, str):
modes.append(change)
else:
mode, target = change
modes.append(mode)
if target is not None:
targets.append(target)
self.update_modes(users.Bot.rawnick, "".join(modes), targets)

70
src/context.py Normal file
View File

@ -0,0 +1,70 @@
Features = {"CASEMAPPING": "rfc1459", "CHARSET": "utf-8", "STATUSMSG": {"@", "+"}, "CHANTYPES": {"#"}}
def lower(nick):
if nick is None:
return None
if isinstance(nick, IRCContext):
return nick.lower()
mapping = {
"[": "{",
"]": "}",
"\\": "|",
"^": "~",
}
if Features["CASEMAPPING"] == "strict-rfc1459":
mapping.pop("^")
elif Features["CASEMAPPING"] == "ascii":
mapping.clear()
return nick.lower().translate(str.maketrans(mapping))
class IRCContext:
"""Base class for channels and users."""
is_channel = False
is_user = False
is_fake = False
def __init__(self, name, client, *, ref=None):
self.name = name
self.client = client
self.ref = ref
def lower(self):
return type(self)(lower(name), client, ref=(self.ref or ref))
def get_send_type(self, *, is_notice=False, is_privmsg=False):
if is_notice and not is_privmsg:
return "NOTICE"
return "PRIVMSG"
@staticmethod
def _send(data, client, send_type, name):
full_address = "{cli.nickname}!{cli.ident}@{cli.hostmask}".format(cli=client)
# Maximum length of sent data is 512 bytes. However, we have to
# reduce the maximum length allowed to account for:
# 1 (1) - The initial colon at the front of the data
# 2 (1) - The space between the sender (us) and the command
# 3 (1) - The space between the command and the target
# 4 (1) - The space between the target and the data
# 5 (1) - The colon at the front of the data to send
# 6 (2) - The trailing \r\n
length = 512 - 7
# Next, we need to reduce the length to account for our address
length -= len(full_address)
# Then we also need to account for the target's length
length -= len(name)
# Finally, we need to account for the send type's length
length -= len(send_type)
for line in data.splitlines():
while line:
extra, line = line[:length], line[length:]
client.send("{0} {1} :{2}".format(send_type, name, extra))
def send(self, data, target=None, *, notice=False, privmsg=False):
send_type = self.get_send_type(is_notice=notice, is_privmsg=privmsg)
self._send(data, self.client, send_type, self.name)

418
src/users.py Normal file
View File

@ -0,0 +1,418 @@
from collections import defaultdict
from weakref import WeakSet
import fnmatch
import re
from src.context import IRCContext, Features, lower
from src.logger import debuglog
from src import settings as var
from src import db
import botconfig
Bot = None # bot instance
_users = WeakSet()
_arg_msg = "(nick={0}, ident={1}, host={2}, realname={3}, account={4}, allow_bot={5})"
# This is used to tell if this is a fake nick or not. If this function
# returns a true value, then it's a fake nick. This is useful for
# testing, where we might want everyone to be fake nicks.
predicate = re.compile(r"^[0-9]+$").search
def get(nick=None, ident=None, host=None, realname=None, account=None, *, allow_multiple=False, allow_none=False, allow_bot=False):
"""Return the matching user(s) from the user list.
This takes up to 5 positional arguments (nick, ident, host, realname,
account) and may take up to three keyword-only arguments:
- allow_multiple (defaulting to False) allows multiple matches,
and returns a list, even if there's only one match;
- allow_none (defaulting to False) allows no match at all, and
returns None instead of raising an error; an empty list will be
returned if this is used with allow_multiple;
- allow_bot (defaulting to False) allows the bot to be matched and
returned;
If allow_multiple is not set and multiple users match, a ValueError
will be raised. If allow_none is not set and no users match, a KeyError
will be raised.
"""
if ident is None and host is None and nick is not None:
nick, ident, host = parse_rawnick(nick)
potential = []
users = set(_users)
if allow_bot:
users.add(Bot)
for user in users:
if nick is not None and user.nick != nick:
continue
if ident is not None and user.ident != ident:
continue
if host is not None and user.host != host:
continue
if realname is not None and user.realname != realname:
continue
if account is not None and user.account != account:
continue
if not potential or allow_multiple:
potential.append(user)
else:
raise ValueError("More than one user matches: " +
_arg_msg.format(nick, ident, host, realname, account, allow_bot))
if not potential and not allow_none:
raise KeyError(_arg_msg.format(nick, ident, host, realname, account, allow_bot))
if allow_multiple:
return potential
if not potential: # allow_none
return None
return potential[0]
def add(cli, *, nick, ident=None, host=None, realname=None, account=None, channels=None):
"""Create a new user, add it to the user list and return it.
This function takes up to 6 keyword-only arguments (and no positional
arguments): nick, ident, host, realname, account and channels.
With the exception of the first one, any parameter can be omitted.
If a matching user already exists, a ValueError will be raised.
"""
if ident is None and host is None and nick is not None:
nick, ident, host = parse_rawnick(nick)
if exists(nick, ident, host, realname, account, allow_multiple=True, allow_bot=True):
raise ValueError("User already exists: " + _arg_msg.format(nick, ident, host, realname, account, True))
if channels is None:
channels = {}
else:
channels = dict(channels)
cls = User
if predicate(nick):
cls = FakeUser
new = cls(cli, nick, ident, host, realname, account, channels)
_users.add(new)
return new
def exists(nick=None, ident=None, host=None, realname=None, account=None, *, allow_multiple=False, allow_bot=False):
"""Return True if a matching user exists.
Positional and keyword arguments are the same as get(), with the
exception that allow_none may not be used (a RuntimeError will be
raised in that case).
"""
try:
get(nick, ident, host, realname, account, allow_multiple=allow_multiple, allow_bot=allow_bot)
except (KeyError, ValueError):
return False
return True
def users():
"""Iterate over the users in the registry."""
yield from _users
_raw_nick_pattern = re.compile(r"^(?P<nick>.+?)(?:!(?P<ident>.+?)@(?P<host>.+))?$")
def parse_rawnick(rawnick, *, default=None):
"""Return a tuple of (nick, ident, host) from rawnick."""
return _raw_nick_pattern.search(rawnick).groups(default)
def parse_rawnick_as_dict(rawnick, *, default=None):
"""Return a dict of {"nick": nick, "ident": ident, "host": host}."""
return _raw_nick_pattern.search(rawnick).groupdict(default)
def equals(nick1, nick2):
return lower(nick1) == lower(nick2)
class User(IRCContext):
is_user = True
_messages = defaultdict(list)
def __init__(self, cli, nick, ident, host, realname, account, channels, **kwargs):
super().__init__(nick, cli, **kwargs)
self.nick = nick
self.ident = ident
self.host = host
self.realname = realname
self.account = account
self.channels = channels
def __str__(self):
return "{self.__class__.__name__}: {self.nick}!{self.ident}@{self.host}#{self.realname}:{self.account}".format(self=self)
def __repr__(self):
return "{self.__class__.__name__}({self.nick!r}, {self.ident!r}, {self.host!r}, {self.realname!r}, {self.account!r}, {self.channels!r})".format(self=self)
def lower(self):
return type(self)(self.client, lower(self.nick), lower(self.ident), lower(self.host), lower(self.realname), lower(self.account), channels, ref=(self.ref or self))
def is_owner(self):
if self.is_fake:
return False
hosts = set(botconfig.OWNERS)
accounts = set(botconfig.OWNERS_ACCOUNTS)
if not var.DISABLE_ACCOUNTS and self.account is not None:
for pattern in accounts:
if fnmatch.fnmatch(lower(self.account), lower(pattern)):
return True
for hostmask in hosts:
if match_hostmask(hostmask, self.nick, self.ident, self.host):
return True
return False
def is_admin(self):
if self.is_fake:
return False
flags = var.FLAGS[self.rawnick] + var.FLAGS_ACCS[self.account]
if "F" not in flags:
try:
hosts = set(botconfig.ADMINS)
accounts = set(botconfig.ADMINS_ACCOUNTS)
if not var.DISABLE_ACCOUNTS and self.account is not None:
for pattern in accounts:
if fnmatch.fnmatch(lower(self.account), lower(pattern)):
return True
for hostmask in hosts:
if match_hostmask(hostmask, self.nick, self.ident, self.host):
return True
except AttributeError:
pass
return self.is_owner()
return True
def get_send_type(self, *, is_notice=False, is_privmsg=False):
if is_privmsg:
return "PRIVMSG"
if is_notice:
return "NOTICE"
if self.prefers_notice():
return "NOTICE"
return "PRIVMSG"
def match_hostmask(self, hostmask):
"""Match n!u@h, u@h, or just h by itself."""
nick, ident, host = re.match("(?:(?:(.*?)!)?(.*?)@)?(.*)", hostmask).groups("")
temp = self.lower()
return (fnmatch.fnmatch(temp.nick, lower(nick)) and
fnmatch.fnmatch(temp.ident, lower(ident)) and
fnmatch.fnmatch(temp.host, lower(host)))
def prefers_notice(self):
temp = self.lower()
if temp.account in var.PREFER_NOTICE_ACCS:
return True
if not var.ACCOUNTS_ONLY:
for hostmask in var.PREFER_NOTICE:
if temp.match_hostmask(hostmask):
return True
return False
def prefers_simple(self):
temp = self.lower()
if temp.account in var.SIMPLE_NOTIFY_ACCS:
return True
if not var.ACCOUNTS_ONLY:
for hostmask in var.SIMPLE_NOTIFY:
if temp.match_hostmask(hostmask):
return True
return False
def get_pingif_count(self):
temp = self.lower()
if not var.DISABLE_ACCOUNTS and temp.account is not None:
if temp.account in var.PING_IF_PREFS_ACCS:
return var.PING_IF_PREFS_ACCS[temp.account]
elif not var.ACCOUNTS_ONLY:
for hostmask, pref in var.PING_IF_PREFS.items():
if temp.match_hostmask(hostmask):
return pref
return 0
def set_pingif_count(self, value, old=None):
temp = self.lower()
if not value:
if not var.DISABLE_ACCOUNTS and temp.account:
if temp.account in var.PING_IF_PREFS_ACCS:
del var.PING_IF_PREFS_ACCS[temp.account]
db.set_pingif(0, temp.account, None)
if old is not None:
with var.WARNING_LOCK:
if old in var.PING_IF_NUMS_ACCS:
var.PING_IF_NUMS_ACCS[old].discard(temp.account)
if not var.ACCOUNTS_ONLY:
for hostmask in list(var.PING_IF_PREFS):
if temp.match_hostmask(hostmask):
del var.PING_IF_PREFS[hostmask]
db.set_pingif(0, None, hostmask)
if old is not None:
with var.WARNING_LOCK:
if old in var.PING_IF_NUMS:
var.PING_IF_NUMS[old].discard(hostmask)
var.PING_IF_NUMS[old].discard(temp.host)
else:
if not var.DISABLE_ACCOUNTS and temp.account:
var.PING_IF_PREFS[temp.account] = value
db.set_pingif(value, temp.account, None)
with var.WARNING_LOCK:
if value not in var.PING_IF_NUMS_ACCS:
var.PING_IF_NUMS_ACCS[value] = set()
var.PING_IF_NUMS_ACCS[value].add(temp.account)
if old is not None:
if old in var.PING_IF_NUMS_ACCS:
var.PING_IF_NUMS_ACCS[old].discard(temp.account)
elif not var.ACCOUNTS_ONLY:
var.PING_IF_PREFS[temp.userhost] = value
db.set_pingif(value, None, temp.userhost)
with var.WARNING_LOCK:
if value not in var.PING_IF_NUMS:
var.PING_IF_NUMS[value] = set()
var.PING_IF_NUMS[value].add(temp.userhost)
if old is not None:
if old in var.PING_IF_NUMS:
var.PING_IF_NUMS[old].discard(temp.host)
var.PING_IF_NUMS[old].discard(temp.userhost)
def wants_deadchat(self):
temp = self.lower()
if temp.account in var.DEADCHAT_PREFS_ACCS:
return False
elif var.ACCOUNTS_ONLY:
return True
elif temp.host in var.DEADCHAT_PREFS:
return False
return True
def stasis_count(self):
"""Return the number of games the user is in stasis for."""
temp = self.lower()
amount = 0
if not var.DISABLE_ACCOUNTS:
amount = var.STASISED_ACCS.get(temp.account, 0)
amount = max(amount, var.STASISED.get(temp.userhost, 0))
return amount
def queue_message(self, message):
self._messages[message].append(self)
@classmethod
def send_messages(cls, *, notice=False, privmsg=False):
for message, targets in cls._messages.items():
send_types = defaultdict(list)
for target in targets:
send_types[target.get_send_type(is_notice=notice, is_privmsg=privmsg)].append(target)
for send_type, targets in send_types.items():
max_targets = Features["TARGMAX"][send_type]
while targets:
using, targets = targets[:max_targets], targets[max_targets:]
cls._send(message, targets[0].client, send_type, ",".join([t.nick for t in using]))
cls._messages.clear()
@property
def nick(self): # name should be the same as nick (for length calculation)
return self.name
@nick.setter
def nick(self, nick):
self.name = nick
if self is Bot: # update the client's nickname as well
self.client.nickname = nick
@property
def account(self): # automatically converts "0" and "*" to None
return self._account
@account.setter
def account(self, account):
if account in ("0", "*"):
account = None
self._account = account
@property
def rawnick(self):
if self.nick is None or self.ident is None or self.host is None:
return None
return "{self.nick}!{self.ident}@{self.host}".format(self=self)
@rawnick.setter
def rawnick(self, rawnick):
self.nick, self.ident, self.host = parse_rawnick(rawnick)
@property
def userhost(self):
if self.ident is None or self.host is None:
return None
return "{self.ident}@{self.host}".format(self=self)
@userhost.setter
def userhost(self, userhost):
nick, self.ident, self.host = parse_rawnick(userhost)
class FakeUser(User):
is_fake = True
def queue_message(self, message):
self.send(message) # don't actually queue it
def send(self, data, *, notice=False, privmsg=False):
debuglog("Would message fake user {0}: {1!r}".format(self.nick, data))
@property
def rawnick(self):
return self.nick # we don't have a raw nick
@rawnick.setter
def rawnick(self, rawnick):
self.nick = parse_rawnick_as_dict(rawnick)["nick"]

View File

@ -28,7 +28,8 @@ if sys.version_info < (3, 3):
sys.exit(1)
try: # need to manually add dependencies here
import typing
import typing # Python >= 3.5
import enum # Python >= 3.4
except ImportError:
command = "python3"
if os.name == "nt":