# -*- coding: utf-8 -*- # # Copyright (C) 2013 - 2018 Stefan Wold # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # (This script requires WeeChat 0.4.2 or higher). # # WeeChat script that enables automatic OTP (OATH-TOTP) support for UnderNET's X # and Login on Connect (LoC) authentication. # # The script will generate an OTP and automatically append it when it # notices /msg x@channels.undernet.org login command. # This allows OTP login when using irc.server.*.command to automatically # sign in to the X service when connecting to an undernet server. # # Commands: # /uotp otp [server] # /uotp list # /uotp add # /uotp remove # /uotp enable # /uotp disable SCRIPT_NAME = "undernet_totp" SCRIPT_AUTHOR = "Stefan Wold " SCRIPT_VERSION = "0.4.0" SCRIPT_LICENSE = "GPL3" SCRIPT_DESC = "Automatic OTP (OATH-TOTP) authentication with UnderNET's channel services (X) and Login on Connect (LoC)." SCRIPT_COMMAND = "uotp" HOOKS = {} SETTINGS = { "otp_server_names": ("", "List of undernet server for which to enable OTP, use comma as separator"), "debug" : ("off", "Debug output"), } import_ok = True try: import weechat except ImportError: print "This script must be run under WeeChat." import_ok = False try: import hmac import re from base64 import b32decode from hashlib import sha1 from struct import pack, unpack from time import time from binascii import unhexlify except ImportError as err: print "Missing module(s) for %s: %s" % (SCRIPT_NAME, err) import_ok = False def print_debug(message): if weechat.config_get_plugin('debug') == 'on': weechat.prnt("", "%s DEBUG: %s" % (SCRIPT_NAME, message)) def sprint(message, buffer=""): weechat.prnt(buffer, "%s: %s" % (SCRIPT_NAME, message)) def unhook(hook): global HOOKS if hook in HOOKS: print_debug('Unhooking %s' % hook) weechat.unhook(HOOKS[hook]) del HOOKS[hook] def unhook_all(server): for hook in [server+'.notice', server+'.modifier', server+'.modifier2']: unhook(hook) def hook_all(server): print_debug("hook_all(%s)" % server) global HOOKS notice = server + '.notice' modifier = server + '.modifier' modifier2 = server + '.modifier2' if notice not in HOOKS: HOOKS[notice] = weechat.hook_signal("%s,irc_raw_in_notice" % server, "auth_success_cb", server) if modifier not in HOOKS: HOOKS[modifier] = weechat.hook_modifier("irc_out_privmsg", "totp_login_modifier_cb", server) if modifier2 not in HOOKS: HOOKS[modifier2] = weechat.hook_modifier("irc_out_pass", "totp_login_modifier_cb", server) def totp_login_modifier_cb(data, modifier, server, cmd): if server == data and server in enabled_servers(): if re.match(r'(?i)^(PRIVMSG x@channels.undernet.org :login .+ .+|PASS .*)', cmd): print_debug("totp_login_modifier_cb(%s)" % cmd) otp = generate_totp(server) if otp is not None: cmd += " %s" % otp return cmd def auth_success_cb(server, signal, signal_data): if signal_data.startswith(":X!cservice@undernet.org NOTICE"): if re.match(r'^:X!cservice@undernet.org NOTICE .+ :AUTHENTICATION SUCCESSFUL', signal_data): unhook_all(server) return weechat.WEECHAT_RC_OK def signal_cb(data, signal, server): if server in enabled_servers(): print_debug('signal_cb(%s)' % signal) if signal == 'irc_server_connecting': hook_all(server) elif signal == 'irc_server_disconnected': unhook_all(server) return weechat.WEECHAT_RC_OK def get_otp_cb(data, buffer, server): if server: server = [server] else: server = enabled_servers() for _server in server: otp = generate_totp(_server) if otp is not None: weechat.prnt("", "%s OTP: %s" % (_server, otp)) return weechat.WEECHAT_RC_OK def get_irc_servers(): """ Returns a list of configured IRC servers in weechat""" serverptrlist = weechat.infolist_get('irc_server', '', '') serverlist = [] while weechat.infolist_next(serverptrlist): serverlist.append(weechat.infolist_string(serverptrlist, 'name')) return serverlist def enabled_servers(): """ Return a list of TOTP enabled servers. """ serverlist = get_irc_servers() return [s for s in get_config_as_list('otp_server_names') if s in serverlist] def disabled_servers(): """ Return a list of configured TOTP servers that are currently disabled. """ serverlist = get_irc_servers() server_seed_list = [server for server in serverlist if weechat.string_eval_expression("${sec.data.%s_seed}" % server, {}, {}, {}) and server not in get_config_as_list('otp_server_names')] return [s for s in server_seed_list if s in serverlist] def configured_servers(): """ Return a lost of servers with an existing seed. """ serverlist = get_irc_servers() return [s for s in serverlist if weechat.string_eval_expression("${sec.data.%s_seed}" % s, {}, {}, {})] def generate_totp(server, period=30, buffer=""): print_debug('generate_totp(%s)' % server) seed = weechat.string_eval_expression("${sec.data.%s_seed}" % server, {}, {}, {}) if not seed: sprint("No OATH-TOTP secret set, use: /uotp add %s " % server, buffer) return None if len(seed) == 40: # Assume hex format seed = unhexlify(seed) else: seed = b32decode(seed.replace(" ", ""), True) t = pack(">Q", int(time() / period)) _hmac = hmac.new(seed, t, sha1).digest() o = ord(_hmac[19]) & 15 otp = (unpack(">I", _hmac[o:o+4])[0] & 0x7fffffff) % 1000000 return '%06d' % otp def config_update_cb(data, option, value): """ Reload hooks on configuration change. """ print_debug("config_cb(%s)" % value) [hook_all(s.strip()) for s in value.split(',')] return weechat.WEECHAT_RC_OK def options_cb(data, buffer, args): """ Script configuration callback """ if not args: weechat.command("", "/help %s" % SCRIPT_COMMAND) args = args.strip().split(' ') opt = args[0] opt_args = args[1:] if opt == 'otp': if opt_args: servers = [opt_args[0]] else: servers = enabled_servers() for server in servers: otp = generate_totp(server, buffer=buffer) if otp: sprint("%s = %s" % (server, otp), buffer) elif opt == 'list': sprint("List of configured servers", buffer) for server in enabled_servers(): weechat.prnt(buffer, " - %s [enabled]" % server) for server in disabled_servers(): weechat.prnt(buffer, " - %s [disabled]" % server) elif opt == 'add': if len(opt_args) >= 2: if opt_args[0] not in enabled_servers() and opt_args[0] in get_irc_servers(): #weechat.command("", "/secure set %s_seed %s" % (opt_args[0], opt_args[1])) try: add_server(opt_args[0], opt_args[1:]) sprint("server '%s' was successfully added" % opt_args[0], buffer) except Exception as ex: sprint("invalid TOTP seed provided", buffer) elif opt_args[0] not in get_irc_servers(): sprint("No server named '%s' was found, see /help server" % opt_args[0], buffer) else: sprint("OTP already configured for '%s', to change remove the existing one first." % opt_args[0], buffer) else: sprint("/uotp -- invalid argument, valid command is /uotp add ", buffer) elif opt == 'remove': if opt_args[0] in enabled_servers() or opt_args[0] in disabled_servers(): remove_server(opt_args[0], True) sprint("server '%s' was successfully removed" % opt_args[0], buffer) else: sprint("failed to remove server, '%s' not found" % opt_args[0], buffer) elif opt == 'enable': if opt_args and opt_args[0] not in enabled_servers(): if opt_args[0] in get_irc_servers(): add_server(opt_args[0]) sprint("server '%s' was successfully enabled" % opt_args[0], buffer) else: sprint("No server named '%s' was found, see /help server" % opt_args[0], buffer) else: sprint("OTP is already enabled for the server '%s'." % opt_args[0], buffer) elif opt == 'disable': if opt_args and opt_args[0] in enabled_servers(): remove_server(opt_args[0]) else: sprint("OTP does not seem to be enabled for '%s'" % opt_args[0], buffer) elif opt: sprint("/uotp: invalid option -- '%s'" % opt, buffer) weechat.command("", "/help %s" % SCRIPT_COMMAND) return weechat.WEECHAT_RC_OK def get_config_as_list(option): """ Return comma-separated