添加通道连接特性,可用+连接两个通道,连接后将交换双方的接收端

This commit is contained in:
远野千束 2024-10-13 01:09:25 +08:00
parent 873123a99d
commit ffd7847fb1
5 changed files with 116 additions and 46 deletions

View File

@ -1 +1,5 @@
from multiprocessing import set_start_method
from magicoca.chan import Chan, T
set_start_method("spawn", force=True)

View File

@ -12,8 +12,8 @@ class Chan(Generic[T]):
Chan is a simple channel implementation using multiprocessing.Pipe.
Attributes:
_send_conn: The sending end of the pipe.
_recv_conn: The receiving end of the pipe.
send_conn: The sending end of the pipe.
recv_conn: The receiving end of the pipe.
Methods:
send: Send a value to the channel.
@ -29,7 +29,7 @@ class Chan(Generic[T]):
"""
Initialize the channel.
"""
self._send_conn, self._recv_conn = Pipe()
self.send_conn, self.recv_conn = Pipe()
def send(self, value: T):
"""
@ -37,9 +37,9 @@ class Chan(Generic[T]):
Args:
value: The value to send.
"""
self._send_conn.send(value)
self.send_conn.send(value)
def recv(self, timeout: float | None) -> T | None:
def recv(self, timeout: float | None = None) -> T | None:
"""Receive a value from the channel.
If the timeout is None, it will block until a value is received.
If the timeout is a positive number, it will wait for the specified time, and if no value is received, it will return None.
@ -55,16 +55,16 @@ class Chan(Generic[T]):
从通道接收的值
"""
if timeout is not None:
if not self._recv_conn.poll(timeout):
if not self.recv_conn.poll(timeout):
return None
return self._recv_conn.recv()
return self.recv_conn.recv()
def close(self):
"""
Close the channel. destructor
"""
self._send_conn.close()
self._recv_conn.close()
self.send_conn.close()
self.recv_conn.close()
def __iter__(self) -> "Chan[T]":
"""
@ -91,3 +91,12 @@ class Chan(Generic[T]):
Returns: The value received from the channel.
"""
return self.recv(None)
def __add__(self, other: "Chan[T]") -> "Chan[T]":
"""
Connect 1 channel.send to another channel.recv.
Args:
other:
Returns:
"""
self.recv_conn, other.recv_conn = other.recv_conn, self.recv_conn

View File

@ -5,7 +5,7 @@
groups = ["default", "dev"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:7448dc54658d779bbdd51b39ae72faf632280702d8719337a0fff377415332ba"
content_hash = "sha256:0036f04f76c33e040729d2763c99d933269bae8a2417a2f5846a78887dd196c9"
[[metadata.targets]]
requires_python = ">=3.10"
@ -45,6 +45,48 @@ files = [
{file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"},
]
[[package]]
name = "mypy"
version = "1.11.2"
requires_python = ">=3.8"
summary = "Optional static typing for Python"
groups = ["dev"]
dependencies = [
"mypy-extensions>=1.0.0",
"tomli>=1.1.0; python_version < \"3.11\"",
"typing-extensions>=4.6.0",
]
files = [
{file = "mypy-1.11.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d42a6dd818ffce7be66cce644f1dff482f1d97c53ca70908dff0b9ddc120b77a"},
{file = "mypy-1.11.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:801780c56d1cdb896eacd5619a83e427ce436d86a3bdf9112527f24a66618fef"},
{file = "mypy-1.11.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:41ea707d036a5307ac674ea172875f40c9d55c5394f888b168033177fce47383"},
{file = "mypy-1.11.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:6e658bd2d20565ea86da7d91331b0eed6d2eee22dc031579e6297f3e12c758c8"},
{file = "mypy-1.11.2-cp310-cp310-win_amd64.whl", hash = "sha256:478db5f5036817fe45adb7332d927daa62417159d49783041338921dcf646fc7"},
{file = "mypy-1.11.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:75746e06d5fa1e91bfd5432448d00d34593b52e7e91a187d981d08d1f33d4385"},
{file = "mypy-1.11.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a976775ab2256aadc6add633d44f100a2517d2388906ec4f13231fafbb0eccca"},
{file = "mypy-1.11.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:cd953f221ac1379050a8a646585a29574488974f79d8082cedef62744f0a0104"},
{file = "mypy-1.11.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:57555a7715c0a34421013144a33d280e73c08df70f3a18a552938587ce9274f4"},
{file = "mypy-1.11.2-cp311-cp311-win_amd64.whl", hash = "sha256:36383a4fcbad95f2657642a07ba22ff797de26277158f1cc7bd234821468b1b6"},
{file = "mypy-1.11.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:e8960dbbbf36906c5c0b7f4fbf2f0c7ffb20f4898e6a879fcf56a41a08b0d318"},
{file = "mypy-1.11.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:06d26c277962f3fb50e13044674aa10553981ae514288cb7d0a738f495550b36"},
{file = "mypy-1.11.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6e7184632d89d677973a14d00ae4d03214c8bc301ceefcdaf5c474866814c987"},
{file = "mypy-1.11.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:3a66169b92452f72117e2da3a576087025449018afc2d8e9bfe5ffab865709ca"},
{file = "mypy-1.11.2-cp312-cp312-win_amd64.whl", hash = "sha256:969ea3ef09617aff826885a22ece0ddef69d95852cdad2f60c8bb06bf1f71f70"},
{file = "mypy-1.11.2-py3-none-any.whl", hash = "sha256:b499bc07dbdcd3de92b0a8b29fdf592c111276f6a12fe29c30f6c417dd546d12"},
{file = "mypy-1.11.2.tar.gz", hash = "sha256:7f9993ad3e0ffdc95c2a14b66dee63729f021968bff8ad911867579c65d13a79"},
]
[[package]]
name = "mypy-extensions"
version = "1.0.0"
requires_python = ">=3.5"
summary = "Type system extensions for programs checked with the mypy type checker."
groups = ["dev"]
files = [
{file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"},
{file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"},
]
[[package]]
name = "packaging"
version = "24.1"
@ -97,3 +139,14 @@ files = [
{file = "tomli-2.0.2-py3-none-any.whl", hash = "sha256:2ebe24485c53d303f690b0ec092806a085f07af5a5aa1464f3931eec36caaa38"},
{file = "tomli-2.0.2.tar.gz", hash = "sha256:d46d457a85337051c36524bc5349dd91b1877838e2979ac5ced3e710ed8a60ed"},
]
[[package]]
name = "typing-extensions"
version = "4.12.2"
requires_python = ">=3.8"
summary = "Backported and Experimental Type Hints for Python 3.8+"
groups = ["dev"]
files = [
{file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"},
{file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"},
]

View File

@ -23,6 +23,7 @@ tag_regex = '^v(?:\D*)?(?P<version>([1-9][0-9]*!)?(0|[1-9][0-9]*)(\.(0|[1-9][0-9
[tool.pdm.dev-dependencies]
dev = [
"pytest>=8.3.3",
"mypy>=1.11.2",
]
[tool.pdm]
distribution = true

View File

@ -1,30 +1,34 @@
import time
from magicoca.chan import Chan
from multiprocessing import Process
from multiprocessing import Process, set_start_method
def p1f(chan: Chan[int]):
for i in range(10):
chan << i
chan << -1
def p2f(chan: Chan[int]):
recv_ans = []
while True:
a = int << chan
print("Recv", a)
recv_ans.append(a)
if a == -1:
break
if recv_ans != list(range(10)) + [-1]:
raise ValueError("Chan Shift Test Failed")
class TestChan:
def test_test(self):
print("Test is running")
def test_chan_shift(self):
"""测试运算符"""
ch = Chan[int]()
def p1f(chan: Chan[int]):
for i in range(10):
time.sleep(1)
chan << i
chan << -1
def p2f(chan: Chan[int]):
recv_ans = []
while True:
a = int << chan
print("Recv", a)
recv_ans.append(a)
if a == -1:
break
if recv_ans != list(range(10)) + [-1]:
raise ValueError("Chan Shift Test Failed")
print("Test Chan Shift")
p1 = Process(target=p1f, args=(ch,))
p2 = Process(target=p2f, args=(ch,))
@ -34,25 +38,9 @@ class TestChan:
p2.join()
def test_chan_sr(self):
"""测试收发"""
ch = Chan[int]()
def p1f(chan: Chan[int]):
for i in range(10):
time.sleep(1)
chan.send(i)
chan.send(-1)
def p2f(chan: Chan[int]):
recv_ans = []
while True:
a = chan.recv()
recv_ans.append(a)
print("Recv2", a)
if a == -1:
break
if recv_ans != list(range(10)) + [-1]:
raise ValueError("Chan SR Test Failed")
print("Test Chan SR")
p1 = Process(target=p1f, args=(ch,))
p2 = Process(target=p2f, args=(ch,))
@ -60,3 +48,18 @@ class TestChan:
p2.start()
p1.join()
p2.join()
def test_connect(self):
"""测试双通道连接"""
chan1 = Chan[int]()
chan2 = Chan[int]()
chan1 + chan2
print("Test Chan Connect")
p1 = Process(target=p1f, args=(chan1,))
p2 = Process(target=p2f, args=(chan2,))
p1.start()
p2.start()
p1.join()
p2.join()