This commit is contained in:
远野千束 2023-10-04 21:25:49 +08:00
parent e495028198
commit 656a4687f2
25 changed files with 431 additions and 21 deletions

View File

@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="jdk" jdkName="Python 3.10" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11 (2)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10" project-jdk-type="Python SDK" />
</project>

2
r.txt
View File

@ -0,0 +1,2 @@
nonebot-adapter-onebot==2.3.0
nonebot2==2.1.1

View File

@ -21,7 +21,7 @@
1.安装Git使用命令`git clone https://gitee.com/snowykami/liteyuki-bot` 克隆项目至本地
2.切换到轻雪目录,使用`pip install -r requirements.txt`
2.切换到轻雪目录,使用`pip install -r r.txt`
3.`python main.py`!启动!
@ -35,7 +35,7 @@ echo 正在克隆项目...
git clone https://gitee.com/snowykami/liteyuki-bot
cd liteyuki-bot
echo 正在安装依赖...
pip install -r requirements.txt
pip install -r r.txt
echo python main.py > run.bat
echo pause >> run.bat
echo 启动脚本"run.bat"已创建,点击即可启动

View File

@ -0,0 +1,2 @@
send_group_msg message=欢迎新成员[CQ:at,qq=USER_ID]
send_group_msg message=[CQ:poke,qq=USER_ID]

View File

@ -0,0 +1,4 @@
{
"name": "轻雪原版包",
"id": ""
}

View File

@ -0,0 +1,2 @@
name=English(United Kingdom)
fallback=en_US

View File

@ -0,0 +1 @@
name=English(United States)

View File

@ -0,0 +1,5 @@
{
"text.weather.usage": [
"天气插件命令"
]
}

View File

@ -0,0 +1,17 @@
lang.en_US.name=英语(美国)
lang.en_GB.name=英语(英国)
lang.ja_JP.name=日语(日本)
lang.zh_CN.name=简体中文
lang.zh_TW.name=繁体中文
log.main.res_name_not_found=资源名称未定义
log.main.suc_to_load_resource=成功加载资源包:{NAME}
log.main.no_superusers=<r>当前没有超级用户请通过QQ发送 {CS}<y>SU{AUTH_CODE}</y> 以注册为超级用户</r>
log.main.suc_to_reg_su=<g>超级用户 <y>{USER_ID}</y> 注册成功</g>
log.main.fail_to_reg_su=<r>超级用户 <y>{USER_ID}</y> 注册失败,验证码错误</r>
msg.main.suc_to_reg_su=超级用户「{USER_ID}」注册成功
msg.main.fail_to_reg_su=超级用户「{USER_ID}」注册失败,验证码错误
text.weather.name=轻雪天气
text.weather.description=轻雪内置的一个天气插件,很精准

View File

@ -0,0 +1 @@
quote=zh_CN

8
src/api/adapter.py Normal file
View File

@ -0,0 +1,8 @@
from nonebot.adapters import onebot
Bot = onebot.V11Bot | onebot.V12Bot
Message = onebot.V11Message | onebot.V12Message
Event = onebot.V11Event.Event | onebot.V12Event.Event
MessageEvent = onebot.V11Event.MessageEvent | onebot.V12Event.MessageEvent
GroupMessageEvent = onebot.V11Event.GroupMessageEvent | onebot.V12Event.GroupMessageEvent

38
src/api/command.py Normal file
View File

@ -0,0 +1,38 @@
from typing import List, Any, Tuple, Dict
from src.api.adapter import Bot
async def run_function(bot: Bot, function_name: str) -> List[Any]:
await bot.call_api()
async def run_command(bot: Bot, cmd: str) -> Any:
await bot.call_api()
def message_unescape(message: str):
"""把那堆乱七八糟的文本转回原始文本
:param message:
:return:
"""
data = {
'&amp;': '&',
'&#91;': '[',
'&#93;': ']',
'&#44;': ',',
'%20': ' '
}
for old, new in data.items():
message = message.replace(old, new)
return message
def format_command(command_str: str) -> Tuple[str, Tuple[str], Dict[str, str]]:
args, kwargs = [], {}
for element in command_str.split(' '):
if '=' in element:
kwargs[element.split('=')[0]] = kwargs[element.split('=')[1]]
else:
args.append(element)

80
src/api/data.py Normal file
View File

@ -0,0 +1,80 @@
import os
from typing import Any
from keyvalue_sqlite import KeyValueSqlite
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent, PrivateMessageEvent
DB_PATH = 'data/liteyuki'
class Data:
def __init__(self, db_name: str = None, table_name: str = None, event: MessageEvent = None):
"""
:param db_name: DB Name, such as users/groups/globals/plugins
:param table_name: Table Name, such as target_id, not start with '_'
"""
if event is not None:
if isinstance(event, PrivateMessageEvent):
db_name = 'users'
table_name = str(event.user_id)
else:
event: GroupMessageEvent
db_name = 'groups'
table_name = str(event.group_id)
if table_name.isalnum():
table_name = '_' + table_name
self.db = KeyValueSqlite(db_path=os.path.join(DB_PATH, str(db_name) + ".db"), table_name=table_name)
def get(self, key: str, default: Any = None) -> Any:
"""Get data from table
:param key: key
:param default: if there is no key, return this
:return:
"""
return self.db.get(key, default)
def get_many(self, key_set: set) -> dict[str, Any]:
"""Get many data from table by a key set
:param key_set:
:return: {key: val,}
"""
return self.db.get_many(key_set)
def set(self, key: str, val: Any):
"""Store data to table
:param key: key
:param val: value
:return:
"""
return self.db.set(key, val)
def set_many(self, data: dict):
"""Store many data to table
:param data: {key: val,}
:return:
"""
for key, val in data.items():
self.set(key, val)
def remove(self, key: str):
"""Remove data from table
:param key: key
:return:
"""
return self.db.remove(key, ignore_missing_key=True)
def detect(self, key: str) -> bool:
"""Detect if the key in the table
:param key: key
:return:
"""
return self.db.has_key(key)
config_db = Data("globals", "config")

0
src/api/message.py Normal file
View File

153
src/api/resource.py Normal file
View File

@ -0,0 +1,153 @@
import json
import locale
import os
import traceback
from typing import Dict, List, Any
from nonebot import logger
from src.api.data import Data
# global variable
language_data: Dict[str, Dict[str, str]] = dict()
resource_data: Dict[str, List[str]] = dict()
# 按照优先级排序
loaded_resource_packs: List['ResourcePack'] = list()
# system language
system_lang = locale.getdefaultlocale()[0]
if not isinstance(system_lang, str): system_lang = 'en_US'
"""
Language Data
{
"zh_CN":{
"example.text": "Hello World"
}
}
"""
class Language:
def __init__(self, lang: str = system_lang, quote: str = system_lang, fallback: str = 'en_US'):
"""语言类
:param lang: 语言id例如zh_CNen_US
:param quote: 语言引用例如中文新加坡可引用于中文简体避免重复造轮子
:param fallback: 语言回退当目前语言和引用语言均没有时回退到en_US默认
"""
self.lang = lang
if self.lang not in language_data: language_data[self.lang] = dict()
self.language_data = language_data[self.lang]
self.quote = self.language_data.get('quote', quote)
self.fallback = fallback
def get(self, key: str, **kwargs) -> str | Any:
"""获取本地化键名的值
:param key:
:return:
"""
val = self.language_data.get(key, language_data.get(self.quote).get(key, language_data.get(self.fallback).get(key, key)))
if isinstance(val, str):
return val.format(**kwargs)
else:
return val
def add(self, key: str, value: Any):
"""添加一个新词条
:param key: 本地化键名
:param value: 键值
:return:
"""
self.language_data[key] = value
def add_data(self, data: dict):
"""添加一组键值对词条
:param data:
:return:
"""
self.language_data.update(data)
@staticmethod
def load_file(fp: str):
f = open(fp, 'r', encoding='utf-8')
lang = os.path.basename(fp).split('.')[0]
language = Language(lang)
if fp.endswith('lang'):
for line in f.read().splitlines():
language.add(line.split('=')[0], line.split('=')[-1])
elif fp.endswith('json'):
language.add_data(json.load(f))
@staticmethod
def get_user_language(user_id: int) -> 'Language':
db = Data('user', f'u{user_id}')
return Language(db.get('language', system_lang))
class ResourcePack:
def __init__(self, path: str):
"""
:param path: resource pack dir path(include metadata.json)
"""
self.path = path
self.dir_name = os.path.basename(self.path)
try:
metadata = json.load(open(os.path.join(path, 'metadata.json'), 'r', encoding='utf-8'))
self.name = metadata.get('name', self.dir_name)
self.description = metadata.get('description', 'main.no_details')
except BaseException as e:
logger.error(Language().get('log.main.load_resource_error'))
def __str__(self):
return f'<ResourcePack {self.dir_name} {Language().get(self.name)}>'
def load(self):
self.load_language()
self.load_file_path()
loaded_resource_packs.insert(0, self)
logger.success(Language().get('log.main.suc_to_load_resource', NAME=self.name))
def load_language(self):
texts_path = os.path.join(self.path, 'texts')
if os.path.exists(texts_path):
for lang_file in os.listdir(texts_path):
Language.load_file(os.path.join(self.path, 'texts', lang_file))
def load_file_path(self):
for path, folders, files in os.walk(self.path):
for f in files:
file_index = os.path.join(path, f).replace(self.path + os.path.sep, '')
file_path = os.path.join(path, f)
file_index = file_index.replace('\\', '/')
if file_index in resource_data:
resource_data[file_index].insert(0, file_path)
else:
resource_data[file_index] = [file_path]
def load_resource_from_index():
resource_index: list = Data('common', 'config').get('resource_index', [])
resource_index.insert(0, 'vanilla_pack')
for dir_name in resource_index:
resource_pack = ResourcePack(os.path.join('resources', dir_name))
try:
resource_pack.load()
except BaseException as e:
traceback.print_exc()
def get_resource_path(resource_name: str) -> str:
pass

View File

@ -1,11 +0,0 @@
print(
'''\033[34m __ ______ ________ ________ __ __ __ __ __ __ ______
/ | / |/ |/ |/ \ / |/ | / |/ | / |/ |
$$ | $$$$$$/ $$$$$$$$/ $$$$$$$$/ $$ \ /$$/ $$ | $$ |$$ | /$$/ $$$$$$/
$$ | $$ | $$ | $$ |__ $$ \/$$/ $$ | $$ |$$ |/$$/ $$ |
$$ | $$ | $$ | $$ | $$ $$/ $$ | $$ |$$ $$< $$ |
$$ | $$ | $$ | $$$$$/ $$$$/ $$ | $$ |$$$$$ \ $$ |
$$ |_____ _$$ |_ $$ | $$ |_____ $$ | $$ \__$$ |$$ |$$ \ _$$ |_
$$ |/ $$ | $$ | $$ | $$ | $$ $$/ $$ | $$ |/ $$ |
$$$$$$$$/ $$$$$$/ $$/ $$$$$$$$/ $$/ $$$$$$/ $$/ $$/ $$$$$$/ \033[0m'''
)

View File

@ -1 +0,0 @@
from nonebot import on_command

View File

@ -0,0 +1 @@
from .autorun import *

View File

@ -0,0 +1,6 @@
import nonebot
from nonebot import get_driver
from nonebot.utils import run_sync
driver = get_driver()

View File

@ -0,0 +1,11 @@
from nonebot.plugin import PluginMetadata
__plugin_meta__ = PluginMetadata(
name='text.weather.name',
description='text.weather.description',
usage='text.weather.usage',
extra={
'liteyuki_plugin': True
}
)

View File

@ -0,0 +1,52 @@
import json
import os
import random
import nonebot
from nonebot import get_driver, logger, on_command
from nonebot.params import CommandArg
from nonebot.permission import SUPERUSER
from src.api.adapter import Bot, Message, MessageEvent
from src.api.resource import Language, ResourcePack, load_resource_from_index, language_data, system_lang
from src.api.data import Data
# register superuser
driver = get_driver()
# load resource pack
load_resource_from_index()
@driver.on_bot_connect
async def detect_superuser(bot: Bot):
db = Data('common', 'config')
db.remove('auth_code')
if not len(bot.config.superusers):
auth_code = str(random.randint(1000, 9999))
db.set('auth_code', auth_code)
nonebot.logger.opt(colors=True).warning(Language().get('log.main.no_superusers', CS=list(bot.config.command_start)[0], AUTH_CODE=auth_code))
cmd_reg_su = on_command(cmd='SU')
su_test = on_command(cmd='ST', permission=SUPERUSER)
@cmd_reg_su.handle()
async def _(bot: Bot, event: MessageEvent, arg: Message = CommandArg(), ):
db = Data('common', 'config')
ul = Language.get_user_language(event.user_id)
auth_code = db.get('auth_code', None)
if isinstance(auth_code, str):
if str(arg) == auth_code:
bot.config.superusers.add(event.user_id)
await cmd_reg_su.send(ul.get('msg.main.suc_to_reg_su', USER_ID=event.user_id))
logger.opt(colors=True).success(Language().get('log.main.suc_to_reg_su', USER_ID=event.user_id))
else:
await cmd_reg_su.send(ul.get('msg.main.fail_to_reg_su', USER_ID=event.user_id))
logger.opt(colors=True).warning(Language().get('log.main.fail_to_reg_su', USER_ID=event.user_id))
@su_test.handle()
async def _():
await su_test.send('suc')

7
src/metadata.json Normal file
View File

@ -0,0 +1,7 @@
{
"version_name": "5.0.2",
"version_id": 50002,
"new_feature": [
"1.更新不了一点"
]
}

View File

@ -2,13 +2,20 @@ import os.path
import threading
import nonebot
import yaml
from src.api.data import Data
from nonebot import get_driver
from nonebot.adapters.onebot.v11 import Adapter as V11Adapter
from nonebot.adapters.onebot.v12 import Adapter as V12Adapter
adapters = [V11Adapter, V12Adapter]
if not os.path.exists('config.yml'):
f = open('config.yml', 'w', encoding='utf-8')
yaml.dump(
{
'liteyuki': {
},
'nonebot': {
@ -22,27 +29,52 @@ config = yaml.safe_load(open('config.yml', encoding='utf-8'))
if not isinstance(config, dict):
config = dict()
class Liteyuki:
def __init__(self, params=None):
print(
'\033[34m' + r''' __ ______ ________ ________ __ __ __ __ __ __ ______
/ | / |/ |/ |/ \ / |/ | / |/ | / |/ |
$$ | $$$$$$/ $$$$$$$$/ $$$$$$$$/ $$ \ /$$/ $$ | $$ |$$ | /$$/ $$$$$$/
$$ | $$ | $$ | $$ |__ $$ \/$$/ $$ | $$ |$$ |/$$/ $$ |
$$ | $$ | $$ | $$ | $$ $$/ $$ | $$ |$$ $$< $$ |
$$ | $$ | $$ | $$$$$/ $$$$/ $$ | $$ |$$$$$ \ $$ |
$$ |_____ _$$ |_ $$ | $$ |_____ $$ | $$ \__$$ |$$ |$$ \ _$$ |_
$$ |/ $$ | $$ | $$ | $$ | $$ $$/ $$ | $$ |/ $$ |
$$$$$$$$/ $$$$$$/ $$/ $$$$$$$$/ $$/ $$$$$$/ $$/ $$/ $$$$$$/ ''' + '\033[0m'
)
if params is None:
params = dict()
params.update()
kwargs = {
'port': 11451,
'host': '127.0.0.1',
'nickname': ['Liteyuki'],
'command_start': ['']
}
kwargs.update(config.get('nonebot', {}))
kwargs.update(params)
self.liteyuki_main = threading.Thread(target=nonebot.run)
self.running_state = 1
nonebot.init(**kwargs)
self.driver = get_driver()
def start(self):
self.liteyuki_main.start()
nonebot.load_plugin('src.builtin.liteyuki_main')
for adapter in adapters:
self.driver.register_adapter(adapter)
nonebot.load_plugin('src.liteyuki_main')
nonebot.load_plugins('src/builtin')
nonebot.run()
def after_load():
nonebot.load_plugins('plugins')
installed_plugins = Data('common', 'system').get('installed_plugins', [])
for plugin_name in installed_plugins:
nonebot.load_plugin(plugin_name)
after_loader = threading.Thread(target=after_load)
after_loader.start()
def stop(self):
pass