Skip to content
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class Error(Exception):
"""Copies Exception class"""
pass
class InputError(Error):
"""Exception raised for errors in the input. Extends Error class.
Attributes:
expr -- input expression in which the error occurred
msg -- error message
"""
def __init__(self, expr, msg):
self.expr = expr
self.msg = msg
class VarError(Error):
"""Exception raised for internal variable errors. Extends Error class.
Attributes:
var -- var in which the error occurred
msg -- error message
"""
def __init__(self, var, msg):
self.var = var
self.msg = msg
\ No newline at end of file
# __all__ = []
#
# import pkgutil
# import inspect
# from . import cal, chronoamp, cv
#
#
# for loader, name, is_pkg in pkgutil.walk_packages(__path__):
# print loader, name, is_pkg
# module = loader.find_module(name).load_module(name)
#
# for name, value in inspect.getmembers(module):
# if name.startswith('__'):
# continue
#
# globals()[name] = value
# __all__.append(name)
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2017 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import struct
import logging
logger = logging.getLogger(__name__)
import serial
from ..errors import InputError, VarError
from ..dstat import state
from ..experiments.experiment_template import Experiment, dstat_logger
def measure_offset(time):
gain_trim_table = [None, 'r100_trim', 'r3k_trim', 'r30k_trim', 'r300k_trim',
'r3M_trim', 'r30M_trim', 'r100M_trim']
parameters = {}
parameters['time'] = time
gain_offset = {}
for i in range(1,8):
parameters['gain'] = i
state.ser.start_exp(CALExp(parameters))
logger.info("measure_offset: %s", state.ser.get_proc(block=True))
gain_offset[gain_trim_table[i]] = state.ser.get_data(block=True)
return gain_offset
class CALExp(Experiment):
id = 'cal'
"""Offset calibration experiment"""
def __init__(self, parameters):
self.parameters = parameters
self.databytes = 8
self.scan = 0
self.data = []
self.commands = ["EA2 3 1 ", "EG"]
self.commands[1] += str(self.parameters['gain'])
self.commands[1] += " "
self.commands[1] += "0 "
self.commands.append(
("ER1 0", ["32768", str(self.parameters['time'])])
)
def serial_handler(self):
"""Handles incoming serial transmissions from DStat. Returns False
if stop button pressed and sends abort signal to instrument. Sends
data to self.data_pipe as result of self.data_handler).
"""
try:
while True:
if self.ctrl_pipe.poll():
input = self.ctrl_pipe.recv()
logger.debug("serial_handler: %s", input)
if input == ('a' or "DISCONNECT"):
self.serial.write('a')
logger.info("serial_handler: ABORT pressed!")
return False
for line in self.serial:
if self.ctrl_pipe.poll():
if self.ctrl_pipe.recv() == 'a':
self.serial.write('a')
logger.info("serial_handler: ABORT pressed!")
return False
if line.startswith('B'):
self.data.append(self.data_handler(
self.serial.read(size=self.databytes)))
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
self.serial.flushInput()
self.experiment_done()
return True
except serial.SerialException:
return False
def data_handler(self, data):
"""Takes data_input as tuple -- (scan, data).
Returns:
current
"""
seconds, milliseconds, current = struct.unpack('<HHl', data)
return current
def experiment_done(self):
"""Averages data points
"""
try:
sum = 0
self.data[0] = 0 # Skip first point
except IndexError:
return
for i in self.data:
sum += i
sum /= len(self.data)
if (sum > 32767):
sum = 32767
elif (sum < -32768):
sum = -32768
self.data_pipe.send(sum)
\ No newline at end of file
import time
import struct
import numpy as np
import serial
from ..interface.plot import mean, plotSpectrum, findBounds
from .experiment_template import PlotBox, Experiment, exp_logger
class ChronoampBox(PlotBox):
def setup(self):
self.plot_format = {
'current_time': {'xlabel': "Time (s)",
'ylabel': "Current (A)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Overrides superclass.
"""
self.subplots = {'current_time': self.figure.add_subplot(111)}
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([],[])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
class Chronoamp(Experiment):
id = 'cae'
"""Chronoamperometry experiment"""
def setup(self):
self.datatype = "linearData"
self.datalength = 2
self.databytes = 8
self.data = {'current_time' : [([],[])]}
self.columns = ['Time (s)', 'Current (A)']
self.total_time = sum(self.parameters['time'])
self.plotlims = {'current_time': {'xlims': (0, self.total_time)}
}
self.commands.append(
("ER" + str(len(self.parameters['potential'])) + " 0 ", [])
)
for i in self.parameters['potential']:
self.commands[-1][1].append(str(int(i*(65536./3000)+32768)))
for i in self.parameters['time']:
self.commands[-1][1].append(str(i))
plot = ChronoampBox(['current_time'])
plot.setlims('current_time', **self.plotlims['current_time'])
self.plots.append(plot)
def data_handler(self, data_input):
"""Overrides Experiment method to not convert x axis to mV."""
scan, data = data_input
# 2*uint16 + int32
seconds, milliseconds, current = struct.unpack('<HHl', data)
return (scan, (
seconds+milliseconds/1000.,
(current+self.gain_trim)*(1.5/self.gain/8388607)
)
)
def store_data(self, incoming, newline):
"""Stores data in data attribute. Should not be called from subprocess.
Can be overriden for custom experiments."""
line, data = incoming
if newline is True:
self.data['current_time'].append(deepcopy(self.line_data))
for i, item in enumerate(self.data['current_time'][line]):
item.append(data[i])
def get_progress(self):
try:
return self.data['current_time'][-1][0][-1]/self.total_time
except IndexError:
return 0
class PDExp(Chronoamp):
"""Photodiode/PMT experiment"""
id = 'pde'
def setup(self):
self.plots.append(ChronoampBox('current_time'))
self.datatype = "linearData"
self.datalength = 2
self.databytes = 8
self.data = {'current_time' : [([],[])]}
self.columns = ['Time (s)', 'Current (A)']
self.plot_format = {
'current_time' : {
'labels' : self.columns,
'xlims' : (0, int(self.parameters['time']))
}
}
self.total_time = int(self.parameters['time'])
if self.parameters['shutter_true']:
if self.parameters['sync_true']:
self.commands.append("EZ")
self.commands[-1] += str(self.parameters['sync_freq'])
self.commands[-1] += " "
else:
self.commands.append(("E2", []))
command = "ER1 "
params = []
if self.parameters['interlock_true']:
command += "1 "
else:
command += "0 "
if self.parameters['voltage'] == 0: # Special case where V=0
params.append("65535")
else:
params.append(str(int(
65535-(self.parameters['voltage']*(65536./3000))))
)
params.append(str(self.parameters['time']))
self.commands.append((command, params))
if self.parameters['shutter_true']:
if self.parameters['sync_true']:
self.commands.append("Ez")
else:
self.commands.append("E1")
class FT_Box(PlotBox):
def updateline(self, Experiment, line_number):
def search_value(data, target):
for i in range(len(data)):
if data[i] > target:
return i
y = Experiment.data['data'][line_number][1]
x = Experiment.data['data'][line_number][0]
freq = Experiment.parameters['adc_rate_hz']
i = search_value(x, float(Experiment.parameters['fft_start']))
y1 = y[i:]
x1 = x[i:]
avg = mean(y1)
min_index, max_index = findBounds(y1)
y1[min_index] = avg
y1[max_index] = avg
f, Y = plotSpectrum(y1[min_index:max_index],freq)
self.axe1.lines[line_number].set_ydata(Y)
self.axe1.lines[line_number].set_xdata(f)
Experiment.data['ft'] = [(f, Y)]
def changetype(self, Experiment):
"""Change plot type. Set axis labels and x bounds to those stored
in the Experiment instance. Stores class instance in Experiment.
"""
self.axe1.set_xlabel("Freq (Hz)")
self.axe1.set_ylabel("|Y| (A/Hz)")
self.axe1.set_xlim(0, Experiment.parameters['adc_rate_hz']/2)
Experiment.plots['ft'] = self
self.figure.canvas.draw()
\ No newline at end of file
import time
import struct
from .experiment_template import PlotBox, Experiment
class CVExp(Experiment):
id = 'cve'
"""Cyclic Voltammetry experiment"""
def setup(self):
self.plotlims['current_voltage']['xlims'] = tuple(
sorted((int(self.parameters['v1']), int(self.parameters['v2'])))
)
super(CVExp, self).setup()
self.datatype = "CVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
self.datalength = 2 * self.parameters['scans'] # x and y for each scan
self.databytes = 6 # uint16 + int32
self.commands += "E"
self.commands[2] += "C"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['clean_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['dep_mV'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['v1'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['v2'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['start'])/
self.re_voltage_scale*
(65536./3000)+32768
))
self.commands[2] += " "
self.commands[2] += str(self.parameters['scans'])
self.commands[2] += " "
self.commands[2] += str(int(
int(self.parameters['slope'])/
self.re_voltage_scale*
(65536./3000)
))
self.commands[2] += " "
def get_progress(self):
return (len(self.data['current_voltage'])-1) / float(self.parameters['scans'])
This diff is collapsed.
This diff is collapsed.
import time
import struct
from .experiment_template import Experiment
from ..dstat import state
class OCPExp(Experiment):
"""Open circuit potential measumement in statusbar."""
id = 'ocp'
def __init__(self):
self.re_voltage_scale = state.board_instance.re_voltage_scale
self.databytes = 8
self.commands = ["EA", "EP0 0 "]
self.commands[0] += "2 " # input buffer
self.commands[0] += "3 " # 2.5 Hz sample rate
self.commands[0] += "1 " # 2x PGA
def data_handler(self, data_input):
"""Overrides Experiment method to only send ADC values."""
scan, data = data_input
# 2*uint16 + int32
seconds, milliseconds, voltage = struct.unpack('<HHl', data)
return voltage/5.592405e6*self.re_voltage_scale
class PMTIdle(Experiment):
"""PMT idle mode."""
id = "pmt_idle"
def __init__(self):
self.databytes = 8
self.commands = ["EA", "EM"]
self.commands[0] += "2 " # input buffer
self.commands[0] += "3 " # 2.5 Hz sample rate
self.commands[0] += "1 " # 2x PGA
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.