Skip to content
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, absolute_import, print_function, unicode_literals
from ..dstat import state
import logging
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, GObject
except ImportError:
print("ERR: GTK not available")
sys.exit(1)
logger = logging.getLogger(__name__)
class BaseLoop(GObject.GObject):
__gsignals__ = {
b'experiment_done': (GObject.SIGNAL_RUN_FIRST, None, tuple()),
b'progress_update': (GObject.SIGNAL_RUN_FIRST, None, (float,))
}
def __init__(self, experiment, callbacks=None):
GObject.GObject.__init__(self)
self.line = None
self.lastdataline = 0
self.current_exp = experiment
self.experiment_proc = None
for signal, cb in callbacks.items():
try:
self.connect(signal, cb)
except TypeError:
logger.warning("Invalid signal %s", signal)
def run(self):
self.experiment_proc = [
GObject.idle_add(self.experiment_running_data),
GObject.idle_add(self.experiment_running_proc),
GObject.timeout_add(100, self.update_progress)
]
def experiment_running_data(self):
"""Receive data from experiment process and add to
current_exp.data['data].
Run in GTK main loop.
Returns:
True -- when experiment is continuing to keep function in GTK's queue.
False -- when experiment process signals EOFError or IOError to remove
function from GTK's queue.
"""
try:
incoming = state.ser.get_data()
while incoming is not None:
try:
self.line = incoming[0]
if self.line > self.lastdataline:
newline = True
try:
logger.info("running scan_process()")
self.current_exp.scan_process(self.lastdataline)
except AttributeError:
pass
self.lastdataline = self.line
else:
newline = False
self.current_exp.store_data(incoming, newline)
except TypeError:
pass
incoming = state.ser.get_data()
return True
except EOFError as err:
logger.error(err)
self.experiment_done()
return False
except IOError as err:
logger.error(err)
self.experiment_done()
return False
def experiment_running_proc(self):
"""Receive proc signals from experiment process.
Run in GTK main loop.
Returns:
True -- when experiment is continuing to keep function in GTK's queue.
False -- when experiment process signals EOFError or IOError to remove
function from GTK's queue.
"""
try:
ctrl_buffer = state.ser.get_ctrl()
try:
if ctrl_buffer is not None:
self.current_exp.ctrl_loop(ctrl_buffer)
except AttributeError:
pass
proc_buffer = state.ser.get_proc()
if proc_buffer is not None:
if proc_buffer in ["DONE", "SERIAL_ERROR", "ABORT"]:
self.experiment_done()
if proc_buffer == "SERIAL_ERROR":
self.on_serial_disconnect_clicked()
else:
logger.warning("Unrecognized experiment return code: %s",
proc_buffer)
return False
return True
except EOFError as err:
logger.warning("EOFError: %s", err)
self.experiment_done()
return False
except IOError as err:
logger.warning("IOError: %s", err)
self.experiment_done()
return False
def experiment_done(self):
logger.info("Experiment done")
for proc in self.experiment_proc:
GObject.source_remove(proc)
self.current_exp.scan_process(self.lastdataline)
self.current_exp.experiment_done()
self.emit("experiment_done")
def update_progress(self):
try:
progress = self.current_exp.get_progress()
except AttributeError:
progress = -1
self.emit("progress_update", progress)
return True
class PlotLoop(BaseLoop):
def experiment_running_plot(self, force_refresh=False):
"""Plot all data in current_exp.data.
Run in GTK main loop. Always returns True so must be manually
removed from GTK's queue.
"""
if self.line is None:
return True
for plot in self.current_exp.plots:
if (plot.scan_refresh and self.line > self.lastdataline):
while self.line > self.lastline:
# make sure all of last line is added
plot.updateline(self.current_exp, self.lastdataline)
self.lastdataline += 1
plot.updateline(self.current_exp, self.line)
plot.redraw()
else:
while self.line > self.lastdataline:
# make sure all of last line is added
plot.updateline(self.current_exp, self.lastdataline)
self.lastdataline += 1
plot.updateline(self.current_exp, self.line)
if plot.continuous_refresh is True or force_refresh is True:
plot.redraw()
return True
def run(self):
super(PlotLoop, self).run()
self.experiment_proc.append(
GObject.timeout_add(200, self.experiment_running_plot)
)
def experiment_done(self):
logger.info("Experiment done")
for proc in self.experiment_proc:
GObject.source_remove(proc)
self.current_exp.scan_process(self.lastdataline)
self.current_exp.experiment_done()
self.experiment_running_plot(force_refresh=True)
self.emit("experiment_done")
\ No newline at end of file
......@@ -19,11 +19,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import struct
from datetime import datetime
import time
from collections import OrderedDict
from copy import deepcopy
from datetime import datetime
from math import ceil
import time
try:
import gi
......@@ -33,39 +33,45 @@ except ImportError:
print "ERR: GTK not available"
sys.exit(1)
from matplotlib.figure import Figure
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
from matplotlib.backends.backend_gtk3agg \
import FigureCanvasGTK3Agg as FigureCanvas
from pandas import DataFrame
try:
import seaborn as sns
sns.set(context='paper', style='darkgrid')
except ImportError:
pass
import serial
logger = logging.getLogger("dstat.comm")
dstat_logger = logging.getLogger("dstat.comm.DSTAT")
exp_logger = logging.getLogger("dstat.comm.Experiment")
from ..dstat import state, comm
from ..dstat.comm import TransmitError
from . import experiment_loops
logger = logging.getLogger(__name__)
dstat_logger = logging.getLogger("{}.DSTAT".format(comm.__name__))
exp_logger = logging.getLogger("{}.Experiment".format(__name__))
from errors import InputError, VarError
import state
class Experiment(object):
class Experiment(GObject.Object):
"""Store and acquire a potentiostat experiment. Meant to be subclassed
to by different experiment types and not used instanced directly. Subclass
must instantiate self.plotbox as the PlotBox class to use and define id as
a class attribute.
"""
id = None
Loops = experiment_loops.PlotLoop
__gsignals__ = {
b'exp_ready': (GObject.SIGNAL_RUN_FIRST, None, ()),
b'exp_done': (GObject.SIGNAL_RUN_FIRST, None, ()),
}
def __init__(self, parameters):
"""Adds commands for gain and ADC."""
super(Experiment, self).__init__()
self.current_command = None
self.parameters = parameters
self.databytes = 8
self.datapoint = 0
......@@ -73,25 +79,18 @@ class Experiment(object):
self.time = 0
self.plots = []
major, minor = state.dstat_version
self.re_voltage_scale = state.board_instance.re_voltage_scale
if major >= 1:
if minor == 1:
self.__gaintable = [1e2, 3e2, 3e3, 3e4, 3e5, 3e6, 3e7, 5e8]
elif minor >= 2:
self.__gaintable = [1, 1e2, 3e3, 3e4, 3e5, 3e6, 3e7, 1e8]
self.__gain_trim_table = ['r100_trim', 'r100_trim', 'r3k_trim',
'r30k_trim', 'r300k_trim', 'r3M_trim',
'r30M_trim', 'r100M_trim']
else:
raise VarError(parameters['version'], "Invalid version parameter.")
self.gain = state.board_instance.gain[int(self.parameters['gain'])]
self.gain = self.__gaintable[int(self.parameters['gain'])]
try:
self.gain_trim = int(
state.settings[
self.__gain_trim_table[int(self.parameters['gain'])]
state.board_instance.gain_trim[int(self.parameters['gain'])]
][1]
)
except AttributeError:
logger.debug("No gain trim table.")
self.commands = ["EA", "EG"]
......@@ -104,30 +103,40 @@ class Experiment(object):
self.commands[1] += "{p[gain]} {p[short_true]:d} ".format(
p=self.parameters)
self.plotlims = {'current_voltage' : {'xlims' : (0, 1)}
}
self.setup()
self.time = [datetime.utcnow()]
def setup_loops(self, callbacks):
self.loops = self.__class__.Loops(self, callbacks)
self.loops.run()
def setup(self):
self.data = OrderedDict(current_voltage=[([], [])])
self.columns = ['Voltage (mV)', 'Current (A)']
self.plot_format = {
'current_voltage' : {'labels' : self.columns,
'xlims' : (0, 1)
}
}
# list of scans, tuple of dimensions, list of data
self.line_data = ([], [])
self.plots.append(PlotBox(['current_voltage']))
plot = PlotBox(['current_voltage'])
plot.setlims('current_voltage', **self.plotlims['current_voltage'])
self.plots.append(plot)
def write_command(self, cmd, params=None, retry=10):
def write_command(self, cmd, params=None, retry=5):
"""Write command to serial with optional number of retries."""
def get_reply():
def get_reply(retries=3):
while True:
reply = self.serial.readline().rstrip()
if reply.startswith('#'):
dstat_logger.info(reply)
return get_reply()
elif reply == "":
retries -= 1
if retries <= 0:
raise TransmitError
else:
return reply
n = len(cmd)
......@@ -135,50 +144,87 @@ class Experiment(object):
n_params = len(params)
for _ in range(retry):
tries = 5
while True:
time.sleep(0.2)
self.serial.reset_input_buffer()
self.serial.write('!{}\n'.format(n))
time.sleep(.1)
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@ACK {}".format(n):
logger.warning("Invalid response: {}".format(reply))
logger.warning("Expected ACK got: {}".format(reply))
continue
self.serial.write('{}\n'.format(cmd))
tries = 5
while True:
self.serial.write('{}\n'.format(cmd))
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@RCV {}".format(n):
logger.warning("Invalid response: {}".format(reply))
logger.warning("Expected RCV got: {}".format(reply))
continue
if params is None:
return True
tries = 5
while True:
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
break
tries -= 1
pass
else:
break
if reply != "@RQP {}".format(n_params):
logger.warning("Invalid response: {}".format(reply))
logger.warning("Expected RQP got: {}".format(reply))
continue
self.serial.write(" ".join(params) + " ")
tries = 5
for i in params:
while True:
self.serial.write(i + " ")
try:
reply = get_reply()
if reply != "@RCP {}".format(n_params):
logger.warning("Invalid response: {}".format(reply))
if reply == "@RCVC {}".format(i):
break
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
return True
return False
def run(self, ser, ctrl_pipe, data_pipe):
"""Execute experiment. Connects and sends handshake signal to DStat
then sends self.commands. Don't call directly as a process in Windows,
use run_wrapper instead.
then sends self.commands.
"""
self.serial = ser
self.ctrl_pipe = ctrl_pipe
......@@ -188,6 +234,7 @@ class Experiment(object):
try:
for i in self.commands:
self.current_command = i
status = "DONE"
if isinstance(i, (str, unicode)):
logger.info("Command: %s", i)
......@@ -225,11 +272,11 @@ class Experiment(object):
data to self.data_pipe as result of self.data_handler).
"""
scan = 0
try:
while True:
def check_ctrl():
if self.ctrl_pipe.poll():
input = self.ctrl_pipe.recv()
logger.debug("serial_handler: %s", input)
logger.info("serial_handler: %s", input)
if input == "DISCONNECT":
self.serial.write('a')
self.serial.reset_input_buffer()
......@@ -238,11 +285,14 @@ class Experiment(object):
return False
elif input == 'a':
self.serial.write('a')
else:
self.serial.write(input)
try:
while True:
check_ctrl()
for line in self.serial:
if self.ctrl_pipe.poll():
if self.ctrl_pipe.recv() == 'a':
self.serial.write('a')
check_ctrl()
if line.startswith('B'):
data = self.data_handler(
......@@ -269,7 +319,6 @@ class Experiment(object):
except serial.SerialException:
return False
def data_handler(self, data_input):
"""Takes data_input as tuple -- (scan, data).
Returns:
......@@ -278,8 +327,8 @@ class Experiment(object):
scan, data = data_input
voltage, current = struct.unpack('<Hl', data) #uint16 + int32
return (scan, (
(voltage-32768)*3000./65536,
(current+self.gain_trim)*(1.5/self.gain/8388607)
(voltage-32768)*(3000./65536)*self.re_voltage_scale,
(current+self.gain_trim)*(1.5/8388607)/self.gain
)
)
......@@ -299,13 +348,16 @@ class Experiment(object):
in subclass.
"""
try:
if self.datapoint == 0:
if self.datapoint <= 1:
return None
except AttributeError: # Datapoint counting is optional
pass
return data
def scan_process(self, line):
pass
def experiment_done(self):
"""Runs when experiment is finished (all data acquired)"""
self.data_to_pandas()
......@@ -332,11 +384,45 @@ class Experiment(object):
self.df = OrderedDict()
for name, data in self.data.items():
df = DataFrame(columns=['Scan'] + self.columns)
try:
df = DataFrame(
columns=['Scan'] + list(self.plot_format[name]['labels']))
for n, line in enumerate(data): # Each scan
df = df.append(
DataFrame(
OrderedDict(zip(
['Scan'] + list(self.plot_format[name]['labels']),
[n] + list(line))
)
), ignore_index = True
)
except (AttributeError, KeyError):
try:
df = DataFrame(
columns=['Scan'] + list(self.columns))
for n, line in enumerate(data): # Each scan
df = df.append(
DataFrame(
OrderedDict(zip(
['Scan'] + list(self.columns),
[n] + list(line))
)
), ignore_index = True
)
except AttributeError as e: # Fallback if no self.columns
df = DataFrame(
columns=['Scan'] + ["{}{}".format(name, n)
for n in range(len(data))]
)
for n, line in enumerate(data):
df = df.append(DataFrame(
OrderedDict(zip(['Scan'] + self.columns,
df = df.append(
DataFrame(
OrderedDict(zip(
['Scan'] + ["{}{}".format(name, n)
for n in range(len(data))],
[n] + list(line))
)
), ignore_index = True
......@@ -365,20 +451,26 @@ class Experiment(object):
return buf
class PlotBox(object):
"""Contains data plot and associated methods."""
def __init__(self, plots):
def __init__(self, plots=None):
"""Initializes plots. self.box should be reparented."""
self.name = "Main"
self.continuous_refresh = True
self.scan_refresh = False
if plots is not None:
self.plotnames = plots
else:
self.plotnames = []
self.subplots = {}
self.figure = Figure()
# self.figure.subplots_adjust(left=0.07, bottom=0.07,
# right=0.96, top=0.96)
self.setup()
self.format_plots() # Should be overriden by subclass
self.figure.set_tight_layout(True)
......@@ -389,6 +481,13 @@ class PlotBox(object):
self.box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.box.pack_start(self.canvas, expand=True, fill=True, padding=0)
def setup(self):
self.plot_format = {
'current_voltage': {'xlabel': "Voltage (mV)",
'ylabel': "Current (A)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Should be overriden by subclass
......@@ -402,10 +501,12 @@ class PlotBox(object):
for n, i in enumerate(self.plotnames):
self.subplots[i] = self.figure.add_subplot(gs[n])
for subplot in self.subplots.values():
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([], [])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
def clearall(self):
"""Remove all lines on plot. """
......@@ -436,25 +537,27 @@ class PlotBox(object):
line_number -- line number to update
"""
for subplot in Experiment.data:
while True:
try:
self.subplots[subplot].lines[line_number].set_xdata(
Experiment.data[subplot][line_number][0])
self.subplots[subplot].lines[line_number].set_ydata(
Experiment.data[subplot][line_number][1])
except IndexError:
self.addline()
except KeyError:
pass
else:
break
# logger.warning("Tried to set line %s that doesn't exist.", line_number)
def changetype(self, Experiment):
"""Change plot type. Set axis labels and x bounds to those stored
in the Experiment instance. Stores class instance in Experiment.
def setlims(self, plot, xlims=None, ylims=None):
"""Sets x and y limits.
"""
for name, subplot in self.subplots.items():
subplot.set_xlabel(Experiment.plot_format[name]['labels'][0])
subplot.set_ylabel(Experiment.plot_format[name]['labels'][1])
subplot.set_xlim(Experiment.plot_format[name]['xlims'])
for name, subplot in Experiment.plot_format.items():
self.subplots[name].set_xlabel(subplot['labels'][0])
self.subplots[name].set_ylabel(subplot['labels'][1])
self.subplots[name].set_xlim(subplot['xlims'])
if xlims is not None:
self.subplots[plot].set_xlim(xlims)
if ylims is not None:
self.subplots[plot].set_ylim(ylims)
self.figure.canvas.draw()
......
import time
import struct
from experiments.experiment_template import Experiment
from .experiment_template import Experiment
from ..dstat import state
class OCPExp(Experiment):
"""Open circuit potential measumement in statusbar."""
id = 'ocp'
def __init__(self):
self.re_voltage_scale = state.board_instance.re_voltage_scale
self.databytes = 8
self.commands = ["EA", "EP0 0 "]
......@@ -15,17 +18,18 @@ class OCPExp(Experiment):
self.commands[0] += "3 " # 2.5 Hz sample rate
self.commands[0] += "1 " # 2x PGA
def data_handler(self, data_input):
"""Overrides Experiment method to only send ADC values."""
scan, data = data_input
# 2*uint16 + int32
seconds, milliseconds, voltage = struct.unpack('<HHl', data)
return (voltage/5.592405e6)
return voltage/5.592405e6*self.re_voltage_scale
class PMTIdle(Experiment):
"""Open circuit potential measumement in statusbar."""
"""PMT idle mode."""
id = "pmt_idle"
def __init__(self):
self.databytes = 8
......
import time
import struct
from .experiment_template import PlotBox, Experiment
class LSVExp(Experiment):
"""Linear Scan Voltammetry experiment"""
id = 'lsv'
def setup(self):
self.plotlims['current_voltage']['xlims'] = tuple(
sorted(
(int(self.parameters['start']),
int(self.parameters['stop']))
)
)
super(LSVExp, self).setup()
self.datatype = "linearData"
self.datalength = 2
self.databytes = 6 # uint16 + int32
self.stop_mv = int(self.parameters['stop'])
self.max_mv = abs(int(self.parameters['start'])-int(self.parameters['stop']))
self.commands += "E"
self.commands[2] += "L"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['stop'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['slope'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
def get_progress(self):
try:
return 1 - (abs(self.stop_mv - self.data['current_voltage'][-1][0][-1])/self.max_mv)
except IndexError:
return 0
\ No newline at end of file
import time
import struct
from experiments.experiment_template import PlotBox, Experiment
from .experiment_template import PlotBox, Experiment
class PotBox(PlotBox):
def setup(self):
self.plot_format = {
'voltage_time': {'xlabel': "Time (s)",
'ylabel': "Voltage (V)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Overrides superclass.
......@@ -14,6 +22,9 @@ class PotBox(PlotBox):
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([],[])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
class PotExp(Experiment):
id = 'pot'
......@@ -26,12 +37,14 @@ class PotExp(Experiment):
self.databytes = 8
self.data = {'voltage_time' : [([],[])]}
self.columns = ['Time (s)', 'Voltage (V)']
self.plot_format = {
self.plotlims = {
'voltage_time': {
'labels' : self.columns,
'xlims': (0, int(self.parameters['time']))
}
}
self.plots[-1].setlims('voltage_time', **self.plotlims['voltage_time'])
self.total_time = int(self.parameters['time'])
self.commands += "E"
self.commands[2] += "P"
......@@ -44,7 +57,8 @@ class PotExp(Experiment):
# 2*uint16 + int32
seconds, milliseconds, voltage = struct.unpack('<HHl', data)
return (scan, (
seconds+milliseconds/1000., voltage*(1.5/8388607.)
seconds+milliseconds/1000.,
voltage*self.re_voltage_scale*(1.5/8388607.)
)
)
......@@ -58,3 +72,9 @@ class PotExp(Experiment):
for i, item in enumerate(self.data['voltage_time'][line]):
item.append(data[i])
def get_progress(self):
try:
return self.data['voltage_time'][-1][0][-1]/self.total_time
except IndexError:
return 0
\ No newline at end of file
......@@ -2,27 +2,35 @@ import time
import struct
from copy import deepcopy
from experiments.experiment_template import PlotBox, Experiment
from .experiment_template import PlotBox, Experiment
class SWVBox(PlotBox):
def setup(self):
self.plot_format = {
'swv': {'xlabel': "Voltage (mV)",
'ylabel': "Current (A)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Overrides superclass.
"""
self.subplots = {'swv': self.figure.add_subplot(111)}
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([],[])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
class SWVExp(Experiment):
"""Square Wave Voltammetry experiment"""
id = 'swv'
def setup(self):
self.plots.append(SWVBox(['swv']))
def setup(self):
self.datatype = "SWVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
......@@ -34,34 +42,65 @@ class SWVExp(Experiment):
self.databytes = 10
self.columns = ['Voltage (mV)', 'Net Current (A)',
'Forward Current (A)', 'Reverse Current (A)']
self.plot_format = {
'swv' : {'labels' : ('Voltage (mV)',
'Current (A)'
),
'xlims' : (int(self.parameters['start']),
self.plotlims = {
'swv': {
'xlims': tuple(sorted(
(int(self.parameters['start']),
int(self.parameters['stop']))
)
)
}
}
plot = SWVBox()
plot.setlims('swv', **self.plotlims['swv'])
self.plots.append(plot)
self.stop_mv = int(self.parameters['stop'])
self.max_mv = abs(int(self.parameters['start']) - int(self.parameters['stop']))
self.scan_points = self.max_mv * 2 / float(self.parameters['step'])
self.commands += "E"
self.commands[2] += "S"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(int(self.parameters['clean_mV'])*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(int(int(self.parameters['dep_mV'])*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(self.parameters['start'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['stop'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['step'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['pulse'])
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['stop'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['step'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['pulse'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(self.parameters['freq'])
self.commands[2] += " "
......@@ -77,7 +116,7 @@ class SWVExp(Experiment):
r_trim = reverse+self.gain_trim
return (scan, (
(voltage-32768)*3000./65536,
(voltage-32768)*3000./65536*self.re_voltage_scale,
(f_trim-r_trim)*(1.5/self.gain/8388607),
f_trim*(1.5/self.gain/8388607),
r_trim*(1.5/self.gain/8388607)
......@@ -95,12 +134,25 @@ class SWVExp(Experiment):
for i, item in enumerate(self.data['swv'][line]):
item.append(data[i])
def get_progress(self):
try:
if int(self.parameters['scans']) != 0:
scans_prog = (len(self.data['swv'])-1) / float(self.parameters['scans'])
scan_prog = (len(self.data['swv'][-1][0])-1) / self.scan_points / float(self.parameters['scans'])
prog = scans_prog + scan_prog
if prog > 1:
prog = 1
return prog
else:
return 1 - (abs(self.stop_mv - self.data['swv'][-1][0][-1])/self.max_mv)
except IndexError:
return 0
class DPVExp(SWVExp):
"""Diffential Pulse Voltammetry experiment."""
id = 'dpv'
def setup(self):
self.plots.append(SWVBox(['swv']))
self.datatype = "SWVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
......@@ -112,36 +164,73 @@ class DPVExp(SWVExp):
self.databytes = 10
self.columns = ['Voltage (mV)', 'Net Current (A)',
'Forward Current (A)', 'Reverse Current (A)']
self.plot_format = {
'swv' : {'labels' : ('Voltage (mV)',
'Current (A)'
),
'xlims' : (int(self.parameters['start']),
self.plotlims = {
'swv': {
'xlims': tuple(sorted(
(int(self.parameters['start']),
int(self.parameters['stop']))
)
)
}
}
plot = SWVBox()
plot.setlims('swv', **self.plotlims['swv'])
self.plots.append(plot)
self.stop_mv = int(self.parameters['stop'])
self.max_mv = abs(int(self.parameters['start']) - int(self.parameters['stop']))
self.commands += "E"
self.commands[2] += "D"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(int(self.parameters['clean_mV'])*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(int(int(self.parameters['dep_mV'])*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(self.parameters['start'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['stop'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['step'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['pulse'])
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['stop'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['step'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['pulse'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(self.parameters['period'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['width'])
self.commands[2] += " "
def get_progress(self):
try:
return 1 - (abs(self.stop_mv - self.data['swv'][-1][0][-1])/self.max_mv)
except IndexError:
return 0
\ No newline at end of file
......@@ -17,6 +17,8 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os.path
from pkg_resources import parse_version
try:
import gi
......@@ -26,31 +28,15 @@ except ImportError:
print "ERR: GTK not available"
sys.exit(1)
from errors import InputError, VarError
v1_1_gain = [(0, "100 Ω (15 mA FS)", "0"),
(1, "300 Ω (5 mA FS)", "1"),
(2, "3 kΩ (500 µA FS)", "2"),
(3, "30 kΩ (50 µA FS)", "3"),
(4, "300 kΩ (5 µA FS)", "4"),
(5, "3 MΩ (500 nA FS)", "5"),
(6, "30 MΩ (50 nA FS)", "6"),
(7, "500 MΩ (3 nA FS)", "7")]
v1_2_gain = [(0, "Bypass", "0"),
(1, "100 Ω (15 mA FS)", "1"),
(2, "3 kΩ (500 µA FS)", "2"),
(3, "30 kΩ (50 µA FS)", "3"),
(4, "300 kΩ (5 µA FS)", "4"),
(5, "3 MΩ (500 nA FS)", "5"),
(6, "30 MΩ (50 nA FS)", "6"),
(7, "100 MΩ (15 nA FS)", "7")]
from ..errors import InputError, VarError
from ..dstat import state
mod_dir = os.path.dirname(os.path.abspath(__file__))
class adc_pot(object):
def __init__(self):
self.builder = Gtk.Builder()
self.builder.add_from_file('interface/adc_pot.glade')
self.builder.add_from_file(os.path.join(mod_dir,'adc_pot.glade'))
self.builder.connect_signals(self)
self.cell = Gtk.CellRendererText()
......@@ -81,7 +67,7 @@ class adc_pot(object):
self.gain_liststore = self.builder.get_object('gain_liststore')
self.ui['gain_index'].pack_start(self.cell, True)
self.ui['gain_index'].add_attribute(self.cell, 'text', 1)
self.ui['gain_index'].set_active(2)
# self.ui['gain_index'].set_active(2)
self._params = {}
......@@ -135,15 +121,19 @@ class adc_pot(object):
for i in self.ui:
self.ui[i].set_active(self._params[i])
def set_version(self, version):
def set_version(self, boost=None):
""" Sets menus for DStat version. """
try:
if self.version == state.board_instance:
return
except AttributeError:
pass
self.version = state.board_instance
self.gain_liststore.clear()
if version[0] == 1:
if version[1] == 1:
for i in v1_1_gain:
self.gain_liststore.append(str(i))
elif version[1] >= 2:
for i in v1_2_gain:
self.gain_liststore.append(i)
for n, i in enumerate(self.version.gain_labels):
self.gain_liststore.append((n, i, str(n)))
self.ui['gain_index'].set_active(self.version.gain_default_index)
\ No newline at end of file
from __future__ import division, absolute_import, print_function, unicode_literals
import logging
logger = logging.getLogger("dstat.interface.data_view")
logger = logging.getLogger(__name__)
from collections import OrderedDict
......
......@@ -30,7 +30,7 @@ except ImportError:
print "ERR: GTK not available"
sys.exit(1)
logger = logging.getLogger('dstat.interface.db')
logger = logging.getLogger(__name__)
class DB_Window(GObject.GObject):
__gsignals__ = {
......
......@@ -30,13 +30,16 @@ except ImportError:
print("ERR: GTK not available")
sys.exit(1)
import dstat_comm
import state
import experiments as exp
import experiments.cal as cal
from ..dstat import comm, state
from ..experiments import (cal, chronoamp, cv, experiment_template,
idle, lsv, pot, swv)
import __main__
from errors import InputError, VarError
logger = logging.getLogger("dstat.interface.exp_int")
from ..errors import InputError, VarError
logger = logging.getLogger(__name__)
mod_dir = os.path.dirname(os.path.abspath(__file__))
class ExpInterface(GObject.Object):
"""Generic experiment interface class. Should be subclassed to implement
......@@ -59,13 +62,16 @@ class ExpInterface(GObject.Object):
self.entry = {} # to be used only for str parameters
self._params = None
def get_experiment(self, parameters):
return self.__class__.experiment(parameters)
def _fill_params(self):
self._params = dict.fromkeys(self.entry.keys())
@property
def params(self):
"""Dict of parameters"""
if self._params == None:
if self._params is None:
self._fill_params()
self._get_params()
return self._params
......@@ -77,7 +83,7 @@ class ExpInterface(GObject.Object):
@params.setter
def params(self, params):
if self._params == None:
if self._params is None:
self._fill_params()
for i in self._params:
try:
......@@ -98,6 +104,7 @@ class ExpInterface(GObject.Object):
def on_done_utility(self, data=None):
self.emit('done_utility')
class Chronoamp(ExpInterface):
"""Experiment class for chronoamperometry. Extends ExpInterface class to
support treeview neeeded for CA.
......@@ -108,10 +115,10 @@ class Chronoamp(ExpInterface):
get_params(self)
"""
id = 'cae'
experiment = exp.Chronoamp
experiment = chronoamp.Chronoamp
def __init__(self):
"""Extends superclass method to support treeview."""
super(Chronoamp, self).__init__('interface/chronoamp.glade')
super(Chronoamp, self).__init__(os.path.join(mod_dir, 'chronoamp.glade'))
self.name = "Chronoamperometry"
......@@ -144,9 +151,9 @@ class Chronoamp(ExpInterface):
self.builder.get_object('potential_entry').get_text())
time = int(self.builder.get_object('time_entry').get_text())
if (potential > 1499 or potential < -1500):
if not state.board_instance.test_mv(potential):
raise ValueError("Potential out of range")
if (time < 1 or time > 65535):
if not state.board_instance.test_s(time):
raise ValueError("Time out of range")
self.model.append([potential, time])
......@@ -187,10 +194,10 @@ class Chronoamp(ExpInterface):
class LSV(ExpInterface):
"""Experiment class for LSV."""
id = 'lsv'
experiment = exp.LSVExp
experiment = lsv.LSVExp
def __init__(self):
"""Adds entry listings to superclass's self.entry dict"""
super(LSV, self).__init__('interface/lsv.glade')
super(LSV, self).__init__(os.path.join(mod_dir, 'lsv.glade'))
self.name = "Linear Sweep Voltammetry"
self.entry['clean_mV'] = self.builder.get_object('clean_mV')
......@@ -204,10 +211,10 @@ class LSV(ExpInterface):
class CV(ExpInterface):
"""Experiment class for CV."""
id = 'cve'
experiment = exp.CVExp
experiment = cv.CVExp
def __init__(self):
"""Adds entry listings to superclass's self.entry dict"""
super(CV, self).__init__('interface/cv.glade')
super(CV, self).__init__(os.path.join(mod_dir, 'cv.glade'))
self.name = "Cyclic Voltammetry"
self.entry['clean_mV'] = self.builder.get_object('clean_mV')
......@@ -223,10 +230,10 @@ class CV(ExpInterface):
class SWV(ExpInterface):
"""Experiment class for SWV."""
id = 'swv'
experiment = exp.SWVExp
experiment = swv.SWVExp
def __init__(self):
"""Adds entry listings to superclass's self.entry dict"""
super(SWV, self).__init__('interface/swv.glade')
super(SWV, self).__init__(os.path.join(mod_dir, 'swv.glade'))
self.name = "Square Wave Voltammetry"
self.entry['clean_mV'] = self.builder.get_object('clean_mV')
......@@ -262,10 +269,10 @@ class SWV(ExpInterface):
class DPV(ExpInterface):
"""Experiment class for DPV."""
id = 'dpv'
experiment = exp.DPVExp
experiment = swv.DPVExp
def __init__(self):
"""Adds entry listings to superclass's self.entry dict"""
super(DPV, self).__init__('interface/dpv.glade')
super(DPV, self).__init__(os.path.join(mod_dir, 'dpv.glade'))
self.name = "Differential Pulse Voltammetry"
......@@ -297,10 +304,10 @@ class DPV(ExpInterface):
class PD(ExpInterface):
"""Experiment class for PD."""
id = 'pde'
experiment = exp.PDExp
experiment = chronoamp.PDExp
def __init__(self):
"""Adds entry listings to superclass's self.entry dict"""
super(PD, self).__init__('interface/pd.glade')
super(PD, self).__init__(os.path.join(mod_dir, 'pd.glade'))
self.name = "Photodiode/PMT"
......@@ -363,9 +370,9 @@ class PD(ExpInterface):
try:
self.builder.get_object('light_label').set_text(str(
dstat_comm.read_light_sensor()))
dstat_comm.read_settings()
comm.read_settings()
state.settings['tcs_enabled'][1] = '1' # Make sure TCS enabled
dstat_comm.write_settings()
comm.write_settings()
self.builder.get_object('threshold_entry').set_text(str(
state.settings['tcs_clear_threshold'][1]))
......@@ -384,8 +391,8 @@ class PD(ExpInterface):
try:
state.settings['tcs_clear_threshold'][1] = self.builder.get_object(
'threshold_entry').get_text()
dstat_comm.write_settings()
dstat_comm.read_settings()
comm.write_settings()
comm.read_settings()
self.builder.get_object('threshold_entry').set_text(
str(state.settings['tcs_clear_threshold'][1]))
__main__.MAIN.start_ocp()
......@@ -405,10 +412,10 @@ class PD(ExpInterface):
class POT(ExpInterface):
"""Experiment class for Potentiometry."""
id = 'pot'
experiment = exp.PotExp
experiment = pot.PotExp
def __init__(self):
"""Adds entry listings to superclass's self.entry dict"""
super(POT, self).__init__('interface/potexp.glade')
super(POT, self).__init__(os.path.join(mod_dir, 'potexp.glade'))
self.name = "Potentiometry"
self.entry['time'] = self.builder.get_object('time_entry')
......@@ -416,10 +423,10 @@ class POT(ExpInterface):
class CAL(ExpInterface):
"""Experiment class for Calibrating gain."""
id = 'cal'
experiment = exp.CALExp
experiment = cal.CALExp
def __init__(self):
"""Adds entry listings to superclass's self.entry dict"""
super(CAL, self).__init__('interface/calib.glade')
super(CAL, self).__init__(os.path.join(mod_dir, 'calib.glade'))
self.name = "Calilbration"
self.entry['time'] = self.builder.get_object('time_entry')
......@@ -442,7 +449,7 @@ class CAL(ExpInterface):
try:
__main__.MAIN.on_pot_stop_clicked()
__main__.MAIN.stop_ocp()
dstat_comm.read_settings()
comm.read_settings()
self.entry['R100'].set_text(str(
state.settings['r100_trim'][1]))
......@@ -479,7 +486,7 @@ class CAL(ExpInterface):
state.settings['r3M_trim'][1] = self.entry['R3M'].get_text()
state.settings['r30M_trim'][1] = self.entry['R30M'].get_text()
state.settings['r100M_trim'][1] = self.entry['R100M'].get_text()
dstat_comm.write_settings()
comm.write_settings()
__main__.MAIN.start_ocp()
......@@ -487,7 +494,7 @@ class CAL(ExpInterface):
GObject.timeout_add(700, restore_buttons, self.buttons)
def on_measure_button_clicked(self, data=None):
if (int(self.entry['time'].get_text()) <= 0 or int(self.entry['time'].get_text()) > 65535):
if int(self.entry['time'].get_text()) <= 0 or int(self.entry['time'].get_text()) > 65535:
logger.error("ERR: Time out of range")
return
......@@ -524,6 +531,7 @@ class CAL(ExpInterface):
GObject.timeout_add(700, restore_buttons, self.buttons)
__main__.MAIN.spinner.stop()
def restore_buttons(buttons):
""" Should be called with GObject callback """
for i in buttons:
......