Commits on Source (6)
......@@ -131,10 +131,19 @@ class DStatDaemon(object):
return {'status': 'done'}
async def run(self):
received_commands = set()
while True:
command: str
payload: dict
command, payload = pickle.loads(await self.socket_ctrl.arecv())
received = pickle.loads(await self.socket_ctrl.arecv())
if len(received) == 3:
exp_uuid, command, payload = received
if exp_uuid in received_commands: # Ignore previously handled commands
continue
logger.info('Got Experiment %s', exp_uuid)
received_commands.add(exp_uuid)
else:
command, payload = received
await self.socket_ctrl.asend(pickle.dumps(await (self.command_map[command](payload))))
def close(self):
......