Commit a6d70d31 authored by Piotr Różański's avatar Piotr Różański Committed by Joanna Duda

Move RM and wiiRM to new repo

parent 2bba0650
*~
*.bak
*.pyc
*#*
*.#*
*.o
*.cmd
*.orig
*.so
/obci/control/gui/resources/*ui.py
/obci/control/gui/resources/*_rc.py
__pycache__
obci.egg-info
build
dist
deb_dist
.eggs
.cache
.coverage
openbci.pyproj
openbci.sln
.vs/
.idea/
coverage_html/
docs/_build/
docs/apidoc/
<?xml version="1.0" encoding="UTF-8"?>
rsion="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/tmp_repo.iml" filepath="$PROJECT_DIR$/.idea/tmp_repo.iml" />
</modules>
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>
\ No newline at end of file
This diff is collapsed.
"""This module contains single :class'WBBReadManager' for wii balance board."""
import numpy as np
from obci_readmanager.signal_processing import read_manager
class WBBReadManager():
"""Read_manager for wii balance board."""
def __init__(self, info_source, data_source, tags_source):
"""A class responsible for reading OpenBCI file format for wii balance board."""
super(WBBReadManager, self).__init__()
try:
self.mgr = read_manager.ReadManager(info_source,
data_source,
tags_source)
except IOError as e:
raise Exception("\n[ERROR]\t{}".format(e))
def get_raw_signal(self):
"""Return raw sensor data (TopRight, TopLeft, BottomRight, BottomLeft)."""
top_left = self.mgr.get_channel_samples('tl')
top_right = self.mgr.get_channel_samples('tr')
bottom_right = self.mgr.get_channel_samples('br')
bottom_left = self.mgr.get_channel_samples('bl')
return top_left, top_right, bottom_right, bottom_left
def get_x(self):
"""Return COPx computed from raw sensor data and adds 'x' channel to ReadManager object."""
top_left, top_right, bottom_right, bottom_left = self.get_raw_signal()
x, y = self.get_x_y(top_left, top_right, bottom_right, bottom_left)
samples = self.mgr.get_samples()
chann_names = self.mgr.get_param('channels_names')
self.mgr.set_samples(np.vstack((samples,x)), chann_names + [u'x'])
chann_off = self.mgr.get_param('channels_offsets')
self.mgr.set_param('channels_offsets',chann_off + [u'0.0'])
chann_gain = self.mgr.get_param('channels_gains')
self.mgr.set_param('channels_gains',chann_gain + [u'1.0'])
return x
def get_y(self):
"""Return COPy computed from raw sensor data and adds 'y' channel to ReadManager object."""
top_left, top_right, bottom_right, bottom_left = self.get_raw_signal()
x, y = self.get_x_y(top_left, top_right, bottom_right, bottom_left)
samples = self.mgr.get_samples()
chann_names = self.mgr.get_param('channels_names')
self.mgr.set_samples(np.vstack((samples,y)), chann_names + [u'y'])
chann_off = self.mgr.get_param('channels_offsets')
self.mgr.set_param('channels_offsets',chann_off + [u'0.0'])
chann_gain = self.mgr.get_param('channels_gains')
self.mgr.set_param('channels_gains',chann_gain + [u'1.0'])
return y
def get_x_y(self, tl, tr, br, bl):
"""For given wii sensor values.
- tl - top left
- tr - top right
- br - bottom right
- bl - bottom left
in int or float format, return x, y in floats.
"""
sum_mass = tl + tr + br + bl
x = (((tr + br) - (tl + bl)) / sum_mass)
y = (((tr + tl) - (br + bl)) / sum_mass)
return x, y
def get_timestamps(self):
"""Return timestamps channel."""
return self.mgr.get_channel_samples('TSS')
\ No newline at end of file
......@@ -4,7 +4,7 @@ Module defines single :class: 'DataAsciiWriteProxy'.
Author:
Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
from obci.drivers.eeg.eeg_amplifier import SamplePacket
from obci_readmanager.signal_processing.utils_manager import SamplePacket
from .data_generic_write_proxy import DataGenericWriteProxy
......
......@@ -4,8 +4,8 @@ Module providing generic temporary data buffer.
Author:
Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
from obci.drivers.eeg.eeg_amplifier import SamplePacket
from obci.signal_processing.signal.data_raw_write_proxy import DataRawWriteProxy
from obci_readmanager.signal_processing.utils_manager import SamplePacket
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import DataRawWriteProxy
BUF_SIZE = 1024
......
......@@ -7,7 +7,7 @@ Author:
import abc
import numpy
from obci.drivers.eeg.eeg_amplifier import SamplePacket
from obci_readmanager.signal_processing.utils_manager import SamplePacket
from . import signal_logging as logger
LOGGER = logger.get_logger('data_generic_write_proxy', 'info')
......
......@@ -4,7 +4,7 @@ Module defines single :class 'DataRawWriteProxy'.
Author:
Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
from obci.drivers.eeg.eeg_amplifier import SamplePacket
from obci_readmanager.signal_processing.utils_manager import SamplePacket
from .data_generic_write_proxy import DataGenericWriteProxy
from .signal_constants import SAMPLE_NUMPY_TYPES
......
"""This module contains single :class 'SamplePacket'."""
import numpy
class SamplePacket:
"""Class storing samples in packet."""
def __init__(self, samples: numpy.ndarray, ts: numpy.ndarray):
"""
Initialize packet of samples.
:param samples: numpy 2D array of size (sample_count, channel_count)
:param ts: numpy 1D array of size sample_count
"""
if len(samples.shape) != 2 or len(ts.shape) != 1 or samples.shape[0] != ts.shape[0]:
raise Exception("invalid data dimensions: samples~{} ts~{}".format(samples.shape, ts.shape))
self._samples = samples
self._ts = ts
@property
def samples(self):
"""Param samples numpy 2D array of size (sample_count, channel_count)."""
return self._samples
@property
def ts(self):
"""Param ts is a numpy 1D array of size sample_count."""
return self._ts
@property
def sample_count(self):
"""Param sample_count is a number of samples in packet."""
return self._samples.shape[0]
@property
def channel_count(self):
"""Param channel_count is a number of channels."""
return self._samples.shape[1]
def __eq__(self, other):
"""Compare two samples."""
if (self.samples == other.samples).all() and (self.ts == other.ts).all():
return True
else:
return False
......@@ -19,4 +19,18 @@ exclude = .eggs,
doctests = True
ignore = D
max-line-length = 120
max-complexity = 11
\ No newline at end of file
max-complexity = 11
[tool:pytest]
addopts = -vvvv
-rwfxsE
--showlocals
# --full-trace
--doctest-modules
--doctest-ignore-import-error
--cov=obci_readmanager
--cov-report=term:skip-covered
--cov-report=html:coverage_html
--durations=15
--ignore=docs
--ignore=setup.py
\ No newline at end of file
......@@ -3,27 +3,30 @@
# Author:
# Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
#
"""
>>> from obci.signal_processing.signal.data_read_proxy import DataReadProxy
>>> from obci_readmanager.signal_processing.utils_manager import SamplePacket
>>> from obci_readmanager.signal_processing.signal.data_read_proxy import DataReadProxy
>>> from obci.signal_processing.signal.data_raw_write_proxy import DataRawWriteProxy
>>> from obci_readmanager.signal_processing.signal.data_raw_write_proxy import DataRawWriteProxy
>>> import os.path, os
>>> import os.path, os, time, numpy
>>> px = DataRawWriteProxy('./tescik.obci.dat')
>>> px.set_data_len(1, 1)
>>> def write_single_value(value):
... packet = SamplePacket(ts=numpy.array([time.time()]), samples=numpy.array([[value]]))
... px.data_received(packet)
>>> px.data_received(1.2)
>>> write_single_value(1.2)
>>> px.data_received(0.0023)
>>> write_single_value(0.0023)
>>> px.data_received(-123.456)
>>> write_single_value(-123.456)
>>> px.data_received(3.3)
>>> write_single_value(3.3)
>>> px.data_received(5.0)
>>> write_single_value(5.0)
>>> nic = px.finish_saving()
......@@ -55,7 +58,7 @@ True
>>> py.get_next_value()
Traceback (most recent call last):
...
obci.signal_processing.signal.signal_exceptions.NoNextValue
obci_readmanager.signal_processing.signal.signal_exceptions.NoNextValue
>>> py.finish_reading()
......@@ -69,7 +72,7 @@ True
>>> py.get_next_values(3)
Traceback (most recent call last):
...
obci.signal_processing.signal.signal_exceptions.NoNextValue
obci_readmanager.signal_processing.signal.signal_exceptions.NoNextValue
>>> #warning here
>>> py.finish_reading()
......
......@@ -5,9 +5,9 @@
#
"""
>>> from obci.signal_processing.tags import tags_file_writer as p
>>> from obci_readmanager.signal_processing.tags import tags_file_writer as p
>>> from obci.signal_processing.tags import tags_file_reader as t
>>> from obci_readmanager.signal_processing.tags import tags_file_reader as t
>>> px = p.TagsFileWriter('./tescik.obci.tags')
......
......@@ -6,7 +6,7 @@
"""
>>> import os
>>> from obci.signal_processing.signal import info_file_proxy as p
>>> from obci_readmanager.signal_processing.signal import info_file_proxy as p
>>> px = p.InfoFileWriteProxy('./tescik.obci.svarog.info')
......
......@@ -5,31 +5,37 @@
#
"""
>>> from obci.signal_processing.signal.data_raw_write_proxy import DataRawWriteProxy
>>> from obci_readmanager.signal_processing.utils_manager import SamplePacket
>>> import os.path, os
>>> from obci_readmanager.signal_processing.signal.data_raw_write_proxy import DataRawWriteProxy
>>> import os.path, os, time, numpy
>>> # PREPARE SOME SAMPLE FILE *************************************************
>>> px = DataRawWriteProxy('./tescik.obci.dat')
>>> px.data_received(1.2)
>>> def write_single_value(value):
... packet = SamplePacket(ts=numpy.array([time.time()]), samples=numpy.array([[value]]))
... px.data_received(packet)
>>> write_single_value(1.2)
>>> px.data_received(0.0023)
>>> write_single_value(0.0023)
>>> px.data_received(-123.456)
>>> write_single_value(-123.456)
>>> px.data_received(3.3)
>>> write_single_value(3.3)
>>> px.data_received(5.0)
>>> write_single_value(5.0)
>>> px.data_received(0.0)
>>> write_single_value(0.0)
>>> nic = px.finish_saving()
>>> f = './tescik.obci.dat'
>>> from obci.signal_processing.signal import read_data_source as s
>>> from obci_readmanager.signal_processing.signal import read_data_source as s
>>> # TEST MEMORY DATA SOURCE **************************************************
......@@ -81,7 +87,7 @@ array([[ True, True],
>>> py.get_samples(0, 10)
Traceback (most recent call last):
...
obci.signal_processing.signal.signal_exceptions.NoNextValue
obci_readmanager.signal_processing.signal.signal_exceptions.NoNextValue
>>> np.abs(np.array([[ 1.20000000e+00, -1.23456000e+02, 5.00000000e+00],\
[ 2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples(0, 3)) < 0.001
......@@ -92,7 +98,7 @@ array([[ True, True, True],
>>> py.get_samples(1, 3)
Traceback (most recent call last):
...
obci.signal_processing.signal.signal_exceptions.NoNextValue
obci_readmanager.signal_processing.signal.signal_exceptions.NoNextValue
>>> np.abs(np.array([[ 1.20000000e+00, -1.23456000e+02, 5.00000000e+00],\
[ 2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples()) < 0.001
......
......@@ -5,7 +5,7 @@
#
"""
>>> from obci.signal_processing import read_manager
>>> from obci_readmanager.signal_processing import read_manager
>>> import os, os.path, sys
......@@ -56,7 +56,7 @@
>>> mgr.get_param('im_not_there')
Traceback (most recent call last):
...
obci.signal_processing.signal.signal_exceptions.NoParameter:\
obci_readmanager.signal_processing.signal.signal_exceptions.NoParameter:\
No parameter 'im_not_there' was found in info source!
>>> import numpy as np
......
......@@ -22,7 +22,7 @@
>>> tags.append({'start_timestamp':1008.0, 'end_timestamp':1009.0, 'name': 'nic3', 'channels':'A B C',\
'desc': {'x':12345, 'y':45678, 'z': 789}})
>>> from obci.signal_processing.tags import read_tags_source
>>> from obci_readmanager.signal_processing.tags import read_tags_source
>>> s = read_tags_source.MemoryTagsSource(tags)
......
......@@ -3,7 +3,7 @@
# Author:
# Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
>>> from obci.signal_processing.tags import smart_tag_definition as d
>>> from obci_readmanager.signal_processing.tags import smart_tag_definition as d
>>> x = d.SmartTagDurationDefinition(start_tag_name='x', start_offset=1, end_offset=0, duration=21)
>>> print(x.__dict__['start_tag_name'])
......
......@@ -7,9 +7,9 @@
"""
>>> import os, os.path
>>> from obci.signal_processing import smart_tags_manager as mgr
>>> from obci_readmanager.signal_processing import smart_tags_manager as mgr
>>> from obci.signal_processing.tags import smart_tag_definition as df
>>> from obci_readmanager.signal_processing.tags import smart_tag_definition as df
>>> d = df.SmartTagDurationDefinition(start_tag_name='trigger', start_offset=0, end_offset=0, duration=1.0)
......@@ -59,7 +59,7 @@ True
>>> # Another test
>>> from obci.signal_processing.tags import tags_file_writer as p
>>> from obci_readmanager.signal_processing.tags import tags_file_writer as p
>>> px = p.TagsFileWriter('./tescik.obci.tags')
......@@ -229,7 +229,7 @@ def fabricate_data_file(f, ch=23):
def fabricate_info_file(f, ch=23):
from obci.signal_processing.signal import info_file_proxy
from obci_readmanager.signal_processing.signal import info_file_proxy
p = info_file_proxy.InfoFileWriteProxy(f)
l_signal_params = {}
l_freq = '128.0'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment