Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Commits on Source (174)
Showing
with 1793 additions and 92 deletions
......@@ -13,3 +13,5 @@ setup.py
*.egg-info
last_params
last_params.yml
\.idea/
File moved
Version 1.0.2
-Improved logging system: Log messages now print showing where they came from.
-Implemented gobject IO callbacks for experiments:
Process will not continuously poll for new data from serial process anymore.
-Stop button works again
-Buttons in Photodiode and Calibration modules remain insensitive until ready.
Version 1.0.2a
-Hotfix #12: Restored measurement ability on Windows
-Minor logging changes
-Automatically enable TCS on DStat when measure light sensor button clicked.
Version 1.4.6
-Fixed data output for SWV/DPV forward/reverse current
-Working progress bars
-Refactored a lot of plotting code
-Calibration should work properly now
Version 1.0.3
-Fixed #14: Added support for PMT idle mode
-Reduced CPU usage when running OCP by reducing polling frequency
Version 1.4.5
-Made board definitions modular
-Fix several bugs with experiment parameters
-Uses DAC unit based parameters now (**REQUIRES dstat-firmware@9e4a9f or higher**)
-Change package import structure again (main must always be run as module now)
-Workaround for weird Gtk+3 redrawing bug on Windows
Version 1.0.4
-Adds support for synchronous electromechanical shutter detection (added in dstat/dstat-firmware@29d4c86)
-New version string system
Version 1.4.4
-Make connection code more robust
-Execute button disabled until DStat is ready to start
-Supports new firmware version strings added in dstat-firmware@c5f9701
-Experimental firmware upgrade tool (see DStat Menu)
-Fix many bugs
Version 1.0.5
-Bugfix for saving error introduced by new logging system
Version 1.0.6
-Automatically integrates shutter FFT peak and saves to data file
-Adds option to offset start of FFT to avoid PMT startup delay
Version 1.4.3
-Fix another critical bug with Windows multiprocessing
-Allow normal exit even if DStat was never connected
-Store last parameters in user folder
Version 1.4.2
-Refactor to fix critical bug preventing running packaged versions.
Version 1.4.1
-Fixed voltage axis orientation for LSV, CV, SWV, and DPV (Thanks to Dan Bizzotto @ UBC)
-Tweaked paver files to make version detection work without git.
Version 1.4
-Switched to GTK+3
-Support new DStat communications protocol (requires dstat-firmware>fe50c38)
-Many behind-the-scenes changes to improve code readability and make adding new experiment types easier
-Documented new Anaconda packages
Version 1.3.3
-Bugfix #24: Remove ZODB support until it can be fixed for latest ZODB
Version 1.3.2
-Improves initial connection reliability
Version 1.3.1
-Fixed electrochem modes broken when database added
-Make metadata keys optional.
Version 1.0.7
-Fixed a few bugs for systems without git
-Implements mean crossing detection instead of windowing for shutter FFT
Version 1.3
-Fixed a bug related to calibration mode
-Added ZODB data storage
-Integrated with zmq_plugin
Version 1.2
-Old Microdrop interface depreciated
-New zmq_plugin based interface
-Internal changes to save functionality and plot storage.
Version 1.1.3
-Changed internal storage of experiment data
-Added Analysis options:
-FFT integral moved there
-Basic statistics
Version 1.1.2
-Fixed more critical bugs from refactoring
Version 1.1.1
-Fixed critical bug that made PGA setting change with Gain resistor
Version 1.1
-Plot will be prettier if seaborn is installed
-Fixed bug in shutter FFT display
......@@ -36,15 +75,35 @@ Version 1.1
-Adds requirement for yaml
-Parameters automatically saved and loaded from last session
-Can manually save and load parameter files
Version 1.0.7
-Fixed a few bugs for systems without git
-Implements mean crossing detection instead of windowing for shutter FFT
Version 1.1.1
-Fixed critical bug that made PGA setting change with Gain resistor
Version 1.1.2
-Fixed more critical bugs from refactoring
Version 1.0.6
-Automatically integrates shutter FFT peak and saves to data file
-Adds option to offset start of FFT to avoid PMT startup delay
Version 1.1.3
-Changed internal storage of experiment data
-Added Analysis options:
-FFT integral moved there
-Basic statistics
\ No newline at end of file
Version 1.0.5
-Bugfix for saving error introduced by new logging system
Version 1.0.4
-Adds support for synchronous electromechanical shutter detection (added in dstat/dstat-firmware@29d4c86)
-New version string system
Version 1.0.3
-Fixed #14: Added support for PMT idle mode
-Reduced CPU usage when running OCP by reducing polling frequency
Version 1.0.2a
-Hotfix #12: Restored measurement ability on Windows
-Minor logging changes
-Automatically enable TCS on DStat when measure light sensor button clicked.
Version 1.0.2
-Improved logging system: Log messages now print showing where they came from.
-Implemented gobject IO callbacks for experiments:
Process will not continuously poll for new data from serial process anymore.
-Stop button works again
-Buttons in Photodiode and Calibration modules remain insensitive until ready.
include RELEASE-VERSION
include version.py
include setup.py
include main.py
include paver-minilib.zip
include LICENSE
include CHANGELOG
include README.markdown
include core/utils/RELEASE-VERSION
recursive-include dstat_interface *
recursive-exclude dstat_interface *.pyc
recursive-exclude dstat_interface *~
recursive_exclude core last_params.yml
recursive-exclude . .DS_Store
\ No newline at end of file
##### _DStat is described in detail in [Dryden MDM, Wheeler AR (2015) DStat: A Versatile, Open-Source Potentiostat for Electroanalysis and Integration. PLoS ONE 10(10): e0140349. doi: 10.1371/journal.pone.0140349](http://journals.plos.org/plosone/article?id=10.1371/journal.pone.0140349) If you use this information in published work, please cite accordingly._
---
## Python 2.7 is now discontinued and gtk has always been a pain for cross-platform use, so I am in the process of writing a new interface in Python 3 and Qt that I hope will be working before too long.
This is the documentation for the DStat interface software.
The DStat interface is written primarily in Python and runs on Linux, Mac, and Windows.
It is the main method for running experiments on the DStat, controlling experimental parameters and collecting and plotting data.
......@@ -9,68 +11,123 @@ It currently has no abilities for analyzing recorded data or opening previously
## Table of Contents:
1. [Installation](#Installation)
1. [Prerequisites](#Prerequisites)
1. [MacOS](#macos)
1. [Using Anaconda (Recommended)](#using-anaconda-recommended)
2. [Old Homebrew Instructions](#old-homebrew-instructions)
1. [Linux](#linux)
2. [Windows](#windows)
3. [Upgrading](#upgrading)
2. [Getting Started](#Getting-Started)
# Introduction
The DStat interface is written primarily in Python and runs on Linux, Mac, and Windows.
It is the main method for running experiments on the DStat, controlling experimental parameters and collecting and plotting data.
It currently has no abilities for analyzing recorded data or opening previously saved data files, but data is saved in a simple text format or numpy-compatible binary format and plots can be saved as images.
# Installation
Unfortunately, due to the python packages used, dstat-interface is difficult to make into a single self-contained package, so for the time being, the simplest way to run it is to install a python distribution. dstat-interface itself, therefore, requires no installation and can be run from any directory by executing `/dstat-interface/main.py` with python.
It currently has no abilities for analyzing recorded data or opening previously saved data files, but data is saved in a simple text format and plots can be saved as images.
## Prerequisites
# Installation
Python and related packages needed: (versions listed are tested, older versions may still work)
dstat-interface has moved to gtk+3 and we now recommend Anaconda/Miniconda for installation.
* Python (2.7.9)
* matplotlib (1.4.3—compiled with gtk backend)
* numpy (1.9.2)
* py2cairo (1.10.0)
* pyserial (2.7)
* pygtk (2.24.0)
* pygobject (2.28.6)
* XQuartz (2.7.7)
* zeromq (4.0.5) and pyzmq (14.6.0)
* pyyaml (3.11)
## MacOS
### Using Anaconda (Recommended)
Optional:
1. [Install Miniconda](https://repo.continuum.io/miniconda/Miniconda2-latest-MacOSX-x86_64.sh) It doesn't matter if you pick Python 2.7 or 3.5—this just sets Miniconda's default Python. (Skip if Miniconda or Anaconda are already installed)
2. Download the [conda env file](conda-env.yml).
3. In the terminal, create the dstat environment (replacing </path/to/conda-env.yml> with the actual path to the file on your computer):
````conda env create -f </path/to/conda-env.yml>````
* seaborn (0.7.0)—Makes prettier plots if available
3. Then to run dstat-interface:
````source activate dstat
python -m dstat_interface.main````
### Mac OS X
#### Old Homebrew Instructions
The easiest way to get most of the necessary requirements to run dstat-interface is using [Homebrew](http://brew.sh):
brew tap homebrew/python
brew update
brew install python pygtk pygobject py2cairo zeromq
brew install matplotlib --with-pygtk
```shell
brew tap homebrew/python
brew update
brew install python gobject-introspection gtk+3 pygobject3 py2cairo scipy zeromq
brew install matplotlib --with-pygtk
```
Be patient on the last step—matplotlib needs to be compiled and may take 2 or 3 minutes.
Make sure you're using brew-installed python, not OS X's default python. `which python` should point to `/usr/local/bin/python` not `/usr/bin/python`. Type `brew doctor` for more information if you are having issues.
The interface runs in X11 using the GTK+ toolkit, so [XQuartz](http://xquartz.macosforge.org/landing/) needs to be installed.
The final requirements, can be installed using python's pip system:
pip install pyserial pyzmq pyyaml seaborn
pip install pandas pyserial pyzmq pyyaml seaborn zmq-plugin
### Linux
Linux prerequisite installation is similar to that of Mac OS X, only using your distribution's native package manager rather than Homebrew, and X11 will likely be installed already. Some distributions may not have packages available for installing matplotlib or numpy, in which case, they should be installed using pip.
## Linux
Linux prerequisite installation is similar to that of MacOS with Homebrew, only using your distribution's native package manager rather than Homebrew, and X11 will likely be installed already. Some distributions may not have packages available for installing matplotlib or numpy, in which case, they should be installed using pip.
The final requirements, can be installed using python's pip system:
These instructions were tested on Ubuntu 17.04:
````shell
sudo apt-get install gobject-introspection python-gobject python-pip
pip install dstat-interface
````
You will need to add your user to the `dialout` group to access virtual serial ports (replace <user> with your username):
```shell
sudo usermod -a -G dialout <user>
```
## Windows
The following terminal commands will result in a full installation of dstat-interface and its requirements, assuming [64-bit Miniconda][1] is installed:
```shell
conda create -n dstat -c mdryden python=2 dstat-interface
activate dstat
```
To finish the installation, GTK+3 and its Python bindings must be installed:
1. Download the latest all-in-one installer from [here][2].
2. When the installer prompts for the path to your Python distribution, navigate to your Miniconda/Anaconda envs folder and choose the dstat folder. If you chose to install Miniconda for your user only, the envs folder is found in `$USER/Miniconda2/envs` (for Miniconda) or `$USER/Anaconda/envs` (for Anaconda), where `$USER` is your user directory. If you installed Miniconda for all users, the Miniconda2/Anaconda folder will be in the root of your C: drive.
3. When the installer asks which modules to install, choose GTK.
4. Finish the installer.
We are installing in a separate environment to keep a clean system.
`activate dstat` will enter the environment (must be done whenever a new terminal is opened),
and `deactivate` will return to the root environment.
Therefore, to run dstat-interface from our environment, we must first activate it (if not already done) before launching it:
```shell
activate dstat
python -m dstat_interface.main
```
[1]: https://repo.continuum.io/miniconda/Miniconda2-latest-Windows-x86_64.exe
[2]: https://sourceforge.net/projects/pygobjectwin32/
## Upgrading
Anaconda builds can be upgraded to the latest version by issuing this command (from an activated conda environment):
```shell
conda upgrade -c mdryden dstat-interface # For MacOS, be sure to upgrade dstat-interface-deps as well
```
pip install pyserial pyzmq pyyaml seaborn
pip installs can be upgraded similarly:
### Windows
While it is possible to install a bare python distribution and install the required prerequisites separately, [Python(x,y)](https://code.google.com/p/pythonxy/wiki/Downloads) has a python 2.7 distribution that already contains most of the necessary packages. However, pyserial is not installed in the recommended install so it should be manually selected or the full install done instead (tested with 2.7.9.0).
```shell
pip install --upgrade dstat-interface
```
The newest versions of Python(x,y) are also missing PyGTK, so it should be installed from [here](http://ftp.gnome.org/pub/GNOME/binaries/win32/pygtk/2.24/pygtk-all-in-one-2.24.2.win32-py2.7.msi) once Python(x,y) is installed. Matplotlib should then be reinstalled to get gtk support from [here](https://downloads.sourceforge.net/project/matplotlib/matplotlib/matplotlib-1.4.3/windows/matplotlib-1.4.3.win32-py2.7.exe).
You can also run development builds directly from a cloned git repository (from an activated conda environment):
## Experimental pip install
Tagged git versions are uploaded to [PiPy](https://pypi.python.org/pypi/dstat-interface) regularly, and thus dstat-interface can be installed using the command `pip install dstat-interface`, which will attempt to automatically install matplotlib, numpy, pyserial, and pyzmq. (N.B. matplotlib does not install well with pip on Mac and should be manually installed with Homebrew as described above)
This is still experimental as dstat-interface cannot be launched as a module for compatibility with multiprocessing on Windows and pygtk and pygobject must be installed manually as described above.
```shell
cd ~/src/dstat-interface/dstat_interface # Replace with path to dstat_interface folder inside repository
python -m main
```
# Getting started
## Interface overview
......@@ -133,4 +190,4 @@ If the connection failed, unplug the DStat and try again.
3. Set an appropriate potentiostat gain.
4. Click Execute.
![experiment](images/3.png)
\ No newline at end of file
![experiment](images/3.png)
name: dstat
channels:
- conda-forge
- mdryden
- defaults
dependencies:
- ca-certificates=2018.4.16=0
- certifi=2018.4.16=py27_0
- nb_conda_kernels=2.1.0=py27_0
- openssl=1.0.2o=0
- appnope=0.1.0=py27_0
- backports=1.0=py27_0
- backports_abc=0.5=py27_0
- bleach=1.5.0=py27_0
- configparser=3.5.0=py27_0
- curl=7.54.1=0
- cycler=0.10.0=py27_0
- decorator=4.1.2=py27_0
- entrypoints=0.2.3=py27_0
- enum34=1.1.6=py27_0
- expat=2.1.0=0
- freetype=2.5.5=2
- funcsigs=1.0.2=py27hb9f6266_0
- functools32=3.2.3.2=py27_0
- get_terminal_size=1.0.0=py27_0
- gettext=0.19.8=1
- git=2.11.1=0
- html5lib=0.9999999=py27_0
- icu=54.1=0
- intel-openmp=2018.0.0=h8158457_8
- ipykernel=4.6.1=py27_0
- ipython=5.3.0=py27_0
- ipython-notebook=4.0.4=py27_0
- ipython_genutils=0.2.0=py27_0
- jbig=2.1=0
- jinja2=2.9.6=py27_0
- jpeg=9b=0
- jsonschema=2.6.0=py27_0
- jupyter_client=5.1.0=py27_0
- jupyter_core=4.3.0=py27_0
- krb5=1.13.2=0
- libcxx=4.0.1=h579ed51_0
- libcxxabi=4.0.1=hebd6815_0
- libffi=3.2.1=1
- libgfortran=3.0.1=h93005f0_2
- libiconv=1.14=0
- libpng=1.6.30=1
- libssh2=1.8.0=0
- libtiff=4.0.6=3
- llvmlite=0.21.0=py27hac8ee23_0
- markupsafe=1.0=py27_0
- matplotlib=2.0.2=np113py27_0
- mistune=0.7.4=py27_0
- mkl=2018.0.1=hfbd8650_4
- nbconvert=5.2.1=py27_0
- nbformat=4.4.0=py27_0
- notebook=5.0.0=py27_0
- numba=0.36.2=np113py27h7c931aa_0
- numpy=1.13.3=py27h62f9060_0
- pandas=0.20.3=py27_0
- pandocfilters=1.4.2=py27_0
- path.py=10.3.1=py27_0
- pathlib2=2.3.0=py27_0
- patsy=0.4.1=py27_0
- pcre=8.39=1
- pexpect=4.2.1=py27_0
- pickleshare=0.7.4=py27_0
- pip=9.0.1=py27_1
- prompt_toolkit=1.0.15=py27_0
- ptyprocess=0.5.2=py27_0
- pygments=2.2.0=py27_0
- pyparsing=2.2.0=py27_0
- pyqt=5.6.0=py27_2
- python=2.7.13=0
- python-dateutil=2.6.1=py27_0
- pytz=2017.2=py27_0
- pyyaml=3.12=py27_0
- pyzmq=16.0.2=py27_0
- qt=5.6.2=2
- readline=6.2=2
- scandir=1.5=py27_0
- scipy=1.0.0=py27h793f721_0
- seaborn=0.8=py27_0
- setuptools=36.4.0=py27_0
- simplegeneric=0.8.1=py27_1
- singledispatch=3.4.0.3=py27_0
- sip=4.18=py27_0
- six=1.10.0=py27_0
- sqlite=3.13.0=0
- ssl_match_hostname=3.5.0.1=py27_0
- statsmodels=0.8.0=np113py27_0
- subprocess32=3.2.7=py27_0
- terminado=0.6=py27_0
- testpath=0.3.1=py27_0
- tk=8.5.18=0
- tornado=4.5.2=py27_0
- traitlets=4.3.2=py27_0
- wcwidth=0.1.7=py27_0
- wheel=0.29.0=py27_0
- xz=5.2.3=0
- yaml=0.1.6=0
- zlib=1.2.11=0
- adwaita-icon-theme=3.24.0=1
- arrow=0.10.0=py27_0
- at-spi2-atk=2.24.1=2
- at-spi2-core=2.24.1=2
- atk=2.24.0=3
- cairo-gobject=1.14.8=8
- dbus-client=1.10.18=0
- dfu-programmer=0.7.2=2
- dstat-interface=1.4.6=py27_0
- dstat-interface-deps=1.0=0
- gdk-pixbuf=2.36.6=2
- glib=2.52.2=5
- gobject-introspection=1.52.1=2
- gtk3=3.22.15=4
- harfbuzz=1.4.6=3
- libepoxy=1.4.2=5
- libusb=1.0.21=0
- pango=1.40.6=2
- pixman=0.34.0=1
- py2cairo=1.10.0=py27_0
- pygobject3=3.24.2=py27_3
- pyserial=3.3=py27_0
- zmq-plugin=0.2.post14=py27_0
- pip:
- backports.shutil-get-terminal-size==1.0.0
- backports.shutil-which==3.5.1
- backports.ssl-match-hostname==3.5.0.1
- chardet==3.0.4
- colorama==0.3.9
- idna==2.6
- paver==1.2.4
- pygobject==3.24.1
- requests==2.18.4
- urllib3==1.22
- vmprof==0.4.10
......@@ -21,18 +21,19 @@
Functions for analyzing data.
"""
import logging
import os
import pygtk
import gtk
from numpy import mean, trapz
logger = logging.getLogger('dstat.analysis')
logger = logging.getLogger(__name__)
mod_dir = os.path.dirname(os.path.abspath(__file__))
class AnalysisOptions(object):
"""Analysis options window."""
def __init__(self, builder):
self.builder = builder
self.builder.add_from_file('interface/analysis_options.glade')
self.builder.add_from_file(
os.path.join(mod_dir, 'interface/analysis_options.glade'))
self.builder.connect_signals(self)
self.window = self.builder.get_object('analysis_dialog')
......@@ -85,11 +86,9 @@ class AnalysisOptions(object):
@params.setter
def params(self, params):
try:
for key in self._params:
for key in self._params:
if key in params:
self._params[key] = params[key]
except KeyError as e:
logger.warning("Missing parameter - %s" % e)
self.stats_button.set_active(self._params['stats_true'])
self.stats_start_button.set_active(self._params['stats_start_true'])
......
from . import comm
from . import dfu
from . import state
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2017 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, absolute_import, print_function, unicode_literals
import sys
import inspect
import logging
from pkg_resources import parse_version, parse_requirements
logger = logging.getLogger(__name__)
class BaseBoard(object):
pcb_version = 'x.x.x'
booster = False
def __init__(self):
self.max_freq = 5000
self.max_scans = 255
self.max_time = 65535
self.setup()
assert len(self.gain) == self.gain_settings
assert len(self.gain_labels) == self.gain_settings
if self.gain_trim is not None:
assert len(self.gain_trim) == self.gain_settings
def setup(self):
"""Override in subclasses to provide correct numbers"""
self.gain = [1, 1e2, 3e3, 3e4, 3e5, 3e6, 3e7, 1e8]
self.gain_labels = ["Bypass", "100 Ω (15 mA FS)", "3 kΩ (500 µA FS)",
"30 kΩ (50 µA FS)", "300 kΩ (5 µA FS)",
"3 MΩ (500 nA FS)", "30 MΩ (50 nA FS)",
"100 MΩ (15 nA FS)"
]
self.gain_trim = [None, 'r100_trim', 'r3k_trim',
'r30k_trim', 'r300k_trim', 'r3M_trim',
'r30M_trim', 'r100M_trim']
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
def test_mv(self, mv):
"""Return true if voltage in mV is in range."""
dac = float(mv)*self.re_voltage_scale/(3000./65536) + 32768
if 0 <= dac <= 65535:
return True
else:
return False
def test_freq(self, hz):
"""Return true if frequency in Hz is in range."""
return 0 < float(hz) < self.max_freq
def test_scans(self, n):
"""Return true if number of scans is valid."""
return 0 < int(n) < self.max_scans
def test_s(self, s):
"""Return true if time in integer seconds is valid."""
return 0 < int(s) < self.max_time
class V1_1Board(BaseBoard):
pcb_version = '1.1'
def setup(self):
self.gain = [1e2, 3e2, 3e3, 3e4, 3e5, 3e6, 3e7, 5e8]
self.gain_labels = [None, "300 Ω (5 mA FS)",
"3 kΩ (500 µA FS)", "30 kΩ (50 µA FS)",
"300 kΩ (5 µA FS)", "3 MΩ (500 nA FS)",
"30 MΩ (50 nA FS)", "500 MΩ (3 nA FS)"
]
self.gain_trim = None
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
class V1_2Board(BaseBoard):
pcb_version = '1.2'
def setup(self):
self.gain = [1, 1e2, 3e3, 3e4, 3e5, 3e6, 3e7, 1e8]
self.gain_labels = ["Bypass", "100 Ω (15 mA FS)", "3 kΩ (500 µA FS)",
"30 kΩ (50 µA FS)", "300 kΩ (5 µA FS)",
"3 MΩ (500 nA FS)", "30 MΩ (50 nA FS)",
"100 MΩ (15 nA FS)"
]
self.gain_trim = [None, 'r100_trim', 'r3k_trim',
'r30k_trim', 'r300k_trim', 'r3M_trim',
'r30M_trim', 'r100M_trim']
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
def __get_all_subclasses(cls):
all_subclasses = []
for subclass in cls.__subclasses__():
all_subclasses.append(subclass)
all_subclasses.extend(__get_all_subclasses(subclass))
return all_subclasses
def find_board(version, booster=False):
"""Returns highest compatible board class or None if none available."""
boards = __get_all_subclasses(BaseBoard)
candidates = []
for board in boards:
req = parse_requirements('dstat~={}'.format(board.pcb_version)).next()
if board.booster == booster and version in req:
candidates.append(board)
try:
picked = sorted(candidates,
key=lambda board: parse_version(board.pcb_version))[-1]
logger.info("Picked %s", picked)
return picked
except IndexError:
logger.warning("No matching board definition for ver: %s.", version)
return None
#!/usr/bin/env python
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import struct
import multiprocessing as mp
from collections import OrderedDict
import logging
from pkg_resources import parse_version
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, GObject
except ImportError:
print "ERR: GTK not available"
sys.exit(1)
import serial
from serial.tools import list_ports
from ..errors import InputError, VarError
logger = logging.getLogger(__name__)
dstat_logger = logging.getLogger("{}.DSTAT".format(__name__))
exp_logger = logging.getLogger("{}.Experiment".format(__name__))
from . import state
class AlreadyConnectedError(Exception):
def __init__(self):
super(AlreadyConnectedError, self).__init__(self,
"Serial instance already connected.")
class NotConnectedError(Exception):
def __init__(self):
super(NotConnectedError, self).__init__(self,
"Serial instance not connected.")
class ConnectionError(Exception):
def __init__(self):
super(ConnectionError, self).__init__(self,
"Could not connect.")
class TransmitError(Exception):
def __init__(self):
super(TransmitError, self).__init__(self,
"No reply received.")
def _serial_process(ser_port, proc_pipe, ctrl_pipe, data_pipe):
ser_logger = logging.getLogger("{}._serial_process".format(__name__))
connected = False
for i in range(5):
time.sleep(1) # Give OS time to enumerate
try:
ser = serial.Serial(ser_port, timeout=1)
# ser = serial.Serial(ser_port, timeout=1)
ser_logger.info("Connecting")
time.sleep(.5)
connected = True
except serial.SerialException:
pass
if connected is True:
break
try:
if ser.isOpen() is False:
ser_logger.info("Connection Error")
proc_pipe.send("SERIAL_ERROR")
return 1
except UnboundLocalError: # ser doesn't exist
ser_logger.info("Connection Error")
proc_pipe.send("SERIAL_ERROR")
return 1
ser.write('!0 ')
for i in range(10):
if ser.readline().rstrip()=="@ACK 0":
if ser.readline().rstrip()=="@RCV 0":
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write('!0 ')
time.sleep(.1)
while True:
# These can only be called when no experiment is running
if ctrl_pipe.poll():
ctrl_buffer = ctrl_pipe.recv()
if ctrl_buffer in ('a', "DISCONNECT"):
proc_pipe.send("ABORT")
try:
ser.write('a')
except serial.SerialException:
return 0
ser_logger.info("ABORT")
if ctrl_buffer == "DISCONNECT":
ser_logger.info("DISCONNECT")
ser.rts = False
ser._update_dtr_state() # Need DTR update on Windows
ser.close()
proc_pipe.send("DISCONNECT")
return 0
else:
ser.write(ctrl_buffer)
elif proc_pipe.poll():
while ctrl_pipe.poll():
ctrl_pipe.recv()
try:
return_code = proc_pipe.recv().run(ser, ctrl_pipe, data_pipe)
except serial.SerialException:
proc_pipe.send("DISCONNECT")
ser.rts = False
ser._update_dtr_state() # Need DTR update on Windows
ser.close()
return 0
ser_logger.info('Return code: %s', str(return_code))
proc_pipe.send(return_code)
else:
time.sleep(.1)
class SerialConnection(GObject.Object):
__gsignals__ = {
'connected': (GObject.SIGNAL_RUN_FIRST, None, ()),
'disconnected': (GObject.SIGNAL_RUN_FIRST, None, ())
}
def __init__(self):
super(SerialConnection, self).__init__()
self.connected = False
def connect(self, ser_port):
if self.connected is False:
self.proc_pipe_p, self.proc_pipe_c = mp.Pipe(duplex=True)
self.ctrl_pipe_p, self.ctrl_pipe_c = mp.Pipe(duplex=True)
self.data_pipe_p, self.data_pipe_c = mp.Pipe(duplex=True)
self.proc = mp.Process(target=_serial_process, args=(ser_port,
self.proc_pipe_c, self.ctrl_pipe_c,
self.data_pipe_c))
self.proc.start()
time.sleep(2)
if self.proc.is_alive() is False:
raise ConnectionError()
return False
self.connected = True
self.emit('connected')
return True
else:
raise AlreadyConnectedError()
return False
def assert_connected(self):
if self.connected is False:
raise NotConnectedError()
def start_exp(self, exp):
self.assert_connected()
self.proc_pipe_p.send(exp)
def stop_exp(self):
self.send_ctrl('a')
def get_proc(self, block=False):
self.assert_connected()
if block is True:
return self.proc_pipe_p.recv()
else:
if self.proc_pipe_p.poll() is True:
return self.proc_pipe_p.recv()
else:
return None
def get_ctrl(self, block=False):
self.assert_connected()
if block is True:
return self.ctrl_pipe_p.recv()
else:
if self.ctrl_pipe_p.poll() is True:
return self.ctrl_pipe_p.recv()
else:
return None
def get_data(self, block=False):
self.assert_connected()
if block is True:
return self.data_pipe_p.recv()
else:
if self.data_pipe_p.poll() is True:
return self.data_pipe_p.recv()
else:
return None
def flush_data(self):
self.assert_connected()
while self.data_pipe_p.poll() is True:
self.data_pipe_p.recv()
def send_ctrl(self, ctrl):
self.assert_connected()
self.ctrl_pipe_p.send(ctrl)
def disconnect(self):
logger.info("Disconnecting")
self.send_ctrl('DISCONNECT')
self.proc.join()
self.emit('disconnected')
self.connected = False
class VersionCheck(object):
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get version. Returns a tuple of
(major, minor). If no response, returns empty tuple.
Arguments:
ser_port -- address of serial port to use
"""
try:
ser.reset_input_buffer()
ser.write('!1\n')
for i in range(10):
if ser.readline().rstrip()=="@ACK 1":
ser.write('V\n')
if ser.readline().rstrip()=="@RCV 1":
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write('!1\n')
time.sleep(.1)
for line in ser:
dstat_logger.info(line.decode('utf-8'))
if line.startswith('V'):
input = line.lstrip('V')
elif line.startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
ser.reset_input_buffer()
break
pcb, sep, firmware = input.strip().rpartition('-')
if pcb == "":
pcb = firmware
firmware = False
logger.info("Your firmware does not support version detection.")
data_pipe.send((pcb, False))
else:
logger.info(
"Firmware Version: {}".format(
hex(int(firmware)).lstrip('0x')
)
)
data_pipe.send((
pcb,
hex(int(firmware)).lstrip('0x')
))
logger.info(
"PCB Version: {}".format(pcb)
)
status = "DONE"
except UnboundLocalError as e:
status = "SERIAL_ERROR"
except SerialException as e:
logger.error('SerialException: %s', e)
status = "SERIAL_ERROR"
finally:
return status
def version_check(ser_port):
"""Tries to contact DStat and get version. Stores version in state.
If no response, returns False, otherwise True.
Arguments:
ser_port -- address of serial port to use
"""
state.ser = SerialConnection()
state.ser.connect(ser_port)
state.ser.start_exp(VersionCheck())
result = state.ser.get_proc(block=True)
if result == "SERIAL_ERROR":
state.dstat_version = None
state.firmware_version = None
return False
else:
buffer = state.ser.get_data(block=True)
version, state.firmware_version = buffer
state.dstat_version = parse_version(version)
logger.debug("version_check done")
time.sleep(.1)
return True
class Settings(object):
def __init__(self, task, settings=None):
self.task = task
self.settings = settings
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get settings. Returns dict of
settings.
"""
self.ser = ser
if 'w' in self.task:
self.write()
if 'r' in self.task:
data_pipe.send(self.read())
status = "DONE"
return status
def read(self):
settings = OrderedDict()
self.ser.reset_input_buffer()
self.ser.write('!2\n')
for i in range(10):
if self.ser.readline().rstrip()=="@ACK 2":
self.ser.write('SR\n')
if self.ser.readline().rstrip()=="@RCV 2":
break
else:
time.sleep(.5)
self.ser.reset_input_buffer()
self.ser.write('!2\n')
time.sleep(.1)
for line in self.ser:
if line.lstrip().startswith('S'):
input = line.lstrip().lstrip('S')
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
self.ser.reset_input_buffer()
break
parted = input.rstrip().split(':')
for i in range(len(parted)):
settings[parted[i].split('.')[0]] = [i, parted[i].split('.')[1]]
return settings
def write_command(self, cmd, params=None, retry=5):
"""Write command to serial with optional number of retries."""
def get_reply(retries = 3):
while True:
reply = self.ser.readline().rstrip()
if reply.startswith('#'):
dstat_logger.info(reply)
elif reply == "":
retries -= 1
if retries <= 0:
raise TransmitError
else:
return reply
n = len(cmd)
if params is not None:
n_params = len(params)
for _ in range(retry):
tries = 5
while True:
time.sleep(0.2)
self.ser.reset_input_buffer()
self.ser.write('!{}\n'.format(n))
time.sleep(.1)
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@ACK {}".format(n):
logger.warning("Expected ACK got: {}".format(reply))
continue
tries = 5
while True:
self.ser.write('{}\n'.format(cmd))
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@RCV {}".format(n):
logger.warning("Expected RCV got: {}".format(reply))
continue
if params is None:
return True
tries = 5
while True:
try:
reply = get_reply()
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != "@RQP {}".format(n_params):
logger.warning("Expected RQP got: {}".format(reply))
continue
tries = 5
for i in params:
while True:
self.ser.write(i + " ")
try:
reply = get_reply()
if reply == "@RCVC {}".format(i):
break
except TransmitError:
if tries <= 0:
continue
tries -= 1
pass
else:
break
return True
return False
def write(self):
write_buffer = range(len(self.settings))
for i in self.settings: # make sure settings are in right order
write_buffer[self.settings[i][0]] = self.settings[i][1]
to_write = " ".join(write_buffer) + " "
n = len(to_write)
logger.debug("to_write = %s", to_write)
if not self.write_command('SW' + to_write):
logger.error("Could not write command.")
def read_settings():
"""Tries to contact DStat and get settings. Returns dict of
settings.
"""
state.ser.flush_data()
state.ser.start_exp(Settings(task='r'))
state.settings = state.ser.get_data(block=True)
logger.info("Read settings from DStat")
logger.debug("read_settings: %s", state.ser.get_proc(block=True))
return
def write_settings():
"""Tries to write settings to DStat from global settings var.
"""
logger.debug("Settings to write: %s", state.settings)
state.ser.flush_data()
state.ser.start_exp(Settings(task='w', settings=state.settings))
logger.info("Wrote settings to DStat")
logger.debug("write_settings: %s", state.ser.get_proc(block=True))
return
class LightSensor:
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get light sensor reading. Returns uint of
light sensor clear channel.
"""
ser.reset_input_buffer()
ser.write('!')
while not ser.read()=="@":
self.ser.reset_input_buffer()
ser.write('!')
ser.write('T')
for line in ser:
if line.lstrip().startswith('T'):
input = line.lstrip().lstrip('T')
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
ser.reset_input_buffer()
break
parted = input.rstrip().split('.')
data_pipe.send(parted[0])
status = "DONE"
return status
def read_light_sensor():
"""Tries to contact DStat and get light sensor reading. Returns uint of
light sensor clear channel.
"""
state.ser.flush_data()
state.ser.start_exp(LightSensor())
logger.debug("read_light_sensor: %s", state.ser.get_proc(block=True))
return state.ser.get_data(block=True)
class SerialDevices(object):
"""Retrieves and stores list of serial devices in self.ports"""
def __init__(self):
self.ports = []
self.refresh()
def refresh(self):
"""Refreshes list of ports."""
try:
self.ports, _, _ = zip(*list_ports.grep("DSTAT"))
except ValueError:
self.ports = []
logger.error("No serial ports found")
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division,
print_function, unicode_literals)
import subprocess
import sys
import os
import time
import logging
from tempfile import mkdtemp
from zipfile import ZipFile
if sys.version_info >= (3,):
import urllib.request as urllib2
import urllib.parse as urlparse
else:
import urllib2
import urlparse
logger = logging.getLogger(__name__)
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
except ImportError:
print("ERR: GTK not available")
sys.exit(1)
import serial
from . import state
from .comm import dstat_logger, exp_logger
fwurl = "http://microfluidics.utoronto.ca/gitlab/api/v4/projects/4/jobs/artifacts/master/download?job=1.2.3&private_token=zkgSx1FaaTP7yLyFKkX6"
class FWDialog(object):
def __init__(self, parent, connect, stop_callback, disconnect_callback, signal='activate'):
self.parent = parent
self.stop = stop_callback
self.disconnect = disconnect_callback
connect.connect(signal, self.activate)
def activate(self, widget=None, data=None):
for name, result in assert_deps().items():
if result is not True:
logger.error("Can't use firmware update module.")
self.missing_deps()
return
self.stop() # Stop OCP
version_result, master = test_firmware_version()
if version_result is False:
self.git_error()
return
if version_result == 'latest':
message = "Your firmware is already up to date."
secondary = "Click yes to reflash firmware anyways."
elif version_result == 'devel':
message = "Your firmware is not on the master branch."
secondary = "You may have a development version. " +\
"Click yes to reflash firmware anyways."
elif version_result == 'old':
message = "Your firmware is out of date."
secondary = "Click yes to flash the latest firmware."
dialog = Gtk.MessageDialog(self.parent, 0, Gtk.MessageType.INFO,
Gtk.ButtonsType.YES_NO, message)
dialog.format_secondary_text(secondary)
dialog.get_content_area().add(
Gtk.Label(
label="Installed version: {}".format(state.firmware_version)))
dialog.get_content_area().add(
Gtk.Label(label="Latest version: {}".format(master)))
dialog.show_all()
response = dialog.run()
if response == Gtk.ResponseType.YES:
try:
download_fw()
except:
self.dl_error()
return
dstat_enter_dfu()
self.dfu_notice()
self.disconnect()
try:
dfu_program()
except:
self.dfu_error()
dialog.destroy()
else:
dialog.destroy()
def missing_deps(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Missing Dependencies")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def git_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Git Error")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def dl_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Download Error")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def dfu_notice(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.INFO,
Gtk.ButtonsType.OK, "Note about DFU")
dialog.format_secondary_text("Click OK once the DStat has connected in "
+ "DFU mode. Windows doesn't seem to like the automatic reboot. "
+ "Try holding down the reset button while plugging the "
+ 'USB port in (No LEDs should be lit), then click OK. Make sure '
+ 'the DFU driver from the dfu-programmer directory is installed.')
dialog.run()
dialog.destroy()
def dfu_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Could not update over DFU")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def destroy(self, widget=None, data=None):
widget.destroy()
def assert_deps():
deps = {'git' : 'git --version',
'dfu-programmer' : 'dfu-programmer --version'}
result = {}
for key, command in deps.items():
try:
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
result[key] = True
except subprocess.CalledProcessError:
logger.warning("{} is not available.".format(key))
result[key] = False
return result
def download_fw(): # from https://stackoverflow.com/a/16518224
temp_dir = mkdtemp()
logger.info("Temporary directory: {}".format(temp_dir))
os.chdir(temp_dir) # Go to temporary directory
u = urllib2.urlopen(fwurl)
scheme, netloc, path, query, fragment = urlparse.urlsplit(fwurl)
filename = os.path.basename(path)
if not filename:
filename = 'downloaded.file'
with open(filename, 'wb') as f:
meta = u.info()
meta_func = meta.getheaders if hasattr(meta, 'getheaders') else meta.get_all
meta_length = meta_func("Content-Length")
file_size = None
if meta_length:
file_size = int(meta_length[0])
logger.info("Downloading: {0} Bytes: {1}".format(fwurl, file_size))
file_size_dl = 0
block_sz = 8192
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_size_dl += len(buffer)
f.write(buffer)
status = "{0:16}".format(file_size_dl)
if file_size:
status += " [{0:6.2f}%]".format(file_size_dl * 100 / file_size)
status += chr(13)
logger.info(status)
with ZipFile(filename, mode='r') as z:
fw_path = z.extract('dstat-firmware.hex')
return fw_path
def test_firmware_version(current=None):
if current is None:
current = state.firmware_version
temp_dir = mkdtemp()
logger.info("Temporary directory: {}".format(temp_dir))
os.chdir(temp_dir) # Go to temporary directory
command = "git clone http://microfluidics.utoronto.ca/gitlab/dstat/dstat-firmware.git"
logger.info('Cloning master.')
try:
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
logger.error("git failed with error code {}".format(e.returncode))
logger.error("Output: {}".format(e.output))
return False, None
logger.info(output)
os.chdir("./dstat-firmware")
command = "git rev-parse --short master"
master = subprocess.check_output(command.split(), stderr=subprocess.STDOUT)
logger.info("Current master commit: {}".format(master))
command = "git merge-base --is-ancestor master {}".format(current)
test = subprocess.call(command.split())
if test == 0: # already newest
logger.info('Firmware is latest available.')
return 'latest', master
elif test == 1: # old version
logger.info('Firmware is out of date.')
return 'old', master
elif test == 128: # newer or different branch
logger.info('Firmware is not on the master branch.')
return 'devel', master
else:
logger.error('Unexpected git error. Git exited {}'.format(test))
return False, None
def dfu_program(path='./dstat-firmware.hex'):
"""Tries to program DStat over USB with DFU with hex file at path."""
try:
command = "dfu-programmer atxmega256a3u erase"
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
command = "dfu-programmer atxmega256a3u flash {}".format(path)
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
command = "dfu-programmer atxmega256a3u launch"
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
except subprocess.CalledProcessError as e:
logger.error("{} failed with output:".format(" ".join(e.cmd)))
logger.error(e.output)
raise
def dstat_enter_dfu():
"""Tries to contact DStat and get version. Stores version in state.
If no response, returns False, otherwise True.
Arguments:
ser_port -- address of serial port to use
"""
exp = DFUMode()
state.ser.start_exp(exp)
while True:
result = state.ser.get_proc(block=True)
if result in ('SERIAL_ERROR', 'DONE'):
break
logger.info(result)
# state.ser.disconnect()
time.sleep(.1)
return True
class DFUMode(object):
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get version. Returns a tuple of
(major, minor). If no response, returns empty tuple.
Arguments:
ser_port -- address of serial port to use
"""
status = None
try:
ser.write(b'!2\n')
exp_logger.info('!2')
for i in range(10):
if ser.readline().rstrip() == b"@ACK 2":
dstat_logger.info('@ACK 2')
ser.write(b'SF\n')
exp_logger.info('SF')
status = "DONE"
time.sleep(5)
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write(b'!2\n')
exp_logger.info('!2')
time.sleep(.1)
except UnboundLocalError as e:
status = "SERIAL_ERROR"
except serial.SerialException as e:
logger.error('SerialException: %s', e)
status = "SERIAL_ERROR"
finally:
return status
if __name__ == "__main__":
log_handler = logging.StreamHandler()
log_formatter = logging.Formatter(
fmt='%(asctime)s %(levelname)s: [%(name)s] %(message)s',
datefmt='%H:%M:%S'
)
log_handler.setFormatter(log_formatter)
logger.setLevel(level=logging.INFO)
logger.addHandler(log_handler)
dstat_enter_dfu()
time.sleep(2)
dfu_program(sys.argv[1])
\ No newline at end of file
from collections import OrderedDict
def reset():
settings = OrderedDict()
ser = None
dstat_version = None
firmware_version = None
board_instance = None
settings = OrderedDict()
ser = None
dstat_version = None
firmware_version = None
board_instance = None
\ No newline at end of file
......@@ -44,14 +44,4 @@ class VarError(Error):
def __init__(self, var, msg):
self.var = var
self.msg = msg
class ErrorLogger(object):
def __init__(self, sender="dstat-interface", level=('ERR', 'WAR', 'INFO')):
self.sender = str(sender)
self.level = level
self.levels = ('ERR', 'WAR', 'INFO', 'DBG')
def error(self, msg, level):
if level in self.level:
print "[%s (%s)] %s" % (self.sender, level, str(msg))
\ No newline at end of file
self.msg = msg
\ No newline at end of file
# __all__ = []
#
# import pkgutil
# import inspect
# from . import cal, chronoamp, cv
#
#
# for loader, name, is_pkg in pkgutil.walk_packages(__path__):
# print loader, name, is_pkg
# module = loader.find_module(name).load_module(name)
#
# for name, value in inspect.getmembers(module):
# if name.startswith('__'):
# continue
#
# globals()[name] = value
# __all__.append(name)
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2017 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import struct
import logging
logger = logging.getLogger(__name__)
import serial
from ..errors import InputError, VarError
from ..dstat import state
from ..experiments.experiment_template import Experiment, dstat_logger
def measure_offset(time):
gain_trim_table = [None, 'r100_trim', 'r3k_trim', 'r30k_trim', 'r300k_trim',
'r3M_trim', 'r30M_trim', 'r100M_trim']
parameters = {}
parameters['time'] = time
gain_offset = {}
for i in range(1,8):
parameters['gain'] = i
state.ser.start_exp(CALExp(parameters))
logger.info("measure_offset: %s", state.ser.get_proc(block=True))
gain_offset[gain_trim_table[i]] = state.ser.get_data(block=True)
return gain_offset
class CALExp(Experiment):
id = 'cal'
"""Offset calibration experiment"""
def __init__(self, parameters):
self.parameters = parameters
self.databytes = 8
self.scan = 0
self.data = []
self.commands = ["EA2 3 1 ", "EG"]
self.commands[1] += str(self.parameters['gain'])
self.commands[1] += " "
self.commands[1] += "0 "
self.commands.append(
("ER1 0", ["32768", str(self.parameters['time'])])
)
def serial_handler(self):
"""Handles incoming serial transmissions from DStat. Returns False
if stop button pressed and sends abort signal to instrument. Sends
data to self.data_pipe as result of self.data_handler).
"""
try:
while True:
if self.ctrl_pipe.poll():
input = self.ctrl_pipe.recv()
logger.debug("serial_handler: %s", input)
if input == ('a' or "DISCONNECT"):
self.serial.write('a')
logger.info("serial_handler: ABORT pressed!")
return False
for line in self.serial:
if self.ctrl_pipe.poll():
if self.ctrl_pipe.recv() == 'a':
self.serial.write('a')
logger.info("serial_handler: ABORT pressed!")
return False
if line.startswith('B'):
self.data.append(self.data_handler(
self.serial.read(size=self.databytes)))
elif line.lstrip().startswith("#"):
dstat_logger.info(line.lstrip().rstrip())
elif line.lstrip().startswith("@DONE"):
dstat_logger.debug(line.lstrip().rstrip())
self.serial.flushInput()
self.experiment_done()
return True
except serial.SerialException:
return False
def data_handler(self, data):
"""Takes data_input as tuple -- (scan, data).
Returns:
current
"""
seconds, milliseconds, current = struct.unpack('<HHl', data)
return current
def experiment_done(self):
"""Averages data points
"""
try:
sum = 0
self.data[0] = 0 # Skip first point
except IndexError:
return
for i in self.data:
sum += i
sum /= len(self.data)
if (sum > 32767):
sum = 32767
elif (sum < -32768):
sum = -32768
self.data_pipe.send(sum)
\ No newline at end of file
import time
import struct
import numpy as np
import serial
from ..interface.plot import mean, plotSpectrum, findBounds
from .experiment_template import PlotBox, Experiment, exp_logger
class ChronoampBox(PlotBox):
def setup(self):
self.plot_format = {
'current_time': {'xlabel': "Time (s)",
'ylabel': "Current (A)"
}
}
def format_plots(self):
"""
Creates and formats subplots needed. Overrides superclass.
"""
self.subplots = {'current_time': self.figure.add_subplot(111)}
for key, subplot in self.subplots.items():
subplot.ticklabel_format(style='sci', scilimits=(0, 3),
useOffset=False, axis='y')
subplot.plot([],[])
subplot.set_xlabel(self.plot_format[key]['xlabel'])
subplot.set_ylabel(self.plot_format[key]['ylabel'])
class Chronoamp(Experiment):
id = 'cae'
"""Chronoamperometry experiment"""
def setup(self):
self.datatype = "linearData"
self.datalength = 2
self.databytes = 8
self.data = {'current_time' : [([],[])]}
self.columns = ['Time (s)', 'Current (A)']
self.total_time = sum(self.parameters['time'])
self.plotlims = {'current_time': {'xlims': (0, self.total_time)}
}
self.commands.append(
("ER" + str(len(self.parameters['potential'])) + " 0 ", [])
)
for i in self.parameters['potential']:
self.commands[-1][1].append(str(int(i*(65536./3000)+32768)))
for i in self.parameters['time']:
self.commands[-1][1].append(str(i))
plot = ChronoampBox(['current_time'])
plot.setlims('current_time', **self.plotlims['current_time'])
self.plots.append(plot)
def data_handler(self, data_input):
"""Overrides Experiment method to not convert x axis to mV."""
scan, data = data_input
# 2*uint16 + int32
seconds, milliseconds, current = struct.unpack('<HHl', data)
return (scan, (
seconds+milliseconds/1000.,
(current+self.gain_trim)*(1.5/self.gain/8388607)
)
)
def store_data(self, incoming, newline):
"""Stores data in data attribute. Should not be called from subprocess.
Can be overriden for custom experiments."""
line, data = incoming
if newline is True:
self.data['current_time'].append(deepcopy(self.line_data))
for i, item in enumerate(self.data['current_time'][line]):
item.append(data[i])
def get_progress(self):
try:
return self.data['current_time'][-1][0][-1]/self.total_time
except IndexError:
return 0
class PDExp(Chronoamp):
"""Photodiode/PMT experiment"""
id = 'pde'
def setup(self):
self.plots.append(ChronoampBox('current_time'))
self.datatype = "linearData"
self.datalength = 2
self.databytes = 8
self.data = {'current_time' : [([],[])]}
self.columns = ['Time (s)', 'Current (A)']
self.plot_format = {
'current_time' : {
'labels' : self.columns,
'xlims' : (0, int(self.parameters['time']))
}
}
self.total_time = int(self.parameters['time'])
if self.parameters['shutter_true']:
if self.parameters['sync_true']:
self.commands.append("EZ")
self.commands[-1] += str(self.parameters['sync_freq'])
self.commands[-1] += " "
else:
self.commands.append(("E2", []))
command = "ER1 "
params = []
if self.parameters['interlock_true']:
command += "1 "
else:
command += "0 "
if self.parameters['voltage'] == 0: # Special case where V=0
params.append("65535")
else:
params.append(str(int(
65535-(self.parameters['voltage']*(65536./3000))))
)
params.append(str(self.parameters['time']))
self.commands.append((command, params))
if self.parameters['shutter_true']:
if self.parameters['sync_true']:
self.commands.append("Ez")
else:
self.commands.append("E1")
class FT_Box(PlotBox):
def updateline(self, Experiment, line_number):
def search_value(data, target):
for i in range(len(data)):
if data[i] > target:
return i
y = Experiment.data['data'][line_number][1]
x = Experiment.data['data'][line_number][0]
freq = Experiment.parameters['adc_rate_hz']
i = search_value(x, float(Experiment.parameters['fft_start']))
y1 = y[i:]
x1 = x[i:]
avg = mean(y1)
min_index, max_index = findBounds(y1)
y1[min_index] = avg
y1[max_index] = avg
f, Y = plotSpectrum(y1[min_index:max_index],freq)
self.axe1.lines[line_number].set_ydata(Y)
self.axe1.lines[line_number].set_xdata(f)
Experiment.data['ft'] = [(f, Y)]
def changetype(self, Experiment):
"""Change plot type. Set axis labels and x bounds to those stored
in the Experiment instance. Stores class instance in Experiment.
"""
self.axe1.set_xlabel("Freq (Hz)")
self.axe1.set_ylabel("|Y| (A/Hz)")
self.axe1.set_xlim(0, Experiment.parameters['adc_rate_hz']/2)
Experiment.plots['ft'] = self
self.figure.canvas.draw()
\ No newline at end of file