#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2017 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import struct
import logging
logger = logging.getLogger(__name__)
import serial
from ..errors import InputError, VarError
from ..dstat import state
from ..experiments.experiment_template import Experiment, dstat_logger
def measure_offset(time):
gain_trim_table = [None, 'r100_trim', 'r3k_trim', 'r30k_trim', 'r300k_trim',
'r3M_trim', 'r30M_trim', 'r100M_trim']
parameters = {}
parameters['time'] = time
gain_offset = {}
for i in range(1,8):
parameters['gain'] = i
state.ser.start_exp(CALExp(parameters))
logger.info("measure_offset: %s", state.ser.get_proc(block=True))
gain_offset[gain_trim_table[i]] = state.ser.get_data(block=True)
return gain_offset
class CALExp(Experiment):
id = 'cal'
"""Offset calibration experiment"""
def __init__(self, parameters):
self.parameters = parameters
self.databytes = 8
self.scan = 0
self.data = []
self.commands = ["EA2 3 1 ", "EG"]
self.commands[1] += str(self.parameters['gain'])
self.commands[1] += " "
self.commands[1] += "0 "
self.commands.append(
("ER1 0", ["32768", str(self.parameters['time'])])
)
def serial_handler(self):
"""Handles incoming serial transmissions from DStat. Returns False
if stop button pressed and sends abort signal to instrument. Sends
data to self.data_pipe as result of self.data_handler).
"""
try:
while True:
if self.ctrl_pipe.poll():
input = self.ctrl_pipe.recv()
logger.debug("serial_handler: %s", input)
if input == ('a' or "DISCONNECT"):
self.serial.write('a')
logger.info("serial_handler: ABORT pressed!")
return False
for line in self.serial:
if self.ctrl_pipe.poll():
if self.ctrl_pipe.recv() == 'a':
self.serial.write('a')
logger.info("serial_handler: ABORT pressed!")
return False
if line.startswith('B'):
self.data.append(self.data_handler(
self.serial.read(size=self.databytes)))
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
self.serial.flushInput()
self.experiment_done()
return True
except serial.SerialException:
return False
def data_handler(self, data):
"""Takes data_input as tuple -- (scan, data).
Returns:
current
"""
seconds, milliseconds, current = struct.unpack('<HHl', data)
return current
def experiment_done(self):
"""Averages data points
"""
try:
sum = 0
self.data[0] = 0 # Skip first point
except IndexError:
return
for i in self.data:
sum += i
sum /= len(self.data)
if (sum > 32767):
sum = 32767
elif (sum < -32768):
sum = -32768
self.data_pipe.send(sum)
\ No newline at end of file
import time
import struct
import numpy as np
import serial
from ..interface.plot import mean, plotSpectrum, findBounds
from .experiment_template import PlotBox, Experiment, exp_logger
class ChronoampBox(PlotBox):
def setup(self):
self.plot_format = {
'current_time': {'xlabel': "Time (s)",
'ylabel': "Current (A)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Overrides superclass.
"""
self.subplots = {'current_time': self.figure.add_subplot(111)}
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([],[])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
class Chronoamp(Experiment):
id = 'cae'
"""Chronoamperometry experiment"""
def setup(self):
self.datatype = "linearData"
self.datalength = 2
self.databytes = 8
self.data = {'current_time' : [([],[])]}
self.columns = ['Time (s)', 'Current (A)']
self.total_time = sum(self.parameters['time'])
self.plotlims = {'current_time': {'xlims': (0, self.total_time)}
}
self.commands.append(
("ER" + str(len(self.parameters['potential'])) + " 0 ", [])
)
for i in self.parameters['potential']:
self.commands[-1][1].append(str(int(i*(65536./3000)+32768)))
for i in self.parameters['time']:
self.commands[-1][1].append(str(i))
plot = ChronoampBox(['current_time'])
plot.setlims('current_time', **self.plotlims['current_time'])
self.plots.append(plot)
def data_handler(self, data_input):
"""Overrides Experiment method to not convert x axis to mV."""
scan, data = data_input
# 2*uint16 + int32
seconds, milliseconds, current = struct.unpack('<HHl', data)
return (scan, (
seconds+milliseconds/1000.,
(current+self.gain_trim)*(1.5/self.gain/8388607)
)
)
def store_data(self, incoming, newline):
"""Stores data in data attribute. Should not be called from subprocess.
Can be overriden for custom experiments."""
line, data = incoming
if newline is True:
self.data['current_time'].append(deepcopy(self.line_data))
for i, item in enumerate(self.data['current_time'][line]):
item.append(data[i])
from datetime import datetime
from typing import Any
from ..dstat.experiment_handler import ExperimentHandler
from ..dstat.experiment_process import ExperimentProcess
from ..dstat.utility import abs_mv_to_dac, rel_mv_to_dac, param_test_uint16, param_test_non_zero_uint8
from ..experiments.experiment_container import ExperimentContainer
class CAExperimentHandler(ExperimentHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.data['time'] = []
def data_handler(self, data_input: tuple[datetime, int, bytes]):
unpacked: tuple[int, int, int]
date, scan, data = data_input
unpacked = self.struct.unpack(data)
self.data['scan'].append(scan)
self.data['timestamp'].append(date)
for n, i in enumerate(unpacked):
try:
self.data[self.data_cols[n]].append(self.data_convert[n](i))
except TypeError: # If no converter
self.data[self.data_cols[n]].append(i)
self.data['time'].append(self.data['time_s'][-1] + self.data['time_ms'][-1])
class CAExperimentContainer(ExperimentContainer):
experiment_id = 'ca'
display_name = 'Chronoamperometry'
process = ExperimentProcess
handler = CAExperimentHandler
data_bytes = 0
def __init__(self, params: dict[str, Any], mux: int = 1):
super().__init__(params, mux=mux)
self.data_cols = ['time_s', 'time_ms', 'current']
self.data_format = 'smA'
self.cmd_str += [('ER{n_steps} 0 ', ('voltages', 'times'))]
self.param_input |= {'times': list, 'voltages': list}
self.param_input_display_names |= {'times': 'Time (s)', 'voltages': 'Voltage (mV)'}
self.param_tables = {'Steps:': ['times', 'voltages']}
self.param_input_limits |= {'times': (0, 'time_max'), 'voltages': ('mv_min', 'mv_max'), }
self.param_converters |= {'times': lambda x: [param_test_uint16(i) for i in x],
'voltages': lambda x: [abs_mv_to_dac(i) for i in x], }
self.defaults |= {'times': 0, 'voltages': 0}
if params:
self.progress_end = sum(self.params['times'])
self.params['n_steps'] = len(self.params['times'])
else:
self.progress_max = 0
self.progress_iters = 1
self.data_bytes = self.calculate_data_bytes()
def get_progress(self):
def get_progress(self) -> float:
if self.handler_instance.done:
return 100
try:
return self.data['current_time'][-1][0][-1]/self.total_time
return (1 - abs(
self.progress_end - self.handler_instance.data['time_s'][-1]) / self.progress_end) * 100
except IndexError:
return 0
class PDExp(Chronoamp):
"""Photodiode/PMT experiment"""
id = 'pde'
def setup(self):
self.plots.append(ChronoampBox('current_time'))
self.datatype = "linearData"
self.datalength = 2
self.databytes = 8
self.data = {'current_time' : [([],[])]}
self.columns = ['Time (s)', 'Current (A)']
self.plot_format = {
'current_time' : {
'labels' : self.columns,
'xlims' : (0, int(self.parameters['time']))
}
}
self.total_time = int(self.parameters['time'])
if self.parameters['shutter_true']:
if self.parameters['sync_true']:
self.commands.append("EZ")
self.commands[-1] += str(self.parameters['sync_freq'])
self.commands[-1] += " "
else:
self.commands.append(("E2", []))
command = "ER1 "
params = []
if self.parameters['interlock_true']:
command += "1 "
else:
command += "0 "
if self.parameters['voltage'] == 0: # Special case where V=0
params.append("65535")
else:
params.append(str(int(
65535-(self.parameters['voltage']*(65536./3000))))
)
params.append(str(self.parameters['time']))
self.commands.append((command, params))
if self.parameters['shutter_true']:
if self.parameters['sync_true']:
self.commands.append("Ez")
else:
self.commands.append("E1")
class FT_Box(PlotBox):
def updateline(self, Experiment, line_number):
def search_value(data, target):
for i in range(len(data)):
if data[i] > target:
return i
y = Experiment.data['data'][line_number][1]
x = Experiment.data['data'][line_number][0]
freq = Experiment.parameters['adc_rate_hz']
i = search_value(x, float(Experiment.parameters['fft_start']))
y1 = y[i:]
x1 = x[i:]
avg = mean(y1)
min_index, max_index = findBounds(y1)
y1[min_index] = avg
y1[max_index] = avg
f, Y = plotSpectrum(y1[min_index:max_index],freq)
self.axe1.lines[line_number].set_ydata(Y)
self.axe1.lines[line_number].set_xdata(f)
Experiment.data['ft'] = [(f, Y)]
def changetype(self, Experiment):
"""Change plot type. Set axis labels and x bounds to those stored
in the Experiment instance. Stores class instance in Experiment.
"""
self.axe1.set_xlabel("Freq (Hz)")
self.axe1.set_ylabel("|Y| (A/Hz)")
self.axe1.set_xlim(0, Experiment.parameters['adc_rate_hz']/2)
Experiment.plots['ft'] = self
self.figure.canvas.draw()
\ No newline at end of file
import time
import struct
from .experiment_template import PlotBox, Experiment
class CVExp(Experiment):
id = 'cve'
"""Cyclic Voltammetry experiment"""
def setup(self):
self.plotlims['current_voltage']['xlims'] = tuple(
sorted((int(self.parameters['v1']), int(self.parameters['v2'])))
)
super(CVExp, self).setup()
self.datatype = "CVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
self.datalength = 2 * self.parameters['scans'] # x and y for each scan
self.databytes = 6 # uint16 + int32
self.commands += "E"
self.commands[2] += "C"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['v1'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['v2'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(self.parameters['scans'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['slope'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
def get_progress(self):
return (len(self.data['current_voltage'])-1) / float(self.parameters['scans'])
import logging
import struct
from abc import ABC, abstractmethod, ABCMeta
from typing import Type, Callable, Union, Any, List
from ..dstat.comm import SerialConnection
from ..dstat.experiment_handler import ExperimentHandler
from ..dstat.experiment_process import BaseExperimentProcess
from ..dstat.state import DStatState
from ..dstat.boards import V1_2Board
logger = logging.getLogger(__name__)
state = DStatState()
board = V1_2Board()
class ExperimentContainer(ABC, metaclass=ABCMeta):
dstat_params = {'buffer_false': bool, 'adc_rate': str,
'adc_pga': str, 'gain': int,
'short_true': bool, 'mux_channel': int}
@abstractmethod
def __init__(self, params: dict[str, Any], mux: int = 1):
self.mux = mux
self.data_cols: Union[None, list[str]] = None
self.data_format: str = ''
self.cmd_str: list[str] = ["EA{buffer} {adc_rate:x} {adc_pga:x} ",
"EG{gain} {short_true:d} "]
self.adc_rates = {label: code for label, code
in zip(board.adc_rate_labels, board.adc_rate_codes)}
self.adc_pga = {label: code for label, code
in zip(board.adc_pga_labels, board.adc_pga_codes)}
self.param_input: dict[str, Union[Callable, List]] = {'buffer_false': bool,
'adc_rate': list(self.adc_rates.keys()),
'adc_pga': list(self.adc_pga.keys()),
'gain': int,
'short_true': bool,
}
self.param_input_limits: dict[str, tuple[Union[str, int], Union[str, int]]] = {}
self.param_input_display_names: dict[str, str] = {}
self.param_tables: dict[str, list[str]] = {}
self.param_docs: dict[str, str] = {'buffer_false': 'Disable ADC buffer',
'adc_rate': 'ADC sample rate',
'adc_pga': 'ADC PGA value',
'gain': 'IV converter gain',
'short_true': 'Short RE and CE internally'}
self.param_converters: dict[str, Callable] = {}
self.defaults: dict = {'buffer_false': False, 'short_true': False, 'adc_rate': '60', 'adc_pga': '2',
'gain': 3000}
self.params = params
self.progress_max = 0
self.progress_iters = 0
self.progress_scan = 0
self.handler_instance: Union[None, ExperimentHandler] = None
self.commands: List[str] = []
self.calculate_data_bytes()
if mux > 1:
self.cmd_str += ['EX{mux_channel} ']
self.param_input['mux_channel'] = list(range(mux))
self.param_docs['mux_channel'] = 'Multiplexer Channel'
self.defaults['mux_channel'] = 0
@property
@abstractmethod
def experiment_id(self) -> Type[str]:
raise NotImplementedError
@property
@abstractmethod
def display_name(self) -> Type[str]:
raise NotImplementedError
@property
@abstractmethod
def data_bytes(self) -> Type[ExperimentHandler]:
raise NotImplementedError
@property
@abstractmethod
def handler(self) -> Type[ExperimentHandler]:
raise NotImplementedError
@property
@abstractmethod
def process(self) -> Type[BaseExperimentProcess]:
raise NotImplementedError
@property
def plots(self) -> dict[str, dict[str, str]]:
return {'current': {'x': 'voltage', 'y': 'current'}}
def calculate_data_bytes(self) -> int:
struct_formats = ['H' if i.islower() else 'l' for i in self.data_format]
return struct.Struct(f'<{"".join(struct_formats)}').size
def add_missing_from_defaults(self):
params = self.params
self.params = self.defaults.copy()
self.params |= params
self.commands = []
for i in self.cmd_str:
try:
cmd, params = i
param_list = []
for param in params:
param_list += self.params[param]
self.commands += [(cmd.format(**self.params), param_list)]
except ValueError:
self.commands += [i.format(**self.params)]
def parse_params(self):
self.params |= {key: self.param_converters[key](value) for key, value in self.params.items()
if key in self.param_converters}
self.params['adc_rate'] = self.adc_rates[self.params['adc_rate']]
self.params['adc_pga'] = self.adc_pga[self.params['adc_pga']]
self.params['gain'] = state.board_instance.gain.index(self.params['gain'])
if self.params['buffer_false']:
self.params['buffer'] = '0'
else:
self.params['buffer'] = '2'
logger.info(self.params)
def get_proc(self) -> BaseExperimentProcess:
proc = self.process()
self.parse_params()
for i in self.cmd_str:
proc.parse_command_string(i, self.params)
proc.data_bytes = self.data_bytes
return proc
def start_handler(self, ser: SerialConnection):
self.handler_instance = self.handler(ser=ser, data_cols=self.data_cols, data_format=self.data_format)
self.handler_instance.adc_gain = state.board_instance.gain[self.params['gain']]
self.handler_instance.adc_pga = state.board_instance.adc_pga[
state.board_instance.adc_pga_codes.index(self.params['adc_pga'])]
@abstractmethod
def get_progress(self) -> float:
pass
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, absolute_import, print_function, unicode_literals
from ..dstat import state
import logging
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, GObject
except ImportError:
print("ERR: GTK not available")
sys.exit(1)
logger = logging.getLogger(__name__)
class BaseLoop(GObject.GObject):
__gsignals__ = {
b'experiment_done': (GObject.SIGNAL_RUN_FIRST, None, tuple()),
b'progress_update': (GObject.SIGNAL_RUN_FIRST, None, (float,))
}
def __init__(self, experiment, callbacks=None):
GObject.GObject.__init__(self)
self.line = None
self.lastdataline = 0
self.current_exp = experiment
self.experiment_proc = None
for signal, cb in callbacks.items():
try:
self.connect(signal, cb)
except TypeError:
logger.warning("Invalid signal %s", signal)
def run(self):
self.experiment_proc = [
GObject.idle_add(self.experiment_running_data),
GObject.idle_add(self.experiment_running_proc),
GObject.timeout_add(100, self.update_progress)
]
def experiment_running_data(self):
"""Receive data from experiment process and add to
current_exp.data['data].
Run in GTK main loop.
Returns:
True -- when experiment is continuing to keep function in GTK's queue.
False -- when experiment process signals EOFError or IOError to remove
function from GTK's queue.
"""
try:
incoming = state.ser.get_data()
while incoming is not None:
try:
self.line = incoming[0]
if self.line > self.lastdataline:
newline = True
try:
logger.info("running scan_process()")
self.current_exp.scan_process(self.lastdataline)
except AttributeError:
pass
self.lastdataline = self.line
else:
newline = False
self.current_exp.store_data(incoming, newline)
except TypeError:
pass
incoming = state.ser.get_data()
return True
except EOFError as err:
logger.error(err)
self.experiment_done()
return False
except IOError as err:
logger.error(err)
self.experiment_done()
return False
def experiment_running_proc(self):
"""Receive proc signals from experiment process.
Run in GTK main loop.
Returns:
True -- when experiment is continuing to keep function in GTK's queue.
False -- when experiment process signals EOFError or IOError to remove
function from GTK's queue.
"""
try:
ctrl_buffer = state.ser.get_ctrl()
try:
if ctrl_buffer is not None:
self.current_exp.ctrl_loop(ctrl_buffer)
except AttributeError:
pass
proc_buffer = state.ser.get_proc()
if proc_buffer is not None:
if proc_buffer in ["DONE", "SERIAL_ERROR", "ABORT"]:
self.experiment_done()
if proc_buffer == "SERIAL_ERROR":
self.on_serial_disconnect_clicked()
else:
logger.warning("Unrecognized experiment return code: %s",
proc_buffer)
return False
return True
except EOFError as err:
logger.warning("EOFError: %s", err)
self.experiment_done()
return False
except IOError as err:
logger.warning("IOError: %s", err)
self.experiment_done()
return False
def experiment_done(self):
logger.info("Experiment done")
for proc in self.experiment_proc:
GObject.source_remove(proc)
self.current_exp.scan_process(self.lastdataline)
self.current_exp.experiment_done()
self.emit("experiment_done")
def update_progress(self):
try:
progress = self.current_exp.get_progress()
except AttributeError:
progress = -1
self.emit("progress_update", progress)
return True
class PlotLoop(BaseLoop):
def experiment_running_plot(self, force_refresh=False):
"""Plot all data in current_exp.data.
Run in GTK main loop. Always returns True so must be manually
removed from GTK's queue.
"""
if self.line is None:
return True
for plot in self.current_exp.plots:
if (plot.scan_refresh and self.line > self.lastdataline):
while self.line > self.lastline:
# make sure all of last line is added
plot.updateline(self.current_exp, self.lastdataline)
self.lastdataline += 1
plot.updateline(self.current_exp, self.line)
plot.redraw()
else:
while self.line > self.lastdataline:
# make sure all of last line is added
plot.updateline(self.current_exp, self.lastdataline)
self.lastdataline += 1
plot.updateline(self.current_exp, self.line)
if plot.continuous_refresh is True or force_refresh is True:
plot.redraw()
return True
def run(self):
super(PlotLoop, self).run()
self.experiment_proc.append(
GObject.timeout_add(200, self.experiment_running_plot)
)
def experiment_done(self):
logger.info("Experiment done")
for proc in self.experiment_proc:
GObject.source_remove(proc)
self.current_exp.scan_process(self.lastdataline)
self.current_exp.experiment_done()
self.experiment_running_plot(force_refresh=True)
self.emit("experiment_done")
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2017 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import struct
import time
from collections import OrderedDict
from copy import deepcopy
from datetime import datetime
from math import ceil
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, GObject
except ImportError:
print "ERR: GTK not available"
sys.exit(1)
from matplotlib.figure import Figure
import matplotlib.gridspec as gridspec
from matplotlib.backends.backend_gtk3agg \
import FigureCanvasGTK3Agg as FigureCanvas
from pandas import DataFrame
import seaborn as sns
sns.set(context='paper', style='darkgrid')
import serial
from ..dstat import state, comm
from ..dstat.comm import TransmitError
from . import experiment_loops
logger = logging.getLogger(__name__)
dstat_logger = logging.getLogger("{}.DSTAT".format(comm.__name__))
exp_logger = logging.getLogger("{}.Experiment".format(__name__))
class Experiment(GObject.Object):
"""Store and acquire a potentiostat experiment. Meant to be subclassed
to by different experiment types and not used instanced directly. Subclass
must instantiate self.plotbox as the PlotBox class to use and define id as
a class attribute.
"""
id = None
Loops = experiment_loops.PlotLoop
__gsignals__ = {
b'exp_ready': (GObject.SIGNAL_RUN_FIRST, None, ()),
b'exp_done': (GObject.SIGNAL_RUN_FIRST, None, ()),
}
def __init__(self, parameters):
"""Adds commands for gain and ADC."""
super(Experiment, self).__init__()
self.current_command = None
self.parameters = parameters
self.databytes = 8
self.datapoint = 0
self.scan = 0
self.time = 0
self.plots = []
self.re_voltage_scale = state.board_instance.re_voltage_scale
self.gain = state.board_instance.gain[int(self.parameters['gain'])]
try:
self.gain_trim = int(
state.settings[
state.board_instance.gain_trim[int(self.parameters['gain'])]
][1]
)
except AttributeError:
logger.debug("No gain trim table.")
self.commands = ["EA", "EG"]
if self.parameters['buffer_true']:
self.commands[0] += "2"
else:
self.commands[0] += "0"
self.commands[0] += " {p[adc_rate]} {p[adc_pga]} ".format(
p=self.parameters)
self.commands[1] += "{p[gain]} {p[short_true]:d} ".format(
p=self.parameters)
self.plotlims = {'current_voltage' : {'xlims' : (0, 1)}
}
self.setup()
self.time = [datetime.utcnow()]
def setup_loops(self, callbacks):
self.loops = self.__class__.Loops(self, callbacks)
self.loops.run()
def setup(self):
self.data = OrderedDict(current_voltage=[([], [])])
self.columns = ['Voltage (mV)', 'Current (A)']
# list of scans, tuple of dimensions, list of data
self.line_data = ([], [])
plot = PlotBox(['current_voltage'])
plot.setlims('current_voltage', **self.plotlims['current_voltage'])
self.plots.append(plot)
def write_command(self, cmd, params=None, retry=5):
"""Write command to serial with optional number of retries."""
def get_reply(retries=3):
while True:
reply = self.serial.readline().rstrip()
if reply.startswith('#'):
dstat_logger.info(reply)
elif reply == "":
retries -= 1
if retries <= 0:
raise TransmitError
else:
return reply
n = len(cmd)
if params is not None:
n_params = len(params)
for _ in range(retry):
tries = 5
while True:
time.sleep(0.2)
self.serial.reset_input_buffer()
self.serial.write('!{}\n'.format(n))
time.sleep(.1)
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@ACK {}".format(n):
logger.warning("Expected ACK got: {}".format(reply))
continue
tries = 5
while True:
self.serial.write('{}\n'.format(cmd))
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@RCV {}".format(n):
logger.warning("Expected RCV got: {}".format(reply))
continue
if params is None:
return True
tries = 5
while True:
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
break
tries -= 1
pass
else:
break
if reply != "@RQP {}".format(n_params):
logger.warning("Expected RQP got: {}".format(reply))
continue
tries = 5
for i in params:
while True:
self.serial.write(i + " ")
try:
reply = get_reply()
if reply == "@RCVC {}".format(i):
break
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
return True
return False
def run(self, ser, ctrl_pipe, data_pipe):
"""Execute experiment. Connects and sends handshake signal to DStat
then sends self.commands.
"""
self.serial = ser
self.ctrl_pipe = ctrl_pipe
self.data_pipe = data_pipe
exp_logger.info("Experiment running")
try:
for i in self.commands:
self.current_command = i
status = "DONE"
if isinstance(i, (str, unicode)):
logger.info("Command: %s", i)
if not self.write_command(i):
status = "ABORT"
break
else:
cmd = i[0]
data = i[1]
logger.info("Command: {}".format(cmd))
if not self.write_command(cmd, params=data):
status = "ABORT"
break
if not self.serial_handler():
status = "ABORT"
break
time.sleep(0.5)
except serial.SerialException:
status = "SERIAL_ERROR"
finally:
while self.ctrl_pipe.poll():
self.ctrl_pipe.recv()
return status
def serial_handler(self):
"""Handles incoming serial transmissions from DStat. Returns False
if stop button pressed and sends abort signal to instrument. Sends
data to self.data_pipe as result of self.data_handler).
"""
scan = 0
def check_ctrl():
if self.ctrl_pipe.poll():
input = self.ctrl_pipe.recv()
logger.info("serial_handler: %s", input)
if input == "DISCONNECT":
self.serial.write('a')
self.serial.reset_input_buffer()
logger.info("serial_handler: ABORT pressed!")
time.sleep(.3)
return False
elif input == 'a':
self.serial.write('a')
else:
self.serial.write(input)
try:
while True:
check_ctrl()
for line in self.serial:
check_ctrl()
if line.startswith('B'):
data = self.data_handler(
(scan, self.serial.read(size=self.databytes)))
data = self.data_postprocessing(data)
if data is not None:
self.data_pipe.send(data)
try:
self.datapoint += 1
except AttributeError: #Datapoint counting is optional
pass
elif line.lstrip().startswith('S'):
scan += 1
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
time.sleep(.3)
return True
except serial.SerialException:
return False
def data_handler(self, data_input):
"""Takes data_input as tuple -- (scan, data).
Returns:
(scan number, (voltage, current)) -- voltage in mV, current in A
"""
scan, data = data_input
voltage, current = struct.unpack('<Hl', data) #uint16 + int32
return (scan, (
(voltage-32768)*(3000./65536)*self.re_voltage_scale,
(current+self.gain_trim)*(1.5/8388607)/self.gain
)
)
def store_data(self, incoming, newline):
"""Stores data in data attribute. Should not be called from subprocess.
Can be overriden for custom experiments."""
line, data = incoming
if newline is True:
self.data['current_voltage'].append(deepcopy(self.line_data))
for i, item in enumerate(self.data['current_voltage'][line]):
item.append(data[i])
def data_postprocessing(self, data):
"""Discards first data point (usually gitched) by default, can be overridden
in subclass.
"""
try:
if self.datapoint <= 1:
return None
except AttributeError: # Datapoint counting is optional
pass
return data
def scan_process(self, line):
pass
def experiment_done(self):
"""Runs when experiment is finished (all data acquired)"""
self.data_to_pandas()
self.time += [datetime.utcnow()]
def export(self):
"""Return a dict containing data for saving."""
output = {
"datatype" : self.datatype,
"xlabel" : self.xlabel,
"ylabel" : self.ylabel,
"xmin" : self.xmin,
"xmax" : self.xmax,
"parameters" : self.parameters,
"data" : self.data,
"commands" : self.commands
}
return output
def data_to_pandas(self):
"""Convert data to pandas DataFrame and set as member of .df
attribute."""
self.df = OrderedDict()
for name, data in self.data.items():
try:
df = DataFrame(
columns=['Scan'] + list(self.plot_format[name]['labels']))
for n, line in enumerate(data): # Each scan
df = df.append(
DataFrame(
OrderedDict(zip(
['Scan'] + list(self.plot_format[name]['labels']),
[n] + list(line))
)
), ignore_index = True
)
except (AttributeError, KeyError):
try:
df = DataFrame(
columns=['Scan'] + list(self.columns))
for n, line in enumerate(data): # Each scan
df = df.append(
DataFrame(
OrderedDict(zip(
['Scan'] + list(self.columns),
[n] + list(line))
)
), ignore_index = True
)
except AttributeError as e: # Fallback if no self.columns
df = DataFrame(
columns=['Scan'] + ["{}{}".format(name, n)
for n in range(len(data))]
)
for n, line in enumerate(data):
df = df.append(
DataFrame(
OrderedDict(zip(
['Scan'] + ["{}{}".format(name, n)
for n in range(len(data))],
[n] + list(line))
)
), ignore_index = True
)
self.df[name] = df
def get_info_text(self):
"""Return string of text to disply on Info tab."""
buf = "#Time: S{} E{}\n".format(self.time[0], self.time[1])
buf += "#Commands:\n"
for line in self.commands:
buf += '#{}\n'.format(line)
return buf
def get_save_strings(self):
"""Return dict of strings with experiment parameters and data."""
buf = {}
buf['params'] = self.get_info_text()
buf.update(
{exp : df.to_csv(sep='\t', encoding='utf-8')
for exp, df in self.df.items()}
)
return buf
class PlotBox(object):
"""Contains data plot and associated methods."""
def __init__(self, plots=None):
"""Initializes plots. self.box should be reparented."""
self.name = "Main"
self.continuous_refresh = True
self.scan_refresh = False
if plots is not None:
self.plotnames = plots
else:
self.plotnames = []
self.subplots = {}
self.figure = Figure()
# self.figure.subplots_adjust(left=0.07, bottom=0.07,
# right=0.96, top=0.96)
self.setup()
self.format_plots() # Should be overriden by subclass
self.figure.set_tight_layout(True)
self.canvas = FigureCanvas(self.figure)
self.canvas.set_vexpand(True)
self.box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.box.pack_start(self.canvas, expand=True, fill=True, padding=0)
def setup(self):
self.plot_format = {
'current_voltage': {'xlabel': "Voltage (mV)",
'ylabel': "Current (A)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Should be overriden by subclass
"""
# Calculate size of grid needed
if len(self.plotnames) > 1:
gs = gridspec.GridSpec(int(ceil(len(self.plotnames)/2.)),2)
else:
gs = gridspec.GridSpec(1,1)
for n, i in enumerate(self.plotnames):
self.subplots[i] = self.figure.add_subplot(gs[n])
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([], [])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
def clearall(self):
"""Remove all lines on plot. """
for name, plot in self.subplots.items():
for line in reversed(plot.lines):
line.remove()
self.addline()
def clearline(self, subplot, line_number):
"""Remove a single line.
Arguments:
subplot -- key in self.subplots
line_number -- line number in subplot
"""
self.subplots[subplot].lines[line_number].remove()
def addline(self):
"""Add a new line to plot. (initialized with dummy data)))"""
for subplot in self.subplots.values():
subplot.plot([], [])
def updateline(self, Experiment, line_number):
"""Update a line specified with new data.
Arguments:
Experiment -- Experiment instance
line_number -- line number to update
"""
for subplot in Experiment.data:
while True:
try:
self.subplots[subplot].lines[line_number].set_xdata(
Experiment.data[subplot][line_number][0])
self.subplots[subplot].lines[line_number].set_ydata(
Experiment.data[subplot][line_number][1])
except IndexError:
self.addline()
except KeyError:
pass
else:
break
# logger.warning("Tried to set line %s that doesn't exist.", line_number)
def setlims(self, plot, xlims=None, ylims=None):
"""Sets x and y limits.
"""
if xlims is not None:
self.subplots[plot].set_xlim(xlims)
if ylims is not None:
self.subplots[plot].set_ylim(ylims)
self.figure.canvas.draw()
def redraw(self):
"""Autoscale and refresh the plot."""
for name, plot in self.subplots.items():
plot.relim()
plot.autoscale(True, axis = 'y')
self.figure.canvas.draw()
return True
\ No newline at end of file
import time
import struct
from .experiment_template import Experiment
from ..dstat import state
class OCPExp(Experiment):
"""Open circuit potential measumement in statusbar."""
id = 'ocp'
def __init__(self):
self.re_voltage_scale = state.board_instance.re_voltage_scale
self.databytes = 8
self.commands = ["EA", "EP0 0 "]
self.commands[0] += "2 " # input buffer
self.commands[0] += "3 " # 2.5 Hz sample rate
self.commands[0] += "1 " # 2x PGA
def data_handler(self, data_input):
"""Overrides Experiment method to only send ADC values."""
scan, data = data_input
# 2*uint16 + int32
seconds, milliseconds, voltage = struct.unpack('<HHl', data)
return voltage/5.592405e6*self.re_voltage_scale
class PMTIdle(Experiment):
"""PMT idle mode."""
id = "pmt_idle"
def __init__(self):
self.databytes = 8
self.commands = ["EA", "EM"]
self.commands[0] += "2 " # input buffer
self.commands[0] += "3 " # 2.5 Hz sample rate
self.commands[0] += "1 " # 2x PGA
import time
import struct
from .experiment_template import PlotBox, Experiment
class LSVExp(Experiment):
"""Linear Scan Voltammetry experiment"""
id = 'lsv'
def setup(self):
self.plotlims['current_voltage']['xlims'] = tuple(
sorted(
(int(self.parameters['start']),
int(self.parameters['stop']))
)
)
super(LSVExp, self).setup()
self.datatype = "linearData"
self.datalength = 2
self.databytes = 6 # uint16 + int32
self.stop_mv = int(self.parameters['stop'])
self.max_mv = abs(int(self.parameters['start'])-int(self.parameters['stop']))
self.commands += "E"
self.commands[2] += "L"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['stop'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['slope'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
from typing import Any
from ..dstat.experiment_handler import ExperimentHandler
from ..dstat.experiment_process import ExperimentProcess
from ..dstat.utility import abs_mv_to_dac, rel_mv_to_dac, param_test_uint16, param_test_non_zero_uint8
from ..experiments.experiment_container import ExperimentContainer
class LSVExperimentContainer(ExperimentContainer):
experiment_id = 'lsv'
display_name = 'Linear Scan Voltammetry'
process = ExperimentProcess
handler = ExperimentHandler
data_bytes = 0
def __init__(self, params: dict[str, Any], mux: int = 1):
super().__init__(params, mux=mux)
self.data_cols = ['voltage', 'current']
self.data_format = 'vA'
self.cmd_str += ['EL{clean_s} {dep_s} {clean_mv} {dep_mv} {start} {stop} {slope} ']
self.param_input |= {'clean_s': int, 'dep_s': int, 'clean_mv': int, 'dep_mv': int,
'start': int, 'stop': int, 'slope': int}
self.param_input_display_names |= {'clean_s': 't_clean (s)', 'dep_s': 't_dep (s)',
'clean_mv': 'V_clean (mV)', 'dep_mv': 'V_dep (mV)',
'start': 'V_start (mV)', 'stop': 'V_stop (mV)', 'slope': 'Slope (mV/s)', }
self.param_input_limits |= {'clean_s': (0, 'time_max'), 'dep_s': (0, 'time_max'),
'clean_mv': ('mv_min', 'mv_max'), 'dep_mv': ('mv_min', 'mv_max'),
'start': ('mv_min', 'mv_max'), 'stop': ('mv_min', 'mv_max'),
'slope': (1, 5000)}
self.param_converters |= {'clean_s': param_test_uint16, 'dep_s': param_test_uint16,
'clean_mv': abs_mv_to_dac, 'dep_mv': abs_mv_to_dac,
'start': abs_mv_to_dac, 'stop': abs_mv_to_dac, 'slope': rel_mv_to_dac}
self.defaults |= {'clean_s': 0, 'dep_s': 0, 'clean_mv': 0, 'dep_mv': 0}
if params:
self.progress_max = abs(self.params['stop'] - self.params['start'])
self.progress_end = self.params['stop']
else:
self.progress_max = 0
self.progress_iters = 1
self.data_bytes = self.calculate_data_bytes()
def get_progress(self) -> float:
if self.handler_instance.done:
return 100
try:
return (1 - abs(self.progress_end - self.handler_instance.data['voltage'][-1]) / self.progress_max) * 100
except IndexError:
return 0
class CVExperimentContainer(ExperimentContainer):
experiment_id = 'cv'
display_name = 'Cyclic Voltammetry'
process = ExperimentProcess
handler = ExperimentHandler
plots = {'current': {'x': 'voltage', 'y': 'current', 'hue': 'scan'}}
data_bytes = 0
def __init__(self, params: dict[str, Any], mux: int = 1):
super().__init__(params, mux=mux)
self.data_cols = ['voltage', 'current']
self.data_format = 'vA'
self.cmd_str += ['EC{clean_s} {dep_s} {clean_mv} {dep_mv} {v1} {v2} {start} {scans} {slope} ']
self.param_input |= {'clean_s': int, 'dep_s': int, 'clean_mv': int, 'dep_mv': int, 'v1': int, 'v2': int,
'start': int, 'scans': int, 'slope': int}
self.param_input_display_names |= {'clean_s': 't_clean (s)', 'dep_s': 't_dep (s)',
'clean_mv': 'V_clean (mV)', 'dep_mv': 'V_dep (mV)',
'start': 'V_start (mV)', 'v1': 'V_1 (mV)', 'v2': 'V_2 (mV)',
'scans': 'Scans', 'slope': 'Slope (mV/s)'}
self.param_input_limits |= {'clean_s': (0, 'time_max'), 'dep_s': (0, 'time_max'),
'clean_mv': ('mv_min', 'mv_max'), 'dep_mv': ('mv_min', 'mv_max'),
'start': ('mv_min', 'mv_max'), 'v1': ('mv_min', 'mv_max'),
'v2': ('mv_min', 'mv_max'),
'scans': (1, 'scans_max'), 'slope': (1, 5000)}
self.param_converters |= {'clean_s': param_test_uint16, 'dep_s': param_test_uint16,
'clean_mv': abs_mv_to_dac, 'dep_mv': abs_mv_to_dac,
'v1': abs_mv_to_dac, 'v2': abs_mv_to_dac, 'start': abs_mv_to_dac,
'scans': param_test_non_zero_uint8,
'slope': rel_mv_to_dac}
self.defaults |= {'clean_s': 0, 'dep_s': 0, 'clean_mv': 0, 'dep_mv': 0}
if params:
self.progress_max = 2 * abs(self.params['v1'] - self.params['v2'])
self.progress_start = self.params['start']
self.progress_v1 = self.params['v1']
self.progress_v2 = self.params['v2']
self.progress_iters = self.params['scans']
self.progress_scan = 0
self.progress_lastmv = self.params['start']
else:
self.progress_max = 0
self.data_bytes = self.calculate_data_bytes()
def get_progress(self):
try:
return 1 - (abs(self.stop_mv - self.data['current_voltage'][-1][0][-1])/self.max_mv)
if self.handler_instance.done:
return 100
if self.handler_instance.data['scan'][-1] > self.progress_scan:
self.progress_scan = self.handler_instance.data['scan'][-1]
self.progress_lastmv = self.progress_start
raise StopIteration
current_mv = self.handler_instance.data['voltage'][-1]
# if moving towards v1
if abs(self.progress_v1 - self.progress_lastmv) > abs(self.progress_v1 - current_mv):
# Between v2 and start v1-mv and start-mv have same sign
if (self.progress_v1 - current_mv > 0) == (self.progress_start - current_mv > 0):
progress = 100 * (1 - abs(self.progress_start - current_mv) / self.progress_max)
else:
progress = 100 * (.25 - abs(self.progress_v1 - current_mv) / self.progress_max)
else:
progress = 100 * (.75 - abs(self.progress_v2 - current_mv) / self.progress_max)
self.progress_lastmv = current_mv
return progress
except IndexError:
return 0
\ No newline at end of file
return 0
import time
import struct
from .experiment_template import PlotBox, Experiment
class PotBox(PlotBox):
def setup(self):
self.plot_format = {
'voltage_time': {'xlabel': "Time (s)",
'ylabel': "Voltage (V)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Overrides superclass.
"""
self.subplots = {'voltage_time': self.figure.add_subplot(111)}
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([],[])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
class PotExp(Experiment):
id = 'pot'
"""Potentiometry experiment"""
def setup(self):
self.plots.append(PotBox(['voltage_time']))
self.datatype = "linearData"
self.datalength = 2
self.databytes = 8
self.data = {'voltage_time' : [([],[])]}
self.columns = ['Time (s)', 'Voltage (V)']
self.plotlims = {
'voltage_time': {
'xlims': (0, int(self.parameters['time']))
}
}
self.plots[-1].setlims('voltage_time', **self.plotlims['voltage_time'])
self.total_time = int(self.parameters['time'])
self.commands += "E"
self.commands[2] += "P"
self.commands[2] += str(self.parameters['time'])
self.commands[2] += " 1 " # potentiometry mode
def data_handler(self, data_input):
"""Overrides Experiment method to not convert x axis to mV."""
scan, data = data_input
# 2*uint16 + int32
seconds, milliseconds, voltage = struct.unpack('<HHl', data)
return (scan, (
seconds+milliseconds/1000.,
voltage*self.re_voltage_scale*(1.5/8388607.)
)
)
def store_data(self, incoming, newline):
"""Stores data in data attribute. Should not be called from subprocess.
Can be overriden for custom experiments."""
line, data = incoming
if newline is True:
self.data['voltage_time'].append(deepcopy(self.line_data))
for i, item in enumerate(self.data['voltage_time'][line]):
item.append(data[i])
def get_progress(self):
try:
return self.data['voltage_time'][-1][0][-1]/self.total_time
except IndexError:
return 0
\ No newline at end of file
import time
import struct
from copy import deepcopy
from .experiment_template import PlotBox, Experiment
class SWVBox(PlotBox):
def setup(self):
self.plot_format = {
'swv': {'xlabel': "Voltage (mV)",
'ylabel': "Current (A)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Overrides superclass.
"""
self.subplots = {'swv': self.figure.add_subplot(111)}
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([],[])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
class SWVExp(Experiment):
"""Square Wave Voltammetry experiment"""
id = 'swv'
def setup(self):
self.datatype = "SWVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
self.data = {
'swv' : [([], [], [], [])]
} # voltage, current, forwards, reverse
self.line_data = ([], [], [], [])
self.datalength = 2 * self.parameters['scans']
self.databytes = 10
self.columns = ['Voltage (mV)', 'Net Current (A)',
'Forward Current (A)', 'Reverse Current (A)']
self.plotlims = {
'swv': {
'xlims': tuple(sorted(
(int(self.parameters['start']),
int(self.parameters['stop']))
)
)
}
}
plot = SWVBox()
plot.setlims('swv', **self.plotlims['swv'])
self.plots.append(plot)
self.stop_mv = int(self.parameters['stop'])
self.max_mv = abs(int(self.parameters['start']) - int(self.parameters['stop']))
self.scan_points = self.max_mv * 2 / float(self.parameters['step'])
self.commands += "E"
self.commands[2] += "S"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['stop'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['step'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['pulse'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(self.parameters['freq'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['scans'])
self.commands[2] += " "
def data_handler(self, input_data):
"""Overrides Experiment method to calculate difference current"""
scan, data = input_data
# uint16 + int32
voltage, forward, reverse = struct.unpack('<Hll', data)
f_trim = forward+self.gain_trim
r_trim = reverse+self.gain_trim
return (scan, (
(voltage-32768)*3000./65536*self.re_voltage_scale,
(f_trim-r_trim)*(1.5/self.gain/8388607),
f_trim*(1.5/self.gain/8388607),
r_trim*(1.5/self.gain/8388607)
)
)
def store_data(self, incoming, newline):
"""Stores data in data attribute. Should not be called from subprocess.
Can be overriden for custom experiments."""
line, data = incoming
if newline is True:
self.data['swv'].append(deepcopy(self.line_data))
for i, item in enumerate(self.data['swv'][line]):
item.append(data[i])
from datetime import datetime
from typing import Any, Tuple
from ..dstat.experiment_handler import ExperimentHandler
from ..dstat.experiment_process import ExperimentProcess
from ..dstat.utility import abs_mv_to_dac, rel_mv_to_dac, param_test_non_zero_uint16, \
param_test_uint16, param_test_uint8
from ..experiments.experiment_container import ExperimentContainer
class SWVExperimentHandler(ExperimentHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.data['current'] = []
def data_handler(self, data_input: Tuple[datetime, int, bytes]):
unpacked: Tuple[int, int, int]
date, scan, data = data_input
unpacked = self.struct.unpack(data)
self.data['scan'].append(scan)
self.data['timestamp'].append(date)
for n, i in enumerate(unpacked):
try:
self.data[self.data_cols[n]].append(self.data_convert[n](i))
except TypeError: # If no converter
self.data[self.data_cols[n]].append(i)
self.data['current'].append(self.adc_to_amps(unpacked[1] - unpacked[2])) # forward - reverse
class SWVExperimentContainer(ExperimentContainer):
experiment_id = 'swv'
display_name = 'Square Wave Voltammetry'
process = ExperimentProcess
handler = SWVExperimentHandler
plots = {'current': {'x': 'voltage', 'y': 'current', 'hue': 'scan'}}
data_bytes = 0
def __init__(self, params: dict[str, Any], mux: int = 1):
super().__init__(params, mux=mux)
self.data_cols = ['voltage', 'forward', 'reverse']
self.data_format = 'vAA'
self.cmd_str += ['ES{clean_s} {dep_s} {clean_mv} {dep_mv} {start} {stop} {step} {pulse} {freq} {scans} ']
self.param_input |= {'clean_s': int, 'dep_s': int, 'clean_mv': int, 'dep_mv': int,
'start': int, 'stop': int, 'step': int, 'pulse': int, 'freq': int,
'scans': int}
self.param_input_display_names |= {'clean_s': 't_clean (s)', 'dep_s': 't_dep (s)',
'clean_mv': 'V_clean (mV)', 'dep_mv': 'V_dep (mV)',
'start': 'V_start (mV)', 'stop': 'V_stop (mV)', 'step': 'V_step (mV)',
'pulse': 'V_pulse (mV)', 'freq': 'f (Hz)',
'scans': 'Scans'}
self.param_input_limits |= {'clean_s': (0, 'time_max'), 'dep_s': (0, 'time_max'),
'clean_mv': ('mv_min', 'mv_max'), 'dep_mv': ('mv_min', 'mv_max'),
'start': ('mv_min', 'mv_max'), 'stop': ('mv_min', 'mv_max'),
'step': (1, 1000), 'pulse': (1, 1000), 'freq': (1, 'freq_max'),
'scans': (0, 'scans_max')}
self.param_converters |= {'clean_s': param_test_uint16, 'dep_s': param_test_uint16,
'clean_mv': abs_mv_to_dac, 'dep_mv': abs_mv_to_dac,
'start': abs_mv_to_dac, 'stop': abs_mv_to_dac, 'step': rel_mv_to_dac,
'pulse': rel_mv_to_dac, 'freq': param_test_non_zero_uint16,
'scans': param_test_uint8}
self.defaults |= {'clean_s': 0, 'dep_s': 0, 'clean_mv': 0, 'dep_mv': 0}
if params:
if self.params['scans'] > 0:
self.progress_max = 2 * abs(self.params['stop'] - self.params['start'])
else:
self.progress_max = abs(self.params['stop'] - self.params['start'])
self.progress_start = self.params['start']
self.progress_stop = self.params['stop']
self.progress_iters = self.params['scans']
self.progress_scan = 0
self.progress_lastmv = self.params['start']
else:
self.progress_max = 0
self.data_bytes = self.calculate_data_bytes()
def get_progress(self):
if self.handler_instance.done:
return 100
try:
if int(self.parameters['scans']) != 0:
scans_prog = (len(self.data['swv'])-1) / float(self.parameters['scans'])
scan_prog = (len(self.data['swv'][-1][0])-1) / self.scan_points / float(self.parameters['scans'])
prog = scans_prog + scan_prog
if prog > 1:
prog = 1
return prog
if self.params['scans'] > 0:
if self.handler_instance.data['scan'][-1] > self.progress_scan:
self.progress_scan = self.handler_instance.data['scan'][-1]
self.progress_lastmv = self.progress_start
raise StopIteration
current_mv = self.handler_instance.data['voltage'][-1]
if abs(self.progress_stop - self.progress_lastmv) > abs(self.progress_stop - current_mv):
progress = 100 * (.5 - abs(self.progress_stop - current_mv) / self.progress_max)
else:
progress = 100 * (1 - abs(self.progress_start - current_mv) / self.progress_max)
self.progress_lastmv = current_mv
return progress
else:
return 1 - (abs(self.stop_mv - self.data['swv'][-1][0][-1])/self.max_mv)
return (1 - abs(
self.progress_stop - self.handler_instance.data['voltage'][-1]) / self.progress_max) * 100
except IndexError:
return 0
class DPVExp(SWVExp):
"""Diffential Pulse Voltammetry experiment."""
id = 'dpv'
def setup(self):
self.datatype = "SWVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
self.data = {
'swv' : [([], [], [], [])]
} # voltage, current, forwards, reverse
self.line_data = ([], [], [], [])
self.datalength = 2
self.databytes = 10
self.columns = ['Voltage (mV)', 'Net Current (A)',
'Forward Current (A)', 'Reverse Current (A)']
self.plotlims = {
'swv': {
'xlims': tuple(sorted(
(int(self.parameters['start']),
int(self.parameters['stop']))
)
)
}
}
plot = SWVBox()
plot.setlims('swv', **self.plotlims['swv'])
self.plots.append(plot)
self.stop_mv = int(self.parameters['stop'])
self.max_mv = abs(int(self.parameters['start']) - int(self.parameters['stop']))
self.commands += "E"
self.commands[2] += "D"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['stop'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['step'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['pulse'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
self.commands[2] += str(self.parameters['period'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['width'])
self.commands[2] += " "
class DPVExperimentContainer(ExperimentContainer):
experiment_id = 'dpv'
display_name = 'Differential Pulse Voltammetry'
process = ExperimentProcess
handler = SWVExperimentHandler
data_bytes = 0
def __init__(self, params: dict[str, Any], mux: int = 1):
super().__init__(params, mux=mux)
self.data_cols = ['voltage', 'forward', 'reverse']
self.data_format = 'vAA'
self.cmd_str += ['ED{clean_s} {dep_s} {clean_mv} {dep_mv} {start} {stop} {step} {pulse} {period} {width} ']
self.param_input |= {'clean_s': int, 'dep_s': int, 'clean_mv': int, 'dep_mv': int,
'start': int, 'stop': int, 'step': int, 'pulse': int, 'period': int, 'width': int}
self.param_input_display_names |= {'clean_s': 't_clean (s)', 'dep_s': 't_dep (s)',
'clean_mv': 'V_clean (mV)', 'dep_mv': 'V_dep (mV)',
'start': 'V_start (mV)', 'stop': 'V_stop (mV)', 'step': 'V_step (mV)',
'pulse': 'V_pulse (mV)', 'period': 'Pulse Period (ms)',
'width': 'Pulse Width (ms)'}
self.param_input_limits |= {'clean_s': (0, 'time_max'), 'dep_s': (0, 'time_max'),
'clean_mv': ('mv_min', 'mv_max'), 'dep_mv': ('mv_min', 'mv_max'),
'start': ('mv_min', 'mv_max'), 'stop': ('mv_min', 'mv_max'),
'step': (1, 1000), 'pulse': (1, 1000), 'period': (1, 1000),
'width': (1, 1000)}
self.param_converters |= {'clean_s': param_test_uint16, 'dep_s': param_test_uint16,
'clean_mv': abs_mv_to_dac, 'dep_mv': abs_mv_to_dac,
'start': abs_mv_to_dac, 'stop': abs_mv_to_dac, 'step': rel_mv_to_dac,
'pulse': rel_mv_to_dac, 'period': param_test_uint16,
'width': param_test_non_zero_uint16}
self.defaults |= {'clean_s': 0, 'dep_s': 0, 'clean_mv': 0, 'dep_mv': 0}
if params:
self.progress_max = abs(self.params['stop'] - self.params['start'])
self.progress_start = self.params['start']
self.progress_stop = self.params['stop']
self.progress_scan = 0
self.progress_iters = 1
else:
self.progress_max = 0
self.data_bytes = self.calculate_data_bytes()
def get_progress(self):
if self.handler_instance.done:
return 100
try:
return 1 - (abs(self.stop_mv - self.data['swv'][-1][0][-1])/self.max_mv)
return (1 - abs(
self.progress_stop - self.handler_instance.data['voltage'][-1]) / self.progress_max) * 100
except IndexError:
return 0
\ No newline at end of file
return 0
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.