diff --git a/magicoca/__init__.py b/magicoca/__init__.py index 35a5681..ac10b42 100644 --- a/magicoca/__init__.py +++ b/magicoca/__init__.py @@ -1,4 +1,5 @@ from multiprocessing import set_start_method +from multiprocessing.connection import wait from typing import Any, Callable, Generator from magicoca.chan import Chan, T, NoRecvValue @@ -16,10 +17,14 @@ def select(*args: Chan[T]) -> Generator[T, None, None]: Args: args: channels """ - while True: - for ch in args: - if ch.is_closed: - continue + pipes = [ch.recv_conn for ch in args if not ch.is_closed] - if not isinstance(value := ch.recv(0), NoRecvValue): - yield value + while pipes: + ready_pipes = wait(pipes) + for pipe in ready_pipes: + for ch in args: + if ch.recv_conn == pipe: + if not isinstance(value := ch.recv(0), NoRecvValue): + yield value + if ch.is_closed: + pipes.remove(pipe) \ No newline at end of file diff --git a/magicoca/chan.py b/magicoca/chan.py index b4a7de9..bb80e78 100644 --- a/magicoca/chan.py +++ b/magicoca/chan.py @@ -116,3 +116,4 @@ class Chan(Generic[T]): Close the channel when the object is deleted. """ self.close() + diff --git a/main.py b/main.py new file mode 100644 index 0000000..6cd270f --- /dev/null +++ b/main.py @@ -0,0 +1,37 @@ +import time +from multiprocessing import Process + +from magicoca import select, Chan + + +def send_process(chan: Chan[int], _id: int): + i = 0 + while True: + i += 1 + chan << _id + time.sleep(2) + if i == 50: + chan << -1 + break + + +def recv_process(chan_list: list[Chan[int]]): + for t in select(*chan_list): + print(t) + if t == -1: + break + + +def main(): + chan_list = [] + for i in range(10): + chan = Chan[int]() + chan_list.append(chan) + p = Process(target=send_process, args=(chan, i)) + p.start() + p = Process(target=recv_process, args=(chan_list,)) + p.start() + + +if __name__ == '__main__': + main() diff --git a/pdm.lock b/pdm.lock index b48efcf..31205f8 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:0036f04f76c33e040729d2763c99d933269bae8a2417a2f5846a78887dd196c9" +content_hash = "sha256:1a5afca1143365cebc4c8dbe2a7d96e08a78ff461030ba0f0fd92d9b2e1b3771" [[metadata.targets]] requires_python = ">=3.10" @@ -87,6 +87,17 @@ files = [ {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, ] +[[package]] +name = "objprint" +version = "0.2.3" +requires_python = ">=3.6" +summary = "A library that can print Python objects in human readable format" +groups = ["dev"] +files = [ + {file = "objprint-0.2.3-py3-none-any.whl", hash = "sha256:1721e6f97bae5c5b86c2716a0d45a9dd2c9a4cd9f52cfc8a0dfbe801805554cb"}, + {file = "objprint-0.2.3.tar.gz", hash = "sha256:73d0ad5a7c3151fce634c8892e5c2a050ccae3b1a353bf1316f08b7854da863b"}, +] + [[package]] name = "packaging" version = "24.1" @@ -150,3 +161,51 @@ files = [ {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, ] + +[[package]] +name = "viztracer" +version = "0.17.0" +requires_python = ">=3.9" +summary = "A debugging and profiling tool that can trace and visualize python code execution" +groups = ["dev"] +dependencies = [ + "objprint>0.1.3", +] +files = [ + {file = "viztracer-0.17.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8c963fca8b32b8f34cfb931836d2214a0939503692ba12f7e7c883e89be558a5"}, + {file = "viztracer-0.17.0-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:486990cd0f5761dbe6c88c6fb4e2ff72b2e4b60f9bddfbf692973268b6d5879f"}, + {file = "viztracer-0.17.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2658dedb31119031d75e9dc82c55a8b6a2d6e4075a6af9afa765718ae8d2bad1"}, + {file = "viztracer-0.17.0-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7af07a223e25ec2ada6b7f8a0a4ebaa4ac4120c25910df470d7a85a426e9117d"}, + {file = "viztracer-0.17.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8b6b7a42bca2eac521afadd33f513fefb08099b8f7dd08fb346d20e012b0fdcf"}, + {file = "viztracer-0.17.0-cp310-cp310-win32.whl", hash = "sha256:c558853385bea8d70735fd36c75a35f37dad99fd3de2064fc9f709046312730f"}, + {file = "viztracer-0.17.0-cp310-cp310-win_amd64.whl", hash = "sha256:455e2cc6f6d69d0caaa20f13217b140070531c3ec35eb6878e7a37ee107acd6b"}, + {file = "viztracer-0.17.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:13b207badfeaa89096c285d7161b4d83db41c7f7721dcec0091e5426a47d636a"}, + {file = "viztracer-0.17.0-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:d44133a2279266238a3ebdcef00ab9a89f4e7f0596521166d25e5400ed6207ae"}, + {file = "viztracer-0.17.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:929094255fb0bc7de1e415a79a2c5c6fc3c71fc101818c5729991ebd25f89ed1"}, + {file = "viztracer-0.17.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9c98deafdb3439a8cb41b1d5eba8846c5e8c672dac757cead7ecaa2c7e240177"}, + {file = "viztracer-0.17.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3ad758e75f7b5fc8ffb45725f21f4dd1bd2787c0fe80b079ac1dee3b779069c3"}, + {file = "viztracer-0.17.0-cp311-cp311-win32.whl", hash = "sha256:28f38c54db3957b91c582b90b6090ce7c9b693d73d2f2320ebd02e996303d5d0"}, + {file = "viztracer-0.17.0-cp311-cp311-win_amd64.whl", hash = "sha256:371742b31ca2cbfecefa6fcdbc84cfff798c43f7497d57b0d9cb2e3690083486"}, + {file = "viztracer-0.17.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:67784cb54311f5580ea14adf05988bc9f2c180b896d541cea482062fc0495916"}, + {file = "viztracer-0.17.0-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:3b86ff18e479239bde2bafdf1035d4c3ead8185bdca7446bdb076c13a10dad81"}, + {file = "viztracer-0.17.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7f56fa71aa8ba44870fa35b3b443b0fb49c91dd1ae0d900db714364d26637be3"}, + {file = "viztracer-0.17.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:912ad33d2435797ca0e09c2e044d0d6538ccbadb537b71f4eb2cf27c8910c4d8"}, + {file = "viztracer-0.17.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b01147907c805de78fe4bfaadac3e1fb9f17adb88b30afa6500bf73f18b4d042"}, + {file = "viztracer-0.17.0-cp312-cp312-win32.whl", hash = "sha256:21c00b5b97b9b7ce5afea6288de3234457bdae8aa123df442ed2f8106423ab9c"}, + {file = "viztracer-0.17.0-cp312-cp312-win_amd64.whl", hash = "sha256:a539ed578e4462d0107421197c5fb7345a3572560fb940962418e56b72eaf0ac"}, + {file = "viztracer-0.17.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:a36db9b610131ce52ad911a08075e73ece8f863a74cafa846266e73bf49d4fae"}, + {file = "viztracer-0.17.0-cp313-cp313-macosx_11_0_x86_64.whl", hash = "sha256:fb64f867d36c2fff411b5f155e3f6d383bee76e4c927c9df321012ab34e05afe"}, + {file = "viztracer-0.17.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9b4cb83b86aebd24f872c4baead1c41ec114ca5f104c7297a8852bd37a1d3fa8"}, + {file = "viztracer-0.17.0-cp313-cp313-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:cc4de3e6d5a7906472a777ebc1908bd4f7f38b8d509b9fd725e8a568e13365f8"}, + {file = "viztracer-0.17.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bcac5ae4d20980883312e9527af6bfb87fdc5cdeafe7a67caf4e060f9066f5ed"}, + {file = "viztracer-0.17.0-cp313-cp313-win32.whl", hash = "sha256:3e9bf5674da8476027f4c94f33673136bf24d639260b7f5adcd3bcba1987ede7"}, + {file = "viztracer-0.17.0-cp313-cp313-win_amd64.whl", hash = "sha256:2dce28aed18faa8d923e59b76e13c18fe79fa7e9c5dbd632fcbaeae890c6c84f"}, + {file = "viztracer-0.17.0-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:227fa597fc837697483b91ccc93b12b3da67c1991b4716bc19096ec1419ed4e6"}, + {file = "viztracer-0.17.0-cp313-cp313t-macosx_11_0_x86_64.whl", hash = "sha256:0eb962c2459fb2e781691bfcb4d6cfa1ded90211ee6b1be68e4e31982b9f2f3f"}, + {file = "viztracer-0.17.0-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:96373fb17a94b96925c146caa7651ad16de5fd4a10d69cf11d58fb2a417943bd"}, + {file = "viztracer-0.17.0-cp313-cp313t-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b606830ee327e0e02d317a545d88f5a52ca8ad723c15e9e3f0063ac39f5668b1"}, + {file = "viztracer-0.17.0-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:febd0d23c1782461831a8c83df92475ab3ac019b5cf57208272a43eee3bda58b"}, + {file = "viztracer-0.17.0-cp313-cp313t-win32.whl", hash = "sha256:cd0b99c36ed0e1237fcabcd51952ef98c52ec2daab7605979874b89dec0cdeee"}, + {file = "viztracer-0.17.0-cp313-cp313t-win_amd64.whl", hash = "sha256:45803cf94c8c3ea622221c53df5aa9a9afcb457c8272f7e79bcf6eec56f0eac4"}, + {file = "viztracer-0.17.0.tar.gz", hash = "sha256:20397b0c2a6341513596fe4c292994c3db8f3f6b79a0a4f497dadb9d73d988b8"}, +] diff --git a/pyproject.toml b/pyproject.toml index 442832d..f5e488d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ tag_regex = '^v(?:\D*)?(?P([1-9][0-9]*!)?(0|[1-9][0-9]*)(\.(0|[1-9][0-9 dev = [ "pytest>=8.3.3", "mypy>=1.11.2", + "viztracer>=0.17.0", ] [tool.pdm] distribution = true diff --git a/tests/test_select.py b/tests/test_select.py index 580ecbf..8c0513c 100644 --- a/tests/test_select.py +++ b/tests/test_select.py @@ -1,3 +1,4 @@ +import time from multiprocessing import Process from magicoca import Chan, select @@ -13,7 +14,6 @@ def sp2(chan: Chan[int]): chan << i << i * 3 - def rp(chans: list[Chan[int]]): rl = [] for t in select(*chans): @@ -22,21 +22,23 @@ def rp(chans: list[Chan[int]]): break print(rl) +def send_process(chan: Chan[int], _id: int): + while True: + chan << _id + time.sleep(2) + +def recv_process(chan_list: list[Chan[int]]): + for t in select(*chan_list): + print(t) + class TestSelect: def test_select(self): - chan1 = Chan[int]() - chan2 = Chan[int]() - - print("Test Chan Select") - - p1 = Process(target=sp1, args=(chan1,)) - p2 = Process(target=sp2, args=(chan2,)) - p3 = Process(target=rp, args=([chan1, chan2],)) - p3.start() - p1.start() - p2.start() - - p1.join() - p2.join() - p3.join() + chan_list = [] + for i in range(10): + chan = Chan[int]() + chan_list.append(chan) + p = Process(target=send_process, args=(chan, i)) + p.start() + p = Process(target=recv_process, args=(chan_list,)) + p.start()