Skip to content
#!/usr/bin/env python
import gtk
class lsv:
def __init__(self):
self.builder = gtk.Builder()
self.builder.add_from_file('interface/lsv.glade')
self.builder.connect_signals(self)
self.clean_mV = self.builder.get_object('clean_mV')
self.clean_s = self.builder.get_object('clean_s')
self.dep_mV = self.builder.get_object('dep_mV')
self.dep_s = self.builder.get_object('dep_s')
self.start_entry = self.builder.get_object('start_entry')
self.stop_entry = self.builder.get_object('stop_entry')
self.slope_entry = self.builder.get_object('slope_entry')
\ No newline at end of file
#!/usr/bin/env python
import gtk
class pd:
def __init__(self):
self.builder = gtk.Builder()
self.builder.add_from_file('interface/pd.glade')
self.builder.connect_signals(self)
self.voltage_entry = self.builder.get_object('voltage_entry')
self.time_entry = self.builder.get_object('time_entry')
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import gtk, io, os
import numpy as np
from datetime import datetime
def manSave(current_exp):
exp = current_exp
fcd = gtk.FileChooserDialog("Save...", None, gtk.FILE_CHOOSER_ACTION_SAVE, (gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL, gtk.STOCK_SAVE, gtk.RESPONSE_OK))
filters = [gtk.FileFilter()]
filters[0].set_name("NumPy binary (.npy)")
filters[0].add_pattern("*.npy")
filters.append(gtk.FileFilter())
filters[1].set_name("Space separated text (.txt)")
filters[1].add_pattern("*.txt")
fcd.set_do_overwrite_confirmation(True)
for i in filters:
fcd.add_filter(i)
response = fcd.run()
if response == gtk.RESPONSE_OK:
path = fcd.get_filename()
print "Selected filepath: %s" % path
filter_selection = fcd.get_filter().get_name()
if filter_selection.endswith("(.npy)"):
npy(exp, path)
elif filter_selection.endswith("(.txt)"):
text(exp, path)
fcd.destroy()
elif response == gtk.RESPONSE_CANCEL:
fcd.destroy()
def plotSave(plot):
fcd = gtk.FileChooserDialog("Save Plot…", None, gtk.FILE_CHOOSER_ACTION_SAVE, (gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL, gtk.STOCK_SAVE, gtk.RESPONSE_OK))
filters = [gtk.FileFilter()]
filters[0].set_name("Portable Document Format (.pdf)")
filters[0].add_pattern("*.pdf")
filters.append(gtk.FileFilter())
filters[1].set_name("Portable Network Graphics (.png)")
filters[1].add_pattern("*.png")
fcd.set_do_overwrite_confirmation(True)
for i in filters:
fcd.add_filter(i)
response = fcd.run()
if response == gtk.RESPONSE_OK:
path = fcd.get_filename()
print "Selected filepath: %s" % path
filter_selection = fcd.get_filter().get_name()
if filter_selection.endswith("(.pdf)"):
if not path.endswith(".pdf"):
path += ".pdf"
elif filter_selection.endswith("(.png)"):
if not path.endswith(".png"):
path += ".png"
plot.figure.savefig(path) #savefig determines format from file extension
fcd.destroy()
elif response == gtk.RESPONSE_CANCEL:
fcd.destroy()
def autoSave(current_exp, dir_button, name, expnumber):
if name == "":
name = "file"
path = dir_button.get_filename()
path += '/'
path += name
path += str(expnumber)
text(current_exp, path, auto=True)
def autoPlot(plot, dir_button, name, expnumber):
if name == "":
name = "file"
path = dir_button.get_filename()
path += '/'
path += name
path += str(expnumber)
if path.endswith(".pdf"):
path = path.rstrip(".pdf")
j = 1
while os.path.exists("".join([path, ".pdf"])):
if j > 1:
path = path[:-len(str(j))]
path += str(j)
j += 1
path += ".pdf"
plot.figure.savefig(path)
def npy(exp, path, auto=False):
if path.endswith(".npy"):
path = path.rstrip(".npy")
data = np.array(exp.data)
if auto == True:
j = 1
while os.path.exists("".join([path, ".npy"])):
if j > 1:
path = path[:-len(str(j))]
path += str(j)
j += 1
np.save(path, data)
def text(exp, path, auto=False):
if path.endswith(".txt"):
path = path.rstrip(".txt")
if auto == True:
j = 1
while os.path.exists("".join([path, ".txt"])):
if j > 1:
path = path[:-len(str(j))]
path += str(j)
j += 1
path += ".txt"
file = open(path, 'w')
time = datetime.now()
data = np.array(exp.data)
header = "".join(['#', time.isoformat(), "\n#"])
for i in exp.commands:
header += i
file.write("".join([header, '\n']))
for col in zip(*exp.data):
for row in col:
file.write(str(row)+ " ")
file.write('\n')
file.close()
#!/usr/bin/env python
import gtk
class swv:
def __init__(self):
self.builder = gtk.Builder()
self.builder.add_from_file('interface/swv.glade')
self.builder.connect_signals(self)
self.clean_mV = self.builder.get_object('clean_mV')
self.clean_s = self.builder.get_object('clean_s')
self.dep_mV = self.builder.get_object('dep_mV')
self.dep_s = self.builder.get_object('dep_s')
self.start_entry = self.builder.get_object('start_entry')
self.stop_entry = self.builder.get_object('stop_entry')
self.step_entry = self.builder.get_object('step_entry')
self.pulse_entry = self.builder.get_object('pulse_entry')
self.freq_entry = self.builder.get_object('freq_entry')
self.cyclic_checkbutton = self.builder.get_object('cyclic_checkbutton')
self.scans_entry = self.builder.get_object('scans_entry')
\ No newline at end of file
This diff is collapsed.
import zmq
import zmq.error
#signals
CONREQ = "0"
CONREP = "1"
STARTEXP = "10"
EXPFINISHED = "11"
INVAL_CMD = "99"
#States
RECV = 0
SEND = 1
class microdropConnection:
def __init__(self, port=6789):
self.port = port
self.connected = False
self.state = RECV
self.ctx = zmq.Context()
self.soc = zmq.Socket(self.ctx, zmq.REP)
self.soc.bind("".join(['tcp://*:',str(self.port)]))
def listen(self):
if self.state == SEND:
print "WAR: Microdrop Connection state invalid, resetting..."
self.reset()
self.__init__(self.port)
try:
message = self.soc.recv(flags=zmq.NOBLOCK, copy=True)
self.state = SEND
return (True, message)
except zmq.Again:
return (False, "")
def reply(self, data):
if self.state == RECV:
print "WAR: Microdrop Connection state invalid, resetting..."
self.reset()
self.__init__(self.port)
return False
self.state = RECV
self.soc.send(data)
return True
def reset(self):
self.soc.unbind("".join(['tcp://*:',str(self.port)]))
del self.soc
del self.ctx
#!/usr/bin/env python
import sys, serial, io
import numpy as np
import matplotlib
import gtk
from time import sleep
from collections import deque
from matplotlib import pyplot as plt
# class that holds analog data for N samples
class AnalogData:
# constr
def __init__(self):
self.ax = []
self.ay = []
self.first = 1
# add data
def add(self, data):
if self.first == 1:
self.first = 0
return
assert(len(data) == 2)
self.ax.append(data[0])
self.ay.append(data[1])
# clear data
def clear(self):
self.first = 1
self.ax = []
self.ay = []
# plot class
class AnalogPlot:
# constr
def __init__(self, analogData):
self.i = 0
# set plot to animated
plt.ion() #interactive mode on
plt.autoscale(True,True,True)
self.line = plt.plot(analogData.ax,analogData.ay)
# update plot
def update(self, analogData):
if self.i < 5:
self.i += 1
return
plt.setp(self.line,xdata=analogData.ax, ydata=analogData.ay)
ax = plt.gca()
# recompute the ax.dataLim
ax.relim()
# update ax.viewLim using the new dataLim
ax.autoscale_view()
plt.draw()
self.i=0
# main() function
def main():
# # expects 1 arg - serial port string
# if(len(sys.argv) != 2):
# print 'Example usage: python showdata.py "/dev/tty.usbmodem411"'
# exit(1)
#strPort = '/dev/tty.usbserial-A7006Yqh'
#strPort = sys.argv[1];
strPort = '/dev/cu.usbmodem12...E1'
# open serial port
ser = serial.Serial(strPort, 1024000,timeout=2)
sio = io.TextIOWrapper(io.BufferedRWPair(ser,ser,buffer_size=1),
newline = '\n',
line_buffering = True)
ser.write("ck")
# plot parameters
digiData = AnalogData()
digiPlot = AnalogPlot(digiData)
try:
while True:
output = raw_input('Commands:')
ser.flushInput() #clear input buffer
digiData.clear() #clear old data
ser.write(output)
print output
while True:
for line in ser:
print line
if line.lstrip().startswith("no"):
ser.flushInput()
break
if not (line.isspace() or line.lstrip().startswith('#')):
#print line
data = [float(val) for val in line.split()]
if(len(data) == 2):
digiData.add(data)
digiPlot.update(digiData)
break
# if not line.lstrip().startswith('#'):
# data = [float(val) for val in line.split()]
## if(len(data) == 2):
## analogData.add(data)
## analogPlot.update(analogData)
# block.append(line)
# print line
# print block
except KeyboardInterrupt:
print 'exiting'
# close serial
ser.flush()
ser.close()
# call main
if __name__ == '__main__':
main()
\ No newline at end of file
#!/usr/bin/env python
"""
show how to add a matplotlib FigureCanvasGTK or FigureCanvasGTKAgg widget to a
gtk.Window
"""
import gtk
from matplotlib.figure import Figure
from matplotlib import pyplot as plt
import numpy as np
#from matplotlib.backends.backend_gtkcairo import FigureCanvasGTKCairo as FigureCanvas
#from matplotlib.backends.backend_gtkcairo import NavigationToolbar2Cairo as NavigationToolbar
from matplotlib.backends.backend_gtkagg import FigureCanvasGTKAgg as FigureCanvas
from matplotlib.backends.backend_gtkagg import NavigationToolbar2GTKAgg as NavigationToolbar
class plotbox:
def __init__(self, plotwindow_instance):
self.figure = Figure()
self.figure.subplots_adjust(left=0.07, bottom=0.07, right=0.96, top=0.96)
self.axe1 = self.figure.add_subplot(111)
self.lines = self.axe1.plot([0,1], [0,1])
self.axe1.ticklabel_format(style='sci', scilimits=(0,3), useOffset=False, axis='y')
self.canvas = FigureCanvas(self.figure)
self.win = gtk.Window()
self.vbox = gtk.VBox()
self.win.add(self.vbox)
self.vbox.pack_start(self.canvas)
self.toolbar = NavigationToolbar(self.canvas, self.win)
self.vbox.pack_start(self.toolbar, False, False)
self.vbox.reparent(plotwindow_instance)
def clearall(self):
for i in self.lines:
i.remove()
self.lines = self.axe1.plot([0,1], [0,1])
def clearline(self, line_number):
self.lines[line_number].remove()
self.lines.pop(line_number)
def addline(self):
self.lines.append(self.axe1.plot([0,1], [0,1])[0])
def updateline(self, Experiment, line_number):
divisor = len(Experiment.data[1+line_number*2]) // 2000 + 1 #limits display to 2000 data points per line
self.lines[line_number].set_ydata(Experiment.data[1+line_number*2][1::divisor])
self.lines[line_number].set_xdata(Experiment.data[line_number*2][1::divisor])
def changetype(self, Experiment):
self.axe1.set_xlabel(Experiment.xlabel)
self.axe1.set_ylabel(Experiment.ylabel)
self.axe1.set_xlim(Experiment.xmin, Experiment.xmax)
self.figure.canvas.draw()
def redraw(self):
self.axe1.relim()
self.axe1.autoscale(True, axis = 'y')
self.figure.canvas.draw()
return True
from distutils.core import setup
from Cython.Build import cythonize
setup(
ext_modules = cythonize("*.pyx")
)
\ No newline at end of file
#!/usr/bin/env python
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Functions for analyzing data.
"""
import logging
import os
from numpy import mean, trapz
logger = logging.getLogger(__name__)
mod_dir = os.path.dirname(os.path.abspath(__file__))
class AnalysisOptions(object):
"""Analysis options window."""
def __init__(self, builder):
self.builder = builder
self.builder.add_from_file(
os.path.join(mod_dir, 'interface/analysis_options.glade'))
self.builder.connect_signals(self)
self.window = self.builder.get_object('analysis_dialog')
self.stats_button = self.builder.get_object('stats_button')
self.stats_start = self.builder.get_object('stats_start_spin')
self.stats_start_button = self.builder.get_object('stats_start_button')
self.stats_stop = self.builder.get_object('stats_stop_spin')
self.stats_stop_button = self.builder.get_object('stats_stop_button')
self.stats_button.connect('toggled',
self.on_button_toggled_hide,
[self.stats_stop,
self.stats_stop_button,
self.stats_start,
self.stats_start_button
]
)
self._params = {'stats_true':False,
'stats_start_true':False,
'stats_stop':0,
'stats_stop_true':False,
'stats_start':0
}
def show(self):
"""Show options window."""
self.window.run()
self.window.hide()
def on_button_toggled_hide(self, control, widgets):
"""Hide unchecked fields"""
active = control.get_active()
for widget in widgets:
widget.set_sensitive(active)
@property
def params(self):
"""Getter for analysis params"""
self._params['stats_true'] = self.stats_button.get_active()
self._params['stats_start_true'] = self.stats_start_button.get_active()
self._params['stats_start'] = self.stats_start.get_value()
self._params['stats_stop_true'] = self.stats_stop_button.get_active()
self._params['stats_stop'] = self.stats_stop.get_value()
return self._params
@params.setter
def params(self, params):
for key in self._params:
if key in params:
self._params[key] = params[key]
self.stats_button.set_active(self._params['stats_true'])
self.stats_start_button.set_active(self._params['stats_start_true'])
self.stats_start.set_value(self._params['stats_start'])
self.stats_stop_button.set_active(self._params['stats_stop_true'])
self.stats_stop.set_value(self._params['stats_stop'])
def do_analysis(experiment):
"""Takes an experiment class instance and runs selected analysis."""
experiment.analysis = {}
if experiment.parameters['stats_true']:
if (experiment.parameters['stats_start_true'] or
experiment.parameters['stats_stop_true']):
if experiment.parameters['stats_start_true']:
start = experiment.parameters['stats_start']
else:
start = min(experiment.data['data'][0][0])
if experiment.parameters['stats_stop_true']:
stop = experiment.parameters['stats_stop']
else:
stop = min(experiment.data['data'][0][0])
data = _data_slice(experiment.data['data'],
start,
stop
)
else:
data = experiment.data['data']
experiment.analysis.update(_summary_stats(data))
try:
x, y = experiment.data['ft'][0]
experiment.analysis['FT Integral'] = _integrateSpectrum(
x,
y,
float(experiment.parameters['sync_freq']),
float(experiment.parameters['fft_int'])
)
except KeyError:
pass
def _data_slice(data, start, stop):
"""Accepts data (as list of tuples of lists) and returns copy of data
between start and stop (in whatever x-axis units for the experiment type).
"""
output = []
for scan in range(len(data)):
t = []
for i in range(len(data[scan])):
t.append([])
output.append(tuple(t))
for i in range(len(data[scan][0])): # x-axis column
if data[scan][0][i] >= start or data[scan][0][i] <= stop:
for d in range(len(output[scan])):
output[scan][d].append(data[scan][d][i])
return output
def _summary_stats(data):
"""Takes data and returns summary statistics of first y variable as dict of
name, (scan, values).
"""
stats = {'min':[],'max':[], 'mean':[]}
for scan in range(len(data)):
stats['min'].append(
(scan, min(data[scan][1]))
)
stats['max'].append(
(scan, max(data[scan][1]))
)
stats['mean'].append(
(scan, mean(data[scan][1]))
)
return stats
def _integrateSpectrum(x, y, target, bandwidth):
"""
Returns integral of range of bandwidth centered on target.
"""
j = 0
k = len(x)
for i in range(len(x)):
if x[i] >= target-bandwidth/2:
j = i
break
for i in range(j,len(x)):
if x[i] >= target+bandwidth/2:
k = i
break
return [(0, trapz(y=y[j:k], x=x[j:k]))]
\ No newline at end of file
# -*- mode: python -*- # -*- mode: python -*-
a = Analysis(['interface_test.py'], a = Analysis(['./main.py'],
pathex=['/Users/mdryden/src/dstat-interface2/dstatInterface'],
hiddenimports=[], hiddenimports=[],
hookspath=None, hookspath=None,
runtime_hooks=None) runtime_hooks=None)
glade_tree = Tree('./interface', prefix = 'interface', excludes=['*.py','*.pyc'])
drivers_tree = Tree('./drivers', prefix = 'drivers')
pyz = PYZ(a.pure) pyz = PYZ(a.pure)
exe = EXE(pyz, exe = EXE(pyz,
a.scripts, a.scripts,
exclude_binaries=True, exclude_binaries=True,
name='interface_test.exe', name='DStat',
debug=False, debug=False,
strip=None, strip=None,
upx=True, upx=True,
console=True ) console=False )
coll = COLLECT(exe, coll = COLLECT(exe,
drivers_tree,
glade_tree,
a.binaries, a.binaries,
a.zipfiles, a.zipfiles,
a.datas, a.datas,
strip=None, strip=None,
upx=True, upx=True,
name='interface_test') name='DStat')
app = BUNDLE(coll,
name='DStat.app',
icon=None)
from . import comm
from . import dfu
from . import state
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2017 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, absolute_import, print_function, unicode_literals
import sys
import inspect
import logging
from pkg_resources import parse_version, parse_requirements
logger = logging.getLogger(__name__)
class BaseBoard(object):
pcb_version = 'x.x.x'
booster = False
def __init__(self):
self.max_freq = 5000
self.max_scans = 255
self.max_time = 65535
self.setup()
assert len(self.gain) == self.gain_settings
assert len(self.gain_labels) == self.gain_settings
if self.gain_trim is not None:
assert len(self.gain_trim) == self.gain_settings
def setup(self):
"""Override in subclasses to provide correct numbers"""
self.gain = [1, 1e2, 3e3, 3e4, 3e5, 3e6, 3e7, 1e8]
self.gain_labels = ["Bypass", "100 Ω (15 mA FS)", "3 kΩ (500 µA FS)",
"30 kΩ (50 µA FS)", "300 kΩ (5 µA FS)",
"3 MΩ (500 nA FS)", "30 MΩ (50 nA FS)",
"100 MΩ (15 nA FS)"
]
self.gain_trim = [None, 'r100_trim', 'r3k_trim',
'r30k_trim', 'r300k_trim', 'r3M_trim',
'r30M_trim', 'r100M_trim']
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
def test_mv(self, mv):
"""Return true if voltage in mV is in range."""
dac = float(mv)*self.re_voltage_scale/(3000./65536) + 32768
if 0 <= dac <= 65535:
return True
else:
return False
def test_freq(self, hz):
"""Return true if frequency in Hz is in range."""
return 0 < float(hz) < self.max_freq
def test_scans(self, n):
"""Return true if number of scans is valid."""
return 0 < int(n) < self.max_scans
def test_s(self, s):
"""Return true if time in integer seconds is valid."""
return 0 < int(s) < self.max_time
class V1_1Board(BaseBoard):
pcb_version = '1.1'
def setup(self):
self.gain = [1e2, 3e2, 3e3, 3e4, 3e5, 3e6, 3e7, 5e8]
self.gain_labels = [None, "300 Ω (5 mA FS)",
"3 kΩ (500 µA FS)", "30 kΩ (50 µA FS)",
"300 kΩ (5 µA FS)", "3 MΩ (500 nA FS)",
"30 MΩ (50 nA FS)", "500 MΩ (3 nA FS)"
]
self.gain_trim = None
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
class V1_2Board(BaseBoard):
pcb_version = '1.2'
def setup(self):
self.gain = [1, 1e2, 3e3, 3e4, 3e5, 3e6, 3e7, 1e8]
self.gain_labels = ["Bypass", "100 Ω (15 mA FS)", "3 kΩ (500 µA FS)",
"30 kΩ (50 µA FS)", "300 kΩ (5 µA FS)",
"3 MΩ (500 nA FS)", "30 MΩ (50 nA FS)",
"100 MΩ (15 nA FS)"
]
self.gain_trim = [None, 'r100_trim', 'r3k_trim',
'r30k_trim', 'r300k_trim', 'r3M_trim',
'r30M_trim', 'r100M_trim']
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
def __get_all_subclasses(cls):
all_subclasses = []
for subclass in cls.__subclasses__():
all_subclasses.append(subclass)
all_subclasses.extend(__get_all_subclasses(subclass))
return all_subclasses
def find_board(version, booster=False):
"""Returns highest compatible board class or None if none available."""
boards = __get_all_subclasses(BaseBoard)
candidates = []
for board in boards:
req = parse_requirements('dstat~={}'.format(board.pcb_version)).next()
if board.booster == booster and version in req:
candidates.append(board)
try:
picked = sorted(candidates,
key=lambda board: parse_version(board.pcb_version))[-1]
logger.info("Picked %s", picked)
return picked
except IndexError:
logger.warning("No matching board definition for ver: %s.", version)
return None
#!/usr/bin/env python
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import struct
import multiprocessing as mp
from collections import OrderedDict
import logging
from pkg_resources import parse_version
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, GObject
except ImportError:
print "ERR: GTK not available"
sys.exit(1)
import serial
from serial.tools import list_ports
from ..errors import InputError, VarError
logger = logging.getLogger(__name__)
dstat_logger = logging.getLogger("{}.DSTAT".format(__name__))
exp_logger = logging.getLogger("{}.Experiment".format(__name__))
from . import state
class AlreadyConnectedError(Exception):
def __init__(self):
super(AlreadyConnectedError, self).__init__(self,
"Serial instance already connected.")
class NotConnectedError(Exception):
def __init__(self):
super(NotConnectedError, self).__init__(self,
"Serial instance not connected.")
class ConnectionError(Exception):
def __init__(self):
super(ConnectionError, self).__init__(self,
"Could not connect.")
class TransmitError(Exception):
def __init__(self):
super(TransmitError, self).__init__(self,
"No reply received.")
def _serial_process(ser_port, proc_pipe, ctrl_pipe, data_pipe):
ser_logger = logging.getLogger("{}._serial_process".format(__name__))
connected = False
for i in range(5):
time.sleep(1) # Give OS time to enumerate
try:
ser = serial.Serial(ser_port, timeout=1)
# ser = serial.Serial(ser_port, timeout=1)
ser_logger.info("Connecting")
time.sleep(.5)
connected = True
except serial.SerialException:
pass
if connected is True:
break
try:
if ser.isOpen() is False:
ser_logger.info("Connection Error")
proc_pipe.send("SERIAL_ERROR")
return 1
except UnboundLocalError: # ser doesn't exist
ser_logger.info("Connection Error")
proc_pipe.send("SERIAL_ERROR")
return 1
ser.write('!0 ')
for i in range(10):
if ser.readline().rstrip()=="@ACK 0":
if ser.readline().rstrip()=="@RCV 0":
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write('!0 ')
time.sleep(.1)
while True:
# These can only be called when no experiment is running
if ctrl_pipe.poll():
ctrl_buffer = ctrl_pipe.recv()
if ctrl_buffer in ('a', "DISCONNECT"):
proc_pipe.send("ABORT")
try:
ser.write('a')
except serial.SerialException:
return 0
ser_logger.info("ABORT")
if ctrl_buffer == "DISCONNECT":
ser_logger.info("DISCONNECT")
ser.rts = False
ser._update_dtr_state() # Need DTR update on Windows
ser.close()
proc_pipe.send("DISCONNECT")
return 0
else:
ser.write(ctrl_buffer)
elif proc_pipe.poll():
while ctrl_pipe.poll():
ctrl_pipe.recv()
try:
return_code = proc_pipe.recv().run(ser, ctrl_pipe, data_pipe)
except serial.SerialException:
proc_pipe.send("DISCONNECT")
ser.rts = False
ser._update_dtr_state() # Need DTR update on Windows
ser.close()
return 0
ser_logger.info('Return code: %s', str(return_code))
proc_pipe.send(return_code)
else:
time.sleep(.1)
class SerialConnection(GObject.Object):
__gsignals__ = {
'connected': (GObject.SIGNAL_RUN_FIRST, None, ()),
'disconnected': (GObject.SIGNAL_RUN_FIRST, None, ())
}
def __init__(self):
super(SerialConnection, self).__init__()
self.connected = False
def connect(self, ser_port):
if self.connected is False:
self.proc_pipe_p, self.proc_pipe_c = mp.Pipe(duplex=True)
self.ctrl_pipe_p, self.ctrl_pipe_c = mp.Pipe(duplex=True)
self.data_pipe_p, self.data_pipe_c = mp.Pipe(duplex=True)
self.proc = mp.Process(target=_serial_process, args=(ser_port,
self.proc_pipe_c, self.ctrl_pipe_c,
self.data_pipe_c))
self.proc.start()
time.sleep(2)
if self.proc.is_alive() is False:
raise ConnectionError()
return False
self.connected = True
self.emit('connected')
return True
else:
raise AlreadyConnectedError()
return False
def assert_connected(self):
if self.connected is False:
raise NotConnectedError()
def start_exp(self, exp):
self.assert_connected()
self.proc_pipe_p.send(exp)
def stop_exp(self):
self.send_ctrl('a')
def get_proc(self, block=False):
self.assert_connected()
if block is True:
return self.proc_pipe_p.recv()
else:
if self.proc_pipe_p.poll() is True:
return self.proc_pipe_p.recv()
else:
return None
def get_ctrl(self, block=False):
self.assert_connected()
if block is True:
return self.ctrl_pipe_p.recv()
else:
if self.ctrl_pipe_p.poll() is True:
return self.ctrl_pipe_p.recv()
else:
return None
def get_data(self, block=False):
self.assert_connected()
if block is True:
return self.data_pipe_p.recv()
else:
if self.data_pipe_p.poll() is True:
return self.data_pipe_p.recv()
else:
return None
def flush_data(self):
self.assert_connected()
while self.data_pipe_p.poll() is True:
self.data_pipe_p.recv()
def send_ctrl(self, ctrl):
self.assert_connected()
self.ctrl_pipe_p.send(ctrl)
def disconnect(self):
logger.info("Disconnecting")
self.send_ctrl('DISCONNECT')
self.proc.join()
self.emit('disconnected')
self.connected = False
class VersionCheck(object):
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get version. Returns a tuple of
(major, minor). If no response, returns empty tuple.
Arguments:
ser_port -- address of serial port to use
"""
try:
ser.reset_input_buffer()
ser.write('!1\n')
for i in range(10):
if ser.readline().rstrip()=="@ACK 1":
ser.write('V\n')
if ser.readline().rstrip()=="@RCV 1":
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write('!1\n')
time.sleep(.1)
for line in ser:
dstat_logger.info(line.decode('utf-8'))
if line.startswith('V'):
input = line.lstrip('V')
elif line.startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
ser.reset_input_buffer()
break
pcb, sep, firmware = input.strip().rpartition('-')
if pcb == "":
pcb = firmware
firmware = False
logger.info("Your firmware does not support version detection.")
data_pipe.send((pcb, False))
else:
logger.info(
"Firmware Version: {}".format(
hex(int(firmware)).lstrip('0x')
)
)
data_pipe.send((
pcb,
hex(int(firmware)).lstrip('0x')
))
logger.info(
"PCB Version: {}".format(pcb)
)
status = "DONE"
except UnboundLocalError as e:
status = "SERIAL_ERROR"
except SerialException as e:
logger.error('SerialException: %s', e)
status = "SERIAL_ERROR"
finally:
return status
def version_check(ser_port):
"""Tries to contact DStat and get version. Stores version in state.
If no response, returns False, otherwise True.
Arguments:
ser_port -- address of serial port to use
"""
state.ser = SerialConnection()
state.ser.connect(ser_port)
state.ser.start_exp(VersionCheck())
result = state.ser.get_proc(block=True)
if result == "SERIAL_ERROR":
state.dstat_version = None
state.firmware_version = None
return False
else:
buffer = state.ser.get_data(block=True)
version, state.firmware_version = buffer
state.dstat_version = parse_version(version)
logger.debug("version_check done")
time.sleep(.1)
return True
class Settings(object):
def __init__(self, task, settings=None):
self.task = task
self.settings = settings
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get settings. Returns dict of
settings.
"""
self.ser = ser
if 'w' in self.task:
self.write()
if 'r' in self.task:
data_pipe.send(self.read())
status = "DONE"
return status
def read(self):
settings = OrderedDict()
self.ser.reset_input_buffer()
self.ser.write('!2\n')
for i in range(10):
if self.ser.readline().rstrip()=="@ACK 2":
self.ser.write('SR\n')
if self.ser.readline().rstrip()=="@RCV 2":
break
else:
time.sleep(.5)
self.ser.reset_input_buffer()
self.ser.write('!2\n')
time.sleep(.1)
for line in self.ser:
if line.lstrip().startswith('S'):
input = line.lstrip().lstrip('S')
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
self.ser.reset_input_buffer()
break
parted = input.rstrip().split(':')
for i in range(len(parted)):
settings[parted[i].split('.')[0]] = [i, parted[i].split('.')[1]]
return settings
def write_command(self, cmd, params=None, retry=5):
"""Write command to serial with optional number of retries."""
def get_reply(retries = 3):
while True:
reply = self.ser.readline().rstrip()
if reply.startswith('#'):
dstat_logger.info(reply)
elif reply == "":
retries -= 1
if retries <= 0:
raise TransmitError
else:
return reply
n = len(cmd)
if params is not None:
n_params = len(params)
for _ in range(retry):
tries = 5
while True:
time.sleep(0.2)
self.ser.reset_input_buffer()
self.ser.write('!{}\n'.format(n))
time.sleep(.1)
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@ACK {}".format(n):
logger.warning("Expected ACK got: {}".format(reply))
continue
tries = 5
while True:
self.ser.write('{}\n'.format(cmd))
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@RCV {}".format(n):
logger.warning("Expected RCV got: {}".format(reply))
continue
if params is None:
return True
tries = 5
while True:
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@RQP {}".format(n_params):
logger.warning("Expected RQP got: {}".format(reply))
continue
tries = 5
for i in params:
while True:
self.ser.write(i + " ")
try:
reply = get_reply()
if reply == "@RCVC {}".format(i):
break
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
return True
return False
def write(self):
write_buffer = range(len(self.settings))
for i in self.settings: # make sure settings are in right order
write_buffer[self.settings[i][0]] = self.settings[i][1]
to_write = " ".join(write_buffer) + " "
n = len(to_write)
logger.debug("to_write = %s", to_write)
if not self.write_command('SW' + to_write):
logger.error("Could not write command.")
def read_settings():
"""Tries to contact DStat and get settings. Returns dict of
settings.
"""
state.ser.flush_data()
state.ser.start_exp(Settings(task='r'))
state.settings = state.ser.get_data(block=True)
logger.info("Read settings from DStat")
logger.debug("read_settings: %s", state.ser.get_proc(block=True))
return
def write_settings():
"""Tries to write settings to DStat from global settings var.
"""
logger.debug("Settings to write: %s", state.settings)
state.ser.flush_data()
state.ser.start_exp(Settings(task='w', settings=state.settings))
logger.info("Wrote settings to DStat")
logger.debug("write_settings: %s", state.ser.get_proc(block=True))
return
class LightSensor:
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get light sensor reading. Returns uint of
light sensor clear channel.
"""
ser.reset_input_buffer()
ser.write('!')
while not ser.read()=="@":
self.ser.reset_input_buffer()
ser.write('!')
ser.write('T')
for line in ser:
if line.lstrip().startswith('T'):
input = line.lstrip().lstrip('T')
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
ser.reset_input_buffer()
break
parted = input.rstrip().split('.')
data_pipe.send(parted[0])
status = "DONE"
return status
def read_light_sensor():
"""Tries to contact DStat and get light sensor reading. Returns uint of
light sensor clear channel.
"""
state.ser.flush_data()
state.ser.start_exp(LightSensor())
logger.debug("read_light_sensor: %s", state.ser.get_proc(block=True))
return state.ser.get_data(block=True)
class SerialDevices(object):
"""Retrieves and stores list of serial devices in self.ports"""
def __init__(self):
self.ports = []
self.refresh()
def refresh(self):
"""Refreshes list of ports."""
try:
self.ports, _, _ = zip(*list_ports.grep("DSTAT"))
except ValueError:
self.ports = []
logger.error("No serial ports found")
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division,
print_function, unicode_literals)
import subprocess
import sys
import os
import time
import logging
from tempfile import mkdtemp
from zipfile import ZipFile
if sys.version_info >= (3,):
import urllib.request as urllib2
import urllib.parse as urlparse
else:
import urllib2
import urlparse
logger = logging.getLogger(__name__)
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
except ImportError:
print("ERR: GTK not available")
sys.exit(1)
import serial
from . import state
from .comm import dstat_logger, exp_logger
fwurl = "http://microfluidics.utoronto.ca/gitlab/api/v4/projects/4/jobs/artifacts/master/download?job=1.2.3&private_token=zkgSx1FaaTP7yLyFKkX6"
class FWDialog(object):
def __init__(self, parent, connect, stop_callback, disconnect_callback, signal='activate'):
self.parent = parent
self.stop = stop_callback
self.disconnect = disconnect_callback
connect.connect(signal, self.activate)
def activate(self, widget=None, data=None):
for name, result in assert_deps().items():
if result is not True:
logger.error("Can't use firmware update module.")
self.missing_deps()
return
self.stop() # Stop OCP
version_result, master = test_firmware_version()
if version_result is False:
self.git_error()
return
if version_result == 'latest':
message = "Your firmware is already up to date."
secondary = "Click yes to reflash firmware anyways."
elif version_result == 'devel':
message = "Your firmware is not on the master branch."
secondary = "You may have a development version. " +\
"Click yes to reflash firmware anyways."
elif version_result == 'old':
message = "Your firmware is out of date."
secondary = "Click yes to flash the latest firmware."
dialog = Gtk.MessageDialog(self.parent, 0, Gtk.MessageType.INFO,
Gtk.ButtonsType.YES_NO, message)
dialog.format_secondary_text(secondary)
dialog.get_content_area().add(
Gtk.Label(
label="Installed version: {}".format(state.firmware_version)))
dialog.get_content_area().add(
Gtk.Label(label="Latest version: {}".format(master)))
dialog.show_all()
response = dialog.run()
if response == Gtk.ResponseType.YES:
try:
download_fw()
except:
self.dl_error()
return
dstat_enter_dfu()
self.dfu_notice()
self.disconnect()
try:
dfu_program()
except:
self.dfu_error()
dialog.destroy()
else:
dialog.destroy()
def missing_deps(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Missing Dependencies")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def git_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Git Error")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def dl_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Download Error")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def dfu_notice(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.INFO,
Gtk.ButtonsType.OK, "Note about DFU")
dialog.format_secondary_text("Click OK once the DStat has connected in "
+ "DFU mode. Windows doesn't seem to like the automatic reboot. "
+ "Try holding down the reset button while plugging the "
+ 'USB port in (No LEDs should be lit), then click OK. Make sure '
+ 'the DFU driver from the dfu-programmer directory is installed.')
dialog.run()
dialog.destroy()
def dfu_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Could not update over DFU")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def destroy(self, widget=None, data=None):
widget.destroy()
def assert_deps():
deps = {'git' : 'git --version',
'dfu-programmer' : 'dfu-programmer --version'}
result = {}
for key, command in deps.items():
try:
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
result[key] = True
except subprocess.CalledProcessError:
logger.warning("{} is not available.".format(key))
result[key] = False
return result
def download_fw(): # from https://stackoverflow.com/a/16518224
temp_dir = mkdtemp()
logger.info("Temporary directory: {}".format(temp_dir))
os.chdir(temp_dir) # Go to temporary directory
u = urllib2.urlopen(fwurl)
scheme, netloc, path, query, fragment = urlparse.urlsplit(fwurl)
filename = os.path.basename(path)
if not filename:
filename = 'downloaded.file'
with open(filename, 'wb') as f:
meta = u.info()
meta_func = meta.getheaders if hasattr(meta, 'getheaders') else meta.get_all
meta_length = meta_func("Content-Length")
file_size = None
if meta_length:
file_size = int(meta_length[0])
logger.info("Downloading: {0} Bytes: {1}".format(fwurl, file_size))
file_size_dl = 0
block_sz = 8192
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_size_dl += len(buffer)
f.write(buffer)
status = "{0:16}".format(file_size_dl)
if file_size:
status += " [{0:6.2f}%]".format(file_size_dl * 100 / file_size)
status += chr(13)
logger.info(status)
with ZipFile(filename, mode='r') as z:
fw_path = z.extract('dstat-firmware.hex')
return fw_path
def test_firmware_version(current=None):
if current is None:
current = state.firmware_version
temp_dir = mkdtemp()
logger.info("Temporary directory: {}".format(temp_dir))
os.chdir(temp_dir) # Go to temporary directory
command = "git clone http://microfluidics.utoronto.ca/gitlab/dstat/dstat-firmware.git"
logger.info('Cloning master.')
try:
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
logger.error("git failed with error code {}".format(e.returncode))
logger.error("Output: {}".format(e.output))
return False, None
logger.info(output)
os.chdir("./dstat-firmware")
command = "git rev-parse --short master"
master = subprocess.check_output(command.split(), stderr=subprocess.STDOUT)
logger.info("Current master commit: {}".format(master))
command = "git merge-base --is-ancestor master {}".format(current)
test = subprocess.call(command.split())
if test == 0: # already newest
logger.info('Firmware is latest available.')
return 'latest', master
elif test == 1: # old version
logger.info('Firmware is out of date.')
return 'old', master
elif test == 128: # newer or different branch
logger.info('Firmware is not on the master branch.')
return 'devel', master
else:
logger.error('Unexpected git error. Git exited {}'.format(test))
return False, None
def dfu_program(path='./dstat-firmware.hex'):
"""Tries to program DStat over USB with DFU with hex file at path."""
try:
command = "dfu-programmer atxmega256a3u erase"
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
command = "dfu-programmer atxmega256a3u flash {}".format(path)
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
command = "dfu-programmer atxmega256a3u launch"
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
except subprocess.CalledProcessError as e:
logger.error("{} failed with output:".format(" ".join(e.cmd)))
logger.error(e.output)
raise
def dstat_enter_dfu():
"""Tries to contact DStat and get version. Stores version in state.
If no response, returns False, otherwise True.
Arguments:
ser_port -- address of serial port to use
"""
exp = DFUMode()
state.ser.start_exp(exp)
while True:
result = state.ser.get_proc(block=True)
if result in ('SERIAL_ERROR', 'DONE'):
break
logger.info(result)
# state.ser.disconnect()
time.sleep(.1)
return True
class DFUMode(object):
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get version. Returns a tuple of
(major, minor). If no response, returns empty tuple.
Arguments:
ser_port -- address of serial port to use
"""
status = None
try:
ser.write(b'!2\n')
exp_logger.info('!2')
for i in range(10):
if ser.readline().rstrip() == b"@ACK 2":
dstat_logger.info('@ACK 2')
ser.write(b'SF\n')
exp_logger.info('SF')
status = "DONE"
time.sleep(5)
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write(b'!2\n')
exp_logger.info('!2')
time.sleep(.1)
except UnboundLocalError as e:
status = "SERIAL_ERROR"
except serial.SerialException as e:
logger.error('SerialException: %s', e)
status = "SERIAL_ERROR"
finally:
return status
if __name__ == "__main__":
log_handler = logging.StreamHandler()
log_formatter = logging.Formatter(
fmt='%(asctime)s %(levelname)s: [%(name)s] %(message)s',
datefmt='%H:%M:%S'
)
log_handler.setFormatter(log_formatter)
logger.setLevel(level=logging.INFO)
logger.addHandler(log_handler)
dstat_enter_dfu()
time.sleep(2)
dfu_program(sys.argv[1])
\ No newline at end of file
from collections import OrderedDict
def reset():
settings = OrderedDict()
ser = None
dstat_version = None
firmware_version = None
board_instance = None
settings = OrderedDict()
ser = None
dstat_version = None
firmware_version = None
board_instance = None
\ No newline at end of file