Initial commit

This commit is contained in:
Richard Chien 2016-12-02 22:24:19 +08:00
commit 3f50b7d3f7
14 changed files with 875 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.idea
*.iml
.DS_Store

7
README.md Normal file
View File

@ -0,0 +1,7 @@
# QQBot
此 QQBot 非彼 QQBot不是对 SmartQQ 的封装,而是基于开源的 [sjdy521/Mojo-Webqq](https://github.com/sjdy521/Mojo-Webqq) 实现的处理命令的逻辑。
现在基本框架已经完成,不过还有部分基础性的命令没有实现,待完成之后,再来进行命令的扩充。
由于还没有完成,代码的各个部分、程序的功能等可能会变动比较频繁,此 README 先不详细写。目前可以参考 [编写命令](Write_Command.md) 来了解如何编写命令,因为命令时本程序的重要内容,所以这个文档会更新比较及时。

93
Write_Command.md Normal file
View File

@ -0,0 +1,93 @@
# 编写命令
当你需要自己编写命令时,可能需要了解或参考以下内容。
## 命令仓库 Command Registry
每个 `.py` 文件,就是一个命令仓库,里面可以注册多个命令,每个命令也可以注册多个命令名。
程序启动时,会自动加载 `commands` 目录下的所有 `.py` 文件(模块)中的 `__registry__` 对象,这是一个 `CommandRegistry` 类型的对象,在创建这个对象的时候,可以指定一个 `init_func` 参数作为初始化函数,将会在命令仓库被加载时调用。
使用 `__registry__` 对象的 `register` 装饰器可用来将一个函数注册为一个命令,装饰器的一个必填参数为命令名,可选参数 `hidden` 表示是否将命令名暴露为可直接调用(即形如 `/command_name` 这样调用),如果此项设为 True 则只能在命令名前加仓库名调用。
同一个函数可以注册多次不同命令名(相当于别名),以适应不同的语境。
`CommandRegistry` 类的 `restrict` 装饰器用于限制命令的调用权限,这个装饰器必须在注册命令到仓库之前调用,表现在代码上就是 `restrict` 装饰器必须在所有 `register` 装饰器下方。关于此装饰器的参数基本上从参数名就能看出含义,具体见 `command.py` 中的代码。
有一点需要注意的是,`full_command_only` 参数和 `register` 装饰器的 `hidden` 参数表现出的特性相同(都是防止直接访问),但不同之处在于它对整个命令有效,无论命令被以不同命令名注册过多少次。这导致 `restrict``full_command_only` 参数和 `register``hidden` 参数的建议使用场景有所不同,比如,当需要添加一个可以确定不希望普通用户意外调用到的命令时,可使用如下方式注册:
```python
@__registry__.register('pro_command')
@__registry__.restrict(full_command_only=True, allow_group=False)
def pro_command(args_text, ctx_msg):
pass
```
而如果需要添加一个希望普通用户使用、同时也让高级用户使用起来更舒适的命令,可能用如下方式注册:
```python
@__registry__.register('list_all', hidden=True)
@__registry__.register('列出所有笔记')
@__registry__.restrict(group_admin_only=True)
def list_all(args_text, ctx_msg):
pass
```
这样可以在保持高级用户可以通过简洁的方式调用命令的同时,避免不同命令仓库下同名命令都被调用的问题(因为在默认情况下命令中心在调用命令时,不同仓库中的同名命令都会被依次调用)。
## 命令中心 Command Hub
程序启动时加载的命令仓库全部被集中在了命令中心,以 `.py` 文件名(除去后缀)(也即模块名)为仓库名。命令中心实际上应作为单例使用(插件编写者不应当自己创建实例),即 `command.py` 中的 `hub` 对象,类型是 CommandHub调用它的 `call` 方法将会执行相应的命令,如果调用失败(如命令不存在、没有权限等)会抛出相应的异常。
## 命令之间内部调用
调用命令中心的 `call` 方法是一种可行的命令内部调用方法,不过由于 `call` 方法内会进行很多额外操作(例如命令名匹配、权限检查等),所以并不建议使用这个方法来进行内部调用。
一般而言,当编写命令时发现需要调用另一个已有的命令,可以直接导入相应的模块,然后调用那个命令函数,这样避免了冗杂的命令名匹配过程,例如:
```python
from commands import core
@__registry__.register('cmd_need_call_another')
def cmd_need_call_another(args_text, ctx_msg):
core.echo(args_text, ctx_msg)
```
这里直接调用了 `core.echo`
## 数据持久化
可以使用数据库或文件来对数据进行持久化,理论上只要自行实现数据库和文件的操作即可,这里为了方便起见,在 `little_shit.py` 提供了获取默认数据库路径、默认临时文件路径等若干函数。
使用默认的路径,可以保持文件结构相对比较清晰。
## Source 和 Target
对于用户发送的消息,我们需要用某种标志来区分来源,对于用户保存的数据,也要区分这份数据属于谁。在私聊消息的情况下,这个很容易理解,不同的 QQ 号就是不同的来源,然而在群消息的情况下,会产生一点区别,因此这里引入 Source 和 Target 两个概念。
Source 表示命令的来源由谁发出Target 表示命令将对谁产生效果。
在私聊消息中,这两者没有区别。在群聊中,每个用户(如果限制了权限,则可能只有管理员或群主,但这不影响理解)都可以给 bot 发送命令,但命令在后台保存的数据,应当是属于整个群组的,而不是发送命令的这个用户。与此同时,不同的用户在群聊中发送命令时,命令应当能区分他们,并在需要交互时正确地区分不同用户的会话(关于会话的概念,在下一个标题下)。
`commands/note.py` 中的命令为例,多个管理员可以同时开启会话来添加笔记,最终,这些笔记都会存入群组的数据中,因此这些命令通过 Source 来区分会话,用 Target 来在数据库中区分数据的归属。这也是建议的用法。
至于如何获取 Source 和 Target可用 `little_shit.py` 中的 `get_source``get_target` 函数,当然,如果你对默认的行为感到不满意,也可以自己去实现不一样的区分方法。
## 交互式命令 Interactive Command
通常我们会希望一条命令直接在一条消息中发出然后直接执行就好,但这在某些情况下对普通用户并不友好,因此这里支持了交互式命令,也就是在一条命令调用之后,进行多次后续互动,来引导用户完成数据的输入。
为了实现交互式命令,引入了「会话 Session」概念。
我们以 `commands/note.py` 里的 `note.take` 命令为例,如果发送命令 `/记笔记`(此为一个别名,和 `/note.take` 等价),且不加参数,那么 `note.take` 命令就认为需要开启交互式会话来引导用户输入需要记录的内容,调用 `interactive.py` 中的 `get_session` 函数,由于原先不存在该 Source 的会话,且传入了 `cmd` 参数,就会新创建一个会话,并注册在 `interactive.py` 中的 `_sessions` 字典(这是一个 TTL 字典,目前写死了有效时间 5 分钟,如果需要实现其它有效时间的会话,请先自行实现)。在这个获取的会话对象中,可以保存当前会话的状态和数据。
注意这里的会话对象,是每个 Source 对应一个。一旦创建了一个会话对象,该 Source 原来可能对应的会话就会被关闭。
另外,主程序 `app.py` 中处理接收到的消息时,面临两种消息,一种是命令,一种是不带命令的普通消息,对这两种消息,分别作如下处理:
- 如果是命令,那么不管前面是不是在会话中,都会清除原来的会话,然后启动新的命令(至于新的命令会不会开启新的会话,并没有影响);
- 如果是普通消息,那么如果当前 Source 在某个会话中,就会讲消息内容作为参数,调用该会话的命令,如果没有在会话中,则调用 fallback 命令(一般让图灵机器人去处理)。
除了获取会话对象,还需要在命令中自己实现一个状态机,根据会话对象中保存的状态来判断当前这个 Source 处在交互式命令的哪一个阶段。
总体来说交互式命令相比普通命令写起来更复杂一点,具体写法可以参考 `commands/note.py`

16
apiclient.py Normal file
View File

@ -0,0 +1,16 @@
import requests
class ApiClient:
def __init__(self, base_url):
self.url = base_url
def __getattr__(self, item):
newclient = ApiClient(self.url + '/' + item)
return newclient
def __call__(self, *args, **kwargs):
return requests.get(self.url, params=kwargs)
client = ApiClient('http://127.0.0.1:5000/openqq')

110
app.py Normal file
View File

@ -0,0 +1,110 @@
import re
import sys
import importlib
from flask import Flask, request
import interactive
from little_shit import *
from config import config
from command import hub as cmdhub
from command import CommandNotExistsError, CommandScopeError, CommandPermissionError
from apiclient import client as api
app = Flask(__name__)
_fallback_command = config.get('fallback_command')
_command_start_flags = get_command_start_flags()
_command_args_start_flags = get_command_args_start_flags()
def _send_text(text, ctx_msg):
msg_type = ctx_msg.get('type')
if msg_type == 'group_message':
api.send_group_message(gnumber=ctx_msg.get('gnumber'), content=text)
elif msg_type == 'message':
api.send_message(qq=ctx_msg.get('sender_qq'), content=text)
@app.route('/', methods=['POST'])
def _index():
ctx_msg = request.json
source = get_source(ctx_msg)
try:
if ctx_msg.get('msg_class') != 'recv':
raise SkipException
content = ctx_msg.get('content', '')
if content.startswith('@'):
my_group_nick = ctx_msg.get('receiver')
if not my_group_nick:
raise SkipException
at_me = '@' + my_group_nick
if not content.startswith(at_me):
my_nick = api.get_user_info().json().get('nick', my_group_nick)
at_me = '@' + my_nick
if not content.startswith(at_me):
raise SkipException
content = content[len(at_me):]
else:
# Not starts with '@'
if ctx_msg.get('type') == 'group_message':
# And it's a group message, so we don't reply
raise SkipException
content = content.lstrip()
start_flag = None
for flag in _command_start_flags:
# Match the command start flag
if content.startswith(flag):
start_flag = flag
break
if not start_flag or len(content) <= len(start_flag):
# No command, check if a session exists
if interactive.has_session(source):
command = [interactive.get_session(source).cmd, content]
else:
# Use fallback
if _fallback_command:
command = [_fallback_command, content]
else:
# No fallback
raise SkipException
else:
# Split command and arguments
command = re.split('|'.join(_command_args_start_flags),
content[len(start_flag):], 1)
if len(command) == 1:
# Add an empty argument
command.append('')
# Starting a new command, so remove any previous command session
interactive.remove_session(source)
cmdhub.call(command[0], command[1], ctx_msg)
except SkipException:
# Skip this message
pass
except CommandNotExistsError:
_send_text('暂时还没有这个命令哦~', ctx_msg)
except CommandPermissionError:
_send_text('你没有权限使用这个命令哦~', ctx_msg)
except CommandScopeError as se:
_send_text('这个命令不支持' + se.msg_type + '哦~', ctx_msg)
return '', 204
def load_commands():
command_mod_files = filter(
lambda filename: filename.endswith('.py') and not filename.startswith('_'),
os.listdir(get_commands_dir())
)
command_mods = [os.path.splitext(file)[0] for file in command_mod_files]
for mod_name in command_mods:
cmd_mod = importlib.import_module('commands.' + mod_name)
try:
cmdhub.add_registry(mod_name, cmd_mod.__registry__)
except AttributeError:
print('Failed to load command module "' + mod_name + '.py".', file=sys.stderr)
if __name__ == '__main__':
load_commands()
app.run(host='127.0.0.1', port=8888)

278
command.py Normal file
View File

@ -0,0 +1,278 @@
import functools
import re
from apiclient import client as api
from little_shit import SkipException, get_command_name_separators
_command_name_seps = get_command_name_separators()
class CommandNotExistsError(BaseException):
pass
class CommandPermissionError(BaseException):
pass
class CommandScopeError(BaseException):
def __init__(self, msg_type):
self.msg_type = msg_type
class CommandRegistry:
"""
Represent a map of commands and functions.
"""
def __init__(self, init_func=None):
self.init_func = init_func
self.command_map = {}
self.alias_map = {}
self.hidden_command_names = []
def register(self, command_name, *other_names, hidden=False):
"""
Register command names and map them to a command function.
:param command_name: command name to register
:param other_names: other names of this command
:param hidden: hide the command name or not
NOTE: This is kind of like the 'full_command_only' in restrict(),
but only controls ONE command name,
while the later controls the whole command.
"""
def decorator(func):
if hidden:
self.hidden_command_names.append(command_name)
if not hasattr(func, 'restricted'):
# Apply a default restriction
func = self.restrict()(func)
self.command_map[command_name] = func
for name in other_names:
self.command_map[name] = func
@functools.wraps(func)
def wrapper(*args, **kwargs):
func(*args, **kwargs)
return wrapper
return decorator
# noinspection PyMethodMayBeStatic
def restrict(self, full_command_only=False, superuser_only=False,
group_owner_only=False, group_admin_only=False,
allow_private=True, allow_group=True):
"""
Give a command some restriction.
This decorator must be put below all register() decorators.
Example:
@cr.register('wow', hidden=True)
@cr.register('another_command_name')
@cr.restrict(full_command_only=True)
def wow(_1, _2):
pass
:param full_command_only: whether to be called with full command (including registry name)
:param superuser_only: superuser only
:param group_owner_only: group owner only when processing group message
:param group_admin_only: group admin only when processing group message
:param allow_private: allow private message
:param allow_group: allow group message
"""
def decorator(func):
func.restricted = True
# Visibility
func.full_command_only = full_command_only
# Permission
func.superuser_only = superuser_only
func.group_owner_only = group_owner_only
func.group_admin_only = group_admin_only
# Scope
func.allow_private = allow_private
func.allow_group = allow_group
return func
return decorator
def call(self, command_name, args_text, ctx_msg, **options):
"""
Call the command matching the specified command name.
:param command_name: command name
:param args_text: arguments as a string
:param ctx_msg: context message
:param options: other possible options
:return: things returned by the command function
:raises CommandScopeError: the message scope (group or private) is not allowed
:raises CommandPermissionError: the user is not permitted to call this command
"""
if command_name in self.command_map:
func = self.command_map[command_name]
if not self._check_scope(func, ctx_msg):
raise CommandScopeError(
'群组消息' if ctx_msg.get('type') == 'group_message' else '私聊消息'
)
if not self._check_permission(func, ctx_msg):
raise CommandPermissionError
return func(args_text, ctx_msg, **options)
@staticmethod
def _check_scope(func, ctx_msg):
"""
Check if current message scope (group or private) is allowed.
:param func: command function to check
:param ctx_msg: context message
:return: allowed or not
"""
allowed_msg_type = set()
if func.allow_group:
allowed_msg_type.add('group_message')
if func.allow_private:
allowed_msg_type.add('message')
if ctx_msg.get('type') in allowed_msg_type:
return True
return False
@staticmethod
def _check_permission(func, ctx_msg):
"""
Check if current message sender is permitted to call this command.
:param func: command function to check
:param ctx_msg: context message
:return: permitted or not
"""
def check(b):
if not b:
raise SkipException
try:
if func.superuser_only:
check(str(ctx_msg.get('sender_qq')) == '1002647525')
if ctx_msg.get('type') == 'group_message':
allowed_roles = {'owner', 'admin', 'member'}
if func.group_admin_only:
allowed_roles = allowed_roles.intersection({'owner', 'admin'})
if func.group_owner_only:
allowed_roles = allowed_roles.intersection({'owner'})
groups = list(filter(
lambda g: str(g.get('gnumber')) == str(ctx_msg.get('gnumber')),
api.get_group_info().json()
))
if len(groups) <= 0 or 'member' not in groups[0]:
# This is strange, not likely happens
raise SkipException
members = list(filter(
lambda m: str(m.get('qq')) == str(ctx_msg.get('sender_qq')),
groups[0].get('member')
))
if len(members) <= 0 or members[0].get('role') not in allowed_roles:
# This is strange, not likely happens
raise SkipException
except SkipException:
# Not allowed
return False
# Still alive, so let it go
return True
def has(self, command_name):
"""
Check if this registry has the specified command name,
except command names that is hidden and full command only.
:param command_name: command name
:return: has or not
"""
return command_name in self.command_map \
and command_name not in self.hidden_command_names \
and not self.command_map.get(command_name).full_command_only
def has_include_hidden(self, command_name):
"""
Check if this registry has the specified command name,
including command names that is hidden and full command only.
:param command_name: command name
:return: has or not
"""
return command_name in self.command_map
class CommandHub:
"""
Represent series of command registries,
which means it's used as a collection of different registries
and allows same command names.
"""
def __init__(self):
self.registry_map = {}
def add_registry(self, registry_name, registry):
"""
Add a registry to the hub, running the init function of the registry.
:param registry_name: registry name
:param registry: registry object
"""
if registry.init_func:
registry.init_func()
self.registry_map[registry_name] = registry
def call(self, command_name, args_text, ctx_msg, **options):
"""
Call the commands matching the specified command name.
:param command_name: command name
:param args_text: arguments as a string
:param ctx_msg: context message
:param options: other possible options
:return: things returned by the command function
(list of things if more than one matching command)
:raises CommandNotExistsError: no command exists
:raises CommandScopeError: the message scope is disallowed by all commands
:raises CommandPermissionError: the user is baned by all commands
"""
if not command_name:
# If the command name is empty, we just return
return None
command = re.split('|'.join(_command_name_seps), command_name, 1)
if len(command) == 2 and command[0] in self.registry_map:
registry = self.registry_map.get(command[0])
if registry.has_include_hidden(command[1]):
return registry.call(command[1], args_text, ctx_msg, **options)
else:
raise CommandNotExistsError
else:
results = []
cmd_exists = False
permitted = False
for registry in self.registry_map.values():
# Trying to call all commands with the name
if registry.has(command_name):
cmd_exists = True
try:
results.append(
registry.call(command_name, args_text, ctx_msg, **options))
permitted = True # If it's permitted, this will be set
except CommandPermissionError:
pass
if not cmd_exists:
raise CommandNotExistsError
if not permitted:
# No command was permitted
raise CommandPermissionError
return results
hub = CommandHub()

38
commands/core.py Normal file
View File

@ -0,0 +1,38 @@
import os
import requests
from command import CommandRegistry
from apiclient import client as api
__registry__ = cr = CommandRegistry()
@cr.register('echo', '重复', '跟我念')
def echo(args_text, ctx_msg):
msg_type = ctx_msg.get('type')
if msg_type == 'group_message':
api.send_group_message(gnumber=ctx_msg.get('gnumber'), content=args_text)
elif msg_type == 'message':
api.send_message(qq=ctx_msg.get('sender_qq'), content=args_text)
@cr.register('chat', '聊天')
def chat(args_text, ctx_msg):
url = 'http://www.tuling123.com/openapi/api'
data = {
'key': os.environ.get('TURING123_API_KEY'),
'info': args_text
}
if 'sender_qq' in ctx_msg:
data['userid'] = ctx_msg.get('sender_qq')
resp = requests.post(url, data=data)
if resp.status_code == 200:
json = resp.json()
if int(json.get('code', 0)) == 100000:
reply = json.get('text', '')
else:
# Is not text type
reply = '腊鸡图灵机器人返回了一堆奇怪的东西,就不发出来了'
else:
reply = '腊鸡图灵机器人出问题了,先不管他,过会儿再玩他'
echo(reply, ctx_msg)

159
commands/note.py Normal file
View File

@ -0,0 +1,159 @@
import sqlite3
from datetime import datetime
import pytz
from command import CommandRegistry
from commands import core
from interactive import get_session, has_session, remove_session
from little_shit import get_default_db_path, get_source, get_target
__registry__ = cr = CommandRegistry()
_create_table_sql = """CREATE TABLE IF NOT EXISTS cmd_note (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
content TEXT NOT NULL,
dt INTEGER NOT NULL,
target TEXT NOT NULL
)"""
def _open_db_conn():
conn = sqlite3.connect(get_default_db_path())
conn.execute(_create_table_sql)
conn.commit()
return conn
_cmd_take = 'note.take'
_cmd_remove = 'note.remove'
@cr.register('记笔记', '添加笔记')
@cr.register('take', 'add', hidden=True)
@cr.restrict(group_admin_only=True)
def take(args_text, ctx_msg, force=False):
source = get_source(ctx_msg)
if not force and (not args_text or has_session(source, _cmd_take)):
# Be interactive
return _take_interactively(args_text, ctx_msg, source)
conn = _open_db_conn()
dt_unix = int(datetime.now(tz=pytz.utc).timestamp())
target = get_target(ctx_msg)
conn.execute(
'INSERT INTO cmd_note (content, dt, target) VALUES (?, ?, ?)',
(args_text, dt_unix, target)
)
conn.commit()
conn.close()
core.echo('好的,记下了~', ctx_msg)
@cr.register('列出所有笔记')
@cr.register('list', hidden=True)
def list_all(_, ctx_msg):
conn = _open_db_conn()
target = get_target(ctx_msg)
cursor = conn.execute('SELECT id, dt, content FROM cmd_note WHERE target = ?', (target,))
rows = list(cursor)
conn.close()
if len(rows) == 0:
core.echo('还没有笔记哦~', ctx_msg)
return
for row in rows:
tz_china = pytz.timezone('Asia/Shanghai')
dt_raw = datetime.fromtimestamp(row[1], tz=pytz.utc)
core.echo('ID' + str(row[0])
+ '\n时间:' + dt_raw.astimezone(tz_china).strftime('%Y.%m.%d %H:%M')
+ '\n内容:' + str(row[2]),
ctx_msg)
core.echo('以上~', ctx_msg)
@cr.register('删除笔记')
@cr.register('remove', 'delete', hidden=True)
@cr.restrict(group_admin_only=True)
def remove(args_text, ctx_msg, force=False):
source = get_source(ctx_msg)
if not force and (not args_text or has_session(source, _cmd_remove)):
# Be interactive
return _remove_interactively(args_text, ctx_msg, source)
try:
note_id = int(args_text)
except ValueError:
# Failed to cast
core.echo('你输入的 ID 格式不正确哦~应该是个数字才对~', ctx_msg)
return
conn = _open_db_conn()
target = get_target(ctx_msg)
cursor = conn.cursor()
cursor.execute('DELETE FROM cmd_note WHERE target = ? AND id = ?', (target, note_id))
if cursor.rowcount > 0:
core.echo('删除成功了~', ctx_msg)
else:
core.echo('没找到这个 ID 的笔记哦~', ctx_msg)
conn.commit()
conn.close()
@cr.register('清空笔记', '清空所有笔记', '删除所有笔记')
@cr.register('clear', hidden=True)
@cr.restrict(group_admin_only=True)
def clear(_, ctx_msg):
conn = _open_db_conn()
target = get_target(ctx_msg)
cursor = conn.cursor()
cursor.execute('DELETE FROM cmd_note WHERE target = ?', (target,))
if cursor.rowcount > 0:
core.echo('成功删除了所有的笔记,共 %s 条~' % cursor.rowcount, ctx_msg)
else:
core.echo('本来就没有笔记哦~', ctx_msg)
conn.commit()
conn.close()
_state_machines = {}
def _take_interactively(args_text, ctx_msg, source):
def wait_for_content(s, a, c):
core.echo('请发送你要记录的内容:', c)
s.state += 1
def save_content(s, a, c):
take(a, c, force=True)
return True
if _cmd_take not in _state_machines:
_state_machines[_cmd_take] = (
wait_for_content, # 0
save_content # 1
)
sess = get_session(source, _cmd_take)
if _state_machines[_cmd_take][sess.state](sess, args_text, ctx_msg):
# Done
remove_session(source, _cmd_take)
def _remove_interactively(args_text, ctx_msg, source):
def wait_for_note_id(s, a, c):
core.echo('请发送你要删除的笔记的 ID', c)
s.state += 1
def remove_note(s, a, c):
remove(a, c, force=True)
return True
if _cmd_remove not in _state_machines:
_state_machines[_cmd_remove] = (
wait_for_note_id, # 0
remove_note # 1
)
sess = get_session(source, _cmd_remove)
if _state_machines[_cmd_remove][sess.state](sess, args_text, ctx_msg):
# Done
remove_session(source, _cmd_remove)

62
commands/zhihu.py Normal file
View File

@ -0,0 +1,62 @@
import re
from datetime import date, timedelta
import requests
from little_shit import SkipException
from command import CommandRegistry
from commands import core
__registry__ = cr = CommandRegistry()
@cr.register('zhihu', 'zhihu-daily', '知乎日报')
def zhihu_daily(args_text, ctx_msg):
param = args_text.strip()
reply = None
try:
if not param:
sub_url = '/latest'
elif re.match('\d{8}', param) and param >= '20130519':
thedate = date(year=int(param[:4]), month=int(param[4:6]), day=int(param[6:]))
sub_url = '/before/' + (thedate + timedelta(days=1)).strftime('%Y%m%d')
else:
reply = '命令格式错误,正确的命令格式:\n' \
'/zhihu\n' \
'\n' \
'/zhihu 20161129\n' \
'注意如果指定日期,格式一定要对,且日期需在 20130519 之后。'
raise SkipException
full_url = 'https://news-at.zhihu.com/api/4/news' + sub_url
resp = requests.get(
full_url,
headers={
'Host': 'news-at.zhihu.com',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36'
' (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36'
}
)
if resp.status_code == 200:
json = resp.json()
if 'stories' not in json:
reply = '获取知乎日报数据失败,知乎返回了一堆迷之数据'
raise SkipException
reply = ('今天' if sub_url == '/latest' else '这天') + '的知乎日报内容如下:'
core.echo(reply, ctx_msg)
step = 6 # Send 8 items per time
items = list(reversed(json.get('stories')))
for start in range(0, len(items), step):
reply = ''
for item in items[start:min(start + step, len(items))]:
reply += item.get('title') + '\n' + \
'https://daily.zhihu.com/story/' + str(item.get('id')) + '\n\n'
reply = reply.rstrip()
core.echo(reply, ctx_msg)
return
else:
reply = '获取知乎日报数据失败,可能知乎服务器又宕机了(('
raise SkipException
except SkipException:
reply = reply if reply else '发生了未知错误……'
pass
core.echo(reply, ctx_msg)

6
config.py Normal file
View File

@ -0,0 +1,6 @@
config = {
'fallback_command': 'core.chat',
'command_start_flags': ('/', ''),
'command_name_separators': ('\.', '->', '::', '/'), # Regex
'command_args_start_flags': ('', '', ', ', ': '), # Regex
}

BIN
data/db/default.db Normal file

Binary file not shown.

35
interactive.py Normal file
View File

@ -0,0 +1,35 @@
from cachetools import TTLCache as TTLDict
class _Session:
__dict__ = ('cmd', 'state', 'data')
def __init__(self, cmd):
self.cmd = cmd
self.state = 0
self.data = {}
_sessions = TTLDict(maxsize=10000, ttl=5 * 60)
def get_session(source, cmd=None):
if cmd:
if source in _sessions and _sessions[source].cmd == cmd:
# It's already in a session of this command
return _sessions[source]
sess = _Session(cmd)
_sessions[source] = sess
return sess
else:
return _sessions.get(source)
def has_session(source, cmd=None):
return source in _sessions and (not cmd or _sessions[source].cmd == cmd)
def remove_session(source, cmd=None):
if source in _sessions:
if not cmd or _sessions[source].cmd == cmd:
del _sessions[source]

64
little_shit.py Normal file
View File

@ -0,0 +1,64 @@
import os
from config import config
class SkipException(BaseException):
pass
def _mkdir_if_not_exists_and_return_path(path):
os.makedirs(path, exist_ok=True)
return path
def get_root_dir():
return os.path.split(os.path.realpath(__file__))[0]
def get_commands_dir():
return _mkdir_if_not_exists_and_return_path(os.path.join(get_root_dir(), 'commands'))
def get_db_dir():
return _mkdir_if_not_exists_and_return_path(os.path.join(get_root_dir(), 'data', 'db'))
def get_default_db_path():
return os.path.join(get_db_dir(), 'default.db')
def get_tmp_dir():
return _mkdir_if_not_exists_and_return_path(os.path.join(get_root_dir(), 'data', 'tmp'))
def get_source(ctx_msg):
"""
Source is used to distinguish the interactive sessions.
"""
if ctx_msg.get('type') == 'group_message':
return 'g' + str(ctx_msg.get('gnumber')) + 'p' + str(ctx_msg.get('sender_qq'))
else:
return 'p' + str(ctx_msg.get('sender_qq'))
def get_target(ctx_msg):
"""
Target is used to distinguish the records in database.
"""
if ctx_msg.get('type') == 'group_message':
return 'g' + str(ctx_msg.get('gnumber'))
else:
return 'p' + str(ctx_msg.get('sender_qq'))
def get_command_start_flags():
return tuple(sorted(config['command_start_flags'], reverse=True))
def get_command_name_separators():
return tuple(sorted(config['command_name_separators'], reverse=True))
def get_command_args_start_flags():
return tuple(sorted(('[ \t\n]',) + config['command_args_start_flags'], reverse=True))

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
requests
cachetools
pytz
flask