mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-01-19 17:58:26 +08:00
35 lines
820 B
Python
35 lines
820 B
Python
from typing import Dict
|
|
|
|
from nonebot.adapters import Event
|
|
from nonebot.message import (
|
|
IgnoredException,
|
|
run_preprocessor,
|
|
run_postprocessor,
|
|
)
|
|
|
|
_running_matcher: Dict[str, int] = {}
|
|
|
|
|
|
@run_preprocessor
|
|
async def preprocess(event: Event):
|
|
try:
|
|
session_id = event.get_session_id()
|
|
except Exception:
|
|
return
|
|
current_event_id = id(event)
|
|
event_id = _running_matcher.get(session_id, None)
|
|
if event_id and event_id != current_event_id:
|
|
raise IgnoredException("Another matcher running")
|
|
|
|
_running_matcher[session_id] = current_event_id
|
|
|
|
|
|
@run_postprocessor
|
|
async def postprocess(event: Event):
|
|
try:
|
|
session_id = event.get_session_id()
|
|
except Exception:
|
|
return
|
|
if session_id in _running_matcher:
|
|
del _running_matcher[session_id]
|