Skip to content
from . import comm
from . import dfu
from . import state
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2017 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, absolute_import, print_function, unicode_literals
import sys
import inspect
import logging
from pkg_resources import parse_version, parse_requirements
logger = logging.getLogger(__name__)
class BaseBoard(object):
pcb_version = 'x.x.x'
booster = False
def __init__(self):
self.max_freq = 5000
self.max_scans = 255
self.max_time = 65535
self.setup()
assert len(self.gain) == self.gain_settings
assert len(self.gain_labels) == self.gain_settings
if self.gain_trim is not None:
assert len(self.gain_trim) == self.gain_settings
def setup(self):
"""Override in subclasses to provide correct numbers"""
self.gain = [1, 1e2, 3e3, 3e4, 3e5, 3e6, 3e7, 1e8]
self.gain_labels = ["Bypass", "100 Ω (15 mA FS)", "3 kΩ (500 µA FS)",
"30 kΩ (50 µA FS)", "300 kΩ (5 µA FS)",
"3 MΩ (500 nA FS)", "30 MΩ (50 nA FS)",
"100 MΩ (15 nA FS)"
]
self.gain_trim = [None, 'r100_trim', 'r3k_trim',
'r30k_trim', 'r300k_trim', 'r3M_trim',
'r30M_trim', 'r100M_trim']
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
def test_mv(self, mv):
"""Return true if voltage in mV is in range."""
dac = float(mv)*self.re_voltage_scale/(3000./65536) + 32768
if 0 <= dac <= 65535:
return True
else:
return False
def test_freq(self, hz):
"""Return true if frequency in Hz is in range."""
return 0 < float(hz) < self.max_freq
def test_scans(self, n):
"""Return true if number of scans is valid."""
return 0 < int(n) < self.max_scans
def test_s(self, s):
"""Return true if time in integer seconds is valid."""
return 0 < int(s) < self.max_time
class V1_1Board(BaseBoard):
pcb_version = '1.1'
def setup(self):
self.gain = [1e2, 3e2, 3e3, 3e4, 3e5, 3e6, 3e7, 5e8]
self.gain_labels = [None, "300 Ω (5 mA FS)",
"3 kΩ (500 µA FS)", "30 kΩ (50 µA FS)",
"300 kΩ (5 µA FS)", "3 MΩ (500 nA FS)",
"30 MΩ (50 nA FS)", "500 MΩ (3 nA FS)"
]
self.gain_trim = None
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
class V1_2Board(BaseBoard):
pcb_version = '1.2'
def setup(self):
self.gain = [1, 1e2, 3e3, 3e4, 3e5, 3e6, 3e7, 1e8]
self.gain_labels = ["Bypass", "100 Ω (15 mA FS)", "3 kΩ (500 µA FS)",
"30 kΩ (50 µA FS)", "300 kΩ (5 µA FS)",
"3 MΩ (500 nA FS)", "30 MΩ (50 nA FS)",
"100 MΩ (15 nA FS)"
]
self.gain_trim = [None, 'r100_trim', 'r3k_trim',
'r30k_trim', 'r300k_trim', 'r3M_trim',
'r30M_trim', 'r100M_trim']
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
def __get_all_subclasses(cls):
all_subclasses = []
for subclass in cls.__subclasses__():
all_subclasses.append(subclass)
all_subclasses.extend(__get_all_subclasses(subclass))
return all_subclasses
def find_board(version, booster=False):
"""Returns highest compatible board class or None if none available."""
boards = __get_all_subclasses(BaseBoard)
candidates = []
for board in boards:
req = parse_requirements('dstat~={}'.format(board.pcb_version)).next()
if board.booster == booster and version in req:
candidates.append(board)
try:
picked = sorted(candidates,
key=lambda board: parse_version(board.pcb_version))[-1]
logger.info("Picked %s", picked)
return picked
except IndexError:
logger.warning("No matching board definition for ver: %s.", version)
return None
#!/usr/bin/env python
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import struct
import multiprocessing as mp
from collections import OrderedDict
import logging
from pkg_resources import parse_version
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, GObject
except ImportError:
print "ERR: GTK not available"
sys.exit(1)
import serial
from serial.tools import list_ports
from ..errors import InputError, VarError
logger = logging.getLogger(__name__)
dstat_logger = logging.getLogger("{}.DSTAT".format(__name__))
exp_logger = logging.getLogger("{}.Experiment".format(__name__))
from . import state
class AlreadyConnectedError(Exception):
def __init__(self):
super(AlreadyConnectedError, self).__init__(self,
"Serial instance already connected.")
class NotConnectedError(Exception):
def __init__(self):
super(NotConnectedError, self).__init__(self,
"Serial instance not connected.")
class ConnectionError(Exception):
def __init__(self):
super(ConnectionError, self).__init__(self,
"Could not connect.")
class TransmitError(Exception):
def __init__(self):
super(TransmitError, self).__init__(self,
"No reply received.")
def _serial_process(ser_port, proc_pipe, ctrl_pipe, data_pipe):
ser_logger = logging.getLogger("{}._serial_process".format(__name__))
connected = False
for i in range(5):
time.sleep(1) # Give OS time to enumerate
try:
ser = serial.Serial(ser_port, timeout=1)
# ser = serial.Serial(ser_port, timeout=1)
ser_logger.info("Connecting")
time.sleep(.5)
connected = True
except serial.SerialException:
pass
if connected is True:
break
try:
if ser.isOpen() is False:
ser_logger.info("Connection Error")
proc_pipe.send("SERIAL_ERROR")
return 1
except UnboundLocalError: # ser doesn't exist
ser_logger.info("Connection Error")
proc_pipe.send("SERIAL_ERROR")
return 1
ser.write('!0 ')
for i in range(10):
if ser.readline().rstrip()=="@ACK 0":
if ser.readline().rstrip()=="@RCV 0":
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write('!0 ')
time.sleep(.1)
while True:
# These can only be called when no experiment is running
if ctrl_pipe.poll():
ctrl_buffer = ctrl_pipe.recv()
if ctrl_buffer in ('a', "DISCONNECT"):
proc_pipe.send("ABORT")
try:
ser.write('a')
except serial.SerialException:
return 0
ser_logger.info("ABORT")
if ctrl_buffer == "DISCONNECT":
ser_logger.info("DISCONNECT")
ser.rts = False
ser._update_dtr_state() # Need DTR update on Windows
ser.close()
proc_pipe.send("DISCONNECT")
return 0
else:
ser.write(ctrl_buffer)
elif proc_pipe.poll():
while ctrl_pipe.poll():
ctrl_pipe.recv()
try:
return_code = proc_pipe.recv().run(ser, ctrl_pipe, data_pipe)
except serial.SerialException:
proc_pipe.send("DISCONNECT")
ser.rts = False
ser._update_dtr_state() # Need DTR update on Windows
ser.close()
return 0
ser_logger.info('Return code: %s', str(return_code))
proc_pipe.send(return_code)
else:
time.sleep(.1)
class SerialConnection(GObject.Object):
__gsignals__ = {
'connected': (GObject.SIGNAL_RUN_FIRST, None, ()),
'disconnected': (GObject.SIGNAL_RUN_FIRST, None, ())
}
def __init__(self):
super(SerialConnection, self).__init__()
self.connected = False
def connect(self, ser_port):
if self.connected is False:
self.proc_pipe_p, self.proc_pipe_c = mp.Pipe(duplex=True)
self.ctrl_pipe_p, self.ctrl_pipe_c = mp.Pipe(duplex=True)
self.data_pipe_p, self.data_pipe_c = mp.Pipe(duplex=True)
self.proc = mp.Process(target=_serial_process, args=(ser_port,
self.proc_pipe_c, self.ctrl_pipe_c,
self.data_pipe_c))
self.proc.start()
time.sleep(2)
if self.proc.is_alive() is False:
raise ConnectionError()
return False
self.connected = True
self.emit('connected')
return True
else:
raise AlreadyConnectedError()
return False
def assert_connected(self):
if self.connected is False:
raise NotConnectedError()
def start_exp(self, exp):
self.assert_connected()
self.proc_pipe_p.send(exp)
def stop_exp(self):
self.send_ctrl('a')
def get_proc(self, block=False):
self.assert_connected()
if block is True:
return self.proc_pipe_p.recv()
else:
if self.proc_pipe_p.poll() is True:
return self.proc_pipe_p.recv()
else:
return None
def get_ctrl(self, block=False):
self.assert_connected()
if block is True:
return self.ctrl_pipe_p.recv()
else:
if self.ctrl_pipe_p.poll() is True:
return self.ctrl_pipe_p.recv()
else:
return None
def get_data(self, block=False):
self.assert_connected()
if block is True:
return self.data_pipe_p.recv()
else:
if self.data_pipe_p.poll() is True:
return self.data_pipe_p.recv()
else:
return None
def flush_data(self):
self.assert_connected()
while self.data_pipe_p.poll() is True:
self.data_pipe_p.recv()
def send_ctrl(self, ctrl):
self.assert_connected()
self.ctrl_pipe_p.send(ctrl)
def disconnect(self):
logger.info("Disconnecting")
self.send_ctrl('DISCONNECT')
self.proc.join()
self.emit('disconnected')
self.connected = False
class VersionCheck(object):
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get version. Returns a tuple of
(major, minor). If no response, returns empty tuple.
Arguments:
ser_port -- address of serial port to use
"""
try:
ser.reset_input_buffer()
ser.write('!1\n')
for i in range(10):
if ser.readline().rstrip()=="@ACK 1":
ser.write('V\n')
if ser.readline().rstrip()=="@RCV 1":
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write('!1\n')
time.sleep(.1)
for line in ser:
dstat_logger.info(line.decode('utf-8'))
if line.startswith('V'):
input = line.lstrip('V')
elif line.startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
ser.reset_input_buffer()
break
pcb, sep, firmware = input.strip().rpartition('-')
if pcb == "":
pcb = firmware
firmware = False
logger.info("Your firmware does not support version detection.")
data_pipe.send((pcb, False))
else:
logger.info(
"Firmware Version: {}".format(
hex(int(firmware)).lstrip('0x')
)
)
data_pipe.send((
pcb,
hex(int(firmware)).lstrip('0x')
))
logger.info(
"PCB Version: {}".format(pcb)
)
status = "DONE"
except UnboundLocalError as e:
status = "SERIAL_ERROR"
except SerialException as e:
logger.error('SerialException: %s', e)
status = "SERIAL_ERROR"
finally:
return status
def version_check(ser_port):
"""Tries to contact DStat and get version. Stores version in state.
If no response, returns False, otherwise True.
Arguments:
ser_port -- address of serial port to use
"""
state.ser = SerialConnection()
state.ser.connect(ser_port)
state.ser.start_exp(VersionCheck())
result = state.ser.get_proc(block=True)
if result == "SERIAL_ERROR":
state.dstat_version = None
state.firmware_version = None
return False
else:
buffer = state.ser.get_data(block=True)
version, state.firmware_version = buffer
state.dstat_version = parse_version(version)
logger.debug("version_check done")
time.sleep(.1)
return True
class Settings(object):
def __init__(self, task, settings=None):
self.task = task
self.settings = settings
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get settings. Returns dict of
settings.
"""
self.ser = ser
if 'w' in self.task:
self.write()
if 'r' in self.task:
data_pipe.send(self.read())
status = "DONE"
return status
def read(self):
settings = OrderedDict()
self.ser.reset_input_buffer()
self.ser.write('!2\n')
for i in range(10):
if self.ser.readline().rstrip()=="@ACK 2":
self.ser.write('SR\n')
if self.ser.readline().rstrip()=="@RCV 2":
break
else:
time.sleep(.5)
self.ser.reset_input_buffer()
self.ser.write('!2\n')
time.sleep(.1)
for line in self.ser:
if line.lstrip().startswith('S'):
input = line.lstrip().lstrip('S')
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
self.ser.reset_input_buffer()
break
parted = input.rstrip().split(':')
for i in range(len(parted)):
settings[parted[i].split('.')[0]] = [i, parted[i].split('.')[1]]
return settings
def write_command(self, cmd, params=None, retry=5):
"""Write command to serial with optional number of retries."""
def get_reply(retries = 3):
while True:
reply = self.ser.readline().rstrip()
if reply.startswith('#'):
dstat_logger.info(reply)
elif reply == "":
retries -= 1
if retries <= 0:
raise TransmitError
else:
return reply
n = len(cmd)
if params is not None:
n_params = len(params)
for _ in range(retry):
tries = 5
while True:
time.sleep(0.2)
self.ser.reset_input_buffer()
self.ser.write('!{}\n'.format(n))
time.sleep(.1)
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@ACK {}".format(n):
logger.warning("Expected ACK got: {}".format(reply))
continue
tries = 5
while True:
self.ser.write('{}\n'.format(cmd))
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@RCV {}".format(n):
logger.warning("Expected RCV got: {}".format(reply))
continue
if params is None:
return True
tries = 5
while True:
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@RQP {}".format(n_params):
logger.warning("Expected RQP got: {}".format(reply))
continue
tries = 5
for i in params:
while True:
self.ser.write(i + " ")
try:
reply = get_reply()
if reply == "@RCVC {}".format(i):
break
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
return True
return False
def write(self):
write_buffer = range(len(self.settings))
for i in self.settings: # make sure settings are in right order
write_buffer[self.settings[i][0]] = self.settings[i][1]
to_write = " ".join(write_buffer) + " "
n = len(to_write)
logger.debug("to_write = %s", to_write)
if not self.write_command('SW' + to_write):
logger.error("Could not write command.")
def read_settings():
"""Tries to contact DStat and get settings. Returns dict of
settings.
"""
state.ser.flush_data()
state.ser.start_exp(Settings(task='r'))
state.settings = state.ser.get_data(block=True)
logger.info("Read settings from DStat")
logger.debug("read_settings: %s", state.ser.get_proc(block=True))
return
def write_settings():
"""Tries to write settings to DStat from global settings var.
"""
logger.debug("Settings to write: %s", state.settings)
state.ser.flush_data()
state.ser.start_exp(Settings(task='w', settings=state.settings))
logger.info("Wrote settings to DStat")
logger.debug("write_settings: %s", state.ser.get_proc(block=True))
return
class LightSensor:
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get light sensor reading. Returns uint of
light sensor clear channel.
"""
ser.reset_input_buffer()
ser.write('!')
while not ser.read()=="@":
self.ser.reset_input_buffer()
ser.write('!')
ser.write('T')
for line in ser:
if line.lstrip().startswith('T'):
input = line.lstrip().lstrip('T')
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
ser.reset_input_buffer()
break
parted = input.rstrip().split('.')
data_pipe.send(parted[0])
status = "DONE"
return status
def read_light_sensor():
"""Tries to contact DStat and get light sensor reading. Returns uint of
light sensor clear channel.
"""
state.ser.flush_data()
state.ser.start_exp(LightSensor())
logger.debug("read_light_sensor: %s", state.ser.get_proc(block=True))
return state.ser.get_data(block=True)
class SerialDevices(object):
"""Retrieves and stores list of serial devices in self.ports"""
def __init__(self):
self.ports = []
self.refresh()
def refresh(self):
"""Refreshes list of ports."""
try:
self.ports, _, _ = zip(*list_ports.grep("DSTAT"))
except ValueError:
self.ports = []
logger.error("No serial ports found")
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division,
print_function, unicode_literals)
import subprocess
import sys
import os
import time
import logging
from tempfile import mkdtemp
from zipfile import ZipFile
if sys.version_info >= (3,):
import urllib.request as urllib2
import urllib.parse as urlparse
else:
import urllib2
import urlparse
logger = logging.getLogger(__name__)
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
except ImportError:
print("ERR: GTK not available")
sys.exit(1)
import serial
from . import state
from .comm import dstat_logger, exp_logger
fwurl = "http://microfluidics.utoronto.ca/gitlab/api/v4/projects/4/jobs/artifacts/master/download?job=1.2.3&private_token=zkgSx1FaaTP7yLyFKkX6"
class FWDialog(object):
def __init__(self, parent, connect, stop_callback, disconnect_callback, signal='activate'):
self.parent = parent
self.stop = stop_callback
self.disconnect = disconnect_callback
connect.connect(signal, self.activate)
def activate(self, widget=None, data=None):
for name, result in assert_deps().items():
if result is not True:
logger.error("Can't use firmware update module.")
self.missing_deps()
return
self.stop() # Stop OCP
version_result, master = test_firmware_version()
if version_result is False:
self.git_error()
return
if version_result == 'latest':
message = "Your firmware is already up to date."
secondary = "Click yes to reflash firmware anyways."
elif version_result == 'devel':
message = "Your firmware is not on the master branch."
secondary = "You may have a development version. " +\
"Click yes to reflash firmware anyways."
elif version_result == 'old':
message = "Your firmware is out of date."
secondary = "Click yes to flash the latest firmware."
dialog = Gtk.MessageDialog(self.parent, 0, Gtk.MessageType.INFO,
Gtk.ButtonsType.YES_NO, message)
dialog.format_secondary_text(secondary)
dialog.get_content_area().add(
Gtk.Label(
label="Installed version: {}".format(state.firmware_version)))
dialog.get_content_area().add(
Gtk.Label(label="Latest version: {}".format(master)))
dialog.show_all()
response = dialog.run()
if response == Gtk.ResponseType.YES:
try:
download_fw()
except:
self.dl_error()
return
dstat_enter_dfu()
self.dfu_notice()
self.disconnect()
try:
dfu_program()
except:
self.dfu_error()
dialog.destroy()
else:
dialog.destroy()
def missing_deps(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Missing Dependencies")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def git_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Git Error")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def dl_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Download Error")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def dfu_notice(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.INFO,
Gtk.ButtonsType.OK, "Note about DFU")
dialog.format_secondary_text("Click OK once the DStat has connected in "
+ "DFU mode. Windows doesn't seem to like the automatic reboot. "
+ "Try holding down the reset button while plugging the "
+ 'USB port in (No LEDs should be lit), then click OK. Make sure '
+ 'the DFU driver from the dfu-programmer directory is installed.')
dialog.run()
dialog.destroy()
def dfu_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Could not update over DFU")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def destroy(self, widget=None, data=None):
widget.destroy()
def assert_deps():
deps = {'git' : 'git --version',
'dfu-programmer' : 'dfu-programmer --version'}
result = {}
for key, command in deps.items():
try:
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
result[key] = True
except subprocess.CalledProcessError:
logger.warning("{} is not available.".format(key))
result[key] = False
return result
def download_fw(): # from https://stackoverflow.com/a/16518224
temp_dir = mkdtemp()
logger.info("Temporary directory: {}".format(temp_dir))
os.chdir(temp_dir) # Go to temporary directory
u = urllib2.urlopen(fwurl)
scheme, netloc, path, query, fragment = urlparse.urlsplit(fwurl)
filename = os.path.basename(path)
if not filename:
filename = 'downloaded.file'
with open(filename, 'wb') as f:
meta = u.info()
meta_func = meta.getheaders if hasattr(meta, 'getheaders') else meta.get_all
meta_length = meta_func("Content-Length")
file_size = None
if meta_length:
file_size = int(meta_length[0])
logger.info("Downloading: {0} Bytes: {1}".format(fwurl, file_size))
file_size_dl = 0
block_sz = 8192
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_size_dl += len(buffer)
f.write(buffer)
status = "{0:16}".format(file_size_dl)
if file_size:
status += " [{0:6.2f}%]".format(file_size_dl * 100 / file_size)
status += chr(13)
logger.info(status)
with ZipFile(filename, mode='r') as z:
fw_path = z.extract('dstat-firmware.hex')
return fw_path
def test_firmware_version(current=None):
if current is None:
current = state.firmware_version
temp_dir = mkdtemp()
logger.info("Temporary directory: {}".format(temp_dir))
os.chdir(temp_dir) # Go to temporary directory
command = "git clone http://microfluidics.utoronto.ca/gitlab/dstat/dstat-firmware.git"
logger.info('Cloning master.')
try:
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
logger.error("git failed with error code {}".format(e.returncode))
logger.error("Output: {}".format(e.output))
return False, None
logger.info(output)
os.chdir("./dstat-firmware")
command = "git rev-parse --short master"
master = subprocess.check_output(command.split(), stderr=subprocess.STDOUT)
logger.info("Current master commit: {}".format(master))
command = "git merge-base --is-ancestor master {}".format(current)
test = subprocess.call(command.split())
if test == 0: # already newest
logger.info('Firmware is latest available.')
return 'latest', master
elif test == 1: # old version
logger.info('Firmware is out of date.')
return 'old', master
elif test == 128: # newer or different branch
logger.info('Firmware is not on the master branch.')
return 'devel', master
else:
logger.error('Unexpected git error. Git exited {}'.format(test))
return False, None
def dfu_program(path='./dstat-firmware.hex'):
"""Tries to program DStat over USB with DFU with hex file at path."""
try:
command = "dfu-programmer atxmega256a3u erase"
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
command = "dfu-programmer atxmega256a3u flash {}".format(path)
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
command = "dfu-programmer atxmega256a3u launch"
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
except subprocess.CalledProcessError as e:
logger.error("{} failed with output:".format(" ".join(e.cmd)))
logger.error(e.output)
raise
def dstat_enter_dfu():
"""Tries to contact DStat and get version. Stores version in state.
If no response, returns False, otherwise True.
Arguments:
ser_port -- address of serial port to use
"""
exp = DFUMode()
state.ser.start_exp(exp)
while True:
result = state.ser.get_proc(block=True)
if result in ('SERIAL_ERROR', 'DONE'):
break
logger.info(result)
# state.ser.disconnect()
time.sleep(.1)
return True
class DFUMode(object):
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get version. Returns a tuple of
(major, minor). If no response, returns empty tuple.
Arguments:
ser_port -- address of serial port to use
"""
status = None
try:
ser.write(b'!2\n')
exp_logger.info('!2')
for i in range(10):
if ser.readline().rstrip() == b"@ACK 2":
dstat_logger.info('@ACK 2')
ser.write(b'SF\n')
exp_logger.info('SF')
status = "DONE"
time.sleep(5)
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write(b'!2\n')
exp_logger.info('!2')
time.sleep(.1)
except UnboundLocalError as e:
status = "SERIAL_ERROR"
except serial.SerialException as e:
logger.error('SerialException: %s', e)
status = "SERIAL_ERROR"
finally:
return status
if __name__ == "__main__":
log_handler = logging.StreamHandler()
log_formatter = logging.Formatter(
fmt='%(asctime)s %(levelname)s: [%(name)s] %(message)s',
datefmt='%H:%M:%S'
)
log_handler.setFormatter(log_formatter)
logger.setLevel(level=logging.INFO)
logger.addHandler(log_handler)
dstat_enter_dfu()
time.sleep(2)
dfu_program(sys.argv[1])
\ No newline at end of file
from collections import OrderedDict
def reset():
settings = OrderedDict()
ser = None
dstat_version = None
firmware_version = None
board_instance = None
settings = OrderedDict()
ser = None
dstat_version = None
firmware_version = None
board_instance = None
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
......@@ -17,14 +18,30 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
__requires__ = 'PyInstaller==2.1'
class Error(Exception):
"""Copies Exception class"""
pass
class InputError(Error):
"""Exception raised for errors in the input. Extends Error class.
import os, sys
os.chdir(os.path.dirname(sys.argv[0]))
Attributes:
expr -- input expression in which the error occurred
msg -- error message
"""
args = ['dstat.spec']
args.extend(sys.argv[1:])
def __init__(self, expr, msg):
self.expr = expr
self.msg = msg
import PyInstaller.main as pyi #For some reason, it gets the path here, so working dir must be set first
pyi.run(args)
class VarError(Error):
"""Exception raised for internal variable errors. Extends Error class.
Attributes:
var -- var in which the error occurred
msg -- error message
"""
def __init__(self, var, msg):
self.var = var
self.msg = msg
\ No newline at end of file
# __all__ = []
#
# import pkgutil
# import inspect
# from . import cal, chronoamp, cv
#
#
# for loader, name, is_pkg in pkgutil.walk_packages(__path__):
# print loader, name, is_pkg
# module = loader.find_module(name).load_module(name)
#
# for name, value in inspect.getmembers(module):
# if name.startswith('__'):
# continue
#
# globals()[name] = value
# __all__.append(name)
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2017 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import struct
import logging
logger = logging.getLogger(__name__)
import serial
from ..errors import InputError, VarError
from ..dstat import state
from ..experiments.experiment_template import Experiment, dstat_logger
def measure_offset(time):
gain_trim_table = [None, 'r100_trim', 'r3k_trim', 'r30k_trim', 'r300k_trim',
'r3M_trim', 'r30M_trim', 'r100M_trim']
parameters = {}
parameters['time'] = time
gain_offset = {}
for i in range(1,8):
parameters['gain'] = i
state.ser.start_exp(CALExp(parameters))
logger.info("measure_offset: %s", state.ser.get_proc(block=True))
gain_offset[gain_trim_table[i]] = state.ser.get_data(block=True)
return gain_offset
class CALExp(Experiment):
id = 'cal'
"""Offset calibration experiment"""
def __init__(self, parameters):
self.parameters = parameters
self.databytes = 8
self.scan = 0
self.data = []
self.commands = ["EA2 3 1 ", "EG"]
self.commands[1] += str(self.parameters['gain'])
self.commands[1] += " "
self.commands[1] += "0 "
self.commands.append(
("ER1 0", ["32768", str(self.parameters['time'])])
)
def serial_handler(self):
"""Handles incoming serial transmissions from DStat. Returns False
if stop button pressed and sends abort signal to instrument. Sends
data to self.data_pipe as result of self.data_handler).
"""
try:
while True:
if self.ctrl_pipe.poll():
input = self.ctrl_pipe.recv()
logger.debug("serial_handler: %s", input)
if input == ('a' or "DISCONNECT"):
self.serial.write('a')
logger.info("serial_handler: ABORT pressed!")
return False
for line in self.serial:
if self.ctrl_pipe.poll():
if self.ctrl_pipe.recv() == 'a':
self.serial.write('a')
logger.info("serial_handler: ABORT pressed!")
return False
if line.startswith('B'):
self.data.append(self.data_handler(
self.serial.read(size=self.databytes)))
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
self.serial.flushInput()
self.experiment_done()
return True
except serial.SerialException:
return False
def data_handler(self, data):
"""Takes data_input as tuple -- (scan, data).
Returns:
current
"""
seconds, milliseconds, current = struct.unpack('<HHl', data)
return current
def experiment_done(self):
"""Averages data points
"""
try:
sum = 0
self.data[0] = 0 # Skip first point
except IndexError:
return
for i in self.data:
sum += i
sum /= len(self.data)
if (sum > 32767):
sum = 32767
elif (sum < -32768):
sum = -32768
self.data_pipe.send(sum)
\ No newline at end of file
import time
import struct
import numpy as np
import serial
from ..interface.plot import mean, plotSpectrum, findBounds
from .experiment_template import PlotBox, Experiment, exp_logger
class ChronoampBox(PlotBox):
def setup(self):
self.plot_format = {
'current_time': {'xlabel': "Time (s)",
'ylabel': "Current (A)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Overrides superclass.
"""
self.subplots = {'current_time': self.figure.add_subplot(111)}
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([],[])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
class Chronoamp(Experiment):
id = 'cae'
"""Chronoamperometry experiment"""
def setup(self):
self.datatype = "linearData"
self.datalength = 2
self.databytes = 8
self.data = {'current_time' : [([],[])]}
self.columns = ['Time (s)', 'Current (A)']
self.total_time = sum(self.parameters['time'])
self.plotlims = {'current_time': {'xlims': (0, self.total_time)}
}
self.commands.append(
("ER" + str(len(self.parameters['potential'])) + " 0 ", [])
)
for i in self.parameters['potential']:
self.commands[-1][1].append(str(int(i*(65536./3000)+32768)))
for i in self.parameters['time']:
self.commands[-1][1].append(str(i))
plot = ChronoampBox(['current_time'])
plot.setlims('current_time', **self.plotlims['current_time'])
self.plots.append(plot)
def data_handler(self, data_input):
"""Overrides Experiment method to not convert x axis to mV."""
scan, data = data_input
# 2*uint16 + int32
seconds, milliseconds, current = struct.unpack('<HHl', data)
return (scan, (
seconds+milliseconds/1000.,
(current+self.gain_trim)*(1.5/self.gain/8388607)
)
)
def store_data(self, incoming, newline):
"""Stores data in data attribute. Should not be called from subprocess.
Can be overriden for custom experiments."""
line, data = incoming
if newline is True:
self.data['current_time'].append(deepcopy(self.line_data))
for i, item in enumerate(self.data['current_time'][line]):
item.append(data[i])
def get_progress(self):
try:
return self.data['current_time'][-1][0][-1]/self.total_time
except IndexError:
return 0
class PDExp(Chronoamp):
"""Photodiode/PMT experiment"""
id = 'pde'
def setup(self):
self.plots.append(ChronoampBox('current_time'))
self.datatype = "linearData"
self.datalength = 2
self.databytes = 8
self.data = {'current_time' : [([],[])]}
self.columns = ['Time (s)', 'Current (A)']
self.plot_format = {
'current_time' : {
'labels' : self.columns,
'xlims' : (0, int(self.parameters['time']))
}
}
self.total_time = int(self.parameters['time'])
if self.parameters['shutter_true']:
if self.parameters['sync_true']:
self.commands.append("EZ")
self.commands[-1] += str(self.parameters['sync_freq'])
self.commands[-1] += " "
else:
self.commands.append(("E2", []))
command = "ER1 "
params = []
if self.parameters['interlock_true']:
command += "1 "
else:
command += "0 "
if self.parameters['voltage'] == 0: # Special case where V=0
params.append("65535")
else:
params.append(str(int(
65535-(self.parameters['voltage']*(65536./3000))))
)
params.append(str(self.parameters['time']))
self.commands.append((command, params))
if self.parameters['shutter_true']:
if self.parameters['sync_true']:
self.commands.append("Ez")
else:
self.commands.append("E1")
class FT_Box(PlotBox):
def updateline(self, Experiment, line_number):
def search_value(data, target):
for i in range(len(data)):
if data[i] > target:
return i
y = Experiment.data['data'][line_number][1]
x = Experiment.data['data'][line_number][0]
freq = Experiment.parameters['adc_rate_hz']
i = search_value(x, float(Experiment.parameters['fft_start']))
y1 = y[i:]
x1 = x[i:]
avg = mean(y1)
min_index, max_index = findBounds(y1)
y1[min_index] = avg
y1[max_index] = avg
f, Y = plotSpectrum(y1[min_index:max_index],freq)
self.axe1.lines[line_number].set_ydata(Y)
self.axe1.lines[line_number].set_xdata(f)
Experiment.data['ft'] = [(f, Y)]
def changetype(self, Experiment):
"""Change plot type. Set axis labels and x bounds to those stored
in the Experiment instance. Stores class instance in Experiment.
"""
self.axe1.set_xlabel("Freq (Hz)")
self.axe1.set_ylabel("|Y| (A/Hz)")
self.axe1.set_xlim(0, Experiment.parameters['adc_rate_hz']/2)
Experiment.plots['ft'] = self
self.figure.canvas.draw()
\ No newline at end of file
import time
import struct
from .experiment_template import PlotBox, Experiment
class CVExp(Experiment):
id = 'cve'
"""Cyclic Voltammetry experiment"""
def setup(self):
self.plotlims['current_voltage']['xlims'] = tuple(
sorted((int(self.parameters['v1']), int(self.parameters['v2'])))
)
super(CVExp, self).setup()
self.datatype = "CVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
self.datalength = 2 * self.parameters['scans'] # x and y for each scan
self.databytes = 6 # uint16 + int32
self.commands += "E"
self.commands[2] += "C"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['v1'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['v2'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(self.parameters['scans'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['slope'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
def get_progress(self):
return (len(self.data['current_voltage'])-1) / float(self.parameters['scans'])
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, absolute_import, print_function, unicode_literals
from ..dstat import state
import logging
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, GObject
except ImportError:
print("ERR: GTK not available")
sys.exit(1)
logger = logging.getLogger(__name__)
class BaseLoop(GObject.GObject):
__gsignals__ = {
b'experiment_done': (GObject.SIGNAL_RUN_FIRST, None, tuple()),
b'progress_update': (GObject.SIGNAL_RUN_FIRST, None, (float,))
}
def __init__(self, experiment, callbacks=None):
GObject.GObject.__init__(self)
self.line = None
self.lastdataline = 0
self.current_exp = experiment
self.experiment_proc = None
for signal, cb in callbacks.items():
try:
self.connect(signal, cb)
except TypeError:
logger.warning("Invalid signal %s", signal)
def run(self):
self.experiment_proc = [
GObject.idle_add(self.experiment_running_data),
GObject.idle_add(self.experiment_running_proc),
GObject.timeout_add(100, self.update_progress)
]
def experiment_running_data(self):
"""Receive data from experiment process and add to
current_exp.data['data].
Run in GTK main loop.
Returns:
True -- when experiment is continuing to keep function in GTK's queue.
False -- when experiment process signals EOFError or IOError to remove
function from GTK's queue.
"""
try:
incoming = state.ser.get_data()
while incoming is not None:
try:
self.line = incoming[0]
if self.line > self.lastdataline:
newline = True
try:
logger.info("running scan_process()")
self.current_exp.scan_process(self.lastdataline)
except AttributeError:
pass
self.lastdataline = self.line
else:
newline = False
self.current_exp.store_data(incoming, newline)
except TypeError:
pass
incoming = state.ser.get_data()
return True
except EOFError as err:
logger.error(err)
self.experiment_done()
return False
except IOError as err:
logger.error(err)
self.experiment_done()
return False
def experiment_running_proc(self):
"""Receive proc signals from experiment process.
Run in GTK main loop.
Returns:
True -- when experiment is continuing to keep function in GTK's queue.
False -- when experiment process signals EOFError or IOError to remove
function from GTK's queue.
"""
try:
ctrl_buffer = state.ser.get_ctrl()
try:
if ctrl_buffer is not None:
self.current_exp.ctrl_loop(ctrl_buffer)
except AttributeError:
pass
proc_buffer = state.ser.get_proc()
if proc_buffer is not None:
if proc_buffer in ["DONE", "SERIAL_ERROR", "ABORT"]:
self.experiment_done()
if proc_buffer == "SERIAL_ERROR":
self.on_serial_disconnect_clicked()
else:
logger.warning("Unrecognized experiment return code: %s",
proc_buffer)
return False
return True
except EOFError as err:
logger.warning("EOFError: %s", err)
self.experiment_done()
return False
except IOError as err:
logger.warning("IOError: %s", err)
self.experiment_done()
return False
def experiment_done(self):
logger.info("Experiment done")
for proc in self.experiment_proc:
GObject.source_remove(proc)
self.current_exp.scan_process(self.lastdataline)
self.current_exp.experiment_done()
self.emit("experiment_done")
def update_progress(self):
try:
progress = self.current_exp.get_progress()
except AttributeError:
progress = -1
self.emit("progress_update", progress)
return True
class PlotLoop(BaseLoop):
def experiment_running_plot(self, force_refresh=False):
"""Plot all data in current_exp.data.
Run in GTK main loop. Always returns True so must be manually
removed from GTK's queue.
"""
if self.line is None:
return True
for plot in self.current_exp.plots:
if (plot.scan_refresh and self.line > self.lastdataline):
while self.line > self.lastline:
# make sure all of last line is added
plot.updateline(self.current_exp, self.lastdataline)
self.lastdataline += 1
plot.updateline(self.current_exp, self.line)
plot.redraw()
else:
while self.line > self.lastdataline:
# make sure all of last line is added
plot.updateline(self.current_exp, self.lastdataline)
self.lastdataline += 1
plot.updateline(self.current_exp, self.line)
if plot.continuous_refresh is True or force_refresh is True:
plot.redraw()
return True
def run(self):
super(PlotLoop, self).run()
self.experiment_proc.append(
GObject.timeout_add(200, self.experiment_running_plot)
)
def experiment_done(self):
logger.info("Experiment done")
for proc in self.experiment_proc:
GObject.source_remove(proc)
self.current_exp.scan_process(self.lastdataline)
self.current_exp.experiment_done()
self.experiment_running_plot(force_refresh=True)
self.emit("experiment_done")
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2017 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import struct
import time
from collections import OrderedDict
from copy import deepcopy
from datetime import datetime
from math import ceil
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, GObject
except ImportError:
print "ERR: GTK not available"
sys.exit(1)
from matplotlib.figure import Figure
import matplotlib.gridspec as gridspec
from matplotlib.backends.backend_gtk3agg \
import FigureCanvasGTK3Agg as FigureCanvas
from pandas import DataFrame
import seaborn as sns
sns.set(context='paper', style='darkgrid')
import serial
from ..dstat import state, comm
from ..dstat.comm import TransmitError
from . import experiment_loops
logger = logging.getLogger(__name__)
dstat_logger = logging.getLogger("{}.DSTAT".format(comm.__name__))
exp_logger = logging.getLogger("{}.Experiment".format(__name__))
class Experiment(GObject.Object):
"""Store and acquire a potentiostat experiment. Meant to be subclassed
to by different experiment types and not used instanced directly. Subclass
must instantiate self.plotbox as the PlotBox class to use and define id as
a class attribute.
"""
id = None
Loops = experiment_loops.PlotLoop
__gsignals__ = {
b'exp_ready': (GObject.SIGNAL_RUN_FIRST, None, ()),
b'exp_done': (GObject.SIGNAL_RUN_FIRST, None, ()),
}
def __init__(self, parameters):
"""Adds commands for gain and ADC."""
super(Experiment, self).__init__()
self.current_command = None
self.parameters = parameters
self.databytes = 8
self.datapoint = 0
self.scan = 0
self.time = 0
self.plots = []
self.re_voltage_scale = state.board_instance.re_voltage_scale
self.gain = state.board_instance.gain[int(self.parameters['gain'])]
try:
self.gain_trim = int(
state.settings[
state.board_instance.gain_trim[int(self.parameters['gain'])]
][1]
)
except AttributeError:
logger.debug("No gain trim table.")
self.commands = ["EA", "EG"]
if self.parameters['buffer_true']:
self.commands[0] += "2"
else:
self.commands[0] += "0"
self.commands[0] += " {p[adc_rate]} {p[adc_pga]} ".format(
p=self.parameters)
self.commands[1] += "{p[gain]} {p[short_true]:d} ".format(
p=self.parameters)
self.plotlims = {'current_voltage' : {'xlims' : (0, 1)}
}
self.setup()
self.time = [datetime.utcnow()]
def setup_loops(self, callbacks):
self.loops = self.__class__.Loops(self, callbacks)
self.loops.run()
def setup(self):
self.data = OrderedDict(current_voltage=[([], [])])
self.columns = ['Voltage (mV)', 'Current (A)']
# list of scans, tuple of dimensions, list of data
self.line_data = ([], [])
plot = PlotBox(['current_voltage'])
plot.setlims('current_voltage', **self.plotlims['current_voltage'])
self.plots.append(plot)
def write_command(self, cmd, params=None, retry=5):
"""Write command to serial with optional number of retries."""
def get_reply(retries=3):
while True:
reply = self.serial.readline().rstrip()
if reply.startswith('#'):
dstat_logger.info(reply)
elif reply == "":
retries -= 1
if retries <= 0:
raise TransmitError
else:
return reply
n = len(cmd)
if params is not None:
n_params = len(params)
for _ in range(retry):
tries = 5
while True:
time.sleep(0.2)
self.serial.reset_input_buffer()
self.serial.write('!{}\n'.format(n))
time.sleep(.1)
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@ACK {}".format(n):
logger.warning("Expected ACK got: {}".format(reply))
continue
tries = 5
while True:
self.serial.write('{}\n'.format(cmd))
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@RCV {}".format(n):
logger.warning("Expected RCV got: {}".format(reply))
continue
if params is None:
return True
tries = 5
while True:
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
break
tries -= 1
pass
else:
break
if reply != "@RQP {}".format(n_params):
logger.warning("Expected RQP got: {}".format(reply))
continue
tries = 5
for i in params:
while True:
self.serial.write(i + " ")
try:
reply = get_reply()
if reply == "@RCVC {}".format(i):
break
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
return True
return False
def run(self, ser, ctrl_pipe, data_pipe):
"""Execute experiment. Connects and sends handshake signal to DStat
then sends self.commands.
"""
self.serial = ser
self.ctrl_pipe = ctrl_pipe
self.data_pipe = data_pipe
exp_logger.info("Experiment running")
try:
for i in self.commands:
self.current_command = i
status = "DONE"
if isinstance(i, (str, unicode)):
logger.info("Command: %s", i)
if not self.write_command(i):
status = "ABORT"
break
else:
cmd = i[0]
data = i[1]
logger.info("Command: {}".format(cmd))
if not self.write_command(cmd, params=data):
status = "ABORT"
break
if not self.serial_handler():
status = "ABORT"
break
time.sleep(0.5)
except serial.SerialException:
status = "SERIAL_ERROR"
finally:
while self.ctrl_pipe.poll():
self.ctrl_pipe.recv()
return status
def serial_handler(self):
"""Handles incoming serial transmissions from DStat. Returns False
if stop button pressed and sends abort signal to instrument. Sends
data to self.data_pipe as result of self.data_handler).
"""
scan = 0
def check_ctrl():
if self.ctrl_pipe.poll():
input = self.ctrl_pipe.recv()
logger.info("serial_handler: %s", input)
if input == "DISCONNECT":
self.serial.write('a')
self.serial.reset_input_buffer()
logger.info("serial_handler: ABORT pressed!")
time.sleep(.3)
return False
elif input == 'a':
self.serial.write('a')
else:
self.serial.write(input)
try:
while True:
check_ctrl()
for line in self.serial:
check_ctrl()
if line.startswith('B'):
data = self.data_handler(
(scan, self.serial.read(size=self.databytes)))
data = self.data_postprocessing(data)
if data is not None:
self.data_pipe.send(data)
try:
self.datapoint += 1
except AttributeError: #Datapoint counting is optional
pass
elif line.lstrip().startswith('S'):
scan += 1
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
time.sleep(.3)
return True
except serial.SerialException:
return False
def data_handler(self, data_input):
"""Takes data_input as tuple -- (scan, data).
Returns:
(scan number, (voltage, current)) -- voltage in mV, current in A
"""
scan, data = data_input
voltage, current = struct.unpack('<Hl', data) #uint16 + int32
return (scan, (
(voltage-32768)*(3000./65536)*self.re_voltage_scale,
(current+self.gain_trim)*(1.5/8388607)/self.gain
)
)
def store_data(self, incoming, newline):
"""Stores data in data attribute. Should not be called from subprocess.
Can be overriden for custom experiments."""
line, data = incoming
if newline is True:
self.data['current_voltage'].append(deepcopy(self.line_data))
for i, item in enumerate(self.data['current_voltage'][line]):
item.append(data[i])
def data_postprocessing(self, data):
"""Discards first data point (usually gitched) by default, can be overridden
in subclass.
"""
try:
if self.datapoint <= 1:
return None
except AttributeError: # Datapoint counting is optional
pass
return data
def scan_process(self, line):
pass
def experiment_done(self):
"""Runs when experiment is finished (all data acquired)"""
self.data_to_pandas()
self.time += [datetime.utcnow()]
def export(self):
"""Return a dict containing data for saving."""
output = {
"datatype" : self.datatype,
"xlabel" : self.xlabel,
"ylabel" : self.ylabel,
"xmin" : self.xmin,
"xmax" : self.xmax,
"parameters" : self.parameters,
"data" : self.data,
"commands" : self.commands
}
return output
def data_to_pandas(self):
"""Convert data to pandas DataFrame and set as member of .df
attribute."""
self.df = OrderedDict()
for name, data in self.data.items():
try:
df = DataFrame(
columns=['Scan'] + list(self.plot_format[name]['labels']))
for n, line in enumerate(data): # Each scan
df = df.append(
DataFrame(
OrderedDict(zip(
['Scan'] + list(self.plot_format[name]['labels']),
[n] + list(line))
)
), ignore_index = True
)
except (AttributeError, KeyError):
try:
df = DataFrame(
columns=['Scan'] + list(self.columns))
for n, line in enumerate(data): # Each scan
df = df.append(
DataFrame(
OrderedDict(zip(
['Scan'] + list(self.columns),
[n] + list(line))
)
), ignore_index = True
)
except AttributeError as e: # Fallback if no self.columns
df = DataFrame(
columns=['Scan'] + ["{}{}".format(name, n)
for n in range(len(data))]
)
for n, line in enumerate(data):
df = df.append(
DataFrame(
OrderedDict(zip(
['Scan'] + ["{}{}".format(name, n)
for n in range(len(data))],
[n] + list(line))
)
), ignore_index = True
)
self.df[name] = df
def get_info_text(self):
"""Return string of text to disply on Info tab."""
buf = "#Time: S{} E{}\n".format(self.time[0], self.time[1])
buf += "#Commands:\n"
for line in self.commands:
buf += '#{}\n'.format(line)
return buf
def get_save_strings(self):
"""Return dict of strings with experiment parameters and data."""
buf = {}
buf['params'] = self.get_info_text()
buf.update(
{exp : df.to_csv(sep='\t', encoding='utf-8')
for exp, df in self.df.items()}
)
return buf
class PlotBox(object):
"""Contains data plot and associated methods."""
def __init__(self, plots=None):
"""Initializes plots. self.box should be reparented."""
self.name = "Main"
self.continuous_refresh = True
self.scan_refresh = False
if plots is not None:
self.plotnames = plots
else:
self.plotnames = []
self.subplots = {}
self.figure = Figure()
# self.figure.subplots_adjust(left=0.07, bottom=0.07,
# right=0.96, top=0.96)
self.setup()
self.format_plots() # Should be overriden by subclass
self.figure.set_tight_layout(True)
self.canvas = FigureCanvas(self.figure)
self.canvas.set_vexpand(True)
self.box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.box.pack_start(self.canvas, expand=True, fill=True, padding=0)
def setup(self):
self.plot_format = {
'current_voltage': {'xlabel': "Voltage (mV)",
'ylabel': "Current (A)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Should be overriden by subclass
"""
# Calculate size of grid needed
if len(self.plotnames) > 1:
gs = gridspec.GridSpec(int(ceil(len(self.plotnames)/2.)),2)
else:
gs = gridspec.GridSpec(1,1)
for n, i in enumerate(self.plotnames):
self.subplots[i] = self.figure.add_subplot(gs[n])
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([], [])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
def clearall(self):
"""Remove all lines on plot. """
for name, plot in self.subplots.items():
for line in reversed(plot.lines):
line.remove()
self.addline()
def clearline(self, subplot, line_number):
"""Remove a single line.
Arguments:
subplot -- key in self.subplots
line_number -- line number in subplot
"""
self.subplots[subplot].lines[line_number].remove()
def addline(self):
"""Add a new line to plot. (initialized with dummy data)))"""
for subplot in self.subplots.values():
subplot.plot([], [])
def updateline(self, Experiment, line_number):
"""Update a line specified with new data.
Arguments:
Experiment -- Experiment instance
line_number -- line number to update
"""
for subplot in Experiment.data:
while True:
try:
self.subplots[subplot].lines[line_number].set_xdata(
Experiment.data[subplot][line_number][0])
self.subplots[subplot].lines[line_number].set_ydata(
Experiment.data[subplot][line_number][1])
except IndexError:
self.addline()
except KeyError:
pass
else:
break
# logger.warning("Tried to set line %s that doesn't exist.", line_number)
def setlims(self, plot, xlims=None, ylims=None):
"""Sets x and y limits.
"""
if xlims is not None:
self.subplots[plot].set_xlim(xlims)
if ylims is not None:
self.subplots[plot].set_ylim(ylims)
self.figure.canvas.draw()
def redraw(self):
"""Autoscale and refresh the plot."""
for name, plot in self.subplots.items():
plot.relim()
plot.autoscale(True, axis = 'y')
self.figure.canvas.draw()
return True
\ No newline at end of file
import time
import struct
from .experiment_template import Experiment
from ..dstat import state
class OCPExp(Experiment):
"""Open circuit potential measumement in statusbar."""
id = 'ocp'
def __init__(self):
self.re_voltage_scale = state.board_instance.re_voltage_scale
self.databytes = 8
self.commands = ["EA", "EP0 0 "]
self.commands[0] += "2 " # input buffer
self.commands[0] += "3 " # 2.5 Hz sample rate
self.commands[0] += "1 " # 2x PGA
def data_handler(self, data_input):
"""Overrides Experiment method to only send ADC values."""
scan, data = data_input
# 2*uint16 + int32
seconds, milliseconds, voltage = struct.unpack('<HHl', data)
return voltage/5.592405e6*self.re_voltage_scale
class PMTIdle(Experiment):
"""PMT idle mode."""
id = "pmt_idle"
def __init__(self):
self.databytes = 8
self.commands = ["EA", "EM"]
self.commands[0] += "2 " # input buffer
self.commands[0] += "3 " # 2.5 Hz sample rate
self.commands[0] += "1 " # 2x PGA
import time
import struct
from .experiment_template import PlotBox, Experiment
class LSVExp(Experiment):
"""Linear Scan Voltammetry experiment"""
id = 'lsv'
def setup(self):
self.plotlims['current_voltage']['xlims'] = tuple(
sorted(
(int(self.parameters['start']),
int(self.parameters['stop']))
)
)
super(LSVExp, self).setup()
self.datatype = "linearData"
self.datalength = 2
self.databytes = 6 # uint16 + int32
self.stop_mv = int(self.parameters['stop'])
self.max_mv = abs(int(self.parameters['start'])-int(self.parameters['stop']))
self.commands += "E"
self.commands[2] += "L"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['stop'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['slope'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
def get_progress(self):
try:
return 1 - (abs(self.stop_mv - self.data['current_voltage'][-1][0][-1])/self.max_mv)
except IndexError:
return 0
\ No newline at end of file
import time
import struct
from .experiment_template import PlotBox, Experiment
class PotBox(PlotBox):
def setup(self):
self.plot_format = {
'voltage_time': {'xlabel': "Time (s)",
'ylabel': "Voltage (V)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Overrides superclass.
"""
self.subplots = {'voltage_time': self.figure.add_subplot(111)}
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([],[])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
class PotExp(Experiment):
id = 'pot'
"""Potentiometry experiment"""
def setup(self):
self.plots.append(PotBox(['voltage_time']))
self.datatype = "linearData"
self.datalength = 2
self.databytes = 8
self.data = {'voltage_time' : [([],[])]}
self.columns = ['Time (s)', 'Voltage (V)']
self.plotlims = {
'voltage_time': {
'xlims': (0, int(self.parameters['time']))
}
}
self.plots[-1].setlims('voltage_time', **self.plotlims['voltage_time'])
self.total_time = int(self.parameters['time'])
self.commands += "E"
self.commands[2] += "P"
self.commands[2] += str(self.parameters['time'])
self.commands[2] += " 1 " # potentiometry mode
def data_handler(self, data_input):
"""Overrides Experiment method to not convert x axis to mV."""
scan, data = data_input
# 2*uint16 + int32
seconds, milliseconds, voltage = struct.unpack('<HHl', data)
return (scan, (
seconds+milliseconds/1000.,
voltage*self.re_voltage_scale*(1.5/8388607.)
)
)
def store_data(self, incoming, newline):
"""Stores data in data attribute. Should not be called from subprocess.
Can be overriden for custom experiments."""
line, data = incoming
if newline is True:
self.data['voltage_time'].append(deepcopy(self.line_data))
for i, item in enumerate(self.data['voltage_time'][line]):
item.append(data[i])
def get_progress(self):
try:
return self.data['voltage_time'][-1][0][-1]/self.total_time
except IndexError:
return 0
\ No newline at end of file
import time
import struct
from copy import deepcopy
from .experiment_template import PlotBox, Experiment
class SWVBox(PlotBox):
def setup(self):
self.plot_format = {
'swv': {'xlabel': "Voltage (mV)",
'ylabel': "Current (A)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Overrides superclass.
"""
self.subplots = {'swv': self.figure.add_subplot(111)}
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([],[])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
class SWVExp(Experiment):
"""Square Wave Voltammetry experiment"""
id = 'swv'
def setup(self):
self.datatype = "SWVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
self.data = {
'swv' : [([], [], [], [])]
} # voltage, current, forwards, reverse
self.line_data = ([], [], [], [])
self.datalength = 2 * self.parameters['scans']
self.databytes = 10
self.columns = ['Voltage (mV)', 'Net Current (A)',
'Forward Current (A)', 'Reverse Current (A)']
self.plotlims = {
'swv': {
'xlims': tuple(sorted(
(int(self.parameters['start']),
int(self.parameters['stop']))
)
)
}
}
plot = SWVBox()
plot.setlims('swv', **self.plotlims['swv'])
self.plots.append(plot)
self.stop_mv = int(self.parameters['stop'])
self.max_mv = abs(int(self.parameters['start']) - int(self.parameters['stop']))
self.scan_points = self.max_mv * 2 / float(self.parameters['step'])
self.commands += "E"
self.commands[2] += "S"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['stop'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['step'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['pulse'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(self.parameters['freq'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['scans'])
self.commands[2] += " "
def data_handler(self, input_data):
"""Overrides Experiment method to calculate difference current"""
scan, data = input_data
# uint16 + int32
voltage, forward, reverse = struct.unpack('<Hll', data)
f_trim = forward+self.gain_trim
r_trim = reverse+self.gain_trim
return (scan, (
(voltage-32768)*3000./65536*self.re_voltage_scale,
(f_trim-r_trim)*(1.5/self.gain/8388607),
f_trim*(1.5/self.gain/8388607),
r_trim*(1.5/self.gain/8388607)
)
)
def store_data(self, incoming, newline):
"""Stores data in data attribute. Should not be called from subprocess.
Can be overriden for custom experiments."""
line, data = incoming
if newline is True:
self.data['swv'].append(deepcopy(self.line_data))
for i, item in enumerate(self.data['swv'][line]):
item.append(data[i])
def get_progress(self):
try:
if int(self.parameters['scans']) != 0:
scans_prog = (len(self.data['swv'])-1) / float(self.parameters['scans'])
scan_prog = (len(self.data['swv'][-1][0])-1) / self.scan_points / float(self.parameters['scans'])
prog = scans_prog + scan_prog
if prog > 1:
prog = 1
return prog
else:
return 1 - (abs(self.stop_mv - self.data['swv'][-1][0][-1])/self.max_mv)
except IndexError:
return 0
class DPVExp(SWVExp):
"""Diffential Pulse Voltammetry experiment."""
id = 'dpv'
def setup(self):
self.datatype = "SWVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
self.data = {
'swv' : [([], [], [], [])]
} # voltage, current, forwards, reverse
self.line_data = ([], [], [], [])
self.datalength = 2
self.databytes = 10
self.columns = ['Voltage (mV)', 'Net Current (A)',
'Forward Current (A)', 'Reverse Current (A)']
self.plotlims = {
'swv': {
'xlims': tuple(sorted(
(int(self.parameters['start']),
int(self.parameters['stop']))
)
)
}
}
plot = SWVBox()
plot.setlims('swv', **self.plotlims['swv'])
self.plots.append(plot)
self.stop_mv = int(self.parameters['stop'])
self.max_mv = abs(int(self.parameters['start']) - int(self.parameters['stop']))
self.commands += "E"
self.commands[2] += "D"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['stop'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['step'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['pulse'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(self.parameters['period'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['width'])
self.commands[2] += " "
def get_progress(self):
try:
return 1 - (abs(self.stop_mv - self.data['swv'][-1][0][-1])/self.max_mv)
except IndexError:
return 0
\ No newline at end of file