Monday, 7 September 2020

Enhanced Python Multiprocessing Data Pipeline Wrapper

This is a piece of a big project I'm working on. This is an important part that will massively simplify report transmission in my program. The program tests a function against millions of inputs and uses multiprocessing to speed thing up.

Source code also on Pastebin.

#!/usr/bin/python3
from ctypes import c_bool, c_uint
from io import StringIO
from itertools import cycle, repeat, starmap
from multiprocessing import Event, Lock, Pipe, Process, Value
from typing import Any, Callable, Generator, Iterable, List, Tuple, Union


def p_value(v_type, *i_value, lock: Union[Lock, bool] = False) -> Value:
    return Value(v_type, *i_value, lock = lock)


def event_clear_wait(_ev: Event):
    _ev.clear()
    _ev.wait()


def event_wait_clear(_ev: Event):
    _ev.wait()
    _ev.clear()


class ProcessPairController:
    _event_1: Event
    _event_2: Event

    def __init__(self):
        self._event_1 = Event()
        self._event_2 = Event()

    @property
    def x1(self) -> bool:
        return self._event_1.is_set()

    @property
    def x2(self) -> bool:
        return self._event_2.is_set()

    def a_clear0wait1(self):
        self._event_2.clear()
        event_clear_wait(self._event_1)

    def a_clear0wait2(self):
        self._event_1.clear()
        event_clear_wait(self._event_2)

    def a_clear1wait1(self):
        event_clear_wait(self._event_1)

    def a_clear2wait2(self):
        event_clear_wait(self._event_2)

    def a_clear1wait2(self):
        self._event_1.clear()
        self._event_2.wait()

    def a_clear2wait1(self):
        self._event_2.clear()
        self._event_1.wait()

    def a_clear1set2(self):
        self._event_1.clear()
        self._event_2.set()

    def a_clear2set1(self):
        self._event_2.clear()
        self._event_1.set()

    def b_clear1set2wait1(self):
        (e1 := self._event_1).clear()
        self._event_2.set()
        e1.wait()

    def b_clear2set1wait2(self):
        (e2 := self._event_2).clear()
        self._event_1.set()
        e2.wait()

    def b_set1wait2clear2(self):
        self._event_1.set()
        event_wait_clear(self._event_2)

    def b_set2wait1clear1(self):
        self._event_2.set()
        event_wait_clear(self._event_1)

    def b_set1wait2clear1(self):
        (e1 := self._event_1).set()
        self._event_2.wait()
        e1.clear()

    def b_set2wait1clear2(self):
        (e2 := self._event_2).set()
        self._event_1.wait()
        e2.clear()

    def b_wait1clear1set2(self):
        event_wait_clear(self._event_1)
        self._event_2.set()

    def b_wait2clear2set1(self):
        event_wait_clear(self._event_2)
        self._event_1.set()

    def b_set1wait2(self):
        self._event_1.set()
        self._event_2.wait()

    def b_set2wait1(self):
        self._event_2.set()
        self._event_1.wait()

    def b_wait1set2(self):
        self._event_1.wait()
        self._event_2.set()

    def b_wait2set1(self):
        self._event_2.wait()
        self._event_1.set()

    def b_wait1clear1(self):
        event_wait_clear(self._event_1)

    def b_wait2clear2(self):
        event_wait_clear(self._event_2)

    def clear0(self):
        self._event_1.clear()
        self._event_2.clear()

    def clear1only(self):
        self._event_1.clear()

    def clear2only(self):
        self._event_2.clear()

    def set0(self):
        self._event_1.set()
        self._event_2.set()

    def set1only(self):
        self._event_1.set()

    def set2only(self):
        self._event_2.set()

    def wait0(self):
        self._event_1.wait()
        self._event_2.wait()

    def wait1only(self):
        self._event_1.wait()

    def wait2only(self):
        self._event_2.wait()


G_NONE = Generator[None, None, None]
GPE_TYPE = Callable[[], None]


def gpe_give_next(g: Callable[[ProcessPairController, ProcessPairController], Union[G_NONE, GPE_TYPE]]) -> Callable[[ProcessPairController, ProcessPairController], GPE_TYPE]:
    def inner(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
        xg = g(a, b)
        return lambda: next(xg)

    return inner


class GeneratePPCExchange:
    """
    @staticmethod
    @gpe_give_next
    def _template(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
        pass
    """

    @staticmethod
    @gpe_give_next
    def receive_sync(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
        yield from (x.b_set1wait2clear1() for x in cycle((a, b)))

    @staticmethod
    @gpe_give_next
    def send_sync(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
        yield from (x.b_set2wait1clear2() for x in cycle((a, b)))

    @staticmethod
    @gpe_give_next
    def send_oscillate(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:  # FIRES FIRST!
        yield from (x.b_wait1clear1set2() for x in cycle((a, b)))

    @staticmethod
    @gpe_give_next
    def receive_oscillate(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:  # FIRES SECOND!
        yield from (x.b_set1wait2clear2() for x in cycle((a, b)))


NFT_TYPE = Union[bool, None]
L_NFT_TYPE = List[NFT_TYPE]

S_NONE = Union[str, None]
P_RECEIVER_TYPE = Generator[S_NONE, None, None]
GP_RECEIVER_TYPE = Generator[P_RECEIVER_TYPE, None, None]
P_SENDER_TYPE = Generator[bool, S_NONE, None]


def take_while_inclusive(predicate: Callable[[Any], bool], iterable: Iterable) -> Iterable:
    for x in iter(iterable):
        yield x  # inclusive
        if not predicate(x):
            return


class SimplePipe:
    _pipe_send: Pipe
    _pipe_receive: Pipe
    _c_remaining: Value
    _sent_a_none: Value
    _e_memory_error: Value
    _e_sending_failure: Value
    _data_sent_received: ProcessPairController
    _connect_ask_answer: ProcessPairController
    _receiver_g: Union[P_RECEIVER_TYPE, None]
    _sender_g: Union[P_SENDER_TYPE, None]

    def __init__(self):
        self._pipe_receive, self._pipe_send = Pipe(xf)
        self._c_remaining = p_value(c_uint, 0)
        self._sent_a_none = p_value(c_bool, xf)
        self._e_memory_error = p_value(c_bool, xf)
        self._e_sending_failure = p_value(c_bool, xf)
        self._data_sent_received = ProcessPairController()
        self._connect_ask_answer = ProcessPairController()
        self._receiver_g = self._sender_g = None

    def init_sender(self):
        next(sender_g := self._send_data())  # I <3 WALRUS OPERATOR!
        self._sender_g = sender_g

    def init_receiver(self):
        next(receiver_g := self._receive_data())
        self._receiver_g = receiver_g

    def __del__(self):
        for x in self._pipe_receive, self._pipe_send:
            if not x.closed:
                x.close()

    @property
    def receiver(self) -> GP_RECEIVER_TYPE:
        yield from starmap(take_while_inclusive, repeat((bool, self._receiver_g)))

    @property
    def sender(self) -> Callable[[S_NONE], bool]:
        return lambda _s: self._sender_g.send(_s)

    @property
    def memory_error(self) -> bool:
        return self._e_memory_error.value

    @memory_error.setter
    def memory_error(self, n_memory_error: bool):
        self._e_memory_error.value = n_memory_error

    @property
    def sending_failure(self) -> bool:
        return self._e_sending_failure.value

    @sending_failure.setter
    def sending_failure(self, n_sending_failure: bool):
        self._e_sending_failure.value = n_sending_failure

    @property
    def error(self) -> bool:
        return self._e_memory_error.value or self._e_sending_failure.value

    def _receive_data(self) -> P_RECEIVER_TYPE:
        p_receive = self._pipe_receive
        c_remaining = self._c_remaining
        sent_a_none = self._sent_a_none
        memory_error = self._e_memory_error
        sending_failure = self._e_sending_failure
        switch_processes = GeneratePPCExchange.receive_oscillate(self._connect_ask_answer, self._data_sent_received)  # second
        id_ps = get_ps_id(xf, xf)
        while xt:
            print_safe(id_ps, 'Waiting in main loop.')
            yield switch_processes()
            print_safe(id_ps, 'Proceeding in main loop.')
            if sent_a_none.value:
                print_safe(id_ps, 'Got a None.')
                yield None
                switch_processes()
                print_safe(id_ps, 'None processed.')
                continue
            else:
                while xt:
                    print_safe(id_ps, 'Polling pipe...')
                    if not self.error:  # don't hang on error
                        p_receive.poll(None)  # wait for data
                        print_safe(id_ps, 'done.')
                    else:
                        print_safe(id_ps, 'skipped.')
                    try:
                        print_safe(id_ps, 'Receiving data...')
                        data_in = p_receive.recv()
                        print_safe(id_ps, 'Data received...')
                    except MemoryError:
                        memory_error.value = xt
                        print_safe(id_ps, 'Memory error!')
                        yield ""
                        switch_processes()
                        print_safe(id_ps, 'Memory error handled.')
                        break
                    except EOFError:
                        sending_failure.value = xt
                        print_safe(id_ps, 'EOFError error!')
                        yield ''
                        switch_processes()
                        print_safe(id_ps, 'EOFError error handled.')
                        break
                    rem = c_remaining.value
                    yield data_in
                    print_safe(id_ps, 'Data chunk processed.')
                    switch_processes()
                    if not self.error:
                        print_safe(id_ps, 'No errors detected.')
                        if 0 <= rem <= 10_000:
                            if rem:
                                memory_error.value = sending_failure.value = xf
                                print_safe(id_ps, 'No data remaining in pipe.')
                                yield ''
                                switch_processes()
                                print_safe(id_ps, 'Data chunk stream completed.')
                            break
                    else:
                        print_safe(id_ps, 'Data chunk stream completed with errors!')
                        break

    def _send_data(self) -> P_SENDER_TYPE:
        p_send = self._pipe_send
        c_remaining = self._c_remaining
        sent_a_none = self._sent_a_none
        memory_error = self._e_memory_error
        sending_failure = self._e_sending_failure
        text = StringIO()
        start_chunk_size = 10_000  # ~32MiB Pipe() limit

        def handoff(error: bool):
            memory_error.value = error
            sending_failure.value = error
            switch_processes()

        switch_processes = GeneratePPCExchange.send_oscillate(self._connect_ask_answer, self._data_sent_received)  # first
        id_ps = get_ps_id(xt, xf)
        while xt:
            print_safe(id_ps, 'Waiting in main loop.')
            switch_processes()
            data_in = yield self.error
            print_safe(id_ps, 'Data primed.')
            sent_a_none.value = data_is_none = data_in is None
            c_remaining.value = 0
            if data_is_none:
                print_safe(id_ps, 'Sending None.')
                handoff(xf)
                print_safe(id_ps, 'None sent.')
            else:
                try:
                    print_safe(id_ps, 'Queuing data.')
                    c_remaining.value = text.write(data_in)
                    print_safe(id_ps, 'Data queuing.')
                except (MemoryError, IOError) as e:
                    print_safe(id_ps, f'Queuing failed with error {e}.')
                    handoff(xt)
                    print_safe(id_ps, 'Queuing error handled.')
                    continue
                text.seek(0)
                chunk = start_chunk_size
                chunks, last = divmod((remaining_characters := c_remaining.value), start_chunk_size)
                print_safe(id_ps, f'Sending {remaining_characters} characters using ({chunks} chunks @ {start_chunk_size} characters) + (1 last chunk @ {last} characters).')
                del chunks, last, remaining_characters  # clean up
                while c_remaining.value > 0:
                    print_safe(id_ps, f'Sending data chunk @ {chunk} characters.')
                    place = text.tell()
                    try:
                        p_send.send(s_tmp := text.read(chunk))
                        c_remaining.value -= len(s_tmp)
                        chunk = start_chunk_size
                        print_safe(id_ps, 'Data chunk sent.')
                        handoff(xf)
                        print_safe(id_ps, 'Data chunk processed.')
                    except (ValueError, MemoryError, IOError) as e:
                        chunk //= 2
                        print_safe(id_ps, f'Error {e} occurred.')
                        if chunk > 0:
                            text.seek(place)
                        else:
                            text.seek(0)
                            print_safe(id_ps, 'Chunk size 0 reached.')
                            handoff(xt)
                            print_safe(id_ps, 'Terminating chuck stream.')
                            break

    def ask_remaining(self) -> int:
        return self._c_remaining.value

    def ask_error_state(self) -> Tuple[bool, bool, bool]:
        return self._c_remaining.value > 0, self._e_memory_error.value, self._e_sending_failure.value

    def ask_error_state_string(self) -> str:
        return ''.join(('T' if x else 'F') for x in self.ask_error_state())


def rejoin(pipe: SimplePipe, data: P_RECEIVER_TYPE) -> S_NONE:
    aggregator = StringIO()
    while xt:
        if pipe.error:
            break
        try:
            if data_chunk := next(data):  # None or ''
                aggregator.write(data_chunk)
            elif data_chunk is None:
                return None
            else:
                break
        except MemoryError:
            pipe.memory_error = xt
            break
    aggregator.seek(0)
    return aggregator.read()


text_lock = Lock()


def get_ps_id(sender: bool, tester: bool) -> str:
    return ('Sender.Test:' if tester else 'Sender.Pipe:') if sender else ('Receiver.Test:' if tester else 'Receiver.Pipe:')


def print_safe(sender_message: str, *args, **kwargs):
    with text_lock:
        print(sender_message, *args, **kwargs)


def get_input(msg: str) -> str:
    with text_lock:
        return input(msg)


xt = True
xf = False


def sp_main():
    nexis = SimplePipe()

    def f_input():
        nexis.init_sender()
        d_send = nexis.sender
        quit_phrases = ('quit', 'end', 'none')
        id_ps = get_ps_id(xt, xt)
        print_safe(id_ps, 'Initializing.')
        while xt:
            try:
                print_safe(id_ps, 'Sending text...')
                error = d_send(d_in := (None if (d_in := get_input('Message: ')).strip().lower() in quit_phrases else d_in))  # I <3 WALRUS OPERATOR!
            except EOFError:  # input
                print_safe(id_ps, 'Input() EOF.')
                continue
            except Exception as e:
                print_safe(id_ps, 'error', e)
                continue
            finally:
                print_safe(id_ps, 'Sent.')
            if error:
                print_safe(id_ps, f'Error while sending {repr(d_in)} with error state {nexis.ask_error_state_string()}.')
                continue
            if d_in is None:
                break
            get_input('Hit [enter].')
        print_safe(id_ps, 'Session ended.')

    def f_print():
        nexis.init_receiver()
        d_receive = nexis.receiver
        id_ps = get_ps_id(xf, xt)
        print_safe(id_ps, 'Initializing.')
        while xt:
            try:
                print_safe(id_ps, 'Receiving data...')
                text = rejoin(nexis, next(d_receive))
                print_safe(id_ps, 'Message compiled.')
                if nexis.error:
                    print_safe(id_ps, 'Error state', nexis.ask_error_state_string(), 'encountered.')
            except MemoryError:
                print_safe(id_ps, 'Error rejoining test.')
                continue
            if text is None:
                print_safe(id_ps, 'Output = None.')
                break
            else:
                print_safe(id_ps, 'Output:', text)
        print_safe(id_ps, 'Session concluded.')

    (p_print := Process(target = f_print, name = "P_print")).start()
    f_input()
    p_print.join()
    return


if __name__ == '__main__':
    sp_main()

Put simply, multiprocessing.Pipe() is inadequate. It should be able to handle massive strings and switch process execution between a sender and receiver. I wrote this to implement:

  • Automatic error handling
  • Transmission error categorization
  • Data transmission chunking and reassembly
  • Unlimited data transmission size
  • Process synchronization
  • Simple abstraction to enhance usability

It has a weird bug I can't find. Days and plenty of documentation later, it's not fixed. I've left in a good many debug lines. Try entering "hi": you don't see "Receiver.Test: Output: hi" but should. Try a second time, it just hangs:

Sample output.

/usr/bin/python3 /home/[redacted]/simple_pipe.py
Sender.Pipe: Waiting in main loop.
Receiver.Pipe: Waiting in main loop.
Sender.Test: Initializing.
Sender.Test: Sending text...
Message: hio
Receiver.Test: Initializing.
Receiver.Test: Receiving data...
Sender.Pipe: Data primed.
Sender.Pipe: Queuing data.
Receiver.Pipe: Proceeding in main loop.
Receiver.Pipe: Polling pipe...
Sender.Pipe: Data queuing.
Sender.Pipe: Sending 3 characters using (0 chunks @ 10000 characters) + (1 last chunk @ 3 characters).
Sender.Pipe: Sending data chunk @ 10000 characters.
Sender.Pipe: Data chunk sent.
Receiver.Pipe: done.
Receiver.Pipe: Receiving data...
Receiver.Pipe: Data received...
Receiver.Pipe: Data chunk processed.
Sender.Pipe: Data chunk processed.
Sender.Pipe: Waiting in main loop.
Receiver.Pipe: No errors detected.
Receiver.Pipe: Waiting in main loop.
Sender.Test: Sent.
Receiver.Test: Message compiled.
Hit [enter].
Receiver.Test: Output = None.
Receiver.Test: Session concluded.
Sender.Test: Sending text...
Message: What?
Sender.Pipe: Data primed.
Sender.Pipe: Queuing data.
Sender.Pipe: Data queuing.
Sender.Pipe: Sending 5 characters using (0 chunks @ 10000 characters) + (1 last chunk @ 5 characters).
Sender.Pipe: Sending data chunk @ 10000 characters.
Sender.Pipe: Data chunk sent.

It's time to ask for help. It is part of a larger project. This should be part of the multiprocessing module. I'm humbled. Can someone tell me what up? Thank you all in advance!



from Enhanced Python Multiprocessing Data Pipeline Wrapper

No comments:

Post a Comment