magicoca/tests/test_select.py

45 lines
921 B
Python
Raw Normal View History

import time
2024-10-13 01:56:28 +08:00
from multiprocessing import Process
from magicoca import Chan, select
def sp1(chan: Chan[int]):
for i in range(10):
2024-10-13 02:06:01 +08:00
chan << i << i * 2
2024-10-13 01:56:28 +08:00
def sp2(chan: Chan[int]):
for i in range(10):
2024-10-13 02:06:01 +08:00
chan << i << i * 3
2024-10-13 01:56:28 +08:00
def rp(chans: list[Chan[int]]):
rl = []
for t in select(*chans):
rl.append(t)
2024-10-13 02:06:01 +08:00
if len(rl) == 40:
2024-10-13 01:56:28 +08:00
break
print(rl)
def send_process(chan: Chan[int], _id: int):
while True:
chan << _id
time.sleep(2)
2024-10-13 01:56:28 +08:00
def recv_process(chan_list: list[Chan[int]]):
for t in select(*chan_list):
print(t)
2024-10-13 01:56:28 +08:00
class TestSelect:
def test_select(self):
chan_list = []
for i in range(10):
chan = Chan[int]()
chan_list.append(chan)
p = Process(target=send_process, args=(chan, i))
p.start()
p = Process(target=recv_process, args=(chan_list,))
p.start()