Add some commands

This commit is contained in:
Richard Chien 2016-12-30 22:01:50 +08:00
parent e14b74cf2f
commit c62fafa69e
15 changed files with 407 additions and 46 deletions

View File

@ -7,6 +7,16 @@ class ApiClient:
qq_api_url = os.environ.get('QQ_API_URL') qq_api_url = os.environ.get('QQ_API_URL')
wx_api_url = os.environ.get('WX_API_URL') wx_api_url = os.environ.get('WX_API_URL')
def send_message(self, content: str, ctx_msg: dict):
msg_type = ctx_msg.get('type')
if msg_type == 'group_message':
return self.send_group_message(content=content, ctx_msg=ctx_msg)
elif msg_type == 'discuss_message':
return self.send_discuss_message(content=content, ctx_msg=ctx_msg)
elif msg_type == 'friend_message':
return self.send_friend_message(content=content, ctx_msg=ctx_msg)
return None
def send_group_message(self, content: str, ctx_msg: dict): def send_group_message(self, content: str, ctx_msg: dict):
try: try:
if ctx_msg.get('via') == 'qq' and self.qq_api_url: if ctx_msg.get('via') == 'qq' and self.qq_api_url:

1
app.py
View File

@ -30,7 +30,6 @@ def _main(ctx_msg: dict):
raise SkipException raise SkipException
if not apply_filters(ctx_msg): if not apply_filters(ctx_msg):
raise SkipException raise SkipException
print(ctx_msg)
except SkipException: except SkipException:
# Skip this message # Skip this message
pass pass

View File

@ -8,18 +8,15 @@ __registry__ = cr = CommandRegistry()
@cr.register('echo', '重复', '跟我念') @cr.register('echo', '重复', '跟我念')
def echo(args_text, ctx_msg): def echo(args_text, ctx_msg, internal=False):
msg_type = ctx_msg.get('type') if internal:
if msg_type == 'group_message': return None
api.send_group_message(content=args_text, ctx_msg=ctx_msg) else:
elif msg_type == 'discuss_message': return api.send_message(args_text, ctx_msg)
api.send_discuss_message(content=args_text, ctx_msg=ctx_msg)
elif msg_type == 'friend_message':
api.send_friend_message(content=args_text, ctx_msg=ctx_msg)
@cr.register('chat', '聊天') @cr.register('tuling123', 'chat', '聊天')
def chat(args_text, ctx_msg): def tuling123(args_text, ctx_msg, internal=False):
url = 'http://www.tuling123.com/openapi/api' url = 'http://www.tuling123.com/openapi/api'
data = { data = {
'key': os.environ.get('TURING123_API_KEY'), 'key': os.environ.get('TURING123_API_KEY'),
@ -28,15 +25,42 @@ def chat(args_text, ctx_msg):
if ctx_msg.get('sender_uid'): if ctx_msg.get('sender_uid'):
data['userid'] = ctx_msg.get('sender_uid') data['userid'] = ctx_msg.get('sender_uid')
elif ctx_msg.get('sender_id'): elif ctx_msg.get('sender_id'):
data['userid'] = ctx_msg.get('sender_id')[-32:] data['userid'] = ctx_msg.get('sender_id').strip('@')[-32:]
resp = requests.post(url, data=data) resp = requests.post(url, data=data)
if resp.status_code == 200: if resp.status_code == 200:
json = resp.json() json = resp.json()
if internal:
return json
if int(json.get('code', 0)) == 100000: if int(json.get('code', 0)) == 100000:
reply = json.get('text', '') reply = json.get('text', '')
else: else:
# Is not text type # Is not text type
reply = '腊鸡图灵机器人返回了一堆奇怪的东西,就不发出来了' reply = '腊鸡图灵机器人返回了一堆奇怪的东西,就不发出来了'
else: else:
if internal:
return None
reply = '腊鸡图灵机器人出问题了,先不管他,过会儿再玩他' reply = '腊鸡图灵机器人出问题了,先不管他,过会儿再玩他'
echo(reply, ctx_msg) echo(reply, ctx_msg)
@cr.register('help', '帮助', '用法', '使用帮助', '使用指南', '使用说明', '使用方法', '怎么用')
def help(_, ctx_msg):
echo(
'你好!我是 CCZU 小开机器人,由常州大学开发者协会开发。\n'
'我可以为你做一些简单的事情,如发送知乎日报内容、翻译一段文字等。\n'
'下面是我现在能做的一些事情:\n\n'
'(1)/查天气 常州\n'
'(2)/翻译 こんにちは\n'
'(3)/翻译到 英语 你好\n'
'(4)/历史上的今天\n'
'(5)/知乎日报\n'
'(6)/记笔记 笔记内容\n'
'(7)/查看所有笔记\n'
'(8)/查百科 常州大学\n'
'(9)/说个笑话\n'
'(10)/聊天 你好啊\n\n'
'把以上内容之一(包括斜杠,不包括序号,某些部分替换成你需要的内容)发给我,我就会按你的要求去做啦。\n'
'上面只给出了 10 条功能,还有更多功能和使用方法,请查看 http://t.cn/RIr177e\n\n'
'祝你使用愉快~',
ctx_msg
)

42
commands/encode.py Normal file
View File

@ -0,0 +1,42 @@
import base64 as b64lib
import hashlib
from command import CommandRegistry
from commands import core
__registry__ = cr = CommandRegistry()
@cr.register('base64')
def base64(args_text, ctx_msg, internal=False):
encoded = b64lib.b64encode(args_text.encode('utf-8')).decode('utf-8')
core.echo(encoded, ctx_msg, internal)
return encoded
@cr.register('base64_decode', 'base64-decode', 'base64d')
def base64(args_text, ctx_msg, internal=False):
decoded = b64lib.b64decode(args_text.encode('utf-8')).decode('utf-8')
core.echo(decoded, ctx_msg, internal)
return decoded
@cr.register('md5')
def md5(args_text, ctx_msg, internal=False):
encoded = hashlib.md5(args_text.encode('utf-8')).hexdigest()
core.echo(encoded, ctx_msg, internal)
return encoded
@cr.register('sha1')
def sha1(args_text, ctx_msg, internal=False):
encoded = hashlib.sha1(args_text.encode('utf-8')).hexdigest()
core.echo(encoded, ctx_msg, internal)
return encoded
@cr.register('sha256')
def sha1(args_text, ctx_msg, internal=False):
encoded = hashlib.sha256(args_text.encode('utf-8')).hexdigest()
core.echo(encoded, ctx_msg, internal)
return encoded

60
commands/networktools.py Normal file
View File

@ -0,0 +1,60 @@
import json
import requests
from lxml import etree
from command import CommandRegistry
from commands import core
__registry__ = cr = CommandRegistry()
@cr.register('ip')
def ip(args_text, ctx_msg):
query = args_text.strip()
if not query:
core.echo('请指定要查询的 IP 或域名', ctx_msg)
return
core.echo('正在查询,请稍等……', ctx_msg)
chinaz_url = 'http://ip.chinaz.com/%s'
ipcn_url = 'http://ip.cn/?ip=%s'
ipipnet_url = 'http://freeapi.ipip.net/%s'
found = False
# Get data from ChinaZ.com
resp = requests.get(chinaz_url % query)
if resp.status_code == 200:
html = etree.HTML(resp.text)
p_elems = html.xpath('//*[@id="leftinfo"]/div[3]/div[2]/p')
if len(p_elems) > 1:
p_elems = p_elems[1:]
reply = 'ChinaZ:'
for p_elem in p_elems:
span_elems = p_elem.getchildren()
reply += '\n' + span_elems[1].text.replace('.', '_') + ', ' + span_elems[3].text
core.echo(reply, ctx_msg)
found = True
# Get data from ip.cn
resp = requests.get(ipcn_url % query, headers={'User-Agent': 'curl/7.47.0'})
if resp.status_code == 200:
# Example: 'IP123.125.114.144 来自:北京市 联通'
items = resp.text.strip().split('')
if len(items) == 3:
reply = 'IP_CN:\n' + items[1].split(' ')[0].replace('.', '_') + ', ' + items[2]
core.echo(reply, ctx_msg)
found = True
# Get data from ipip.net
resp = requests.get(ipipnet_url % query, headers={'User-Agent': 'curl/7.47.0'})
if resp.status_code == 200 and resp.text.strip():
# Example: '["中国","江苏","常州","","教育网"]'
parts = json.loads(resp.text)
reply = 'IPIP_NET\n' + query.replace('.', '_') + ' ' + ''.join(parts)
core.echo(reply, ctx_msg)
found = True
core.echo('以上' if found else '查询失败', ctx_msg)

View File

@ -65,7 +65,7 @@ def take(args_text, ctx_msg, allow_interactive=True):
core.echo('好的,记下了~', ctx_msg) core.echo('好的,记下了~', ctx_msg)
@cr.register('列出所有笔记') @cr.register('列出所有笔记', '查看所有笔记', '所有笔记')
@cr.register('list', hidden=True) @cr.register('list', hidden=True)
@_check_target @_check_target
def list_all(_, ctx_msg): def list_all(_, ctx_msg):

View File

@ -3,6 +3,7 @@ import re
from functools import reduce, wraps from functools import reduce, wraps
import pytz import pytz
import requests
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.executors.pool import ProcessPoolExecutor
@ -73,6 +74,24 @@ def _check_target(func):
return wrapper return wrapper
@cr.register('cron_check', 'cron-check', 'cron_test', 'cron-test')
def cron_check(args_text, ctx_msg):
cron = args_text.strip()
if not cron:
core.echo('请指定要检查的 Cron 时间表达式', ctx_msg)
return
resp = requests.post('http://tool.lu/crontab/ajax.html', data={'expression': cron})
if resp.status_code == 200:
data = resp.json()
if data.get('status') and 'dates' in data:
reply = '接下来 7 次的执行时间:\n' + '\n'.join(data['dates'])
core.echo(reply, ctx_msg)
return
core.echo('检查失败,可能因为表达式格式错误或服务器连接不上', ctx_msg)
@cr.register('add_job', 'add-job', 'add') @cr.register('add_job', 'add-job', 'add')
@cr.restrict(full_command_only=True, group_admin_only=True) @cr.restrict(full_command_only=True, group_admin_only=True)
@_check_target @_check_target

117
commands/simpletools.py Normal file
View File

@ -0,0 +1,117 @@
import base64
import requests
from lxml import etree
from command import CommandRegistry
from commands import core
__registry__ = cr = CommandRegistry()
@cr.register('money_zh', 'money-zh')
@cr.register('人民币大写', '金额大写', '人民币金额大写')
def money_zh(args_text, ctx_msg):
query = args_text.strip()
try:
_ = float(query)
except ValueError:
query = None
if not query:
core.echo('请在命令后加上要转换成大写的人民币金额哦~(命令和数字用空格或逗号隔开)', ctx_msg)
return
resp = requests.get('http://tool.lu/daxie/ajax.html?number=%s' % query)
if resp.status_code == 200:
data = resp.json()
if data.get('status') and 'text' in data:
reply = query + ' 的汉字大写是:' + data['text'].strip()
core.echo(reply, ctx_msg)
return
@cr.register('short_url', 'short-url')
@cr.register('生成短网址', '生成短链接', '短网址', '短链接')
def short_url(args_text, ctx_msg):
raw_url = args_text.strip()
if not raw_url:
core.echo('请在命令后加上要转换成短链接的网址哦~(命令和网址用空格或逗号隔开)', ctx_msg)
return
core.echo('正在生成,请稍等……', ctx_msg)
session = requests.Session()
short_urls = []
resp = session.get(
'http://dwz.wailian.work/',
headers={
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36',
'Referer': 'http://dwz.wailian.work/'
}
)
if resp.status_code == 200:
api_url = 'http://dwz.wailian.work/api.php?url=%s&site=%s'
encoded_url = base64.b64encode(bytes(raw_url, 'utf-8')).decode('utf-8')
for site in ('sina', 'googl'):
resp = session.get(api_url % (encoded_url, site))
data = resp.json()
if resp.status_code == 200 and data.get('result') == 'ok':
short_urls.append(data['data']['short_url'])
if short_urls:
core.echo('\n'.join(short_urls), ctx_msg)
else:
core.echo('生成失败,可能因为链接格式错误或服务器连接不上', ctx_msg)
@cr.register('weather')
@cr.register('天气', '查天气')
def weather(args_text, ctx_msg):
city = args_text.strip()
if not city:
core.echo('请在命令后加上要查的城市哦~(命令和城市用空格或逗号隔开)', ctx_msg)
return
data = core.tuling123(city + '天气', ctx_msg, internal=True)
core.echo(data.get('text', ''), ctx_msg)
@cr.register('joke')
@cr.register('笑话', '说笑话', '说个笑话')
def weather(_, ctx_msg):
data = core.tuling123('说个笑话', ctx_msg, internal=True)
core.echo(data.get('text', ''), ctx_msg)
@cr.register('baike')
@cr.register('百科', '查百科')
def weather(args_text, ctx_msg):
query = args_text.strip()
if not query:
core.echo('请在命令后加上要查的关键词哦~(命令和关键词用空格或逗号隔开)', ctx_msg)
return
data = core.tuling123('百科 ' + query, ctx_msg, internal=True)
core.echo(data.get('text', ''), ctx_msg)
@cr.register('today_in_history', 'today-in-history', '历史上的今天')
def today_in_history(_, ctx_msg):
resp = requests.get('http://tool.lu/todayonhistory/')
ok = False
if resp.status_code == 200:
core.echo('历史上的今天:', ctx_msg)
html = etree.HTML(resp.text)
li_elems = html.xpath('//ul[@id="tohlis"]/li')
# reply = reduce(lambda x, y: x.text + '\n' + y.text, li_elems)
step = 10
for start in range(0, len(li_elems), step):
reply = ''
for item in li_elems[start:start + step]:
reply += item.text + '\n'
reply = reply.rstrip()
core.echo(reply, ctx_msg)
core.echo('以上~', ctx_msg)
ok = True
if not ok:
core.echo('很抱歉,网络出错了……建议等会儿再试吧~', ctx_msg)

View File

@ -1,29 +0,0 @@
import requests
from lxml import etree
from command import CommandRegistry
from commands import core
__registry__ = cr = CommandRegistry()
@cr.register('today_in_history', '历史上的今天')
def today_in_history(_, ctx_msg):
resp = requests.get('http://tool.lu/todayonhistory/')
ok = False
if resp.status_code == 200:
core.echo('历史上的今天:', ctx_msg)
html = etree.HTML(resp.text)
li_elems = html.xpath('//ul[@id="tohlis"]/li')
# reply = reduce(lambda x, y: x.text + '\n' + y.text, li_elems)
step = 10
for start in range(0, len(li_elems), step):
reply = ''
for item in li_elems[start:start + step]:
reply += item.text + '\n'
reply = reply.rstrip()
core.echo(reply, ctx_msg)
core.echo('以上~', ctx_msg)
ok = True
if not ok:
core.echo('很抱歉,网络出错了……建议等会儿再试吧~', ctx_msg)

102
commands/translate.py Normal file
View File

@ -0,0 +1,102 @@
import os
import hashlib
from datetime import datetime
import requests
from command import CommandRegistry
from commands import core
__registry__ = cr = CommandRegistry()
_app_id = os.environ.get('BAIDU_FANYI_APP_ID')
_api_key = os.environ.get('BAIDU_FANYI_API_KEY')
_lang_map = {
'中文': 'zh',
'繁体中文': 'cht',
'英语': 'en',
'粤语': 'yue',
'文言文': 'wyw',
'日语': 'jp',
'韩语': 'kor',
'法语': 'fra',
'西班牙语': 'spa',
'阿拉伯语': 'ara',
'俄语': 'ru',
'葡萄牙语': 'pt',
'德语': 'de',
'意大利语': 'it',
'希腊语': 'el',
'荷兰语': 'nl',
'波兰语': 'pl',
'保加利亚语': 'bul',
'爱沙尼亚语': 'est',
'丹麦语': 'dan',
'芬兰语': 'fin',
'捷克语': 'cs',
'罗马尼亚语': 'rom',
'斯洛文尼亚语': 'slo',
'瑞典语': 'swe',
'匈牙利语': 'hu',
'越南语': 'vie'
}
_lang_alias_map = {
'简体中文': 'zh',
'汉语': 'zh',
'英文': 'en',
'日文': 'jp',
'韩文': 'kor'
}
@cr.register('translate', '翻译', '翻訳')
def translate(args_text, ctx_msg):
query = args_text.strip()
if not query:
core.echo('请在命令后加上要翻译的内容哦~(命令和要翻译的内容用空格或逗号隔开)', ctx_msg)
return
cmd = ctx_msg.get('command')
if cmd == 'translate':
return translate_to('英语 ' + args_text, ctx_msg)
elif cmd == '翻訳':
return translate_to('日语 ' + args_text, ctx_msg)
else:
return translate_to('简体中文 ' + args_text, ctx_msg)
@cr.register('translate_to', 'translate-to', '翻译到', '翻译成')
def translate_to(args_text, ctx_msg):
args = args_text.strip().split(' ', 1)
if len(args) < 2 or (args[0] not in _lang_map and args[0] not in _lang_alias_map):
core.echo(
'请指定目标语言和要翻译的内容哦~(命令、目标语言、要翻译的内容之间用空格或逗号隔开\n目前支持的语言:'
+ ''.join(_lang_map.keys()),
ctx_msg
)
return
core.echo('正在翻译,请稍等……', ctx_msg)
to_lang = _lang_map.get(args[0]) or _lang_alias_map.get(args[0])
query = args[1]
api_url = 'https://fanyi-api.baidu.com/api/trans/vip/translate'
salt = str(int(datetime.now().timestamp()))
sign = hashlib.md5((_app_id + query + salt + _api_key).encode('utf-8')).hexdigest()
resp = requests.post(api_url, data={
'q': query,
'from': 'auto',
'to': to_lang,
'appid': _app_id,
'salt': salt,
'sign': sign
})
if resp.status_code == 200:
data = resp.json()
print(data)
if 'trans_result' in data:
core.echo('翻译结果:\n' + '\n'.join([x['dst'] for x in data['trans_result']]), ctx_msg)
return
core.echo('翻译失败,可能因为后台接口的频率限制或服务器连接不上', ctx_msg)

View File

@ -12,7 +12,7 @@ from little_shit import SkipException, get_source
__registry__ = cr = CommandRegistry() __registry__ = cr = CommandRegistry()
@cr.register('zhihu-daily', 'zhihu', '知乎日报') @cr.register('zhihu_daily', 'zhihu-daily', 'zhihu', '知乎日报')
def zhihu_daily(args_text, ctx_msg): def zhihu_daily(args_text, ctx_msg):
arg = args_text.strip() arg = args_text.strip()
reply = None reply = None

View File

@ -1,6 +1,6 @@
config = { config = {
'fallback_command': 'core.chat', 'fallback_command': 'core.chat',
'command_start_flags': ('/', '', '来,'), 'command_start_flags': ('/', '', '来,', '来,'),
'command_name_separators': ('\.', '->', '::', '/'), # Regex 'command_name_separators': ('\.', '->', '::', '/'), # Regex
'command_args_start_flags': ('', '', ', ', ': '), # Regex 'command_args_start_flags': ('', '', ',', ', ', ':', ': '), # Regex
} }

View File

@ -10,5 +10,5 @@ def apply_filters(ctx_msg):
return True return True
def add_filter(func, priority): def add_filter(func, priority=0):
_filters.append((priority, func)) _filters.append((priority, func))

View File

@ -74,6 +74,8 @@ def _dispatch_command(ctx_msg):
# Starting a new command, so remove any previous command session # Starting a new command, so remove any previous command session
interactive.remove_session(source) interactive.remove_session(source)
command[0] = command[0].lower()
ctx_msg['command'] = command[0]
cmdhub.call(command[0], command[1], ctx_msg) cmdhub.call(command[0], command[1], ctx_msg)
except SkipException: except SkipException:
# Skip this message # Skip this message

15
filters/how_to_use.py Normal file
View File

@ -0,0 +1,15 @@
from filter import add_filter
from commands import core
def _print_help_message(ctx_msg):
a = ['help', '怎么用', '怎么用啊', '你好', '你好啊', '帮助',
'用法', '使用帮助', '使用指南', '使用说明', '使用方法',
'你能做什么', '你能做些什么', '你会做什么', '你会做些什么']
if ctx_msg.get('content', '') in a:
core.help('', ctx_msg)
return False
return True
add_filter(_print_help_message, priority=1)