Skip to content
Snippets Groups Projects
Unverified Commit 7a73a69f authored by Michael DM Dryden's avatar Michael DM Dryden
Browse files

Added docstrings, fixed code conventions

parent 540983cb
Branches
Tags
No related merge requests found
#!/usr/bin/env python
import serial, io, time, struct, sys, os
from types import *
import serial
from serial.tools import list_ports
import numpy as np
import time
import struct
import multiprocessing as mp
from Queue import Empty
def call_it(instance, name, args=(), kwargs=None):
"indirect caller for instance methods and multiprocessing"
"""Indirect caller for instance methods and multiprocessing.
Arguments:
instance -- instance to which the method belongs
name -- method to call
args -- passed to method
kwargs -- passed to method
"""
if kwargs is None:
kwargs = {}
return getattr(instance, name)(*args, **kwargs)
class delayedSerial(serial.Serial): #overrides normal serial write so that characters are output individually with a slight delay
class delayedSerial(serial.Serial):
"""Extends Serial.write so that characters are output individually
with a slight delay
"""
def write(self, data):
for i in data:
serial.Serial.write(self, i)
time.sleep(.001)
class SerialDevices:
"""Retrieves and stores list of serial devices in self.ports"""
def __init__(self):
try:
self.ports, _, _ = zip(*list_ports.comports())
......@@ -29,22 +39,33 @@ class SerialDevices:
print "No serial ports found"
def refresh(self):
"""Refreshes list of ports."""
self.ports, _, _ = zip(*list_ports.comports())
class Experiment:
"""Store and acquire a potentiostat experiment. Meant to be subclassed
to by different experiment types and not used instanced directly.
"""
def run_wrapper(self, *argv):
self.p = mp.Process(target=call_it, args=(self, 'run', argv))
self.p.start()
"""Execute experiment indirectly using call_it to bypass lack of fork()
on Windows for multiprocessing.
"""
self.proc = mp.Process(target=call_it, args=(self, 'run', argv))
self.proc.start()
def __init__(self): #will always be overriden, but self.parameters, self.viewparameters, and self.databytes should be defined
pass
def __init__(self, parameters, main_pipe):
"""Must be overridden to define self.parameters, and self.databytes."""
self.parameters = parameters
self.main_pipe = main_pipe
self.databytes = 8
def init(self):
self.data_extra = [] #must be defined even when not needed
"""Adds commands for gain and ADC."""
self.data_extra = [] # must be defined even when not needed
self.__gaintable = [1e2, 3e2, 3e3, 3e4, 3e5, 3e6, 3e7, 5e8]
self.gain = self.__gaintable[int(self.parameters['gain'])]
self.commands = ["A","G"]
self.commands = ["A", "G"]
self.commands[0] += (self.parameters['adc_buffer'])
self.commands[0] += " "
......@@ -55,8 +76,15 @@ class Experiment:
self.commands[1] += (self.parameters['gain'])
self.commands[1] += " "
def run(self, strPort):
self.serial = delayedSerial(strPort, 1024000, timeout=1)
def run(self, ser_port):
"""Execute experiment. Connects and sends handshake signal to DStat
then sendsself.commands. Don't call directly as a process in Windows,
use run_wrapper instead.
Arguments:
ser_port -- address of serial port to use
"""
self.serial = delayedSerial(ser_port, 1024000, timeout=1)
self.serial.write("ck")
self.serial.flushInput()
......@@ -77,6 +105,10 @@ class Experiment:
self.main_pipe.close()
def serial_handler(self):
"""Handles incoming serial transmissions from DStat. Returns False
if stop button pressed and sends abort signal to instrument. Sends
data to self.main_pipe as result of self.data_handler).
"""
scan = 0
while True:
if self.main_pipe.poll():
......@@ -87,7 +119,8 @@ class Experiment:
for line in self.serial:
if line.startswith('B'):
self.main_pipe.send(self.data_handler((scan, self.serial.read(size=self.databytes))))
self.main_pipe.send(self.data_handler(
(scan, self.serial.read(size=self.databytes))))
elif line.startswith('S'):
scan += 1
elif line.startswith("#"):
......@@ -98,22 +131,31 @@ class Experiment:
return True
def data_handler(self, input):
scan, data = input
def data_handler(self, data_input):
"""Takes data_input as tuple -- (scan, data).
Returns:
(scan number, [voltage, current]) -- voltage in mV, current in A
"""
scan, data = data_input
voltage, current = struct.unpack('<Hl', data) #uint16 + int32
return (scan, [(voltage-32768)*3000./65536, current*(1.5/self.gain/8388607)])
return (scan,
[(voltage-32768)*3000./65536, current*(1.5/self.gain/8388607)])
def data_postprocessing(self):
"""No data postprocessing done by default, can be overridden
in subclass.
"""
pass
class chronoamp(Experiment):
"""Chronoamperometry experiment"""
def __init__(self, parameters, main_pipe):
self.main_pipe = main_pipe
self.parameters = parameters
self.datatype = "linearData"
self.xlabel = "Time (s)"
self.ylabel = "Current (A)"
self.data = [[],[]]
self.data = [[], []]
self.datalength = 2
self.databytes = 8
self.xmin = 0
......@@ -122,7 +164,7 @@ class chronoamp(Experiment):
for i in self.parameters['time']:
self.xmax += int(i)
self.init() #need to call after xmin and xmax are set
self.init() # need to call after xmin and xmax are set
self.commands += "R"
self.commands[2] += str(len(self.parameters['potential']))
......@@ -134,12 +176,16 @@ class chronoamp(Experiment):
self.commands[2] += str(i)
self.commands[2] += " "
def data_handler(self, input): #overrides inherited method to not convert x axis
scan, data = input
seconds, milliseconds, current = struct.unpack('<HHl', data) #2*uint16 + int32
return (scan, [seconds+milliseconds/1000., current*(1.5/self.gain/8388607)])
def data_handler(self, data_input):
"""Overrides Experiment method to not convert x axis to mV."""
scan, data = data_input
# 2*uint16 + int32
seconds, milliseconds, current = struct.unpack('<HHl', data)
return (scan,
[seconds+milliseconds/1000., current*(1.5/self.gain/8388607)])
class lsv_exp(Experiment):
"""Linear Scan Voltammetry experiment"""
def __init__(self, parameters, send_pipe):
self.main_pipe = send_pipe
self.parameters = parameters
......@@ -147,22 +193,24 @@ class lsv_exp(Experiment):
self.datatype = "linearData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
self.data = [[],[]]
self.data = [[], []]
self.datalength = 2
self.databytes = 6 #uint16 + int32
self.databytes = 6 # uint16 + int32
self.xmin = self.parameters['start']
self.xmax = self.parameters['stop']
self.init() #need to call after xmin and xmax are set
self.init() # need to call after xmin and xmax are set
self.commands += "L"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(self.parameters['clean_mV']*(65536./3000)+32768))
self.commands[2] += str(int(self.parameters['clean_mV']*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(int(self.parameters['dep_mV']*(65536./3000)+32768))
self.commands[2] += str(int(self.parameters['dep_mV']*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(self.parameters['start'])
self.commands[2] += " "
......@@ -172,6 +220,7 @@ class lsv_exp(Experiment):
self.commands[2] += " "
class cv_exp(Experiment):
"""Cyclic Voltammetry experiment"""
def __init__(self, parameters, main_pipe):
self.main_pipe = main_pipe
self.parameters = parameters
......@@ -179,9 +228,9 @@ class cv_exp(Experiment):
self.datatype = "CVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
self.data = [[],[]] #Will have to alter data_handler to add new lists as needed
self.datalength = 2 * self.parameters['scans'] #x and y for each scan
self.databytes = 6 #uint16 + int32
self.data = [[], []]
self.datalength = 2 * self.parameters['scans'] # x and y for each scan
self.databytes = 6 # uint16 + int32
self.xmin = self.parameters['v1']
self.xmax = self.parameters['v2']
......@@ -192,9 +241,11 @@ class cv_exp(Experiment):
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(self.parameters['clean_mV']*(65536./3000)+32768))
self.commands[2] += str(int(self.parameters['clean_mV']*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(int(self.parameters['dep_mV']*(65536./3000)+32768))
self.commands[2] += str(int(self.parameters['dep_mV']*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(self.parameters['v1'])
self.commands[2] += " "
......@@ -206,13 +257,9 @@ class cv_exp(Experiment):
self.commands[2] += " "
self.commands[2] += str(self.parameters['slope'])
self.commands[2] += " "
def data_handler(self, input):
scan, data = input
voltage, current = struct.unpack('<Hl', data) #uint16 + int32
return (scan, [(voltage-32768)*3000./65536, current*(1.5/self.gain/8388607)])
class swv_exp(Experiment):
"""Square Wave Voltammetry experiment"""
def __init__(self, parameters, main_pipe):
self.main_pipe = main_pipe
self.parameters = parameters
......@@ -220,7 +267,7 @@ class swv_exp(Experiment):
self.datatype = "SWVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
self.data = [[],[]] #only difference stored here
self.data = [[], []] # only difference stored here
self.datalength = 2 * self.parameters['scans']
self.databytes = 10
......@@ -228,16 +275,20 @@ class swv_exp(Experiment):
self.xmax = self.parameters['stop']
self.init()
self.data_extra = [[],[]] #forward/reverse stored here - needs to be after self.init to keep from being redefined
# forward/reverse stored here - needs to be after
# self.init to keep from being redefined
self.data_extra = [[], []]
self.commands += "S"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(self.parameters['clean_mV']*(65536./3000)+32768))
self.commands[2] += str(int(self.parameters['clean_mV']*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(int(self.parameters['dep_mV']*(65536./3000)+32768))
self.commands[2] += str(int(self.parameters['dep_mV']*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(self.parameters['start'])
self.commands[2] += " "
......@@ -252,21 +303,28 @@ class swv_exp(Experiment):
self.commands[2] += str(self.parameters['scans'])
self.commands[2] += " "
def data_handler(self, input):
scan, data = input
voltage, forward, reverse = struct.unpack('<Hll', data) #uint16 + int32
return (scan, [(voltage-32768)*3000./65536, (forward-reverse)*(1.5/self.gain/8388607), forward*(1.5/self.gain/8388607), reverse*(1.5/self.gain/8388607)])
def data_handler(self, input_data):
"""Overrides Experiment method to calculate difference current"""
scan, data = input_data
# uint16 + int32
voltage, forward, reverse = struct.unpack('<Hll', data)
return (scan, [(voltage-32768)*3000./65536,
(forward-reverse)*(1.5/self.gain/8388607),
forward*(1.5/self.gain/8388607),
reverse*(1.5/self.gain/8388607)])
class dpv_exp(swv_exp):
"""Diffential Pulse Voltammetry experiment."""
def __init__(self, parameters, main_pipe):
"""Overrides swv_exp method"""
self.main_pipe = main_pipe
self.parameters = parameters
self.datatype = "SWVData"
self.xlabel = "Voltage (mV)"
self.ylabel = "Current (A)"
self.data = [[],[]] #only difference stored here
self.data = [[], []] # only difference stored here
self.datalength = 2
self.databytes = 10
......@@ -274,16 +332,20 @@ class dpv_exp(swv_exp):
self.xmax = self.parameters['stop']
self.init()
self.data_extra = [[],[]] #forward/reverse stored here - needs to be after self.init to keep from being redefined
# forward/reverse stored here - needs to be after self.init to
# keep from being redefined
self.data_extra = [[], []]
self.commands += "D"
self.commands[2] += str(self.parameters['clean_s'])
self.commands[2] += " "
self.commands[2] += str(self.parameters['dep_s'])
self.commands[2] += " "
self.commands[2] += str(int(self.parameters['clean_mV']*(65536./3000)+32768))
self.commands[2] += str(int(self.parameters['clean_mV']*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(int(self.parameters['dep_mV']*(65536./3000)+32768))
self.commands[2] += str(int(self.parameters['dep_mV']*
(65536./3000)+32768))
self.commands[2] += " "
self.commands[2] += str(self.parameters['start'])
self.commands[2] += " "
......
......@@ -20,8 +20,10 @@ class chronoamp:
self.cell_renderer = gtk.CellRendererText()
self.treeview.insert_column_with_attributes(-1, "Time", self.cell_renderer, text=1).set_expand(True)
self.treeview.insert_column_with_attributes(-1, "Potential", self.cell_renderer, text=0).set_expand(True)
self.treeview.insert_column_with_attributes(-1, "Time",
self.cell_renderer, text=1).set_expand(True)
self.treeview.insert_column_with_attributes(-1, "Potential",
self.cell_renderer, text=0).set_expand(True)
self.treeviewselection = self.treeview.get_selection()
self.treeviewselection.set_mode(gtk.SELECTION_MULTIPLE)
......@@ -46,7 +48,8 @@ class chronoamp:
self.statusbar.push(0, str(e))
def on_remove_button_clicked(self, widget):
self.selected_rows = list(self.treeviewselection.get_selected_rows()[1]) #returns 2-tuple: treemodel, list of paths selected rows
# returns 2-tuple: treemodel, list of paths of selected rows
self.selected_rows = list(self.treeviewselection.get_selected_rows()[1])
self.referencelist = []
......
......@@ -7,7 +7,9 @@ from datetime import datetime
def manSave(current_exp):
exp = current_exp
fcd = gtk.FileChooserDialog("Save...", None, gtk.FILE_CHOOSER_ACTION_SAVE, (gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL, gtk.STOCK_SAVE, gtk.RESPONSE_OK))
fcd = gtk.FileChooserDialog("Save...", None, gtk.FILE_CHOOSER_ACTION_SAVE,
(gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL,
gtk.STOCK_SAVE, gtk.RESPONSE_OK))
filters = [gtk.FileFilter()]
filters[0].set_name("NumPy binary (.npy)")
......@@ -37,7 +39,10 @@ def manSave(current_exp):
fcd.destroy()
def plotSave(plot):
fcd = gtk.FileChooserDialog("Save Plot…", None, gtk.FILE_CHOOSER_ACTION_SAVE, (gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL, gtk.STOCK_SAVE, gtk.RESPONSE_OK))
fcd = gtk.FileChooserDialog("Save Plot…", None,
gtk.FILE_CHOOSER_ACTION_SAVE,
(gtk.STOCK_CANCEL, gtk.RESPONSE_CANCEL,
gtk.STOCK_SAVE, gtk.RESPONSE_OK))
filters = [gtk.FileFilter()]
filters[0].set_name("Portable Document Format (.pdf)")
......@@ -65,7 +70,7 @@ def plotSave(plot):
if not path.endswith(".png"):
path += ".png"
plot.figure.savefig(path) #savefig determines format from file extension
plot.figure.savefig(path) # determines format from file extension
fcd.destroy()
elif response == gtk.RESPONSE_CANCEL:
......
......@@ -15,17 +15,28 @@ RECV = 0
SEND = 1
class microdropConnection:
"""Manages microdrop connection over TCP with zmq"""
def __init__(self, port=6789):
"""Create zmq context and bind to port. Should be called manually
to reinitialize if reset is called.
Keyword arguments:
port -- the TCP to bind to on localhost
"""
self.port = port
self.connected = False
self.state = RECV
self.ctx = zmq.Context()
self.soc = zmq.Socket(self.ctx, zmq.REP)
self.soc.bind("".join(['tcp://*:',str(self.port)]))
self.soc.bind("".join(['tcp://*:', str(self.port)]))
def listen(self):
"""Perform non-blocking recv on zmq port. self.state must be RECV.
Returns a tuple:
[0] -- True if a message was received, False otherwise.
[1] -- The recieved message or "" if no message received.
"""
if self.state == SEND:
print "WAR: Microdrop Connection state invalid, resetting..."
self.reset()
......@@ -38,6 +49,11 @@ class microdropConnection:
return (False, "")
def reply(self, data):
"""Sends a reply on zmq port. self.state must be SEND.
Arguments:
data -- a str to be sent
"""
if self.state == RECV:
print "WAR: Microdrop Connection state invalid, resetting..."
self.reset()
......@@ -48,6 +64,7 @@ class microdropConnection:
return True
def reset(self):
self.soc.unbind("".join(['tcp://*:',str(self.port)]))
"""Reset zmq interface. Must call __init__ again to reinitialize."""
self.soc.unbind("".join(['tcp://*:', str(self.port)]))
del self.soc
del self.ctx
#!/usr/bin/env python
"""
show how to add a matplotlib FigureCanvasGTK or FigureCanvasGTKAgg widget to a
gtk.Window
"""
Creates data plot.
"""
import gtk
from matplotlib.figure import Figure
from matplotlib import pyplot as plt
import numpy as np
#from matplotlib.backends.backend_gtkcairo import FigureCanvasGTKCairo as FigureCanvas
#from matplotlib.backends.backend_gtkcairo import NavigationToolbar2Cairo as NavigationToolbar
from matplotlib.backends.backend_gtkagg import FigureCanvasGTKAgg as FigureCanvas
from matplotlib.backends.backend_gtkagg import NavigationToolbar2GTKAgg as NavigationToolbar
#from matplotlib.backends.backend_gtkcairo\
# import FigureCanvasGTKCairo as FigureCanvas
#from matplotlib.backends.backend_gtkcairo\
# import NavigationToolbar2Cairo as NavigationToolbar
from matplotlib.backends.backend_gtkagg \
import FigureCanvasGTKAgg as FigureCanvas
from matplotlib.backends.backend_gtkagg \
import NavigationToolbar2GTKAgg as NavigationToolbar
class plotbox:
"""Contains main data plot and associated methods."""
def __init__(self, plotwindow_instance):
"""Creates plot and moves it to a gtk container.
Arguments:
plotwindow_instance -- gtk container to hold plot.
"""
self.figure = Figure()
self.figure.subplots_adjust(left=0.07, bottom=0.07, right=0.96, top=0.96)
self.figure.subplots_adjust(left=0.07, bottom=0.07,
right=0.96, top=0.96)
self.axe1 = self.figure.add_subplot(111)
self.lines = self.axe1.plot([0,1], [0,1])
self.lines = self.axe1.plot([0, 1], [0, 1])
self.axe1.ticklabel_format(style='sci', scilimits=(0,3), useOffset=False, axis='y')
self.axe1.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
self.canvas = FigureCanvas(self.figure)
self.win = gtk.Window()
......@@ -33,24 +43,36 @@ class plotbox:
self.vbox.reparent(plotwindow_instance)
def clearall(self):
"""Remove all lines on plot. """
for i in self.lines:
i.remove()
self.lines = self.axe1.plot([0,1], [0,1])
self.lines = self.axe1.plot([0, 1], [0, 1])
def clearline(self, line_number):
"""Remove a line specified by line_number."""
self.lines[line_number].remove()
self.lines.pop(line_number)
def addline(self):
self.lines.append(self.axe1.plot([0,1], [0,1])[0])
"""Add a new line to plot. (initialized with dummy data)))"""
self.lines.append(self.axe1.plot([0, 1], [0, 1])[0])
def updateline(self, Experiment, line_number):
divisor = len(Experiment.data[1+line_number*2]) // 2000 + 1 #limits display to 2000 data points per line
"""Update a line specified by line_number with data stored in
the Experiment instance.
"""
# limits display to 2000 data points per line
divisor = len(Experiment.data[1+line_number*2]) // 2000 + 1
self.lines[line_number].set_ydata(Experiment.data[1+line_number*2][1::divisor])
self.lines[line_number].set_xdata(Experiment.data[line_number*2][1::divisor])
self.lines[line_number].set_ydata(
Experiment.data[1+line_number*2][1::divisor])
self.lines[line_number].set_xdata(
Experiment.data[line_number*2][1::divisor])
def changetype(self, Experiment):
"""Change plot type. Set axis labels and x bounds to those stored
in the Experiment instance.
"""
self.axe1.set_xlabel(Experiment.xlabel)
self.axe1.set_ylabel(Experiment.ylabel)
self.axe1.set_xlim(Experiment.xmin, Experiment.xmax)
......@@ -58,6 +80,7 @@ class plotbox:
self.figure.canvas.draw()
def redraw(self):
"""Autoscale and refresh the plot."""
self.axe1.relim()
self.axe1.autoscale(True, axis = 'y')
self.figure.canvas.draw()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment