Monday, 16 July 2018

Python: Websockets in Synchronous Program

I have a bog-standard synchronous python program that needs to be able to read data from websockets and update the GUI with the data. However, asyncio creep is constantly tripping me up.

How do I make a module that:

  1. accepts multiple subscriptions to multiple sources
  2. sends an update to the requester whenever there's data
  3. opens exactly one websocket connection per URL
  4. resets the websocket if it closes

Here's what I have already, but it's failing at many points:

  1. run_forever() means that the loop gets stuck before the subscription completes and then handle() is stuck in the falsey while loop
  2. it does not seem to want to restart sockets when they're down because a websockets object does not have a connected property (websocket without an s does, but I'm not clear on the differences and can't find info online either)
  3. I'm absolutely not sure if my approach is remotely correct.

Been fighting with this for weeks. Would appreciate some pointers.

class WSClient():
    subscriptions = set()
    connections = {}
    started = False

    def __init__(self):
        self.loop = asyncio.get_event_loop()

    def start(self):
        self.started = True
        self.loop.run_until_complete(self.handle())
        self.loop.run_until_forever()  # problematic, because it does not allow new subscribe() events

    async def handle(self):
        while len(self.connections) > 0:
            # listen to every websocket
            futures = [self.listen(self.connections[url]) for url in self.connections]
            done, pending = await asyncio.wait(futures)

            # the following is apparently necessary to avoid warnings
            # about non-retrieved exceptions etc
            try:
                data, ws = done.pop().result()
            except Exception as e:
                print("OTHER EXCEPTION", e)

            for task in pending:
                task.cancel()

    async def listen(self, ws):
        try:
            async for data in ws:
                data = json.loads(data)
                # call the subscriber (listener) back when there's data
                [s.listener._handle_result(data) for s in self.subscriptions if s.ws == ws]
        except Exception as e:
            print('ERROR LISTENING; RESTARTING SOCKET', e)
            await asyncio.sleep(2)
            self.restart_socket(ws)

    def subscribe(self, subscription):
        task = self.loop.create_task(self._subscribe(subscription))
        asyncio.gather(task)

        if not self.started:
            self.start()

    async def _subscribe(self, subscription):
        try:
            ws = self.connections.get(subscription.url, await websockets.connect(subscription.url))
            await ws.send(json.dumps(subscription.sub_msg))

            subscription.ws = ws
            self.connections[subscription.url] = ws
            self.subscriptions.add(subscription)
        except Exception as e:
            print("ERROR SUBSCRIBING; RETRYING", e)
            await asyncio.sleep(2)
            self.subscribe(subscription)

    def restart_socket(self, ws):
        for s in self.subscriptions:
            if s.ws == ws and not s.ws.connected:
                print(s)
                del self.connections[s.url]
                self.subscribe(s)



from Python: Websockets in Synchronous Program

No comments:

Post a Comment