magicoca/tests/test_select.py

43 lines
787 B
Python
Raw Permalink Normal View History

2024-10-13 01:56:28 +08:00
from multiprocessing import Process
from magicoca import Chan, select
def sp1(chan: Chan[int]):
for i in range(10):
2024-10-13 02:06:01 +08:00
chan << i << i * 2
2024-10-13 01:56:28 +08:00
def sp2(chan: Chan[int]):
for i in range(10):
2024-10-13 02:06:01 +08:00
chan << i << i * 3
2024-10-13 01:56:28 +08:00
def rp(chans: list[Chan[int]]):
rl = []
for t in select(*chans):
rl.append(t)
2024-10-13 02:06:01 +08:00
if len(rl) == 40:
2024-10-13 01:56:28 +08:00
break
print(rl)
class TestSelect:
def test_select(self):
chan1 = Chan[int]()
chan2 = Chan[int]()
print("Test Chan Select")
p1 = Process(target=sp1, args=(chan1,))
p2 = Process(target=sp2, args=(chan2,))
p3 = Process(target=rp, args=([chan1, chan2],))
p3.start()
p1.start()
p2.start()
p1.join()
p2.join()
p3.join()