Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 1327 additions and 1091 deletions
#!/usr/bin/env python
import gtk
class lsv:
def __init__(self):
self.builder = gtk.Builder()
self.builder.add_from_file('interface/lsv.glade')
self.builder.connect_signals(self)
self.clean_mV = self.builder.get_object('clean_mV')
self.clean_s = self.builder.get_object('clean_s')
self.dep_mV = self.builder.get_object('dep_mV')
self.dep_s = self.builder.get_object('dep_s')
self.start_entry = self.builder.get_object('start_entry')
self.stop_entry = self.builder.get_object('stop_entry')
self.slope_entry = self.builder.get_object('slope_entry')
\ No newline at end of file
#!/usr/bin/env python
import gtk
class pd:
def __init__(self):
self.builder = gtk.Builder()
self.builder.add_from_file('interface/pd.glade')
self.builder.connect_signals(self)
self.voltage_entry = self.builder.get_object('voltage_entry')
self.time_entry = self.builder.get_object('time_entry')
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import gtk, io, os
import numpy as np
from datetime import datetime
def manSave(current_exp):
exp = current_exp
fcd = gtk.FileChooserDialog("Save...", None, gtk.FILE_CHOOSER_ACTION_SAVE, (gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL, gtk.STOCK_SAVE, gtk.RESPONSE_OK))
filters = [gtk.FileFilter()]
filters[0].set_name("NumPy binary (.npy)")
filters[0].add_pattern("*.npy")
filters.append(gtk.FileFilter())
filters[1].set_name("Space separated text (.txt)")
filters[1].add_pattern("*.txt")
fcd.set_do_overwrite_confirmation(True)
for i in filters:
fcd.add_filter(i)
response = fcd.run()
if response == gtk.RESPONSE_OK:
path = fcd.get_filename()
print "Selected filepath: %s" % path
filter_selection = fcd.get_filter().get_name()
if filter_selection.endswith("(.npy)"):
npy(exp, path)
elif filter_selection.endswith("(.txt)"):
text(exp, path)
fcd.destroy()
elif response == gtk.RESPONSE_CANCEL:
fcd.destroy()
def plotSave(plot):
fcd = gtk.FileChooserDialog("Save Plot…", None, gtk.FILE_CHOOSER_ACTION_SAVE, (gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL, gtk.STOCK_SAVE, gtk.RESPONSE_OK))
filters = [gtk.FileFilter()]
filters[0].set_name("Portable Document Format (.pdf)")
filters[0].add_pattern("*.pdf")
filters.append(gtk.FileFilter())
filters[1].set_name("Portable Network Graphics (.png)")
filters[1].add_pattern("*.png")
fcd.set_do_overwrite_confirmation(True)
for i in filters:
fcd.add_filter(i)
response = fcd.run()
if response == gtk.RESPONSE_OK:
path = fcd.get_filename()
print "Selected filepath: %s" % path
filter_selection = fcd.get_filter().get_name()
if filter_selection.endswith("(.pdf)"):
if not path.endswith(".pdf"):
path += ".pdf"
elif filter_selection.endswith("(.png)"):
if not path.endswith(".png"):
path += ".png"
plot.figure.savefig(path) #savefig determines format from file extension
fcd.destroy()
elif response == gtk.RESPONSE_CANCEL:
fcd.destroy()
def autoSave(current_exp, dir_button, name, expnumber):
if name == "":
name = "file"
path = dir_button.get_filename()
path += '/'
path += name
path += str(expnumber)
text(current_exp, path, auto=True)
def autoPlot(plot, dir_button, name, expnumber):
if name == "":
name = "file"
path = dir_button.get_filename()
path += '/'
path += name
path += str(expnumber)
if path.endswith(".pdf"):
path = path.rstrip(".pdf")
j = 1
while os.path.exists("".join([path, ".pdf"])):
if j > 1:
path = path[:-len(str(j))]
path += str(j)
j += 1
path += ".pdf"
plot.figure.savefig(path)
def npy(exp, path, auto=False):
if path.endswith(".npy"):
path = path.rstrip(".npy")
data = np.array(exp.data)
if auto == True:
j = 1
while os.path.exists("".join([path, ".npy"])):
if j > 1:
path = path[:-len(str(j))]
path += str(j)
j += 1
np.save(path, data)
def text(exp, path, auto=False):
if path.endswith(".txt"):
path = path.rstrip(".txt")
if auto == True:
j = 1
while os.path.exists("".join([path, ".txt"])):
if j > 1:
path = path[:-len(str(j))]
path += str(j)
j += 1
path += ".txt"
file = open(path, 'w')
time = datetime.now()
data = np.array(exp.data)
header = "".join(['#', time.isoformat(), "\n#"])
for i in exp.commands:
header += i
file.write("".join([header, '\n']))
for col in zip(*exp.data):
for row in col:
file.write(str(row)+ " ")
file.write('\n')
file.close()
#!/usr/bin/env python
import gtk
class swv:
def __init__(self):
self.builder = gtk.Builder()
self.builder.add_from_file('interface/swv.glade')
self.builder.connect_signals(self)
self.clean_mV = self.builder.get_object('clean_mV')
self.clean_s = self.builder.get_object('clean_s')
self.dep_mV = self.builder.get_object('dep_mV')
self.dep_s = self.builder.get_object('dep_s')
self.start_entry = self.builder.get_object('start_entry')
self.stop_entry = self.builder.get_object('stop_entry')
self.step_entry = self.builder.get_object('step_entry')
self.pulse_entry = self.builder.get_object('pulse_entry')
self.freq_entry = self.builder.get_object('freq_entry')
self.cyclic_checkbutton = self.builder.get_object('cyclic_checkbutton')
self.scans_entry = self.builder.get_object('scans_entry')
\ No newline at end of file
This diff is collapsed.
import zmq
import zmq.error
#signals
CONREQ = "0"
CONREP = "1"
STARTEXP = "10"
EXPFINISHED = "11"
INVAL_CMD = "99"
#States
RECV = 0
SEND = 1
class microdropConnection:
def __init__(self, port=6789):
self.port = port
self.connected = False
self.state = RECV
self.ctx = zmq.Context()
self.soc = zmq.Socket(self.ctx, zmq.REP)
self.soc.bind("".join(['tcp://*:',str(self.port)]))
def listen(self):
if self.state == SEND:
print "WAR: Microdrop Connection state invalid, resetting..."
self.reset()
self.__init__(self.port)
try:
message = self.soc.recv(flags=zmq.NOBLOCK, copy=True)
self.state = SEND
return (True, message)
except zmq.Again:
return (False, "")
def reply(self, data):
if self.state == RECV:
print "WAR: Microdrop Connection state invalid, resetting..."
self.reset()
self.__init__(self.port)
return False
self.state = RECV
self.soc.send(data)
return True
def reset(self):
self.soc.unbind("".join(['tcp://*:',str(self.port)]))
del self.soc
del self.ctx
#!/usr/bin/env python
import sys, serial, io
import numpy as np
import matplotlib
import gtk
from time import sleep
from collections import deque
from matplotlib import pyplot as plt
# class that holds analog data for N samples
class AnalogData:
# constr
def __init__(self):
self.ax = []
self.ay = []
self.first = 1
# add data
def add(self, data):
if self.first == 1:
self.first = 0
return
assert(len(data) == 2)
self.ax.append(data[0])
self.ay.append(data[1])
# clear data
def clear(self):
self.first = 1
self.ax = []
self.ay = []
# plot class
class AnalogPlot:
# constr
def __init__(self, analogData):
self.i = 0
# set plot to animated
plt.ion() #interactive mode on
plt.autoscale(True,True,True)
self.line = plt.plot(analogData.ax,analogData.ay)
# update plot
def update(self, analogData):
if self.i < 5:
self.i += 1
return
plt.setp(self.line,xdata=analogData.ax, ydata=analogData.ay)
ax = plt.gca()
# recompute the ax.dataLim
ax.relim()
# update ax.viewLim using the new dataLim
ax.autoscale_view()
plt.draw()
self.i=0
# main() function
def main():
# # expects 1 arg - serial port string
# if(len(sys.argv) != 2):
# print 'Example usage: python showdata.py "/dev/tty.usbmodem411"'
# exit(1)
#strPort = '/dev/tty.usbserial-A7006Yqh'
#strPort = sys.argv[1];
strPort = '/dev/cu.usbmodem12...E1'
# open serial port
ser = serial.Serial(strPort, 1024000,timeout=2)
sio = io.TextIOWrapper(io.BufferedRWPair(ser,ser,buffer_size=1),
newline = '\n',
line_buffering = True)
ser.write("ck")
# plot parameters
digiData = AnalogData()
digiPlot = AnalogPlot(digiData)
try:
while True:
output = raw_input('Commands:')
ser.flushInput() #clear input buffer
digiData.clear() #clear old data
ser.write(output)
print output
while True:
for line in ser:
print line
if line.lstrip().startswith("no"):
ser.flushInput()
break
if not (line.isspace() or line.lstrip().startswith('#')):
#print line
data = [float(val) for val in line.split()]
if(len(data) == 2):
digiData.add(data)
digiPlot.update(digiData)
break
# if not line.lstrip().startswith('#'):
# data = [float(val) for val in line.split()]
## if(len(data) == 2):
## analogData.add(data)
## analogPlot.update(analogData)
# block.append(line)
# print line
# print block
except KeyboardInterrupt:
print 'exiting'
# close serial
ser.flush()
ser.close()
# call main
if __name__ == '__main__':
main()
\ No newline at end of file
#!/usr/bin/env python
"""
show how to add a matplotlib FigureCanvasGTK or FigureCanvasGTKAgg widget to a
gtk.Window
"""
import gtk
from matplotlib.figure import Figure
from matplotlib import pyplot as plt
import numpy as np
#from matplotlib.backends.backend_gtkcairo import FigureCanvasGTKCairo as FigureCanvas
#from matplotlib.backends.backend_gtkcairo import NavigationToolbar2Cairo as NavigationToolbar
from matplotlib.backends.backend_gtkagg import FigureCanvasGTKAgg as FigureCanvas
from matplotlib.backends.backend_gtkagg import NavigationToolbar2GTKAgg as NavigationToolbar
class plotbox:
def __init__(self, plotwindow_instance):
self.figure = Figure()
self.figure.subplots_adjust(left=0.07, bottom=0.07, right=0.96, top=0.96)
self.axe1 = self.figure.add_subplot(111)
self.lines = self.axe1.plot([0,1], [0,1])
self.axe1.ticklabel_format(style='sci', scilimits=(0,3), useOffset=False, axis='y')
self.canvas = FigureCanvas(self.figure)
self.win = gtk.Window()
self.vbox = gtk.VBox()
self.win.add(self.vbox)
self.vbox.pack_start(self.canvas)
self.toolbar = NavigationToolbar(self.canvas, self.win)
self.vbox.pack_start(self.toolbar, False, False)
self.vbox.reparent(plotwindow_instance)
def clearall(self):
for i in self.lines:
i.remove()
self.lines = self.axe1.plot([0,1], [0,1])
def clearline(self, line_number):
self.lines[line_number].remove()
self.lines.pop(line_number)
def addline(self):
self.lines.append(self.axe1.plot([0,1], [0,1])[0])
def updateline(self, Experiment, line_number):
divisor = len(Experiment.data[1+line_number*2]) // 2000 + 1 #limits display to 2000 data points per line
self.lines[line_number].set_ydata(Experiment.data[1+line_number*2][1::divisor])
self.lines[line_number].set_xdata(Experiment.data[line_number*2][1::divisor])
def changetype(self, Experiment):
self.axe1.set_xlabel(Experiment.xlabel)
self.axe1.set_ylabel(Experiment.ylabel)
self.axe1.set_xlim(Experiment.xmin, Experiment.xmax)
self.figure.canvas.draw()
def redraw(self):
self.axe1.relim()
self.axe1.autoscale(True, axis = 'y')
self.figure.canvas.draw()
return True
from distutils.core import setup
from Cython.Build import cythonize
setup(
ext_modules = cythonize("*.pyx")
)
\ No newline at end of file
#!/usr/bin/env python
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Functions for analyzing data.
"""
import logging
import os
from numpy import mean, trapz
logger = logging.getLogger(__name__)
mod_dir = os.path.dirname(os.path.abspath(__file__))
class AnalysisOptions(object):
"""Analysis options window."""
def __init__(self, builder):
self.builder = builder
self.builder.add_from_file(
os.path.join(mod_dir, 'interface/analysis_options.glade'))
self.builder.connect_signals(self)
self.window = self.builder.get_object('analysis_dialog')
self.stats_button = self.builder.get_object('stats_button')
self.stats_start = self.builder.get_object('stats_start_spin')
self.stats_start_button = self.builder.get_object('stats_start_button')
self.stats_stop = self.builder.get_object('stats_stop_spin')
self.stats_stop_button = self.builder.get_object('stats_stop_button')
self.stats_button.connect('toggled',
self.on_button_toggled_hide,
[self.stats_stop,
self.stats_stop_button,
self.stats_start,
self.stats_start_button
]
)
self._params = {'stats_true':False,
'stats_start_true':False,
'stats_stop':0,
'stats_stop_true':False,
'stats_start':0
}
def show(self):
"""Show options window."""
self.window.run()
self.window.hide()
def on_button_toggled_hide(self, control, widgets):
"""Hide unchecked fields"""
active = control.get_active()
for widget in widgets:
widget.set_sensitive(active)
@property
def params(self):
"""Getter for analysis params"""
self._params['stats_true'] = self.stats_button.get_active()
self._params['stats_start_true'] = self.stats_start_button.get_active()
self._params['stats_start'] = self.stats_start.get_value()
self._params['stats_stop_true'] = self.stats_stop_button.get_active()
self._params['stats_stop'] = self.stats_stop.get_value()
return self._params
@params.setter
def params(self, params):
for key in self._params:
if key in params:
self._params[key] = params[key]
self.stats_button.set_active(self._params['stats_true'])
self.stats_start_button.set_active(self._params['stats_start_true'])
self.stats_start.set_value(self._params['stats_start'])
self.stats_stop_button.set_active(self._params['stats_stop_true'])
self.stats_stop.set_value(self._params['stats_stop'])
def do_analysis(experiment):
"""Takes an experiment class instance and runs selected analysis."""
experiment.analysis = {}
if experiment.parameters['stats_true']:
if (experiment.parameters['stats_start_true'] or
experiment.parameters['stats_stop_true']):
if experiment.parameters['stats_start_true']:
start = experiment.parameters['stats_start']
else:
start = min(experiment.data['data'][0][0])
if experiment.parameters['stats_stop_true']:
stop = experiment.parameters['stats_stop']
else:
stop = min(experiment.data['data'][0][0])
data = _data_slice(experiment.data['data'],
start,
stop
)
else:
data = experiment.data['data']
experiment.analysis.update(_summary_stats(data))
try:
x, y = experiment.data['ft'][0]
experiment.analysis['FT Integral'] = _integrateSpectrum(
x,
y,
float(experiment.parameters['sync_freq']),
float(experiment.parameters['fft_int'])
)
except KeyError:
pass
def _data_slice(data, start, stop):
"""Accepts data (as list of tuples of lists) and returns copy of data
between start and stop (in whatever x-axis units for the experiment type).
"""
output = []
for scan in range(len(data)):
t = []
for i in range(len(data[scan])):
t.append([])
output.append(tuple(t))
for i in range(len(data[scan][0])): # x-axis column
if data[scan][0][i] >= start or data[scan][0][i] <= stop:
for d in range(len(output[scan])):
output[scan][d].append(data[scan][d][i])
return output
def _summary_stats(data):
"""Takes data and returns summary statistics of first y variable as dict of
name, (scan, values).
"""
stats = {'min':[],'max':[], 'mean':[]}
for scan in range(len(data)):
stats['min'].append(
(scan, min(data[scan][1]))
)
stats['max'].append(
(scan, max(data[scan][1]))
)
stats['mean'].append(
(scan, mean(data[scan][1]))
)
return stats
def _integrateSpectrum(x, y, target, bandwidth):
"""
Returns integral of range of bandwidth centered on target.
"""
j = 0
k = len(x)
for i in range(len(x)):
if x[i] >= target-bandwidth/2:
j = i
break
for i in range(j,len(x)):
if x[i] >= target+bandwidth/2:
k = i
break
return [(0, trapz(y=y[j:k], x=x[j:k]))]
\ No newline at end of file
python .\main.py
# -*- mode: python -*-
a = Analysis(['interface_test.py'],
a = Analysis(['./main.py'],
pathex=['/Users/mdryden/src/dstat-interface2/dstatInterface'],
hiddenimports=[],
hookspath=None,
runtime_hooks=None)
glade_tree = Tree('./interface', prefix = 'interface', excludes=['*.py','*.pyc'])
drivers_tree = Tree('./drivers', prefix = 'drivers')
pyz = PYZ(a.pure)
exe = EXE(pyz,
a.scripts,
exclude_binaries=True,
name='interface_test.exe',
name='DStat',
debug=False,
strip=None,
upx=True,
console=True )
console=False )
coll = COLLECT(exe,
drivers_tree,
glade_tree,
a.binaries,
a.zipfiles,
a.datas,
strip=None,
upx=True,
name='interface_test')
name='DStat')
app = BUNDLE(coll,
name='DStat.app',
icon=None)
from . import comm
from . import dfu
from . import state
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2017 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, absolute_import, print_function, unicode_literals
import sys
import inspect
import logging
from pkg_resources import parse_version, parse_requirements
logger = logging.getLogger(__name__)
class BaseBoard(object):
pcb_version = 'x.x.x'
booster = False
def __init__(self):
self.max_freq = 5000
self.max_scans = 255
self.max_time = 65535
self.setup()
assert len(self.gain) == self.gain_settings
assert len(self.gain_labels) == self.gain_settings
if self.gain_trim is not None:
assert len(self.gain_trim) == self.gain_settings
def setup(self):
"""Override in subclasses to provide correct numbers"""
self.gain = [1, 1e2, 3e3, 3e4, 3e5, 3e6, 3e7, 1e8]
self.gain_labels = ["Bypass", "100 Ω (15 mA FS)", "3 kΩ (500 µA FS)",
"30 kΩ (50 µA FS)", "300 kΩ (5 µA FS)",
"3 MΩ (500 nA FS)", "30 MΩ (50 nA FS)",
"100 MΩ (15 nA FS)"
]
self.gain_trim = [None, 'r100_trim', 'r3k_trim',
'r30k_trim', 'r300k_trim', 'r3M_trim',
'r30M_trim', 'r100M_trim']
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
def test_mv(self, mv):
"""Return true if voltage in mV is in range."""
dac = float(mv)*self.re_voltage_scale/(3000./65536) + 32768
if 0 <= dac <= 65535:
return True
else:
return False
def test_freq(self, hz):
"""Return true if frequency in Hz is in range."""
return 0 < float(hz) < self.max_freq
def test_scans(self, n):
"""Return true if number of scans is valid."""
return 0 < int(n) < self.max_scans
def test_s(self, s):
"""Return true if time in integer seconds is valid."""
return 0 < int(s) < self.max_time
class V1_1Board(BaseBoard):
pcb_version = '1.1'
def setup(self):
self.gain = [1e2, 3e2, 3e3, 3e4, 3e5, 3e6, 3e7, 5e8]
self.gain_labels = [None, "300 Ω (5 mA FS)",
"3 kΩ (500 µA FS)", "30 kΩ (50 µA FS)",
"300 kΩ (5 µA FS)", "3 MΩ (500 nA FS)",
"30 MΩ (50 nA FS)", "500 MΩ (3 nA FS)"
]
self.gain_trim = None
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
class V1_2Board(BaseBoard):
pcb_version = '1.2'
def setup(self):
self.gain = [1, 1e2, 3e3, 3e4, 3e5, 3e6, 3e7, 1e8]
self.gain_labels = ["Bypass", "100 Ω (15 mA FS)", "3 kΩ (500 µA FS)",
"30 kΩ (50 µA FS)", "300 kΩ (5 µA FS)",
"3 MΩ (500 nA FS)", "30 MΩ (50 nA FS)",
"100 MΩ (15 nA FS)"
]
self.gain_trim = [None, 'r100_trim', 'r3k_trim',
'r30k_trim', 'r300k_trim', 'r3M_trim',
'r30M_trim', 'r100M_trim']
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
def __get_all_subclasses(cls):
all_subclasses = []
for subclass in cls.__subclasses__():
all_subclasses.append(subclass)
all_subclasses.extend(__get_all_subclasses(subclass))
return all_subclasses
def find_board(version, booster=False):
"""Returns highest compatible board class or None if none available."""
boards = __get_all_subclasses(BaseBoard)
candidates = []
for board in boards:
req = parse_requirements('dstat~={}'.format(board.pcb_version)).next()
if board.booster == booster and version in req:
candidates.append(board)
try:
picked = sorted(candidates,
key=lambda board: parse_version(board.pcb_version))[-1]
logger.info("Picked %s", picked)
return picked
except IndexError:
logger.warning("No matching board definition for ver: %s.", version)
return None
#!/usr/bin/env python
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import struct
import multiprocessing as mp
from collections import OrderedDict
import logging
from pkg_resources import parse_version
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, GObject
except ImportError:
print "ERR: GTK not available"
sys.exit(1)
import serial
from serial.tools import list_ports
from ..errors import InputError, VarError
logger = logging.getLogger(__name__)
dstat_logger = logging.getLogger("{}.DSTAT".format(__name__))
exp_logger = logging.getLogger("{}.Experiment".format(__name__))
from . import state
class AlreadyConnectedError(Exception):
def __init__(self):
super(AlreadyConnectedError, self).__init__(self,
"Serial instance already connected.")
class NotConnectedError(Exception):
def __init__(self):
super(NotConnectedError, self).__init__(self,
"Serial instance not connected.")
class ConnectionError(Exception):
def __init__(self):
super(ConnectionError, self).__init__(self,
"Could not connect.")
class TransmitError(Exception):
def __init__(self):
super(TransmitError, self).__init__(self,
"No reply received.")
def _serial_process(ser_port, proc_pipe, ctrl_pipe, data_pipe):
ser_logger = logging.getLogger("{}._serial_process".format(__name__))
connected = False
for i in range(5):
time.sleep(1) # Give OS time to enumerate
try:
ser = serial.Serial(ser_port, timeout=1)
# ser = serial.Serial(ser_port, timeout=1)
ser_logger.info("Connecting")
time.sleep(.5)
connected = True
except serial.SerialException:
pass
if connected is True:
break
try:
if ser.isOpen() is False:
ser_logger.info("Connection Error")
proc_pipe.send("SERIAL_ERROR")
return 1
except UnboundLocalError: # ser doesn't exist
ser_logger.info("Connection Error")
proc_pipe.send("SERIAL_ERROR")
return 1
ser.write('!0 ')
for i in range(10):
if ser.readline().rstrip()=="@ACK 0":
if ser.readline().rstrip()=="@RCV 0":
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write('!0 ')
time.sleep(.1)
while True:
# These can only be called when no experiment is running
if ctrl_pipe.poll():
ctrl_buffer = ctrl_pipe.recv()
if ctrl_buffer in ('a', "DISCONNECT"):
proc_pipe.send("ABORT")
try:
ser.write('a')
except serial.SerialException:
return 0
ser_logger.info("ABORT")
if ctrl_buffer == "DISCONNECT":
ser_logger.info("DISCONNECT")
ser.rts = False
ser._update_dtr_state() # Need DTR update on Windows
ser.close()
proc_pipe.send("DISCONNECT")
return 0
else:
ser.write(ctrl_buffer)
elif proc_pipe.poll():
while ctrl_pipe.poll():
ctrl_pipe.recv()
try:
return_code = proc_pipe.recv().run(ser, ctrl_pipe, data_pipe)
except serial.SerialException:
proc_pipe.send("DISCONNECT")
ser.rts = False
ser._update_dtr_state() # Need DTR update on Windows
ser.close()
return 0
ser_logger.info('Return code: %s', str(return_code))
proc_pipe.send(return_code)
else:
time.sleep(.1)
class SerialConnection(GObject.Object):
__gsignals__ = {
'connected': (GObject.SIGNAL_RUN_FIRST, None, ()),
'disconnected': (GObject.SIGNAL_RUN_FIRST, None, ())
}
def __init__(self):
super(SerialConnection, self).__init__()
self.connected = False
def connect(self, ser_port):
if self.connected is False:
self.proc_pipe_p, self.proc_pipe_c = mp.Pipe(duplex=True)
self.ctrl_pipe_p, self.ctrl_pipe_c = mp.Pipe(duplex=True)
self.data_pipe_p, self.data_pipe_c = mp.Pipe(duplex=True)
self.proc = mp.Process(target=_serial_process, args=(ser_port,
self.proc_pipe_c, self.ctrl_pipe_c,
self.data_pipe_c))
self.proc.start()
time.sleep(2)
if self.proc.is_alive() is False:
raise ConnectionError()
return False
self.connected = True
self.emit('connected')
return True
else:
raise AlreadyConnectedError()
return False
def assert_connected(self):
if self.connected is False:
raise NotConnectedError()
def start_exp(self, exp):
self.assert_connected()
self.proc_pipe_p.send(exp)
def stop_exp(self):
self.send_ctrl('a')
def get_proc(self, block=False):
self.assert_connected()
if block is True:
return self.proc_pipe_p.recv()
else:
if self.proc_pipe_p.poll() is True:
return self.proc_pipe_p.recv()
else:
return None
def get_ctrl(self, block=False):
self.assert_connected()
if block is True:
return self.ctrl_pipe_p.recv()
else:
if self.ctrl_pipe_p.poll() is True:
return self.ctrl_pipe_p.recv()
else:
return None
def get_data(self, block=False):
self.assert_connected()
if block is True:
return self.data_pipe_p.recv()
else:
if self.data_pipe_p.poll() is True:
return self.data_pipe_p.recv()
else:
return None
def flush_data(self):
self.assert_connected()
while self.data_pipe_p.poll() is True:
self.data_pipe_p.recv()
def send_ctrl(self, ctrl):
self.assert_connected()
self.ctrl_pipe_p.send(ctrl)
def disconnect(self):
logger.info("Disconnecting")
self.send_ctrl('DISCONNECT')
self.proc.join()
self.emit('disconnected')
self.connected = False
class VersionCheck(object):
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get version. Returns a tuple of
(major, minor). If no response, returns empty tuple.
Arguments:
ser_port -- address of serial port to use
"""
try:
ser.reset_input_buffer()
ser.write('!1\n')
for i in range(10):
if ser.readline().rstrip()=="@ACK 1":
ser.write('V\n')
if ser.readline().rstrip()=="@RCV 1":
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write('!1\n')
time.sleep(.1)
for line in ser:
dstat_logger.info(line.decode('utf-8'))
if line.startswith('V'):
input = line.lstrip('V')
elif line.startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
ser.reset_input_buffer()
break
pcb, sep, firmware = input.strip().rpartition('-')
if pcb == "":
pcb = firmware
firmware = False
logger.info("Your firmware does not support version detection.")
data_pipe.send((pcb, False))
else:
logger.info(
"Firmware Version: {}".format(
hex(int(firmware)).lstrip('0x')
)
)
data_pipe.send((
pcb,
hex(int(firmware)).lstrip('0x')
))
logger.info(
"PCB Version: {}".format(pcb)
)
status = "DONE"
except UnboundLocalError as e:
status = "SERIAL_ERROR"
except SerialException as e:
logger.error('SerialException: %s', e)
status = "SERIAL_ERROR"
finally:
return status
def version_check(ser_port):
"""Tries to contact DStat and get version. Stores version in state.
If no response, returns False, otherwise True.
Arguments:
ser_port -- address of serial port to use
"""
state.ser = SerialConnection()
state.ser.connect(ser_port)
state.ser.start_exp(VersionCheck())
result = state.ser.get_proc(block=True)
if result == "SERIAL_ERROR":
state.dstat_version = None
state.firmware_version = None
return False
else:
buffer = state.ser.get_data(block=True)
version, state.firmware_version = buffer
state.dstat_version = parse_version(version)
logger.debug("version_check done")
time.sleep(.1)
return True
class Settings(object):
def __init__(self, task, settings=None):
self.task = task
self.settings = settings
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get settings. Returns dict of
settings.
"""
self.ser = ser
if 'w' in self.task:
self.write()
if 'r' in self.task:
data_pipe.send(self.read())
status = "DONE"
return status
def read(self):
settings = OrderedDict()
self.ser.reset_input_buffer()
self.ser.write('!2\n')
for i in range(10):
if self.ser.readline().rstrip()=="@ACK 2":
self.ser.write('SR\n')
if self.ser.readline().rstrip()=="@RCV 2":
break
else:
time.sleep(.5)
self.ser.reset_input_buffer()
self.ser.write('!2\n')
time.sleep(.1)
for line in self.ser:
if line.lstrip().startswith('S'):
input = line.lstrip().lstrip('S')
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
self.ser.reset_input_buffer()
break
parted = input.rstrip().split(':')
for i in range(len(parted)):
settings[parted[i].split('.')[0]] = [i, parted[i].split('.')[1]]
return settings
def write_command(self, cmd, params=None, retry=5):
"""Write command to serial with optional number of retries."""
def get_reply(retries = 3):
while True:
reply = self.ser.readline().rstrip()
if reply.startswith('#'):
dstat_logger.info(reply)
elif reply == "":
retries -= 1
if retries <= 0:
raise TransmitError
else:
return reply
n = len(cmd)
if params is not None:
n_params = len(params)
for _ in range(retry):
tries = 5
while True:
time.sleep(0.2)
self.ser.reset_input_buffer()
self.ser.write('!{}\n'.format(n))
time.sleep(.1)
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@ACK {}".format(n):
logger.warning("Expected ACK got: {}".format(reply))
continue
tries = 5
while True:
self.ser.write('{}\n'.format(cmd))
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@RCV {}".format(n):
logger.warning("Expected RCV got: {}".format(reply))
continue
if params is None:
return True
tries = 5
while True:
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@RQP {}".format(n_params):
logger.warning("Expected RQP got: {}".format(reply))
continue
tries = 5
for i in params:
while True:
self.ser.write(i + " ")
try:
reply = get_reply()
if reply == "@RCVC {}".format(i):
break
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
return True
return False
def write(self):
write_buffer = range(len(self.settings))
for i in self.settings: # make sure settings are in right order
write_buffer[self.settings[i][0]] = self.settings[i][1]
to_write = " ".join(write_buffer) + " "
n = len(to_write)
logger.debug("to_write = %s", to_write)
if not self.write_command('SW' + to_write):
logger.error("Could not write command.")
def read_settings():
"""Tries to contact DStat and get settings. Returns dict of
settings.
"""
state.ser.flush_data()
state.ser.start_exp(Settings(task='r'))
state.settings = state.ser.get_data(block=True)
logger.info("Read settings from DStat")
logger.debug("read_settings: %s", state.ser.get_proc(block=True))
return
def write_settings():
"""Tries to write settings to DStat from global settings var.
"""
logger.debug("Settings to write: %s", state.settings)
state.ser.flush_data()
state.ser.start_exp(Settings(task='w', settings=state.settings))
logger.info("Wrote settings to DStat")
logger.debug("write_settings: %s", state.ser.get_proc(block=True))
return
class LightSensor:
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get light sensor reading. Returns uint of
light sensor clear channel.
"""
ser.reset_input_buffer()
ser.write('!')
while not ser.read()=="@":
self.ser.reset_input_buffer()
ser.write('!')
ser.write('T')
for line in ser:
if line.lstrip().startswith('T'):
input = line.lstrip().lstrip('T')
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
ser.reset_input_buffer()
break
parted = input.rstrip().split('.')
data_pipe.send(parted[0])
status = "DONE"
return status
def read_light_sensor():
"""Tries to contact DStat and get light sensor reading. Returns uint of
light sensor clear channel.
"""
state.ser.flush_data()
state.ser.start_exp(LightSensor())
logger.debug("read_light_sensor: %s", state.ser.get_proc(block=True))
return state.ser.get_data(block=True)
class SerialDevices(object):
"""Retrieves and stores list of serial devices in self.ports"""
def __init__(self):
self.ports = []
self.refresh()
def refresh(self):
"""Refreshes list of ports."""
try:
self.ports, _, _ = zip(*list_ports.grep("DSTAT"))
except ValueError:
self.ports = []
logger.error("No serial ports found")
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division,
print_function, unicode_literals)
import subprocess
import sys
import os
import time
import logging
from tempfile import mkdtemp
from zipfile import ZipFile
if sys.version_info >= (3,):
import urllib.request as urllib2
import urllib.parse as urlparse
else:
import urllib2
import urlparse
logger = logging.getLogger(__name__)
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
except ImportError:
print("ERR: GTK not available")
sys.exit(1)
import serial
from . import state
from .comm import dstat_logger, exp_logger
fwurl = "http://microfluidics.utoronto.ca/gitlab/api/v4/projects/4/jobs/artifacts/master/download?job=1.2.3&private_token=zkgSx1FaaTP7yLyFKkX6"
class FWDialog(object):
def __init__(self, parent, connect, stop_callback, disconnect_callback, signal='activate'):
self.parent = parent
self.stop = stop_callback
self.disconnect = disconnect_callback
connect.connect(signal, self.activate)
def activate(self, widget=None, data=None):
for name, result in assert_deps().items():
if result is not True:
logger.error("Can't use firmware update module.")
self.missing_deps()
return
self.stop() # Stop OCP
version_result, master = test_firmware_version()
if version_result is False:
self.git_error()
return
if version_result == 'latest':
message = "Your firmware is already up to date."
secondary = "Click yes to reflash firmware anyways."
elif version_result == 'devel':
message = "Your firmware is not on the master branch."
secondary = "You may have a development version. " +\
"Click yes to reflash firmware anyways."
elif version_result == 'old':
message = "Your firmware is out of date."
secondary = "Click yes to flash the latest firmware."
dialog = Gtk.MessageDialog(self.parent, 0, Gtk.MessageType.INFO,
Gtk.ButtonsType.YES_NO, message)
dialog.format_secondary_text(secondary)
dialog.get_content_area().add(
Gtk.Label(
label="Installed version: {}".format(state.firmware_version)))
dialog.get_content_area().add(
Gtk.Label(label="Latest version: {}".format(master)))
dialog.show_all()
response = dialog.run()
if response == Gtk.ResponseType.YES:
try:
download_fw()
except:
self.dl_error()
return
dstat_enter_dfu()
self.dfu_notice()
self.disconnect()
try:
dfu_program()
except:
self.dfu_error()
dialog.destroy()
else:
dialog.destroy()
def missing_deps(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Missing Dependencies")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def git_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Git Error")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def dl_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Download Error")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def dfu_notice(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.INFO,
Gtk.ButtonsType.OK, "Note about DFU")
dialog.format_secondary_text("Click OK once the DStat has connected in "
+ "DFU mode. Windows doesn't seem to like the automatic reboot. "
+ "Try holding down the reset button while plugging the "
+ 'USB port in (No LEDs should be lit), then click OK. Make sure '
+ 'the DFU driver from the dfu-programmer directory is installed.')
dialog.run()
dialog.destroy()
def dfu_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Could not update over DFU")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def destroy(self, widget=None, data=None):
widget.destroy()
def assert_deps():
deps = {'git' : 'git --version',
'dfu-programmer' : 'dfu-programmer --version'}
result = {}
for key, command in deps.items():
try:
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
result[key] = True
except subprocess.CalledProcessError:
logger.warning("{} is not available.".format(key))
result[key] = False
return result
def download_fw(): # from https://stackoverflow.com/a/16518224
temp_dir = mkdtemp()
logger.info("Temporary directory: {}".format(temp_dir))
os.chdir(temp_dir) # Go to temporary directory
u = urllib2.urlopen(fwurl)
scheme, netloc, path, query, fragment = urlparse.urlsplit(fwurl)
filename = os.path.basename(path)
if not filename:
filename = 'downloaded.file'
with open(filename, 'wb') as f:
meta = u.info()
meta_func = meta.getheaders if hasattr(meta, 'getheaders') else meta.get_all
meta_length = meta_func("Content-Length")
file_size = None
if meta_length:
file_size = int(meta_length[0])
logger.info("Downloading: {0} Bytes: {1}".format(fwurl, file_size))
file_size_dl = 0
block_sz = 8192
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_size_dl += len(buffer)
f.write(buffer)
status = "{0:16}".format(file_size_dl)
if file_size:
status += " [{0:6.2f}%]".format(file_size_dl * 100 / file_size)
status += chr(13)
logger.info(status)
with ZipFile(filename, mode='r') as z:
fw_path = z.extract('dstat-firmware.hex')
return fw_path
def test_firmware_version(current=None):
if current is None:
current = state.firmware_version
temp_dir = mkdtemp()
logger.info("Temporary directory: {}".format(temp_dir))
os.chdir(temp_dir) # Go to temporary directory
command = "git clone http://microfluidics.utoronto.ca/gitlab/dstat/dstat-firmware.git"
logger.info('Cloning master.')
try:
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
logger.error("git failed with error code {}".format(e.returncode))
logger.error("Output: {}".format(e.output))
return False, None
logger.info(output)
os.chdir("./dstat-firmware")
command = "git rev-parse --short master"
master = subprocess.check_output(command.split(), stderr=subprocess.STDOUT)
logger.info("Current master commit: {}".format(master))
command = "git merge-base --is-ancestor master {}".format(current)
test = subprocess.call(command.split())
if test == 0: # already newest
logger.info('Firmware is latest available.')
return 'latest', master
elif test == 1: # old version
logger.info('Firmware is out of date.')
return 'old', master
elif test == 128: # newer or different branch
logger.info('Firmware is not on the master branch.')
return 'devel', master
else:
logger.error('Unexpected git error. Git exited {}'.format(test))
return False, None
def dfu_program(path='./dstat-firmware.hex'):
"""Tries to program DStat over USB with DFU with hex file at path."""
try:
command = "dfu-programmer atxmega256a3u erase"
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
command = "dfu-programmer atxmega256a3u flash {}".format(path)
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
command = "dfu-programmer atxmega256a3u launch"
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
except subprocess.CalledProcessError as e:
logger.error("{} failed with output:".format(" ".join(e.cmd)))
logger.error(e.output)
raise
def dstat_enter_dfu():
"""Tries to contact DStat and get version. Stores version in state.
If no response, returns False, otherwise True.
Arguments:
ser_port -- address of serial port to use
"""
exp = DFUMode()
state.ser.start_exp(exp)
while True:
result = state.ser.get_proc(block=True)
if result in ('SERIAL_ERROR', 'DONE'):
break
logger.info(result)
# state.ser.disconnect()
time.sleep(.1)
return True
class DFUMode(object):
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get version. Returns a tuple of
(major, minor). If no response, returns empty tuple.
Arguments:
ser_port -- address of serial port to use
"""
status = None
try:
ser.write(b'!2\n')
exp_logger.info('!2')
for i in range(10):
if ser.readline().rstrip() == b"@ACK 2":
dstat_logger.info('@ACK 2')
ser.write(b'SF\n')
exp_logger.info('SF')
status = "DONE"
time.sleep(5)
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write(b'!2\n')
exp_logger.info('!2')
time.sleep(.1)
except UnboundLocalError as e:
status = "SERIAL_ERROR"
except serial.SerialException as e:
logger.error('SerialException: %s', e)
status = "SERIAL_ERROR"
finally:
return status
if __name__ == "__main__":
log_handler = logging.StreamHandler()
log_formatter = logging.Formatter(
fmt='%(asctime)s %(levelname)s: [%(name)s] %(message)s',
datefmt='%H:%M:%S'
)
log_handler.setFormatter(log_formatter)
logger.setLevel(level=logging.INFO)
logger.addHandler(log_handler)
dstat_enter_dfu()
time.sleep(2)
dfu_program(sys.argv[1])
\ No newline at end of file
from collections import OrderedDict
def reset():
settings = OrderedDict()
ser = None
dstat_version = None
firmware_version = None
board_instance = None
settings = OrderedDict()
ser = None
dstat_version = None
firmware_version = None
board_instance = None
\ No newline at end of file