Commit 47b6a39f authored by Maciej Pawlisz's avatar Maciej Pawlisz Committed by Joanna Duda

Docstyle fix

refs #33651
parent 6b9e0daa
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.5.2 (/usr/bin/python3.5)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/tmp_repo.iml" filepath="$PROJECT_DIR$/.idea/tmp_repo.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
This diff is collapsed.
recursive-include obci_readmanager *
include versioneer.py
include obci_readmanager/_version.py
prune .eggs
OBCI readmanager
readmanager support module.
"""Module provides classes representing single blink and blink buffers."""
import collections
import numpy
from obci.signal_processing.buffers import ring_buffer as rbn
Sample = numpy.zeros
class Blink:
""":class:'Blink' initializes packet of a blink."""
def __init__(self, timestamp, index):
"""Initialize packet of a blink."""
self.timestamp = timestamp
self.index = index
class BlinkEntry(object):
"""Class representing single blink."""
def __init__(self, blink: Blink, blink_count: int, blink_pos: int) -> None:
"""
Blink variable as a representation of the stimulus.
:param blink: variable Blink (object with one field named timestamp)
:param blink_count: index of the stimulus
:param blink_pos: position of the stimulus in relation to the indexes of the samples in the buffer.
"""
self.blink = blink
self.count = blink_count
self.position = blink_pos
class AutoBlinkBuffer(object):
"""
Class representing buffer which returns predefined epochs of signal around some blink event.
Ex. to use in event related potential paradigms.
"""
def __init__(self, from_blink, samples_count, num_of_channels, sampling, ret_func, copy_on_ret):
"""
After occurrence of the blink and getting a sufficient number of samples calls ret_func(blink, d).
Blink is a Blink variable and d is a corresponding numpy array with
<samples_count> samples from <num_of_channels> channels.
:param from_blink: blink from which start collecting samples
:param samples_count: number of samples to return
:param num_of_channels: number of channels to return
:param sampling: sampling frequency
:param ret_func: function to call when return. Takes returned samples.
:param copy_on_ret: if true make deep copy of the buffer while processing
"""
assert (samples_count > 0)
assert (num_of_channels > 0)
self.ret_func = ret_func
self.ret_buf_len = samples_count # 1024
self.blink_from = from_blink # 128
self.sampling = sampling # float(1024)
self.curr_blink = None
self.curr_blink_ts = None
self.count = 0
self.blinks_count = 0
self.is_full = False
self.whole_buf_len = (self.ret_buf_len + abs(self.blink_from)) * 2
self.buffer = rbn.RingBuffer(self.whole_buf_len, num_of_channels,
copy_on_ret)
self.times = rbn.RingBuffer(self.whole_buf_len,
1, copy_on_ret)
self.times_sample = Sample(1)
self.blinks = collections.deque()
def clear(self):
"""Clear buffer."""
self.count = 0
self.is_full = False
self.buffer.clear()
self.times.clear()
self._clear_blinks()
def _clear_blinks(self):
self.blinks_count = 0
self.blinks.clear()
def handle_blink(self, blink: Blink) -> None:
"""
Method which assigns index and position to the Blink and queues this Blink.
:param blink: Blink (object with one field named timestamp)
:return:
"""
if not self.is_full:
print("AutoBlinkBuffer - Got blink before buffer is full. Ignore!")
return
blink_ts = blink.timestamp + (self.blink_from / self.sampling)
blink_pos = self._get_times_index(blink.timestamp)
if blink_pos < 0:
return
elif blink_pos >= self.whole_buf_len:
# nie ma jeszcze nawet pierwszej probki
blink_count = self.ret_buf_len + int(
self.sampling * (blink_ts - self._get_times_last()))
blink_pos = self.whole_buf_len - self.ret_buf_len
else:
# gdzies w srodku
blink_count = self.ret_buf_len - (self.whole_buf_len - blink_pos)
if blink_count < 0:
blink_count = 0
blink_pos -= blink_count
if not len(self.blinks) == 0:
blink_count -= self.blinks_count # last.count #get_last_blink()
b = BlinkEntry(blink, blink_count, blink_pos)
self.blinks.append(b)
self.blinks_count += blink_count
# [10, 15, 18, 22]
def handle_sample_packet(self, sample_packet):
"""
Method handles signal sample packets in a manner consistent with AutoBlinkBuffer.
:param sample_packet: signal as a SamplePacket class.
:return: None
"""
for idx in range(len(sample_packet.ts)):
self._handle_sample(sample_packet.samples[idx], sample_packet.ts[idx])
def _handle_sample(self, s, t):
self.buffer.add(s)
self.times_sample[0] = t
self.times.add(self.times_sample)
self.count += 1
if not self.is_full:
self.is_full = (self.count == self.whole_buf_len)
else:
if not len(self.blinks) == 0:
curr = self.blinks[0]
curr.count -= 1
self.blinks_count -= 1
if curr.count <= 0:
curr = self.blinks.popleft()
d = self.buffer.get(curr.position, self.ret_buf_len)
self.ret_func(curr.blink, d)
def _get_times_index(self, value):
if self.is_full:
last = self.whole_buf_len
else:
last = self.count
vect = self.times.get(0, last)[0]
ret = -1
for i, v in enumerate(vect):
if value < v:
return ret
ret = i
return self.whole_buf_len
def _get_times_last(self):
if self.is_full:
last = self.whole_buf_len
else:
last = self.count
return self.times.get(0, last)[0][last - 1]
"""Module provides single class representing sample packet buffer."""
from obci.signal_processing.buffers import ring_buffer as rbn
class AutoRingBuffer(object):
"""Class representing sample packet buffer. When full calls ret_func."""
def __init__(self, from_sample, samples_count, every, num_of_channels, ret_func, copy_on_ret):
"""
When full, calls ret_func(d), where d is numpy array with <samples_count>samples from <num_of_channels>channels.
Skips samples before next ret_func call.
:param from_sample: size of the buffer
:param samples_count: number of samples to return
:param every: interval (number of samples to skip) before next return
:param num_of_channels: number of channels in the returned signal
:param ret_func: function to call when return. Takes returned samples.
:param copy_on_ret: if true make deep copy of the buffer while processing
"""
assert (samples_count > 0)
assert (from_sample > 0)
assert (every > 0)
assert (num_of_channels > 0)
self.every = every
self.ret_func = ret_func
self.whole_buf_len = from_sample
self.ret_buf_len = samples_count
self.count = 0
self.is_full = False
self.buffer = rbn.RingBuffer(from_sample,
num_of_channels,
copy_on_ret)
def clear(self):
"""Clear buffer."""
self.count = 0
self.is_full = False
self.buffer.clear()
def handle_sample_packet(self, sample_packet):
"""
Handler supports packets in a manner consistent with AutoBlinkBuffer.
:param sample_packet: signal as SamplePackage class
:return: None
"""
for idx in range(len(sample_packet.ts)):
self._handle_sample(sample_packet.samples[idx])
def _handle_sample(self, s):
self.buffer.add(s)
self.count += 1
if not self.is_full:
if self.count == self.whole_buf_len:
self.is_full = True
self.count %= self.every
if self.count == 0:
self.count = self.every
if self.is_full and self.count == self.every:
d = self.buffer.get(0, self.ret_buf_len)
self.ret_func(d)
self.count = 0
"""Module provides single class representing buffer."""
import numpy
from obci.signal_processing.buffers import ring_buffer_base
class RingBuffer(ring_buffer_base.RingBufferBase):
"""Subclass 'RingBufferBase' representing buffer."""
def _get_normal(self, start, end):
return self.buffer[:, start:end]
def _get_concat(self, start, end):
return numpy.concatenate((self.buffer[:, start:],
self.buffer[:, :end]),
axis=1)
def _add(self, s):
for i in range(self.number_of_channels):
self.buffer[i, self.index] = s[i]
def _init_buffer(self):
self.buffer = numpy.zeros((self.number_of_channels,
self.size), dtype='float')
"""Module provides single class representing base buffer."""
import copy
class RingBufferBase(object):
"""Class representing base buffer."""
def __init__(self, size, number_of_channels, copy_on_ret):
"""Initialize buffer."""
self.size = int(size)
self.number_of_channels = int(number_of_channels)
self.copy_on_ret = bool(copy_on_ret)
self.clear()
def clear(self):
"""Clear buffer."""
self.is_full = False
self.index = 0
self._init_buffer()
def add(self, s):
"""Add sample to buffer."""
self._add(s)
if not self.is_full and self.index == self.size - 1:
self.is_full = True
self.index = (self.index + 1) % self.size
def get(self, start, length):
"""Get samples from buffer (start, start + length)."""
if not self.is_full:
d = self._get_normal(start, start + length)
else:
if self.index + start + length <= self.size:
d = self._get_normal(self.index + start, self.index + start + length)
elif self.index + start >= self.size:
ind = (self.index + start) % self.size
d = self._get_normal(ind, ind + length)
else:
d = self._get_concat(self.index + start,
length - (self.size - (self.index + start)))
if self.copy_on_ret:
return copy.deepcopy(d)
else:
return d
def _get_normal(self, start, end):
raise Exception("To be implemented!")
def _get_concat(self, start, end):
raise Exception("To be implemented!")
def _add(self, s):
raise Exception("To be implemented!")
def _init_buffer(self):
raise Exception("To be implemented!")
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
from ._version import get_versions
__revision__ = get_versions()['full-revisionid']
del get_versions
from .debug import install_debug_handler
install_debug_handler()
del install_debug_handler
This diff is collapsed.
"""Recommended tools for OpenBCI debugging.
* PyCharm
* ``install_debug_handler()`` defined in this file
To connect to existing processes with PyCharm or Pyrasite
you need to run is you have a kernel biult with
CONFIG_SECURITY_YAMA option:
``echo 0 | sudo tee /proc/sys/kernel/yama/ptrace_scope``
and install ``python3-dbg`` package.
"""
def install_debug_handler() -> None:
"""Install debug handler.
Break into a Python console upon:
* ``SIGUSR1`` (Linux), use: ``kill -SIGUSR1 [pid]`` command
* ``SIGBREAK`` (Windows: CTRL+Pause/Break)
"""
def debug_signal_handler(signal, frame):
del signal
del frame
# NOTE: Following debuggers were tested and don't
# work with Python3 based OpenBCI, so don't
# waste your time on them.
# - RPDB2/Winpdb
# - Pyrasite
try:
import pudb
pudb.set_trace()
except Exception as ex:
pass
try:
import code
code.interact()
except Exception as ex:
print("%r, returning to normal program flow" % ex)
# TODO: ...
# try:
# import pdb
# pdb.set_trace()
# except Exception:
# pass
try:
import signal
signal.signal(
vars(signal).get('SIGBREAK') or vars(signal).get('SIGUSR1'),
debug_signal_handler
)
except ValueError:
# Typically: ValueError: signal only works in main thread
pass
"""Peers for readmanager."""
--index-url https://pypi.python.org/simple/
-e .
#!/usr/bin/env bash
set -e
mkdir dist
python3 setup.py --command-packages=stdeb.command bdist_deb
mv ./deb_dist/*.deb ./dist/
#!/usr/bin/env bash
set -e
#needs variables:
#DEB_SRV
#DEB_USER
#SSHPASSV
#RELEASE
DEBUID=$(uuidgen)
sshpass -p $SSHPASS ssh -o StrictHostKeyChecking=no $DEB_USER@$DEB_SRV "mkdir -p -v ~/incoming/$RELEASE/$DEBUID/"
sshpass -p $SSHPASS scp -o StrictHostKeyChecking=no -v ./dist/* $DEB_USER@$DEB_SRV:~/incoming/$RELEASE/$DEBUID/
sshpass -p $SSHPASS ssh -o StrictHostKeyChecking=no $DEB_USER@$DEB_SRV "~/trigger_rebuild.sh $DEBUID $RELEASE"
#!/bin/bash
flake8 --enable-extensions=D --select D obci_readmanager/
#!/usr/bin/env bash
set -e
apt-get -qq -y install git debhelper sed equivs chrpath python3-all python3-pip
# for some unexplained reason stdeb also need python-dev package installed
apt-get -qq -y install python-dev python3-all-dev
# for building packages
pip3 install stdeb pyqt-distutils
#!/usr/bin/env bash
set -e
apt-get -qq -y install python3-pip
#!/usr/bin/env bash
set -e
apt-get -qq -y install sshpass openssh-client uuid-runtime
#!/usr/bin/env bash
set -e
apt-get -qq -y install gnupg debsigs
#!/usr/bin/env bash
set -e
# GPG_KEY - enviroment variable - private key in base64
echo "$GPG_KEY" | base64 -d | gpg --allow-secret-key-import --import -
cd dist
debsigs --sign=origin python3-obci-readmanager*.deb
#!/usr/bin/env bash
set -e
# used by setup.py to get OpenBCI version
apt-get -qq -y install git
# basic Python 3 environment
apt-get -qq -y install python3-pip python3-setuptools
\ No newline at end of file
[versioneer]
VCS = git
style = pep440
versionfile_source = obci_readmanager/_version.py
versionfile_build = obci_readmanager/_version.py
tag_prefix = ''
parentdir_prefix = ''
[flake8]
exclude = .eggs,
.git,
_env,
__pycache__,
*_pb2.py,
*_rc.py,
obci_readmanager/__init__.py,
versioneer.py,
_version.py
doctests = True
ignore = D
max-line-length = 120
max-complexity = 11
\ No newline at end of file
import sys
from setuptools import setup, find_packages
import versioneer
test_requirements = [
'pytest>=3.0',
'pytest-cov>=2.3.1',
'pytest-timeout>=1.0',
'pytest-catchlog>=1.2.2',
'flaky>=3.3.0',
'nose>=1.3.7',
]
needs_pytest = {'pytest', 'test', 'ptr'}.intersection(sys.argv)
pytest_runner_requirement = ['pytest-runner>=2.9']
setup_requires = pytest_runner_requirement if needs_pytest else []
setup(
name='obci-readmanager',
version=versioneer.get_version(),
cmdclass=versioneer.get_cmdclass(),
zip_safe=False,
author='BrainTech',
author_email='admin@braintech.pl',
description='OpenBCI 2 readmanager support module',
packages=find_packages(exclude=['scripts', ]),
include_package_data=True,
exclude_package_data={'': ['.gitignore', '.gitlab-ci.yml']},
install_requires=[],
tests_require=test_requirements,
setup_requires=setup_requires,
entry_points={
'console_scripts': [
'obci_readmanager = obci_readmanager.cmd.run_readmanager_preset:run',
],
},
)
[DEFAULT]
Depends3: python3-obci
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment