This is a piece of a big project I'm working on. This is an important part that will massively simplify report transmission in my program. The program tests a function against millions of inputs and uses multiprocessing to speed thing up.
#!/usr/bin/python3
from ctypes import c_bool, c_uint
from io import StringIO
from itertools import cycle, repeat, starmap
from multiprocessing import Event, Lock, Pipe, Process, Value
from typing import Any, Callable, Generator, Iterable, List, Tuple, Union
def p_value(v_type, *i_value, lock: Union[Lock, bool] = False) -> Value:
return Value(v_type, *i_value, lock = lock)
def event_clear_wait(_ev: Event):
_ev.clear()
_ev.wait()
def event_wait_clear(_ev: Event):
_ev.wait()
_ev.clear()
class ProcessPairController:
_event_1: Event
_event_2: Event
def __init__(self):
self._event_1 = Event()
self._event_2 = Event()
@property
def x1(self) -> bool:
return self._event_1.is_set()
@property
def x2(self) -> bool:
return self._event_2.is_set()
def a_clear0wait1(self):
self._event_2.clear()
event_clear_wait(self._event_1)
def a_clear0wait2(self):
self._event_1.clear()
event_clear_wait(self._event_2)
def a_clear1wait1(self):
event_clear_wait(self._event_1)
def a_clear2wait2(self):
event_clear_wait(self._event_2)
def a_clear1wait2(self):
self._event_1.clear()
self._event_2.wait()
def a_clear2wait1(self):
self._event_2.clear()
self._event_1.wait()
def a_clear1set2(self):
self._event_1.clear()
self._event_2.set()
def a_clear2set1(self):
self._event_2.clear()
self._event_1.set()
def b_clear1set2wait1(self):
(e1 := self._event_1).clear()
self._event_2.set()
e1.wait()
def b_clear2set1wait2(self):
(e2 := self._event_2).clear()
self._event_1.set()
e2.wait()
def b_set1wait2clear2(self):
self._event_1.set()
event_wait_clear(self._event_2)
def b_set2wait1clear1(self):
self._event_2.set()
event_wait_clear(self._event_1)
def b_set1wait2clear1(self):
(e1 := self._event_1).set()
self._event_2.wait()
e1.clear()
def b_set2wait1clear2(self):
(e2 := self._event_2).set()
self._event_1.wait()
e2.clear()
def b_wait1clear1set2(self):
event_wait_clear(self._event_1)
self._event_2.set()
def b_wait2clear2set1(self):
event_wait_clear(self._event_2)
self._event_1.set()
def b_set1wait2(self):
self._event_1.set()
self._event_2.wait()
def b_set2wait1(self):
self._event_2.set()
self._event_1.wait()
def b_wait1set2(self):
self._event_1.wait()
self._event_2.set()
def b_wait2set1(self):
self._event_2.wait()
self._event_1.set()
def b_wait1clear1(self):
event_wait_clear(self._event_1)
def b_wait2clear2(self):
event_wait_clear(self._event_2)
def clear0(self):
self._event_1.clear()
self._event_2.clear()
def clear1only(self):
self._event_1.clear()
def clear2only(self):
self._event_2.clear()
def set0(self):
self._event_1.set()
self._event_2.set()
def set1only(self):
self._event_1.set()
def set2only(self):
self._event_2.set()
def wait0(self):
self._event_1.wait()
self._event_2.wait()
def wait1only(self):
self._event_1.wait()
def wait2only(self):
self._event_2.wait()
G_NONE = Generator[None, None, None]
GPE_TYPE = Callable[[], None]
def gpe_give_next(g: Callable[[ProcessPairController, ProcessPairController], Union[G_NONE, GPE_TYPE]]) -> Callable[[ProcessPairController, ProcessPairController], GPE_TYPE]:
def inner(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
xg = g(a, b)
return lambda: next(xg)
return inner
class GeneratePPCExchange:
"""
@staticmethod
@gpe_give_next
def _template(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
pass
"""
@staticmethod
@gpe_give_next
def receive_sync(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
yield from (x.b_set1wait2clear1() for x in cycle((a, b)))
@staticmethod
@gpe_give_next
def send_sync(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
yield from (x.b_set2wait1clear2() for x in cycle((a, b)))
@staticmethod
@gpe_give_next
def send_oscillate(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE: # FIRES FIRST!
yield from (x.b_wait1clear1set2() for x in cycle((a, b)))
@staticmethod
@gpe_give_next
def receive_oscillate(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE: # FIRES SECOND!
yield from (x.b_set1wait2clear2() for x in cycle((a, b)))
NFT_TYPE = Union[bool, None]
L_NFT_TYPE = List[NFT_TYPE]
S_NONE = Union[str, None]
P_RECEIVER_TYPE = Generator[S_NONE, None, None]
GP_RECEIVER_TYPE = Generator[P_RECEIVER_TYPE, None, None]
P_SENDER_TYPE = Generator[bool, S_NONE, None]
def take_while_inclusive(predicate: Callable[[Any], bool], iterable: Iterable) -> Iterable:
for x in iter(iterable):
yield x # inclusive
if not predicate(x):
return
class SimplePipe:
_pipe_send: Pipe
_pipe_receive: Pipe
_c_remaining: Value
_sent_a_none: Value
_e_memory_error: Value
_e_sending_failure: Value
_data_sent_received: ProcessPairController
_connect_ask_answer: ProcessPairController
_receiver_g: Union[P_RECEIVER_TYPE, None]
_sender_g: Union[P_SENDER_TYPE, None]
def __init__(self):
self._pipe_receive, self._pipe_send = Pipe(xf)
self._c_remaining = p_value(c_uint, 0)
self._sent_a_none = p_value(c_bool, xf)
self._e_memory_error = p_value(c_bool, xf)
self._e_sending_failure = p_value(c_bool, xf)
self._data_sent_received = ProcessPairController()
self._connect_ask_answer = ProcessPairController()
self._receiver_g = self._sender_g = None
def init_sender(self):
next(sender_g := self._send_data()) # I <3 WALRUS OPERATOR!
self._sender_g = sender_g
def init_receiver(self):
next(receiver_g := self._receive_data())
self._receiver_g = receiver_g
def __del__(self):
for x in self._pipe_receive, self._pipe_send:
if not x.closed:
x.close()
@property
def receiver(self) -> GP_RECEIVER_TYPE:
yield from starmap(take_while_inclusive, repeat((bool, self._receiver_g)))
@property
def sender(self) -> Callable[[S_NONE], bool]:
return lambda _s: self._sender_g.send(_s)
@property
def memory_error(self) -> bool:
return self._e_memory_error.value
@memory_error.setter
def memory_error(self, n_memory_error: bool):
self._e_memory_error.value = n_memory_error
@property
def sending_failure(self) -> bool:
return self._e_sending_failure.value
@sending_failure.setter
def sending_failure(self, n_sending_failure: bool):
self._e_sending_failure.value = n_sending_failure
@property
def error(self) -> bool:
return self._e_memory_error.value or self._e_sending_failure.value
def _receive_data(self) -> P_RECEIVER_TYPE:
p_receive = self._pipe_receive
c_remaining = self._c_remaining
sent_a_none = self._sent_a_none
memory_error = self._e_memory_error
sending_failure = self._e_sending_failure
switch_processes = GeneratePPCExchange.receive_oscillate(self._connect_ask_answer, self._data_sent_received) # second
id_ps = get_ps_id(xf, xf)
while xt:
print_safe(id_ps, 'Waiting in main loop.')
yield switch_processes()
print_safe(id_ps, 'Proceeding in main loop.')
if sent_a_none.value:
print_safe(id_ps, 'Got a None.')
yield None
switch_processes()
print_safe(id_ps, 'None processed.')
continue
else:
while xt:
print_safe(id_ps, 'Polling pipe...')
if not self.error: # don't hang on error
p_receive.poll(None) # wait for data
print_safe(id_ps, 'done.')
else:
print_safe(id_ps, 'skipped.')
try:
print_safe(id_ps, 'Receiving data...')
data_in = p_receive.recv()
print_safe(id_ps, 'Data received...')
except MemoryError:
memory_error.value = xt
print_safe(id_ps, 'Memory error!')
yield ""
switch_processes()
print_safe(id_ps, 'Memory error handled.')
break
except EOFError:
sending_failure.value = xt
print_safe(id_ps, 'EOFError error!')
yield ''
switch_processes()
print_safe(id_ps, 'EOFError error handled.')
break
rem = c_remaining.value
yield data_in
print_safe(id_ps, 'Data chunk processed.')
switch_processes()
if not self.error:
print_safe(id_ps, 'No errors detected.')
if 0 <= rem <= 10_000:
if rem:
memory_error.value = sending_failure.value = xf
print_safe(id_ps, 'No data remaining in pipe.')
yield ''
switch_processes()
print_safe(id_ps, 'Data chunk stream completed.')
break
else:
print_safe(id_ps, 'Data chunk stream completed with errors!')
break
def _send_data(self) -> P_SENDER_TYPE:
p_send = self._pipe_send
c_remaining = self._c_remaining
sent_a_none = self._sent_a_none
memory_error = self._e_memory_error
sending_failure = self._e_sending_failure
text = StringIO()
start_chunk_size = 10_000 # ~32MiB Pipe() limit
def handoff(error: bool):
memory_error.value = error
sending_failure.value = error
switch_processes()
switch_processes = GeneratePPCExchange.send_oscillate(self._connect_ask_answer, self._data_sent_received) # first
id_ps = get_ps_id(xt, xf)
while xt:
print_safe(id_ps, 'Waiting in main loop.')
switch_processes()
data_in = yield self.error
print_safe(id_ps, 'Data primed.')
sent_a_none.value = data_is_none = data_in is None
c_remaining.value = 0
if data_is_none:
print_safe(id_ps, 'Sending None.')
handoff(xf)
print_safe(id_ps, 'None sent.')
else:
try:
print_safe(id_ps, 'Queuing data.')
c_remaining.value = text.write(data_in)
print_safe(id_ps, 'Data queuing.')
except (MemoryError, IOError) as e:
print_safe(id_ps, f'Queuing failed with error {e}.')
handoff(xt)
print_safe(id_ps, 'Queuing error handled.')
continue
text.seek(0)
chunk = start_chunk_size
chunks, last = divmod((remaining_characters := c_remaining.value), start_chunk_size)
print_safe(id_ps, f'Sending {remaining_characters} characters using ({chunks} chunks @ {start_chunk_size} characters) + (1 last chunk @ {last} characters).')
del chunks, last, remaining_characters # clean up
while c_remaining.value > 0:
print_safe(id_ps, f'Sending data chunk @ {chunk} characters.')
place = text.tell()
try:
p_send.send(s_tmp := text.read(chunk))
c_remaining.value -= len(s_tmp)
chunk = start_chunk_size
print_safe(id_ps, 'Data chunk sent.')
handoff(xf)
print_safe(id_ps, 'Data chunk processed.')
except (ValueError, MemoryError, IOError) as e:
chunk //= 2
print_safe(id_ps, f'Error {e} occurred.')
if chunk > 0:
text.seek(place)
else:
text.seek(0)
print_safe(id_ps, 'Chunk size 0 reached.')
handoff(xt)
print_safe(id_ps, 'Terminating chuck stream.')
break
def ask_remaining(self) -> int:
return self._c_remaining.value
def ask_error_state(self) -> Tuple[bool, bool, bool]:
return self._c_remaining.value > 0, self._e_memory_error.value, self._e_sending_failure.value
def ask_error_state_string(self) -> str:
return ''.join(('T' if x else 'F') for x in self.ask_error_state())
def rejoin(pipe: SimplePipe, data: P_RECEIVER_TYPE) -> S_NONE:
aggregator = StringIO()
while xt:
if pipe.error:
break
try:
if data_chunk := next(data): # None or ''
aggregator.write(data_chunk)
elif data_chunk is None:
return None
else:
break
except MemoryError:
pipe.memory_error = xt
break
aggregator.seek(0)
return aggregator.read()
text_lock = Lock()
def get_ps_id(sender: bool, tester: bool) -> str:
return ('Sender.Test:' if tester else 'Sender.Pipe:') if sender else ('Receiver.Test:' if tester else 'Receiver.Pipe:')
def print_safe(sender_message: str, *args, **kwargs):
with text_lock:
print(sender_message, *args, **kwargs)
def get_input(msg: str) -> str:
with text_lock:
return input(msg)
xt = True
xf = False
def sp_main():
nexis = SimplePipe()
def f_input():
nexis.init_sender()
d_send = nexis.sender
quit_phrases = ('quit', 'end', 'none')
id_ps = get_ps_id(xt, xt)
print_safe(id_ps, 'Initializing.')
while xt:
try:
print_safe(id_ps, 'Sending text...')
error = d_send(d_in := (None if (d_in := get_input('Message: ')).strip().lower() in quit_phrases else d_in)) # I <3 WALRUS OPERATOR!
except EOFError: # input
print_safe(id_ps, 'Input() EOF.')
continue
except Exception as e:
print_safe(id_ps, 'error', e)
continue
finally:
print_safe(id_ps, 'Sent.')
if error:
print_safe(id_ps, f'Error while sending {repr(d_in)} with error state {nexis.ask_error_state_string()}.')
continue
if d_in is None:
break
get_input('Hit [enter].')
print_safe(id_ps, 'Session ended.')
def f_print():
nexis.init_receiver()
d_receive = nexis.receiver
id_ps = get_ps_id(xf, xt)
print_safe(id_ps, 'Initializing.')
while xt:
try:
print_safe(id_ps, 'Receiving data...')
text = rejoin(nexis, next(d_receive))
print_safe(id_ps, 'Message compiled.')
if nexis.error:
print_safe(id_ps, 'Error state', nexis.ask_error_state_string(), 'encountered.')
except MemoryError:
print_safe(id_ps, 'Error rejoining test.')
continue
if text is None:
print_safe(id_ps, 'Output = None.')
break
else:
print_safe(id_ps, 'Output:', text)
print_safe(id_ps, 'Session concluded.')
(p_print := Process(target = f_print, name = "P_print")).start()
f_input()
p_print.join()
return
if __name__ == '__main__':
sp_main()
Put simply, multiprocessing.Pipe()
is inadequate. It should be able to handle massive strings and switch process execution between a sender and receiver. I wrote this to implement:
- Automatic error handling
- Transmission error categorization
- Data transmission chunking and reassembly
- Unlimited data transmission size
- Process synchronization
- Simple abstraction to enhance usability
It has a weird bug I can't find. Days and plenty of documentation later, it's not fixed. I've left in a good many debug lines. Try entering "hi": you don't see "Receiver.Test: Output: hi" but should. Try a second time, it just hangs:
/usr/bin/python3 /home/[redacted]/simple_pipe.py
Sender.Pipe: Waiting in main loop.
Receiver.Pipe: Waiting in main loop.
Sender.Test: Initializing.
Sender.Test: Sending text...
Message: hio
Receiver.Test: Initializing.
Receiver.Test: Receiving data...
Sender.Pipe: Data primed.
Sender.Pipe: Queuing data.
Receiver.Pipe: Proceeding in main loop.
Receiver.Pipe: Polling pipe...
Sender.Pipe: Data queuing.
Sender.Pipe: Sending 3 characters using (0 chunks @ 10000 characters) + (1 last chunk @ 3 characters).
Sender.Pipe: Sending data chunk @ 10000 characters.
Sender.Pipe: Data chunk sent.
Receiver.Pipe: done.
Receiver.Pipe: Receiving data...
Receiver.Pipe: Data received...
Receiver.Pipe: Data chunk processed.
Sender.Pipe: Data chunk processed.
Sender.Pipe: Waiting in main loop.
Receiver.Pipe: No errors detected.
Receiver.Pipe: Waiting in main loop.
Sender.Test: Sent.
Receiver.Test: Message compiled.
Hit [enter].
Receiver.Test: Output = None.
Receiver.Test: Session concluded.
Sender.Test: Sending text...
Message: What?
Sender.Pipe: Data primed.
Sender.Pipe: Queuing data.
Sender.Pipe: Data queuing.
Sender.Pipe: Sending 5 characters using (0 chunks @ 10000 characters) + (1 last chunk @ 5 characters).
Sender.Pipe: Sending data chunk @ 10000 characters.
Sender.Pipe: Data chunk sent.
It's time to ask for help. It is part of a larger project. This should be part of the multiprocessing
module. I'm humbled. Can someone tell me what up? Thank you all in advance!
from Enhanced Python Multiprocessing Data Pipeline Wrapper
No comments:
Post a Comment