Commits on Source (5)
include RELEASE-VERSION
include version.py
include setup.py
include main.py
include paver-minilib.zip
include LICENSE
include CHANGELOG
include README.markdown
include core/utils/RELEASE-VERSION
recursive-include dstat_interface *
recursive-exclude dstat_interface *.pyc
recursive-exclude dstat_interface *~
recursive_exclude core last_params.yml
recursive-exclude . .DS_Store
\ No newline at end of file
name: dstat
channels:
- conda-forge
- mdryden
- defaults
dependencies:
- ca-certificates=2018.4.16=0
- certifi=2018.4.16=py27_0
- nb_conda_kernels=2.1.0=py27_0
- openssl=1.0.2o=0
- appnope=0.1.0=py27_0
- backports=1.0=py27_0
- backports_abc=0.5=py27_0
- bleach=1.5.0=py27_0
- configparser=3.5.0=py27_0
- curl=7.54.1=0
- cycler=0.10.0=py27_0
- decorator=4.1.2=py27_0
- entrypoints=0.2.3=py27_0
- enum34=1.1.6=py27_0
- expat=2.1.0=0
- freetype=2.5.5=2
- funcsigs=1.0.2=py27hb9f6266_0
- functools32=3.2.3.2=py27_0
- get_terminal_size=1.0.0=py27_0
- gettext=0.19.8=1
- git=2.11.1=0
- html5lib=0.9999999=py27_0
- icu=54.1=0
- intel-openmp=2018.0.0=h8158457_8
- ipykernel=4.6.1=py27_0
- ipython=5.3.0=py27_0
- ipython-notebook=4.0.4=py27_0
- ipython_genutils=0.2.0=py27_0
- jbig=2.1=0
- jinja2=2.9.6=py27_0
- jpeg=9b=0
- jsonschema=2.6.0=py27_0
- jupyter_client=5.1.0=py27_0
- jupyter_core=4.3.0=py27_0
- krb5=1.13.2=0
- libcxx=4.0.1=h579ed51_0
- libcxxabi=4.0.1=hebd6815_0
- libffi=3.2.1=1
- libgfortran=3.0.1=h93005f0_2
- libiconv=1.14=0
- libpng=1.6.30=1
- libssh2=1.8.0=0
- libtiff=4.0.6=3
- llvmlite=0.21.0=py27hac8ee23_0
- markupsafe=1.0=py27_0
- matplotlib=2.0.2=np113py27_0
- mistune=0.7.4=py27_0
- mkl=2018.0.1=hfbd8650_4
- nbconvert=5.2.1=py27_0
- nbformat=4.4.0=py27_0
- notebook=5.0.0=py27_0
- numba=0.36.2=np113py27h7c931aa_0
- numpy=1.13.3=py27h62f9060_0
- pandas=0.20.3=py27_0
- pandocfilters=1.4.2=py27_0
- path.py=10.3.1=py27_0
- pathlib2=2.3.0=py27_0
- patsy=0.4.1=py27_0
- pcre=8.39=1
- pexpect=4.2.1=py27_0
- pickleshare=0.7.4=py27_0
- pip=9.0.1=py27_1
- prompt_toolkit=1.0.15=py27_0
- ptyprocess=0.5.2=py27_0
- pygments=2.2.0=py27_0
- pyparsing=2.2.0=py27_0
- pyqt=5.6.0=py27_2
- python=2.7.13=0
- python-dateutil=2.6.1=py27_0
- pytz=2017.2=py27_0
- pyyaml=3.12=py27_0
- pyzmq=16.0.2=py27_0
- qt=5.6.2=2
- readline=6.2=2
- scandir=1.5=py27_0
- scipy=1.0.0=py27h793f721_0
- seaborn=0.8=py27_0
- setuptools=36.4.0=py27_0
- simplegeneric=0.8.1=py27_1
- singledispatch=3.4.0.3=py27_0
- sip=4.18=py27_0
- six=1.10.0=py27_0
- sqlite=3.13.0=0
- ssl_match_hostname=3.5.0.1=py27_0
- statsmodels=0.8.0=np113py27_0
- subprocess32=3.2.7=py27_0
- terminado=0.6=py27_0
- testpath=0.3.1=py27_0
- tk=8.5.18=0
- tornado=4.5.2=py27_0
- traitlets=4.3.2=py27_0
- wcwidth=0.1.7=py27_0
- wheel=0.29.0=py27_0
- xz=5.2.3=0
- yaml=0.1.6=0
- zlib=1.2.11=0
- adwaita-icon-theme=3.24.0=1
- arrow=0.10.0=py27_0
- at-spi2-atk=2.24.1=2
- at-spi2-core=2.24.1=2
- atk=2.24.0=3
- cairo-gobject=1.14.8=8
- dbus-client=1.10.18=0
- dfu-programmer=0.7.2=2
- dstat-interface=1.4.6=py27_0
- dstat-interface-deps=1.0=0
- gdk-pixbuf=2.36.6=2
- glib=2.52.2=5
- gobject-introspection=1.52.1=2
- gtk3=3.22.15=4
- harfbuzz=1.4.6=3
- libepoxy=1.4.2=5
- libusb=1.0.21=0
- pango=1.40.6=2
- pixman=0.34.0=1
- py2cairo=1.10.0=py27_0
- pygobject3=3.24.2=py27_3
- pyserial=3.3=py27_0
- zmq-plugin=0.2.post14=py27_0
- pip:
- backports.shutil-get-terminal-size==1.0.0
- backports.shutil-which==3.5.1
- backports.ssl-match-hostname==3.5.0.1
- chardet==3.0.4
- colorama==0.3.9
- idna==2.6
- paver==1.2.4
- pygobject==3.24.1
- requests==2.18.4
- urllib3==1.22
- vmprof==0.4.10
from pathlib import Path
from single_source import get_version
__version__ = get_version(__name__, Path(__file__).parent.parent)
#!/usr/bin/env python
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Functions for analyzing data.
"""
import logging
import os
from numpy import mean, trapz
logger = logging.getLogger(__name__)
mod_dir = os.path.dirname(os.path.abspath(__file__))
class AnalysisOptions(object):
"""Analysis options window."""
def __init__(self, builder):
self.builder = builder
self.builder.add_from_file(
os.path.join(mod_dir, 'interface/analysis_options.glade'))
self.builder.connect_signals(self)
self.window = self.builder.get_object('analysis_dialog')
self.stats_button = self.builder.get_object('stats_button')
self.stats_start = self.builder.get_object('stats_start_spin')
self.stats_start_button = self.builder.get_object('stats_start_button')
self.stats_stop = self.builder.get_object('stats_stop_spin')
self.stats_stop_button = self.builder.get_object('stats_stop_button')
self.stats_button.connect('toggled',
self.on_button_toggled_hide,
[self.stats_stop,
self.stats_stop_button,
self.stats_start,
self.stats_start_button
]
)
self._params = {'stats_true':False,
'stats_start_true':False,
'stats_stop':0,
'stats_stop_true':False,
'stats_start':0
}
def show(self):
"""Show options window."""
self.window.run()
self.window.hide()
def on_button_toggled_hide(self, control, widgets):
"""Hide unchecked fields"""
active = control.get_active()
for widget in widgets:
widget.set_sensitive(active)
@property
def params(self):
"""Getter for analysis params"""
self._params['stats_true'] = self.stats_button.get_active()
self._params['stats_start_true'] = self.stats_start_button.get_active()
self._params['stats_start'] = self.stats_start.get_value()
self._params['stats_stop_true'] = self.stats_stop_button.get_active()
self._params['stats_stop'] = self.stats_stop.get_value()
return self._params
@params.setter
def params(self, params):
for key in self._params:
if key in params:
self._params[key] = params[key]
self.stats_button.set_active(self._params['stats_true'])
self.stats_start_button.set_active(self._params['stats_start_true'])
self.stats_start.set_value(self._params['stats_start'])
self.stats_stop_button.set_active(self._params['stats_stop_true'])
self.stats_stop.set_value(self._params['stats_stop'])
def do_analysis(experiment):
"""Takes an experiment class instance and runs selected analysis."""
experiment.analysis = {}
if experiment.parameters['stats_true']:
if (experiment.parameters['stats_start_true'] or
experiment.parameters['stats_stop_true']):
if experiment.parameters['stats_start_true']:
start = experiment.parameters['stats_start']
else:
start = min(experiment.data['data'][0][0])
if experiment.parameters['stats_stop_true']:
stop = experiment.parameters['stats_stop']
else:
stop = min(experiment.data['data'][0][0])
data = _data_slice(experiment.data['data'],
start,
stop
)
else:
data = experiment.data['data']
experiment.analysis.update(_summary_stats(data))
try:
x, y = experiment.data['ft'][0]
experiment.analysis['FT Integral'] = _integrateSpectrum(
x,
y,
float(experiment.parameters['sync_freq']),
float(experiment.parameters['fft_int'])
)
except KeyError:
pass
def _data_slice(data, start, stop):
"""Accepts data (as list of tuples of lists) and returns copy of data
between start and stop (in whatever x-axis units for the experiment type).
"""
output = []
for scan in range(len(data)):
t = []
for i in range(len(data[scan])):
t.append([])
output.append(tuple(t))
for i in range(len(data[scan][0])): # x-axis column
if data[scan][0][i] >= start or data[scan][0][i] <= stop:
for d in range(len(output[scan])):
output[scan][d].append(data[scan][d][i])
return output
def _summary_stats(data):
"""Takes data and returns summary statistics of first y variable as dict of
name, (scan, values).
"""
stats = {'min':[],'max':[], 'mean':[]}
for scan in range(len(data)):
stats['min'].append(
(scan, min(data[scan][1]))
)
stats['max'].append(
(scan, max(data[scan][1]))
)
stats['mean'].append(
(scan, mean(data[scan][1]))
)
return stats
def _integrateSpectrum(x, y, target, bandwidth):
"""
Returns integral of range of bandwidth centered on target.
"""
j = 0
k = len(x)
for i in range(len(x)):
if x[i] >= target-bandwidth/2:
j = i
break
for i in range(j,len(x)):
if x[i] >= target+bandwidth/2:
k = i
break
return [(0, trapz(y=y[j:k], x=x[j:k]))]
\ No newline at end of file
import logging
import pickle
from math import ceil
import pynng
import trio
from dstat_interface.core.dstat.comm import SerialDevices, dstat_connect
from dstat_interface.core.dstat.comm import state as dstat_state
from dstat_interface.core.experiments.experiment_container import ExperimentContainer
from dstat_interface.core.experiments.lsv import LSVExperimentContainer, CVExperimentContainer
from dstat_interface.core.experiments.swv import SWVExperimentContainer, DPVExperimentContainer
from dstat_interface.core.experiments.chronoamp import CAExperimentContainer
from dstat_interface.core.tasks import ExperimentBaseTasks
from dstat_interface.utils import runtime_dir
logger = logging.getLogger(__name__)
e = [LSVExperimentContainer, CVExperimentContainer, SWVExperimentContainer, DPVExperimentContainer, CAExperimentContainer]
experiments: dict[str, type[ExperimentContainer]] = {exp.experiment_id: exp for exp in e}
class DaemonExperimentTasks(ExperimentBaseTasks):
def __init__(self, exp_con: ExperimentContainer, socket: pynng.Pub0, socket_int: pynng.Bus0):
super().__init__(exp_con)
self.socket = socket
self.socket_int = socket_int
self.data_index = 0
self.tasks += [self.send_data, self.check_interrupts]
async def update_progress(self, cancel_scope: trio.CancelScope):
while True:
scan = self.exp_con.progress_scan
last_progress = 0
while True:
await trio.sleep(0.2)
try:
progress = ceil(self.exp_con.get_progress())
# End of Scan
except StopIteration:
await self.socket.asend(f'progress: {scan}, 100'.encode('utf-8'))
break
if progress > last_progress:
await self.socket.asend(f'progress: {scan}, {progress}'.encode('utf-8'))
last_progress = progress
if progress >= 100:
break
async def send_data(self, cancel_scope: trio.CancelScope):
while True:
await trio.sleep(0.2)
new_index = len(self.exp_con.handler_instance.data['timestamp'])
if new_index == 0:
continue
data = {key: value[self.data_index:new_index] for key, value in self.exp_con.handler_instance.data.items()}
await self.socket.asend(b'data:' + pickle.dumps(data))
self.data_index = new_index
if self.exp_con.handler_instance.done:
cancel_scope.cancel()
return
async def check_interrupts(self, cancel_scope: trio.CancelScope):
while True:
msg = await self.socket_int.arecv()
if msg == b'ABORT':
dstat_state.ser.stop_exp()
class DStatDaemon(object):
def __init__(self, listen_addr_pub=None, listen_addr_ctrl=None, listen_addr_interrupt=None):
if listen_addr_pub is None:
self.address_pub = 'ipc://' + runtime_dir() + '/dstat-interfaced-pub'
if listen_addr_ctrl is None:
self.address_ctrl = 'ipc://' + runtime_dir() + '/dstat-interfaced-ctrl'
if listen_addr_interrupt is None:
self.address_int = 'ipc://' + runtime_dir() + '/dstat-interfaced-int'
self.socket_pub = pynng.Pub0(listen=self.address_pub)
self.socket_ctrl = pynng.Rep0(listen=self.address_ctrl)
self.socket_interrupt = pynng.Bus0(listen=self.address_int)
self.device_list = SerialDevices()
self.connected = False
self.command_map = {'fetch_devices': self.fetch_devices,
'connect_dstat': self.connect_dstat,
'disconnect_dstat': self.disconnect_dstat,
'fetch_experiments': self.fetch_experiments,
'run_experiment': self.run_experiment}
async def fetch_devices(self, payload: dict = None) -> dict:
self.device_list.refresh()
return {'ports': self.device_list.ports}
async def connect_dstat(self, payload: dict = None) -> dict:
if not self.connected:
self.connected = dstat_connect(payload['port'])
msg = f'DStat connected: {self.connected}'
else:
msg = "DStat already connected"
return {'connected': self.connected,
'dstat_version': dstat_state.dstat_version,
'firmware_version': dstat_state.firmware_version,
'board': dstat_state.board_instance,
'msg': msg}
async def disconnect_dstat(self, payload: dict = None):
if self.connected:
dstat_state.ser.close()
self.connected = False
return {'connected': False, 'msg': 'DStat disconnected'}
else:
return {'connected': False, 'msg': 'DStat already disconnected'}
async def fetch_experiments(self, payload: dict = None):
return {'experiments': list(experiments.values())}
async def run_experiment(self, exp_dict: dict):
exp = exp_dict['experiment_id']
del exp_dict['experiment_id']
experiment_container = experiments[exp](exp_dict, mux=dstat_state.board_instance.channels)
dstat_state.ser.start_exp(experiment_container.get_proc())
experiment_container.start_handler(dstat_state.ser)
tasks = DaemonExperimentTasks(experiment_container, self.socket_pub, self.socket_interrupt)
await tasks.loop()
return {'status': 'done'}
async def run(self):
while True:
command: str
payload: dict
command, payload = pickle.loads(await self.socket_ctrl.arecv())
await self.socket_ctrl.asend(pickle.dumps(await (self.command_map[command](payload))))
def close(self):
self.socket_pub.close()
self.socket_ctrl.close()
def __del__(self):
self.close()
async def main():
daemon = DStatDaemon()
async with trio.open_nursery() as nursery:
nursery.start_soon(daemon.run)
if __name__ == '__main__':
trio.run(main)
import argparse
import logging
import pathlib
import sys
from math import ceil
from tqdm import tqdm
import pandas as pd
import trio
from dstat_interface.core.tasks import ExperimentBaseTasks
from . import state
from .dstat.comm import SerialDevices, dstat_connect, read_settings
from .experiments.experiment_container import ExperimentContainer
from .experiments.lsv import LSVExperimentContainer, CVExperimentContainer
from .experiments.swv import SWVExperimentContainer, DPVExperimentContainer
from .dstat.state import DStatState
# Setup Logging
logger = logging.getLogger(__name__)
logging_choices = {'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR}
experiments = [LSVExperimentContainer, CVExperimentContainer, SWVExperimentContainer, DPVExperimentContainer]
dstat_state = DStatState()
class CLIExperimentTasks(ExperimentBaseTasks):
async def update_progress(self, cancel_scope):
scan = 0
while True:
if scan == self.exp_con.progress_iters-1: # Inner loop broken when already at last scan (iters 0-indexed)
break
scan = self.exp_con.progress_scan
last_progress = 0
with tqdm(total=100, unit='%', dynamic_ncols=True, desc=f'Scan {scan+1}',
bar_format='{l_bar}{bar}[{elapsed}]') as pbar:
while True:
try:
progress = ceil(self.exp_con.get_progress())
except StopIteration:
pbar.update(100 - last_progress)
break
pbar.update(progress-last_progress)
last_progress = progress
if progress >= 100:
break
await trio.sleep(0.2)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--log-level', choices=logging_choices, default="INFO")
parser.add_argument('--experiment-path', type=str, default=None)
parser.add_argument('-s', '--simulator', action='store_true')
experiment_subparsers = parser.add_subparsers(title="Experiments:", required=True)
for exp in experiments:
experiment = exp({})
exp_parser = experiment_subparsers.add_parser(name=experiment.experiment_id)
exp_parser.set_defaults(experiment=exp)
arguments = experiment.param_input
for i in arguments:
kind = arguments[i]
try:
default = experiment.defaults[i]
except KeyError:
default = None
try:
doc = experiment.param_docs[i]
except KeyError:
doc = ''
if kind is bool:
exp_parser.add_argument(f'--{i}', action=f'store_{not experiment.defaults[i]}'.lower(), help=doc)
elif isinstance(kind, list):
if default is not None:
exp_parser.add_argument(f'--{i}', choices=kind, default=default, help=doc)
else:
exp_parser.add_argument(i, metavar=i, choices=kind, help=doc)
else:
if default is not None:
exp_parser.add_argument(f'--{i}', type=kind, default=default, help=doc)
else:
exp_parser.add_argument(i, type=kind, help=doc)
args = parser.parse_args()
if args.experiment_path is not None:
state.experiment_folder_location = pathlib.Path(args.experiment_path).expanduser() / state.experiment_name
# Make sure paths exist
pathlib.Path(state.experiment_folder_location).mkdir(parents=True, exist_ok=True)
pathlib.Path(state.app_dirs.user_config_dir).mkdir(parents=True, exist_ok=True)
# Logging
logfile = state.experiment_folder_location / f'{state.experiment_name}.log'
root_logger = logging.getLogger()
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
log_handlers = [logging.StreamHandler(), logging.FileHandler(logfile)]
log_formatter = logging.Formatter(
fmt='%(asctime)s %(levelname)s: [%(name)s] %(message)s',
datefmt='%H:%M:%S',
)
for handler in log_handlers:
handler.setFormatter(log_formatter)
root_logger.addHandler(handler)
root_logger.setLevel(logging_choices[args.log_level])
if args.simulator:
dstat_connect('simulator')
else:
ports = SerialDevices()
dstat_connect(ports.ports[0])
logger.info('DStat ver: %s Firmware ver: %s', dstat_state.dstat_version, dstat_state.firmware_version)
read_settings()
exp: ExperimentContainer = args.experiment(vars(args)) # Convert namespace to dict
dstat_state.ser.start_exp(exp.get_proc())
exp.start_handler(dstat_state.ser)
tasks = CLIExperimentTasks(exp)
trio.run(tasks.loop)
data = pd.DataFrame(exp.handler_instance.data)
data.iloc[1:].to_csv(state.experiment_folder_location / 'data.csv')
dstat_state.ser.close()
sys.exit()
# -*- mode: python -*-
a = Analysis(['./main.py'],
pathex=['/Users/mdryden/src/dstat-interface2/dstatInterface'],
hiddenimports=[],
hookspath=None,
runtime_hooks=None)
pyz = PYZ(a.pure)
exe = EXE(pyz,
a.scripts,
exclude_binaries=True,
name='DStat',
debug=False,
strip=None,
upx=True,
console=False )
coll = COLLECT(exe,
a.binaries,
a.zipfiles,
a.datas,
strip=None,
upx=True,
name='DStat')
app = BUNDLE(coll,
name='DStat.app',
icon=None)
from . import comm
from . import dfu
from . import state
\ No newline at end of file
......@@ -19,23 +19,53 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, absolute_import, print_function, unicode_literals
import sys
import inspect
import logging
from abc import ABCMeta, abstractmethod
from typing import Union, Type
from pkg_resources import parse_version, parse_requirements
from packaging.version import parse as parse_version
from packaging.requirements import Requirement
logger = logging.getLogger(__name__)
class BaseBoard(object):
class BaseBoard(object, metaclass=ABCMeta):
pcb_version = 'x.x.x'
booster = False
mux = False
channels = 1
def __init__(self):
self.max_freq = 5000
self.max_scans = 255
self.max_time = 65535
self.mv_max = 1500
self.mv_min = -1500
self.gain = [1, 1e2, 3e3, 3e4, 3e5, 3e6, 3e7, 1e8]
self.gain_labels = ["Bypass", "100 Ω (15 mA FS)", "3 kΩ (500 µA FS)",
"30 kΩ (50 µA FS)", "300 kΩ (5 µA FS)",
"3 MΩ (500 nA FS)", "30 MΩ (50 nA FS)",
"100 MΩ (15 nA FS)"
]
self.adc_rate_labels = ['2.5', '5', '10', '15', '25', '30', '50', '60', '100', '500', '1000', '2000', '3750',
'7500', '15000', '30000']
self.adc_rate_codes = [3, 19, 35, 51, 67, 83, 99, 114, 130, 146, 161, 176, 192, 208, 224, 240]
self.adc_rate_default_index = 7
self.adc_pga = [1, 2, 4, 8, 16, 32, 64]
self.adc_pga_labels = [f'{pga}X' for pga in self.adc_pga]
self.adc_pga_codes = list(range(7))
self.adc_pga_default_index = 1
self.gain_trim = [None, 'r100_trim', 'r3k_trim',
'r30k_trim', 'r300k_trim', 'r3M_trim',
'r30M_trim', 'r100M_trim']
self.gain_settings = len(self.gain)
self.gain_default_index = 2
self.re_voltage_scale = 1
self.setup()
assert len(self.gain) == self.gain_settings
......@@ -43,6 +73,7 @@ class BaseBoard(object):
if self.gain_trim is not None:
assert len(self.gain_trim) == self.gain_settings
@abstractmethod
def setup(self):
"""Override in subclasses to provide correct numbers"""
self.gain = [1, 1e2, 3e3, 3e4, 3e5, 3e6, 3e7, 1e8]
......@@ -114,6 +145,11 @@ class V1_2Board(BaseBoard):
self.re_voltage_scale = 1
class V1_2BoardMUX(V1_2Board):
mux = True
channels = 16
def __get_all_subclasses(cls):
all_subclasses = []
......@@ -124,17 +160,17 @@ def __get_all_subclasses(cls):
return all_subclasses
def find_board(version, booster=False):
def find_board(version, booster=False, mux=False) -> Union[None, Type[BaseBoard]]:
"""Returns highest compatible board class or None if none available."""
boards = __get_all_subclasses(BaseBoard)
candidates = []
for board in boards:
req = parse_requirements('dstat~={}'.format(board.pcb_version)).next()
if board.booster == booster and version in req:
req = Requirement('dstat~={}'.format(board.pcb_version))
if board.booster == booster and board.mux == mux and str(version) in req.specifier:
candidates.append(board)
try:
picked = sorted(candidates,
key=lambda board: parse_version(board.pcb_version))[-1]
key=lambda b: parse_version(b.pcb_version))[-1]
logger.info("Picked %s", picked)
return picked
except IndexError:
......
This diff is collapsed.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division,
print_function, unicode_literals)
import subprocess
import sys
import os
import time
import logging
from tempfile import mkdtemp
from zipfile import ZipFile
if sys.version_info >= (3,):
import urllib.request as urllib2
import urllib.parse as urlparse
else:
import urllib2
import urlparse
logger = logging.getLogger(__name__)
try:
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
except ImportError:
print("ERR: GTK not available")
sys.exit(1)
import serial
from . import state
from .comm import dstat_logger, exp_logger
fwurl = "http://microfluidics.utoronto.ca/gitlab/api/v4/projects/4/jobs/artifacts/master/download?job=1.2.3&private_token=zkgSx1FaaTP7yLyFKkX6"
class FWDialog(object):
def __init__(self, parent, connect, stop_callback, disconnect_callback, signal='activate'):
self.parent = parent
self.stop = stop_callback
self.disconnect = disconnect_callback
connect.connect(signal, self.activate)
def activate(self, widget=None, data=None):
for name, result in assert_deps().items():
if result is not True:
logger.error("Can't use firmware update module.")
self.missing_deps()
return
self.stop() # Stop OCP
version_result, master = test_firmware_version()
if version_result is False:
self.git_error()
return
if version_result == 'latest':
message = "Your firmware is already up to date."
secondary = "Click yes to reflash firmware anyways."
elif version_result == 'devel':
message = "Your firmware is not on the master branch."
secondary = "You may have a development version. " +\
"Click yes to reflash firmware anyways."
elif version_result == 'old':
message = "Your firmware is out of date."
secondary = "Click yes to flash the latest firmware."
dialog = Gtk.MessageDialog(self.parent, 0, Gtk.MessageType.INFO,
Gtk.ButtonsType.YES_NO, message)
dialog.format_secondary_text(secondary)
dialog.get_content_area().add(
Gtk.Label(
label="Installed version: {}".format(state.firmware_version)))
dialog.get_content_area().add(
Gtk.Label(label="Latest version: {}".format(master)))
dialog.show_all()
response = dialog.run()
if response == Gtk.ResponseType.YES:
try:
download_fw()
except:
self.dl_error()
return
dstat_enter_dfu()
self.dfu_notice()
self.disconnect()
try:
dfu_program()
except:
self.dfu_error()
dialog.destroy()
else:
dialog.destroy()
def missing_deps(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Missing Dependencies")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def git_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Git Error")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def dl_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Download Error")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def dfu_notice(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.INFO,
Gtk.ButtonsType.OK, "Note about DFU")
dialog.format_secondary_text("Click OK once the DStat has connected in "
+ "DFU mode. Windows doesn't seem to like the automatic reboot. "
+ "Try holding down the reset button while plugging the "
+ 'USB port in (No LEDs should be lit), then click OK. Make sure '
+ 'the DFU driver from the dfu-programmer directory is installed.')
dialog.run()
dialog.destroy()
def dfu_error(self):
dialog = Gtk.MessageDialog(
self.parent, 0, Gtk.MessageType.ERROR,
Gtk.ButtonsType.OK, "Could not update over DFU")
dialog.format_secondary_text('Check console for more info.')
dialog.connect('response', self.destroy)
dialog.show()
def destroy(self, widget=None, data=None):
widget.destroy()
def assert_deps():
deps = {'git' : 'git --version',
'dfu-programmer' : 'dfu-programmer --version'}
result = {}
for key, command in deps.items():
try:
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
result[key] = True
except subprocess.CalledProcessError:
logger.warning("{} is not available.".format(key))
result[key] = False
return result
def download_fw(): # from https://stackoverflow.com/a/16518224
temp_dir = mkdtemp()
logger.info("Temporary directory: {}".format(temp_dir))
os.chdir(temp_dir) # Go to temporary directory
u = urllib2.urlopen(fwurl)
scheme, netloc, path, query, fragment = urlparse.urlsplit(fwurl)
filename = os.path.basename(path)
if not filename:
filename = 'downloaded.file'
with open(filename, 'wb') as f:
meta = u.info()
meta_func = meta.getheaders if hasattr(meta, 'getheaders') else meta.get_all
meta_length = meta_func("Content-Length")
file_size = None
if meta_length:
file_size = int(meta_length[0])
logger.info("Downloading: {0} Bytes: {1}".format(fwurl, file_size))
file_size_dl = 0
block_sz = 8192
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_size_dl += len(buffer)
f.write(buffer)
status = "{0:16}".format(file_size_dl)
if file_size:
status += " [{0:6.2f}%]".format(file_size_dl * 100 / file_size)
status += chr(13)
logger.info(status)
with ZipFile(filename, mode='r') as z:
fw_path = z.extract('dstat-firmware.hex')
return fw_path
def test_firmware_version(current=None):
if current is None:
current = state.firmware_version
temp_dir = mkdtemp()
logger.info("Temporary directory: {}".format(temp_dir))
os.chdir(temp_dir) # Go to temporary directory
command = "git clone http://microfluidics.utoronto.ca/gitlab/dstat/dstat-firmware.git"
logger.info('Cloning master.')
try:
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
logger.error("git failed with error code {}".format(e.returncode))
logger.error("Output: {}".format(e.output))
return False, None
logger.info(output)
os.chdir("./dstat-firmware")
command = "git rev-parse --short master"
master = subprocess.check_output(command.split(), stderr=subprocess.STDOUT)
logger.info("Current master commit: {}".format(master))
command = "git merge-base --is-ancestor master {}".format(current)
test = subprocess.call(command.split())
if test == 0: # already newest
logger.info('Firmware is latest available.')
return 'latest', master
elif test == 1: # old version
logger.info('Firmware is out of date.')
return 'old', master
elif test == 128: # newer or different branch
logger.info('Firmware is not on the master branch.')
return 'devel', master
else:
logger.error('Unexpected git error. Git exited {}'.format(test))
return False, None
def dfu_program(path='./dstat-firmware.hex'):
"""Tries to program DStat over USB with DFU with hex file at path."""
try:
command = "dfu-programmer atxmega256a3u erase"
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
command = "dfu-programmer atxmega256a3u flash {}".format(path)
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
command = "dfu-programmer atxmega256a3u launch"
output = subprocess.check_output(command.split(),
stderr=subprocess.STDOUT)
logger.info("%s\n%s", command, output)
except subprocess.CalledProcessError as e:
logger.error("{} failed with output:".format(" ".join(e.cmd)))
logger.error(e.output)
raise
def dstat_enter_dfu():
"""Tries to contact DStat and get version. Stores version in state.
If no response, returns False, otherwise True.
Arguments:
ser_port -- address of serial port to use
"""
exp = DFUMode()
state.ser.start_exp(exp)
while True:
result = state.ser.get_proc(block=True)
if result in ('SERIAL_ERROR', 'DONE'):
break
logger.info(result)
# state.ser.disconnect()
time.sleep(.1)
return True
class DFUMode(object):
def __init__(self):
pass
def run(self, ser, ctrl_pipe, data_pipe):
"""Tries to contact DStat and get version. Returns a tuple of
(major, minor). If no response, returns empty tuple.
Arguments:
ser_port -- address of serial port to use
"""
status = None
try:
ser.write(b'!2\n')
exp_logger.info('!2')
for i in range(10):
if ser.readline().rstrip() == b"@ACK 2":
dstat_logger.info('@ACK 2')
ser.write(b'SF\n')
exp_logger.info('SF')
status = "DONE"
time.sleep(5)
break
else:
time.sleep(.5)
ser.reset_input_buffer()
ser.write(b'!2\n')
exp_logger.info('!2')
time.sleep(.1)
except UnboundLocalError as e:
status = "SERIAL_ERROR"
except serial.SerialException as e:
logger.error('SerialException: %s', e)
status = "SERIAL_ERROR"
finally:
return status
if __name__ == "__main__":
log_handler = logging.StreamHandler()
log_formatter = logging.Formatter(
fmt='%(asctime)s %(levelname)s: [%(name)s] %(message)s',
datefmt='%H:%M:%S'
)
log_handler.setFormatter(log_formatter)
logger.setLevel(level=logging.INFO)
logger.addHandler(log_handler)
dstat_enter_dfu()
time.sleep(2)
dfu_program(sys.argv[1])
\ No newline at end of file
from datetime import datetime
import logging
import struct
from typing import Callable, Union, Sequence, Tuple
from .comm import SerialConnection
logger = logging.getLogger(__name__)
dstat_logger = logging.getLogger("{}.DSTAT".format(__name__))
exp_logger = logging.getLogger("{}.Experiment".format(__name__))
class ExperimentHandler(object):
def __init__(self, ser: SerialConnection, data_cols: Sequence[str] = ('voltage', 'current'),
data_format: str = 'vA'):
self.ser = ser
self.data = {'timestamp': [], 'scan': []}
self.data |= {i: [] for i in data_cols}
self.data_cols = data_cols
self.data_convert = [self.get_data_converter(i) for i in data_format]
struct_formats = ['H' if i.islower() else 'l' for i in data_format]
self.struct = struct.Struct(f'<{"".join(struct_formats)}')
self.done = False
self.adc_trim = 0
self.adc_gain = 3000
self.adc_pga = 2
self.first_skipped = False
def get_data_converter(self, fmt: str) -> Union[Callable, None]:
converters = {'v': self.dac_to_volts, 'A': self.adc_to_amps, 'm': self.ms_to_s}
try:
return converters[fmt]
except KeyError:
return None
@staticmethod
def dac_to_volts(dac: int) -> float:
return (dac - 32768) * 3000. / 65536
def adc_to_amps(self, adc: int) -> float:
return (adc + self.adc_trim) * (1.5 / self.adc_gain / 8388607) / (self.adc_pga / 2)
@staticmethod
def ms_to_s(ms: int) -> float:
return ms/1000.
def experiment_running_data(self):
"""Receive data from experiment process and add to
current_exp.data['data].
"""
try:
incoming = self.ser.get_data()
if incoming is not None:
if not self.first_skipped:
self.first_skipped = True
return True
self.data_handler(incoming)
return True
except EOFError as err:
logger.error(err)
return False
except IOError as err:
logger.error(err)
return False
def get_all_data(self):
"""
Processes remaining data in queue
"""
while True:
incoming = self.ser.get_data()
if incoming is not None:
self.data_handler(incoming)
else:
self.done = True
return
def data_handler(self, data_input: Tuple[datetime, int, bytes]):
date, scan, data = data_input
unpacked = self.struct.unpack(data)
self.data['scan'].append(scan)
self.data['timestamp'].append(date)
for n, i in enumerate(unpacked):
try:
self.data[self.data_cols[n]].append(self.data_convert[n](i))
except TypeError: # If no converter
self.data[self.data_cols[n]].append(i)
def experiment_running_proc(self):
"""Receive proc signals from experiment process."""
try:
proc_buffer = self.ser.get_proc()
if proc_buffer is not None:
if proc_buffer in ["DONE", "SERIAL_ERROR", "ABORT"]:
if proc_buffer == "SERIAL_ERROR":
self.ser.close()
else:
logger.warning("Unrecognized experiment return code: %s",
proc_buffer)
return False
return True
except EOFError as err:
logger.warning("EOFError: %s", err)
return False
except IOError as err:
logger.warning("IOError: %s", err)
return False
import logging
import time
from abc import ABC, abstractmethod
from multiprocessing.connection import Connection
from typing import Union
from functools import singledispatchmethod
import serial
logger = logging.getLogger(__name__)
dstat_logger = logging.getLogger("{}.DSTAT".format(__name__))
exp_logger = logging.getLogger("{}.Experiment".format(__name__))
class BaseExperimentProcess(ABC):
def __init__(self, *args, **kwargs):
self.data_bytes = 0
self.serial: Union[None, serial.Serial] = None
self.ctrl_pipe: Union[None, Connection] = None
self.data_pipe: Union[None, Connection] = None
self.commands = []
@singledispatchmethod
def parse_command_string(self, cmd_str, params: dict):
raise NotImplementedError("cmd_str has unrecognized type %s", type(cmd_str))
@parse_command_string.register
def _(self, cmd_str: str, params: dict):
self.commands += [cmd_str.format(**params)]
@parse_command_string.register
def _(self, cmd_str: tuple, params: dict):
cmd: str
data: tuple
cmd, data = cmd_str
cmd = cmd.format(**params)
data_parsed = []
for i in data:
data_parsed += [str(i) for i in params[i]]
self.commands += [(cmd, data_parsed)]
def write_command(self, cmd, params=None, retry=5):
"""Write command to serial with optional number of retries."""
def get_reply(retries=3):
while True:
reply = self.serial.readline().rstrip().decode('ascii')
print(reply)
if reply.startswith('#'):
dstat_logger.info(reply)
elif reply == "":
retries -= 1
if retries <= 0:
raise serial.SerialException()
else:
return reply
n = len(cmd)
if params is not None:
n_params = len(params)
for _ in range(retry):
tries = 5
while True:
time.sleep(0.2)
self.serial.reset_input_buffer()
self.serial.write(f'!{n}\n'.encode('ascii'))
time.sleep(.1)
try:
reply = get_reply()
except serial.SerialException:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != f"@ACK {n}":
logger.warning("Expected ACK got: {}".format(reply))
continue
tries = 5
while True:
self.serial.write(f'{cmd}\n'.encode('ascii'))
print(cmd)
try:
reply = get_reply()
except serial.SerialException:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != f"@RCV {n}":
logger.warning("Expected RCV got: {}".format(reply))
continue
if params is None:
return True
tries = 5
while True:
try:
reply = get_reply()
except serial.SerialException:
if tries <= 0:
continue
tries -= 1
pass
else:
break
if reply != f"@RQP {n_params}":
logger.warning("Expected RQP %s got: %s", n_params, reply)
continue
for i in params:
tries = 5
while tries:
self.serial.write(f"{i} ".encode('ascii'))
try:
reply = get_reply()
if reply == f"@RCVC {i}":
break
elif reply == "@RCVE":
continue
except serial.SerialException:
tries -= 1
continue
else:
logger.error('Communication failure')
return False
return True
return False
def run(self, ser: serial.Serial, ctrl_pipe: Connection, data_pipe: Connection):
"""Execute experiment. Connects and sends handshake signal to DStat
then sends self.commands. Don't call directly as a process in Windows,
use run_wrapper instead.
"""
self.serial = ser
self.ctrl_pipe = ctrl_pipe
self.data_pipe = data_pipe
exp_logger.info('Experiment running')
status = 'DONE'
try:
for i in self.commands:
status = 'DONE'
if isinstance(i, str):
logger.info('Command: %s', i)
print(f'command: {i}')
if not self.write_command(i):
status = 'ABORT'
break
else:
cmd, data = i
print(f'command: {i}')
logger.info("Command: %s", cmd)
if not self.write_command(cmd, params=data):
status = 'ABORT'
break
if not self.serial_handler():
status = 'ABORT'
break
time.sleep(0.25)
except serial.SerialException:
status = 'SERIAL_ERROR'
finally:
while self.ctrl_pipe.poll():
self.ctrl_pipe.recv()
return status
@abstractmethod
def serial_handler(self):
pass
class ExperimentProcess(BaseExperimentProcess):
def __init__(self):
"""Adds commands for gain and ADC."""
super(ExperimentProcess, self).__init__()
self.datapoint = 0
self.scan = 0
self.time = 0
def serial_handler(self):
"""Handles incoming serial transmissions from DStat. Returns False
if stop button pressed and sends abort signal to instrument. Sends
data to self.data_pipe as result of self.data_handler).
"""
scan = 0
start = None
try:
while True:
if self.ctrl_pipe.poll():
ctrl = self.ctrl_pipe.recv()
logger.debug("serial_handler: %s", ctrl)
if ctrl == "DISCONNECT":
self.serial.write(b'a')
self.serial.reset_input_buffer()
logger.info("serial_handler: ABORT pressed!")
time.sleep(.3)
return False
elif ctrl == 'a':
self.serial.write(b'a')
for line in self.serial:
if self.ctrl_pipe.poll():
if self.ctrl_pipe.recv() == 'a':
self.serial.write(b'a')
if line.startswith(b'B'):
if not start:
start = time.perf_counter()
data = (time.perf_counter()-start, scan, self.serial.read(size=self.data_bytes))
self.data_pipe.send(data)
elif line.lstrip().startswith(b'S'):
scan += 1
elif line.lstrip().startswith(b"#"):
dstat_logger.info(line.strip().decode('ascii'))
elif line.lstrip().startswith(b"@DONE"):
dstat_logger.debug(line.strip().decode('ascii'))
return True
except serial.SerialException:
return False
from copy import copy
from io import IOBase
from math import ceil, floor
import logging
import struct
from time import sleep
import re
from typing import Callable, Union
import numpy as np
logger = logging.getLogger("dstat.simulator")
digit_pattern = re.compile(rb'(\d+)')
class Simulator(object):
default_settings = {'max5443_offset': 0,
'tcs_enabled': 1,
'tcs_clear_threshold': 10000,
'r100_trim': 0,
'r3k_trim': 0,
'r30k_trim': 0,
'r300k_trim': 0,
'r3M_trim': 0,
'r30M_trim': 0,
'r100M_trim': 0,
'eis_cal1': 0,
'eis_cal2': 0,
'dac_units_true': 1}
def __init__(self):
self.current_state = self.main
self.next_state: Union[None, Callable] = None
self.output: bytes = b''
self.input_str: bytes = b''
self.cmd_length: int = 0
self.command_str: bytes = b''
self.waiting_for_params: int = 0
self.extra_params = []
self.current_cmd: bytes = b''
self.current_params: list = []
self.settings_dict = copy(Simulator.default_settings)
def _get_input(self, size=1) -> bytes:
if size > len(self.input_str):
logger.error("Not enough characters in input buffer")
raise IndexError("Not enough characters in input buffer")
string = self.input_str[:size]
self.input_str = self.input_str[size:]
return string
def _get_command(self):
string = self.command_str[:1]
self.command_str = self.command_str[1:]
return string
def _get_params(self, n):
params = self.command_str.split(None, n)
if len(params) > n:
self.input_str = params.pop()
elif len(params) < n:
raise IndexError("Not enough characters in command buffer")
return params
def input(self, string):
logger.debug("input: {}".format(string))
self.input_str = string
while len(self.input_str) > 0:
self.current_state()
def main(self):
char = self._get_input()
if char == b'!':
try:
match = re.match(digit_pattern, self.input_str)
cmd_length = match.group(1)
except AttributeError:
logger.error("No character count received")
self.input_str = b''
return
self.cmd_length = int(cmd_length)
self.input_str = self.input_str[len(cmd_length) + 1:]
self.output += f'@ACK {self.cmd_length}\n'.encode('ascii')
if self.cmd_length == 0:
self.output += b'@RCV 0\n'
return
self.current_state = self.command
if self.input_str:
self.current_state()
def command(self):
def restart():
logger.info("USB restart received")
return
def version():
logger.info("Version Check")
self.output += f'V1.2.3-{0xfffffff:d}\n'.encode('ascii')
return
command_map = {b'E': self.experiment,
b'S': self.settings,
b'R': restart,
b'V': version}
self.command_str = self._get_input(self.cmd_length + 1)
self.output += f'@RCV {self.cmd_length}\n'.encode('ascii')
char = self.command_str[:1]
self.command_str = self.command_str[1:-1] # Remove delimiter
try:
if command_map[char]():
self.current_state = self.input_params
return
except KeyError:
logger.warning("Unrecognized command {}".format(char))
self.output += f"#ERR: Command {char} not recognized\n".encode('ascii')
if self.current_state.__name__ != self.abort_wait.__name__:
self.output += b"@DONE\n"
self.current_state = self.main
else:
logger.info("abort_wait state")
logger.info("Command {} Finished".format(char))
def abort_wait(self):
logger.info("abort_wait() called")
self.current_state = self.abort_wait
if self._get_input() == b'a':
logger.info("Abort signal received")
self.output += b"@DONE\n"
self.current_state = self.main
def input_params(self):
param = self._get_params(1)
self.extra_params.append(param)
self.output += f'@RCVC {param}\n'.encode('ascii')
self.waiting_for_params -= 1
if self.waiting_for_params == 0:
self.next_state()
def settings(self):
def reset():
logger.info("Settings reset")
self.settings_dict = self.default_settings.copy()
def firmware():
logger.info("Firmware update mode")
def read():
self.output += b"S"
for key, value in self.settings_dict.items():
self.output += f"{key}.{value}:".encode('ascii')
self.output = self.output.rstrip(b':')
self.output += b"\n"
def write():
params = self._get_params(10)
for n, i in enumerate(self.settings_dict):
self.settings_dict[i] = int(params[n])
settings_map = {b'D': reset,
b'F': firmware,
b'R': read,
b'W': write
}
char = self.command_str[:1]
self.command_str = self.command_str[1:]
try:
settings_map[char]()
except KeyError:
logger.warning("Settings control %s not found", char)
def experiment(self):
def ads1255():
params = self._get_params(3)
logger.info("ADS1255 params: %s", params)
self.output += f'#A: {params[0]} {params[1]} {params[2]}\n'.encode('ascii')
def gain():
params = self._get_params(2)
logger.info("IV gain: %s", params)
self.output += f'#G: {params[0]} {params[1]}\n'.encode('ascii')
def lsv():
params = self._get_params(7)
start = int(params[-3])
stop = int(params[-2])
slope = int(params[-1])
logger.info("LSV params: %s", params)
for i in np.arange(start, stop, slope//10):
self.output += b"B\n"
self.output += struct.pack('<Hl', i, 500 * (i-32698))
self.output += b"\n"
def cv():
params = self._get_params(9)
start = int(params[-3])
v1 = int(params[-5])
v2 = int(params[-4])
scans = int(params[-2])
slope = int(params[-1])
logger.info("CV params: %s", params)
for scan in range(scans):
for i in np.arange(start, v1, slope//100):
self.output += b"B\n"
self.output += struct.pack('<Hl', i, 500 * (i-32698) + 100 * scan)
self.output += b"\n"
for i in np.arange(v1, v2, slope//100):
self.output += b"B\n"
self.output += struct.pack('<Hl', i, 500 * (i - 32698) + 100 * scan)
self.output += b"\n"
for i in np.arange(v2, start, slope//100):
self.output += b"B\n"
self.output += struct.pack('<Hl', i, 500 * (i - 32698) + 100 * scan)
self.output += b"\n"
self.output += b"S\n"
self.output += b"D\n"
def swv():
params = self._get_params(10)
start = int(params[-6])
stop = int(params[-5])
step = int(int(params[-4])*3000/65536.)
scans = int(params[-1])
logger.info("SWV params: %s", params)
if scans < 1:
scans = 1
for scan in range(scans):
for i in np.arange(start, stop, step):
self.output += b"B\n"
self.output += struct.pack('<Hll', i, 500 * (i - 32698) + 100 * scan, 0)
self.output += b"\n"
for i in np.arange(stop, start, step):
self.output += b"B\n"
self.output += struct.pack('<Hll', i, 500 * (i - 32698) + 100 * scan, 0)
self.output += b"\n"
self.output += b"S\n"
self.output += b"D\n"
def dpv():
params = self._get_params(10)
start = int(params[-6])
stop = int(params[-5])
logger.info("DPV params: %s", params)
for i in np.arange(start, stop, 1):
self.output += b"B\n"
self.output += struct.pack('<Hll', i, 500 * (i - 32698), 0)
self.output += b"\n"
self.output += b"D\n"
def pmt():
logger.info("PMT Idle mode entered")
self.abort_wait()
def pot():
params = self._get_params(2)
seconds = int(params[0])
logger.info("POT params: %s", params)
if seconds == 0:
self.abort_wait()
else:
for i in range(seconds):
self.output += b"B\n"
self.output += struct.pack('<Hll', i, 0, 100)
def ca():
params = self._get_params(2)
steps = int(params[0])
tcs = int(params[1])
self.current_params = [steps, tcs]
self.waiting_for_params = steps * 2
self.output += f"@RQP {steps * 2}\n".encode("ascii")
self.next_state = self.experiment
return True
def ca_params(steps, tcs):
times = [int(i) for i in self.extra_params[steps:]]
seconds = sum(times)
for i in range(seconds):
logger.info(i)
self.output += b"B\n"
self.output += struct.pack('<HHl', i, 0, 100 * i)
self.output += b"\n"
sleep(1)
self.extra_params = []
def sync():
params = self._get_params(1)
logger.info('Shutter Sync %s Hz', params[0])
def shut_off():
logger.info('Shutter Sync Off')
def shut_close():
logger.info('Shutter closed')
def shut_open():
logger.info('Shutter open')
experiment_map = {b'A': ads1255,
b'G': gain,
b'L': lsv,
b'C': cv,
b'S': swv,
b'D': dpv,
b'M': pmt,
b'P': pot,
b'R': ca,
b'Z': sync,
b'z': shut_off,
b'1': shut_close,
b'2': shut_open
}
experiment_post_params_map = {b'R': ca_params}
if self.next_state:
experiment_post_params_map[self.current_cmd](*self.current_params)
self.current_params = []
self.next_state = None
else:
char = self._get_command()
try:
return experiment_map[char]()
except KeyError:
logger.warning('Unrecognized exp command %s', char)
self.output += f'#ERR: Command {char} not recognized\n'.encode('ascii')
class SerialSim(IOBase):
def __init__(self, *args, **kwargs):
self.sim = Simulator()
self.is_open = True
def open(self):
self.is_open = True
def close(self):
self.is_open = False
self.reset_input_buffer()
def write(self, string: bytes):
self.sim.input(string)
def read(self, size=1) -> bytes:
output = self.sim.output[0:size]
self.sim.output = self.sim.output[size:]
return output
def reset_input_buffer(self):
self.sim.output = b""
def next(self) -> bytes:
if self.sim.output == b"":
raise StopIteration
return self.readline()
def readline(self, size=-1) -> bytes:
if len(self.sim.output) == 0:
return b""
if size > 0:
output, sep, remain = self.sim.output[:size].partition(b'\n')
else:
output, sep, remain = self.sim.output.partition(b'\n')
if sep != b'\n':
return b""
self.sim.output = self.sim.output[len(output) + 1:]
return output + b'\n'
def __iter__(self):
return self
def _update_dtr_state(self):
pass
from collections import OrderedDict
def reset():
settings = OrderedDict()
ser = None
dstat_version = None
firmware_version = None
board_instance = None
settings = OrderedDict()
ser = None
dstat_version = None
firmware_version = None
board_instance = None
\ No newline at end of file
from __future__ import annotations
from typing import Union, TYPE_CHECKING
from dstat_interface.utils import Singleton
if TYPE_CHECKING:
from pkg_resources._vendor.packaging import version
from dstat_interface.dstat.comm import SerialConnection
from dstat_interface.dstat.boards import BaseBoard
class DStatState(object, metaclass=Singleton):
def __init__(self):
self.settings = {}
self.ser: Union[None, SerialConnection] = None
self.dstat_version: Union[None, version.Version] = None
self.firmware_version: Union[None, str] = None
self.board_instance: Union[None, BaseBoard] = None
def reset(self):
self.settings = {}
self.ser = None
self.dstat_version = None
self.firmware_version = None
self.board_instance = None
import pytest
from pkg_resources import parse_version
from dstat_interface.dstat.comm import dstat_connect, read_settings, write_settings
from dstat_interface.dstat import simulator, state
class TestDStat:
@pytest.yield_fixture(autouse=True)
def dstat_connect_simulator(self):
dstat_connect('simulator')
yield
state.ser.close()
def test_dstat_connect(self):
assert state.dstat_version == parse_version('1.2.3') and state.firmware_version == 'fffffff'
def test_read_settings(self):
read_settings()
assert state.settings == simulator.Simulator.default_settings
def test_write_settings(self):
new_settings = dict([('max5443_offset', 200),
('tcs_enabled', 0),
('tcs_clear_threshold', 19000),
('r100_trim', 23),
('r3k_trim', 14),
('r30k_trim', 56),
('r300k_trim', 3),
('r3M_trim', 2),
('r30M_trim', -34),
('r100M_trim', -60)
('eis_cal1', 0),
('eis_cal2', 0),
('dac_units_true', 1)
])
state.settings = new_settings
write_settings()
state.settings = {}
read_settings()
assert state.settings == new_settings
import logging
from multiprocessing.connection import Connection
from typing import Union
from serial import Serial, SerialException
from .experiment_process import BaseExperimentProcess
from dstat_interface.core.dstat.state import DStatState
logger = logging.getLogger(__name__)
dstat_logger = logging.getLogger("{}.DSTAT".format(__name__))
exp_logger = logging.getLogger("{}.Experiment".format(__name__))
state = DStatState()
def abs_mv_to_dac(mv: float) -> int:
dac = int(mv / state.board_instance.re_voltage_scale * (65536. / 3000) + 32768)
assert 0 <= dac <= 65535
return dac
def rel_mv_to_dac(mv: float) -> int:
dac = int(mv / state.board_instance.re_voltage_scale * (65536. / 3000))
assert 0 < dac <= 65535 # DStat params that take relative mv don't accept 0
return dac
def param_test_uint16(u: int) -> int:
assert 0 <= u <= 65535
return int(u)
def param_test_non_zero_uint16(u: int) -> int:
assert 0 < u <= 65535
return int(u)
def param_test_uint8(u: int) -> int:
assert 0 < u <= 255
return int(u)
def param_test_non_zero_uint8(u: int) -> int:
assert 0 < u <= 255
return int(u)
class VersionCheckProcess(BaseExperimentProcess):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.commands += ['V', 'X']
def serial_handler(self):
try:
while True:
for line in self.serial:
if line.lstrip().startswith(b"#"):
dstat_logger.info(line.strip().decode('ascii'))
elif line.lstrip().startswith(b"@DONE"):
dstat_logger.debug(line.strip().decode('ascii'))
return True
elif line.startswith(b'V'):
ver_str = line.lstrip(b'V').decode('ascii')
pcb, sep, firmware = ver_str.strip().rpartition('-')
if pcb == "":
pcb = firmware
logger.info('Your firmware does not support PCB version detection.')
self.data_pipe.send(('V', (pcb, False)))
else:
logger.info(f"Firmware Version: {hex(int(firmware)).lstrip('0x')}")
self.data_pipe.send(('V', (pcb, hex(int(firmware)).lstrip('0x'))))
logger.info(f'PCB Version: {pcb}')
elif line.startswith(b'X'):
mux_str = line.lstrip(b'X').decode('ascii')
self.data_pipe.send(('X', int(mux_str)))
except SerialException:
return False
class SettingsProcess(BaseExperimentProcess):
def __init__(self, task, settings: Union[None, dict[str, str]] = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.task = task
self.settings = settings
self.serial: Union[None, Serial] = None
self.ctrl_pipe: Union[None, Connection] = None
self.data_pipe: Union[None, Connection] = None
if self.task == 'w':
write_buffer = [str(i) for i in self.settings.values()]
to_write = ' '.join(write_buffer) + ' '
logger.debug('to_write = %s', to_write)
self.commands += ['SW' + to_write]
elif self.task == 'r':
self.commands += ['SR']
def serial_handler(self):
ser_logger = logging.getLogger(f'{__name__}._serial_process')
try:
while True:
for line in self.serial:
if line.lstrip().startswith(b"#"):
ser_logger.info(line.strip().decode('ascii'))
elif line.lstrip().startswith(b"@DONE"):
ser_logger.debug(line.strip().decode('ascii'))
return True
if line.lstrip().startswith(b'S'):
input_line = line.lstrip()[1:].decode('ascii')
parted = input_line.rstrip().split(':')
settings = {key: int(value) for key, value in [i.split('.') for i in parted]}
self.data_pipe.send(settings)
except SerialException:
return False
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DStat Interface - An interface for the open hardware DStat potentiostat
# Copyright (C) 2014 Michael D. M. Dryden -
# Wheeler Microfluidics Laboratory <http://microfluidics.utoronto.ca>
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class Error(Exception):
"""Copies Exception class"""
pass
class InputError(Error):
"""Exception raised for errors in the input. Extends Error class.
Attributes:
expr -- input expression in which the error occurred
msg -- error message
"""
def __init__(self, expr, msg):
self.expr = expr
self.msg = msg
class VarError(Error):
"""Exception raised for internal variable errors. Extends Error class.
Attributes:
var -- var in which the error occurred
msg -- error message
"""
def __init__(self, var, msg):
self.var = var
self.msg = msg
\ No newline at end of file
# __all__ = []
#
# import pkgutil
# import inspect
# from . import cal, chronoamp, cv
#
#
# for loader, name, is_pkg in pkgutil.walk_packages(__path__):
# print loader, name, is_pkg
# module = loader.find_module(name).load_module(name)
#
# for name, value in inspect.getmembers(module):
# if name.startswith('__'):
# continue
#
# globals()[name] = value
# __all__.append(name)
\ No newline at end of file