Commits (8)
......@@ -43,7 +43,7 @@ def get_config():
cfg.style = "pep440"
cfg.tag_prefix = ""
cfg.parentdir_prefix = "''"
cfg.versionfile_source = "obci/_version.py"
cfg.versionfile_source = "obci_readmanager/_version.py"
cfg.verbose = False
return cfg
......
......@@ -5,7 +5,7 @@
"""Mixin for readmanager to create MNE.Raw objects."""
import json
from typing import Union
from copy import copy
from copy import copy, deepcopy
import numpy
from ..signal.signal_constants import sample_type_from_numpy
......@@ -30,11 +30,19 @@ def add_stim_chnl(raw):
stim_data = numpy.zeros((1, len(raw.times)))
info = mne.create_info(['OBCI_STIM'], raw.info['sfreq'], ['stim'])
stim_raw = mne.io.RawArray(stim_data, info)
raw.add_channels([stim_raw], force_update_info=True)
raw.add_channels([stim_raw], force_update_info=False)
def _description_from_tag(tag):
tag_desc = copy(tag)
tag_custom_describtion = tag_desc['desc']
for key in tag_custom_describtion:
try:
value = tag_custom_describtion[key]
if ';' in value:
tag_custom_describtion[key] = value.replace(';', ':')
except TypeError:
pass
tag_desc.pop('start_timestamp')
tag_desc.pop('end_timestamp')
return json.dumps(tag_desc)
......@@ -47,9 +55,6 @@ def tags_from_mne_annotations(ans):
Returns tags (list of dicts).
"""
tags = []
orig_time = ans.orig_time
if orig_time is None:
orig_time = 0
for onset, duration, desc in zip(ans.onset, ans.duration, ans.description):
# try to load annotations, as they would be exported by ReadManager
# if there is no our annotations reformat them to tags
......@@ -59,8 +64,8 @@ def tags_from_mne_annotations(ans):
except (json.decoder.JSONDecodeError, AssertionError):
# MNE created not in OBCI
tag = {'name': desc, 'desc': {}, 'channels': ''}
tag['start_timestamp'] = onset - orig_time
tag['end_timestamp'] = onset + duration - orig_time
tag['start_timestamp'] = onset
tag['end_timestamp'] = onset + duration
tags.append(tag)
return tags
......@@ -132,9 +137,10 @@ class ReadManagerMNEMixin:
def from_mne(cls, mne_raw):
"""Read Raw mne object to ReadManager."""
assert isinstance(mne_raw, mne.io.BaseRaw)
mne_raw.drop_channels(['OBCI_STIM'])
data = mne_raw.get_data() * 1e6 # mne keeps signals in Volts
return cls._rm_from_mne_data(data, mne_raw.info, mne_raw.annotations)
mne_raw_copy = deepcopy(mne_raw)
mne_raw_copy.drop_channels(['OBCI_STIM'])
data = mne_raw_copy.get_data() * 1e6 # mne keeps signals in Volts
return cls._rm_from_mne_data(data, mne_raw_copy.info, mne_raw_copy.annotations)
@classmethod
def _rm_from_mne_data(cls, data, info, annotations=None):
......@@ -178,7 +184,7 @@ class ReadManagerMNEMixin:
except ValueError:
meas_date_us = 0
meas_date = numpy.array([meas_date_s, meas_date_us], dtype=numpy.int32)
meas_date = (meas_date_s, meas_date_us)
info['meas_date'] = meas_date
def mne_annotations(self):
......@@ -225,5 +231,5 @@ class ReadManagerMNEMixin:
add_stim_chnl(raw)
ans = self.mne_annotations()
events, _ = self.mne_events()
raw.annotations = ans
raw.set_annotations(ans)
raw.add_events(events)
......@@ -13,8 +13,14 @@ test_requirements = [
'pytest-catchlog>=1.2.2',
'flaky>=3.3.0',
'nose>=1.3.7',
'mne==0.14.1'
'mne~=0.17.0',
'scipy',
'matplotlib',
]
install_requires = ['mne~=0.17.0',
]
needs_pytest = {'pytest', 'test', 'ptr'}.intersection(sys.argv)
pytest_runner_requirement = ['pytest-runner>=2.9']
......@@ -44,15 +50,10 @@ setup(
packages=find_packages(exclude=['scripts', ]),
include_package_data=True,
exclude_package_data={'': ['.gitignore', '.gitlab-ci.yml']},
install_requires=[],
install_requires=install_requires,
tests_require=test_requirements,
setup_requires=setup_requires,
# entry_points={
# 'console_scripts': [
# 'obci_readmanager = obci_readmanager.cmd.run_readmanager_preset:run',
# ],
# },
extras_require={
'test': pytest_runner_requirement + test_requirements,
},
)
'test': pytest_runner_requirement + test_requirements,
},
)
......@@ -3,10 +3,13 @@ import os
import math
from obci_readmanager.signal_processing.balance.wii_preprocessing import wii_filter_signal, wii_downsample_signal, wii_cut_fragments
from obci_readmanager.signal_processing.balance.wii_analysis import (wii_COP_path, wii_max_sway_AP_MP, wii_mean_COP_sway_AP_ML,
wii_RMS_AP_ML, wii_confidence_ellipse_area, wii_mean_velocity,
wii_get_percentages_values)
from obci_readmanager.signal_processing.balance.wii_preprocessing import wii_filter_signal, wii_downsample_signal, \
wii_cut_fragments
from obci_readmanager.signal_processing.balance.wii_analysis import (wii_COP_path, wii_max_sway_AP_MP,
wii_mean_COP_sway_AP_ML,
wii_RMS_AP_ML, wii_confidence_ellipse_area,
wii_mean_velocity,
wii_get_percentages_values)
from obci_readmanager.signal_processing.balance.wii_read_manager import WBBReadManager
import matplotlib.pyplot as py
......@@ -35,7 +38,6 @@ def test_wii_analysis(plot=False):
def test_wbb_tutorial():
# inicjalizacja klasy WBBReadManager - utwórz obiekt podając na wejściu ścieżki do
# odpowiednich plików :
pth = __file__
......@@ -225,5 +227,6 @@ def test_wbb_tutorial_calculations():
assert math.isclose(bottom_right, 23.487261146496824)
assert math.isclose(bottom_left, 18.232484076433121)
if __name__ == '__main__':
test_wii_analysis(plot=True)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved.
"""
>>> import numpy
>>> import time
import time
>>> from obci_readmanager.signal_processing.buffers import auto_blink_buffer as R
import numpy
>>> per, ch, sm = (4, 3, 20)
from obci_readmanager.signal_processing.buffers import auto_blink_buffer as R
from obci_readmanager.signal_processing.signal.data_generic_write_proxy import SamplePacket
>>> ts_step = 1/float(sm)
try:
from .test_auto_ring_buffer import ArrayCallback
except (SystemError, ImportError):
from test_auto_ring_buffer import ArrayCallback
>>> f = print_bufs
>>> b = R.AutoBlinkBuffer(0, 5, ch, sm, f, False)
class BlinkArrayCallback(ArrayCallback):
def __call__(self, blink, arr):
super(BlinkArrayCallback, self).__call__(arr.copy())
>>> t = time.time()
>>> ts = numpy.linspace(t, t + 200 * ts_step, 200)
def test_auto_blink_buffer():
per, ch, sm = (4, 3, 20)
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[0 * per], ts_step))#1-4
ts_step = 1 / float(sm)
callback = BlinkArrayCallback()
>>> b.handle_blink(B(ts[1 * per]))
AutoBlinkBuffer - Got blink before buffer is full. Ignore!
b = R.AutoBlinkBuffer(0, 5, ch, sm, callback, False)
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[1 * per + 1] ,ts_step))#5-8
t = time.time()
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[2 * per + 1], ts_step))#9-12
ts = numpy.linspace(t, t + 200 * ts_step, 200)
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[3 * per + 1], ts_step))#13-16
b.handle_sample_packet(get_sample_packet(per, ch, ts[0 * per], ts_step)) # 1-4
>>> b.handle_blink(B(ts[4 * per + 1]))
b.handle_blink(B(ts[1 * per]))
# AutoBlinkBuffer - Got blink before buffer is full. Ignore!
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[4 * per + 2], ts_step)) #17-20
b.handle_sample_packet(get_sample_packet(per, ch, ts[1 * per + 1], ts_step)) # 5-8
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[5 * per + 2], ts_step))#21-24
FUNC
[[ 18. 19. 20. 21. 22.]
[ 180. 190. 200. 210. 220.]
[ 1800. 1900. 2000. 2100. 2200.]]
b.handle_sample_packet(get_sample_packet(per, ch, ts[2 * per + 1], ts_step)) # 9-12
b.handle_sample_packet(get_sample_packet(per, ch, ts[3 * per + 1], ts_step)) # 13-16
>>> b.handle_blink(B(ts[6 * per + 2]))
b.handle_blink(B(ts[4 * per + 1]))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[6 * per + 3], ts_step)) #25-28
b.handle_sample_packet(get_sample_packet(per, ch, ts[4 * per + 2], ts_step)) # 17-20
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[7 * per + 3], ts_step))# 29-32
FUNC
[[ 26. 27. 28. 29. 30.]
[ 260. 270. 280. 290. 300.]
[ 2600. 2700. 2800. 2900. 3000.]]
b.handle_sample_packet(get_sample_packet(per, ch, ts[5 * per + 2], ts_step)) # 21-24
callback.assert_called([[18., 19., 20., 21., 22.],
[180., 190., 200., 210., 220.],
[1800., 1900., 2000., 2100., 2200.]])
>>> b.handle_blink(B(ts[8 * per + 3]))
b.handle_blink(B(ts[6 * per + 2]))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[8 * per + 4], ts_step)) #33-36
b.handle_sample_packet(get_sample_packet(per, ch, ts[6 * per + 3], ts_step)) # 25-28
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[9 * per + 4], ts_step)) #37-40
FUNC
[[ 34. 35. 36. 37. 38.]
[ 340. 350. 360. 370. 380.]
[ 3400. 3500. 3600. 3700. 3800.]]
b.handle_sample_packet(get_sample_packet(per, ch, ts[7 * per + 3], ts_step)) # 29-32
callback.assert_called([[26., 27., 28., 29., 30.],
[260., 270., 280., 290., 300.],
[2600., 2700., 2800., 2900., 3000.]])
b.handle_blink(B(ts[8 * per + 3]))
>>> zero_count()
b.handle_sample_packet(get_sample_packet(per, ch, ts[8 * per + 4], ts_step)) # 33-36
>>> b = R.AutoBlinkBuffer(0, 10, ch, sm, f, False)
b.handle_sample_packet(get_sample_packet(per, ch, ts[9 * per + 4], ts_step)) # 37-40
callback.assert_called(
[[34., 35., 36., 37., 38.],
[340., 350., 360., 370., 380.],
[3400., 3500., 3600., 3700., 3800.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[10 * per + 4], ts_step))#1-4
zero_count()
>>> b.handle_blink(B(ts[11 * per + 4]))
AutoBlinkBuffer - Got blink before buffer is full. Ignore!
b = R.AutoBlinkBuffer(0, 10, ch, sm, callback, False)
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[11 * per + 5], ts_step))
b.handle_sample_packet(get_sample_packet(per, ch, ts[10 * per + 4], ts_step)) # 1-4
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[12 * per + 5], ts_step))
b.handle_blink(B(ts[11 * per + 4]))
# AutoBlinkBuffer - Got blink before buffer is full. Ignore!
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[13 * per + 5], ts_step))
b.handle_sample_packet(get_sample_packet(per, ch, ts[11 * per + 5], ts_step))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[14 * per + 5], ts_step)) #17-20
b.handle_sample_packet(get_sample_packet(per, ch, ts[12 * per + 5], ts_step))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[15 * per + 5], ts_step))
b.handle_sample_packet(get_sample_packet(per, ch, ts[13 * per + 5], ts_step))
>>> b.handle_blink(B(ts[16 * per + 5]))
b.handle_sample_packet(get_sample_packet(per, ch, ts[14 * per + 5], ts_step)) # 17-20
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[16 * per + 6], ts_step)) #25-28
b.handle_sample_packet(get_sample_packet(per, ch, ts[15 * per + 5], ts_step))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[17 * per + 6], ts_step))
b.handle_blink(B(ts[16 * per + 5]))
>>> b.handle_blink(B(ts[18 * per + 6]))
b.handle_sample_packet(get_sample_packet(per, ch, ts[16 * per + 6], ts_step)) # 25-28
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[18 * per + 7], ts_step)) #33-36
FUNC
[[ 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.]
[ 260. 270. 280. 290. 300. 310. 320. 330. 340. 350.]
[ 2600. 2700. 2800. 2900. 3000. 3100. 3200. 3300. 3400. 3500.]]
b.handle_sample_packet(get_sample_packet(per, ch, ts[17 * per + 6], ts_step))
b.handle_blink(B(ts[18 * per + 6]))
>>> b.handle_blink(B(ts[19 * per + 7]))
b.handle_sample_packet(get_sample_packet(per, ch, ts[18 * per + 7], ts_step)) # 33-36
callback.assert_called(
[[26., 27., 28., 29., 30., 31., 32., 33., 34., 35.],
[260., 270., 280., 290., 300., 310., 320., 330., 340., 350.],
[2600., 2700., 2800., 2900., 3000., 3100., 3200., 3300., 3400., 3500.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[19 * per + 8], ts_step))
b.handle_blink(B(ts[19 * per + 7]))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[20 * per + 8], ts_step))
FUNC
[[ 34. 35. 36. 37. 38. 39. 40. 41. 42. 43.]
[ 340. 350. 360. 370. 380. 390. 400. 410. 420. 430.]
[ 3400. 3500. 3600. 3700. 3800. 3900. 4000. 4100. 4200. 4300.]]
b.handle_sample_packet(get_sample_packet(per, ch, ts[19 * per + 8], ts_step))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[21 * per + 8], ts_step))
FUNC
[[ 38. 39. 40. 41. 42. 43. 44. 45. 46. 47.]
[ 380. 390. 400. 410. 420. 430. 440. 450. 460. 470.]
[ 3800. 3900. 4000. 4100. 4200. 4300. 4400. 4500. 4600. 4700.]]
b.handle_sample_packet(get_sample_packet(per, ch, ts[20 * per + 8], ts_step))
callback.assert_called(
[[34., 35., 36., 37., 38., 39., 40., 41., 42., 43.],
[340., 350., 360., 370., 380., 390., 400., 410., 420., 430.],
[3400., 3500., 3600., 3700., 3800., 3900., 4000., 4100., 4200., 4300.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[22 * per + 8], ts_step))
b.handle_sample_packet(get_sample_packet(per, ch, ts[21 * per + 8], ts_step))
callback.assert_called(
[[38., 39., 40., 41., 42., 43., 44., 45., 46., 47.],
[380., 390., 400., 410., 420., 430., 440., 450., 460., 470.],
[3800., 3900., 4000., 4100., 4200., 4300., 4400., 4500., 4600., 4700.]])
b.handle_sample_packet(get_sample_packet(per, ch, ts[22 * per + 8], ts_step))
>>> zero_count()
zero_count()
>>> b = R.AutoBlinkBuffer(0, 10, ch, sm, f, False)
b = R.AutoBlinkBuffer(0, 10, ch, sm, callback, False)
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[23 * per + 8], ts_step))#1-4
b.handle_sample_packet(get_sample_packet(per, ch, ts[23 * per + 8], ts_step)) # 1-4
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[24 * per + 8], ts_step))
b.handle_sample_packet(get_sample_packet(per, ch, ts[24 * per + 8], ts_step))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[25 * per + 8], ts_step))
b.handle_sample_packet(get_sample_packet(per, ch, ts[25 * per + 8], ts_step))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[26 * per + 8], ts_step))
b.handle_sample_packet(get_sample_packet(per, ch, ts[26 * per + 8], ts_step))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[27 * per + 8], ts_step)) #17-20
b.handle_sample_packet(get_sample_packet(per, ch, ts[27 * per + 8], ts_step)) # 17-20
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[28 * per + 8], ts_step))
b.handle_sample_packet(get_sample_packet(per, ch, ts[28 * per + 8], ts_step))
>>> b.handle_blink(B(ts[29 * per + 2]))
b.handle_blink(B(ts[29 * per + 2]))
>>> b.handle_blink(B(ts[29 * per + 3]))
b.handle_blink(B(ts[29 * per + 3]))
>>> b.handle_blink(B(ts[29 * per + 4]))
b.handle_blink(B(ts[29 * per + 4]))
>>> b.handle_blink(B(ts[29 * per + 5]))
b.handle_blink(B(ts[29 * per + 5]))
>>> b.handle_blink(B(ts[29 * per + 6]))
b.handle_blink(B(ts[29 * per + 6]))
>>> b.handle_blink(B(ts[29 * per + 7]))
b.handle_blink(B(ts[29 * per + 7]))
>>> b.handle_blink(B(ts[29 * per + 8]))
b.handle_blink(B(ts[29 * per + 8]))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[29 * per + 10], ts_step)) #25-28
FUNC
[[ 19. 20. 21. 22. 23. 24. 25. 26. 27. 28.]
[ 190. 200. 210. 220. 230. 240. 250. 260. 270. 280.]
[ 1900. 2000. 2100. 2200. 2300. 2400. 2500. 2600. 2700. 2800.]]
b.handle_sample_packet(get_sample_packet(per, ch, ts[29 * per + 10], ts_step)) # 25-28
callback.assert_called(
[[19., 20., 21., 22., 23., 24., 25., 26., 27., 28.],
[190., 200., 210., 220., 230., 240., 250., 260., 270., 280.],
[1900., 2000., 2100., 2200., 2300., 2400., 2500., 2600., 2700., 2800.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[30 * per + 10], ts_step))
FUNC
[[ 20. 21. 22. 23. 24. 25. 26. 27. 28. 29.]
[ 200. 210. 220. 230. 240. 250. 260. 270. 280. 290.]
[ 2000. 2100. 2200. 2300. 2400. 2500. 2600. 2700. 2800. 2900.]]
FUNC
[[ 21. 22. 23. 24. 25. 26. 27. 28. 29. 30.]
[ 210. 220. 230. 240. 250. 260. 270. 280. 290. 300.]
[ 2100. 2200. 2300. 2400. 2500. 2600. 2700. 2800. 2900. 3000.]]
FUNC
[[ 22. 23. 24. 25. 26. 27. 28. 29. 30. 31.]
[ 220. 230. 240. 250. 260. 270. 280. 290. 300. 310.]
[ 2200. 2300. 2400. 2500. 2600. 2700. 2800. 2900. 3000. 3100.]]
FUNC
[[ 23. 24. 25. 26. 27. 28. 29. 30. 31. 32.]
[ 230. 240. 250. 260. 270. 280. 290. 300. 310. 320.]
[ 2300. 2400. 2500. 2600. 2700. 2800. 2900. 3000. 3100. 3200.]]
b.handle_sample_packet(get_sample_packet(per, ch, ts[30 * per + 10], ts_step))
callback.assert_called(
[[20., 21., 22., 23., 24., 25., 26., 27., 28., 29.],
[200., 210., 220., 230., 240., 250., 260., 270., 280., 290.],
[2000., 2100., 2200., 2300., 2400., 2500., 2600., 2700., 2800., 2900.]])
callback.assert_called(
[[21., 22., 23., 24., 25., 26., 27., 28., 29., 30.],
[210., 220., 230., 240., 250., 260., 270., 280., 290., 300.],
[2100., 2200., 2300., 2400., 2500., 2600., 2700., 2800., 2900., 3000.]])
callback.assert_called(
[[22., 23., 24., 25., 26., 27., 28., 29., 30., 31.],
[220., 230., 240., 250., 260., 270., 280., 290., 300., 310.],
[2200., 2300., 2400., 2500., 2600., 2700., 2800., 2900., 3000., 3100.]])
callback.assert_called(
[[23., 24., 25., 26., 27., 28., 29., 30., 31., 32.],
[230., 240., 250., 260., 270., 280., 290., 300., 310., 320.],
[2300., 2400., 2500., 2600., 2700., 2800., 2900., 3000., 3100., 3200.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[31 * per + 10], ts_step)) #33-36
FUNC
[[ 25. 26. 27. 28. 29. 30. 31. 32. 33. 34.]
[ 250. 260. 270. 280. 290. 300. 310. 320. 330. 340.]
[ 2500. 2600. 2700. 2800. 2900. 3000. 3100. 3200. 3300. 3400.]]
FUNC
[[ 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.]
[ 260. 270. 280. 290. 300. 310. 320. 330. 340. 350.]
[ 2600. 2700. 2800. 2900. 3000. 3100. 3200. 3300. 3400. 3500.]]
b.handle_sample_packet(get_sample_packet(per, ch, ts[31 * per + 10], ts_step)) # 33-36
callback.assert_called(
[[25., 26., 27., 28., 29., 30., 31., 32., 33., 34.],
[250., 260., 270., 280., 290., 300., 310., 320., 330., 340.],
[2500., 2600., 2700., 2800., 2900., 3000., 3100., 3200., 3300., 3400.]])
callback.assert_called(
[[26., 27., 28., 29., 30., 31., 32., 33., 34., 35.],
[260., 270., 280., 290., 300., 310., 320., 330., 340., 350.],
[2600., 2700., 2800., 2900., 3000., 3100., 3200., 3300., 3400., 3500.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[32 * per + 10], ts_step)) #33-36
b.handle_sample_packet(get_sample_packet(per, ch, ts[32 * per + 10], ts_step)) # 33-36
>>> b.handle_blink(B(ts[33 * per + 10]))
b.handle_blink(B(ts[33 * per + 10]))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[33 * per + 11], ts_step))
b.handle_sample_packet(get_sample_packet(per, ch, ts[33 * per + 11], ts_step))
>>> b.handle_blink(B(ts[34 * per + 11]))
b.handle_blink(B(ts[34 * per + 11]))
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[34 * per + 12], ts_step))#41-44
b.handle_sample_packet(get_sample_packet(per, ch, ts[34 * per + 12], ts_step)) # 41-44
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[35 * per + 12], ts_step))
FUNC
[[ 42. 43. 44. 45. 46. 47. 48. 49. 50. 51.]
[ 420. 430. 440. 450. 460. 470. 480. 490. 500. 510.]
[ 4200. 4300. 4400. 4500. 4600. 4700. 4800. 4900. 5000. 5100.]]
b.handle_sample_packet(get_sample_packet(per, ch, ts[35 * per + 12], ts_step))
callback.assert_called(
[[42., 43., 44., 45., 46., 47., 48., 49., 50., 51.],
[420., 430., 440., 450., 460., 470., 480., 490., 500., 510.],
[4200., 4300., 4400., 4500., 4600., 4700., 4800., 4900., 5000., 5100.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch, ts[36 * per + 12], ts_step))
FUNC
[[ 46. 47. 48. 49. 50. 51. 52. 53. 54. 55.]
[ 460. 470. 480. 490. 500. 510. 520. 530. 540. 550.]
[ 4600. 4700. 4800. 4900. 5000. 5100. 5200. 5300. 5400. 5500.]]
b.handle_sample_packet(get_sample_packet(per, ch, ts[36 * per + 12], ts_step))
callback.assert_called(
[[46., 47., 48., 49., 50., 51., 52., 53., 54., 55.],
[460., 470., 480., 490., 500., 510., 520., 530., 540., 550.],
[4600., 4700., 4800., 4900., 5000., 5100., 5200., 5300., 5400., 5500.]])
"""
import numpy
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
COUNT = 0
......@@ -246,23 +248,6 @@ def get_sample_packet(per, ch_num, ts_start, ts_step):
for i in range(per):
COUNT += 1
for j in range(ch_num):
samples[i][j] = (10**j) * COUNT
samples[i][j] = (10 ** j) * COUNT
timestamps[i] = ts_start + ts_step * i
return SamplePacket(samples, timestamps)
def print_bufs(blink, bufs):
print("FUNC")
print(bufs)
def run():
import sys
import doctest
res = doctest.testmod(sys.modules[__name__])
if res.failed == 0:
print("All tests succeeded!")
if __name__ == '__main__':
run()
......@@ -2,94 +2,107 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved.
from unittest.mock import Mock
"""
>>> from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
>>> per, ch = (4, 10)
>>> f = print_bufs
>>> b = R.AutoRingBuffer(10, 5, 5, ch, f, False)
>>> v1 = get_sample_packet(per, ch, 1)
>>> v2 = get_sample_packet(per, ch, 10)
>>> v3 = get_sample_packet(per, ch, 100)
>>> v4 = get_sample_packet(per, ch, 1000)
>>> b.handle_sample_packet(v1)
>>> b.handle_sample_packet(v2)
>>> b.handle_sample_packet(v3)
FUNC
[[ 1. 2. 3. 4. 10.]
[ 2. 3. 4. 5. 20.]
[ 3. 4. 5. 6. 30.]
[ 4. 5. 6. 7. 40.]
[ 5. 6. 7. 8. 50.]
[ 6. 7. 8. 9. 60.]
[ 7. 8. 9. 10. 70.]
[ 8. 9. 10. 11. 80.]
[ 9. 10. 11. 12. 90.]
[ 10. 11. 12. 13. 100.]]
>>> b.handle_sample_packet(v4)
FUNC
[[ 11. 12. 13. 100. 101.]
[ 21. 22. 23. 200. 201.]
[ 31. 32. 33. 300. 301.]
[ 41. 42. 43. 400. 401.]
[ 51. 52. 53. 500. 501.]
[ 61. 62. 63. 600. 601.]
[ 71. 72. 73. 700. 701.]
[ 81. 82. 83. 800. 801.]
[ 91. 92. 93. 900. 901.]
[ 101. 102. 103. 1000. 1001.]]
>>> b = R.AutoRingBuffer(10, 3, 5, ch, f, False)
>>> b.handle_sample_packet(v1)
>>> b.handle_sample_packet(v2)
>>> b.handle_sample_packet(v3)
FUNC
[[ 1. 2. 3.]
[ 2. 3. 4.]
[ 3. 4. 5.]
[ 4. 5. 6.]
[ 5. 6. 7.]
[ 6. 7. 8.]
[ 7. 8. 9.]
[ 8. 9. 10.]
[ 9. 10. 11.]
[ 10. 11. 12.]]
>>> b.handle_sample_packet(v4)
FUNC
[[ 11. 12. 13.]
[ 21. 22. 23.]
[ 31. 32. 33.]
[ 41. 42. 43.]
[ 51. 52. 53.]
[ 61. 62. 63.]
[ 71. 72. 73.]
[ 81. 82. 83.]
[ 91. 92. 93.]
[ 101. 102. 103.]]
"""
import numpy
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
class ArrayCallback(Mock):
def __init__(self, *args, **kwargs):
super(ArrayCallback, self).__init__(*args, **kwargs)
self.current_check = 0
def assert_called(self, arr2):
assert self.current_check < self.call_count
arr1 = numpy.array(self.call_args_list[self.current_check][0][0])
arr2 = numpy.array(arr2)
assert numpy.array_equal(arr1, arr2)
self.current_check += 1
def __call__(self, arr):
super(ArrayCallback, self).__call__(arr.copy())
def test_auto_ring_buffer():
from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
per, ch = (4, 10)
callback = ArrayCallback()
b = R.AutoRingBuffer(10, 5, 5, ch, callback, False)
v1 = get_sample_packet(per, ch, 1)
v2 = get_sample_packet(per, ch, 10)
v3 = get_sample_packet(per, ch, 100)
v4 = get_sample_packet(per, ch, 1000)
b.handle_sample_packet(v1)
b.handle_sample_packet(v2)
b.handle_sample_packet(v3)
callback.assert_called(
[[1., 2., 3., 4., 10.],
[2., 3., 4., 5., 20.],
[3., 4., 5., 6., 30.],
[4., 5., 6., 7., 40.],
[5., 6., 7., 8., 50.],
[6., 7., 8., 9., 60.],
[7., 8., 9., 10., 70.],
[8., 9., 10., 11., 80.],
[9., 10., 11., 12., 90.],
[10., 11., 12., 13., 100.]])
b.handle_sample_packet(v4)
callback.assert_called(
[[11., 12., 13., 100., 101.],
[21., 22., 23., 200., 201.],
[31., 32., 33., 300., 301.],
[41., 42., 43., 400., 401.],
[51., 52., 53., 500., 501.],
[61., 62., 63., 600., 601.],
[71., 72., 73., 700., 701.],
[81., 82., 83., 800., 801.],
[91., 92., 93., 900., 901.],
[101., 102., 103., 1000., 1001.]])
b = R.AutoRingBuffer(10, 3, 5, ch, callback, False)
b.handle_sample_packet(v1)
b.handle_sample_packet(v2)
b.handle_sample_packet(v3)
callback.assert_called(
[[1., 2., 3.],
[2., 3., 4.],
[3., 4., 5.],
[4., 5., 6.],
[5., 6., 7.],
[6., 7., 8.],
[7., 8., 9.],
[8., 9., 10.],
[9., 10., 11.],
[10., 11., 12.]])
b.handle_sample_packet(v4)
callback.assert_called(
[[11., 12., 13.],
[21., 22., 23.],
[31., 32., 33.],
[41., 42., 43.],
[51., 52., 53.],
[61., 62., 63.],
[71., 72., 73.],
[81., 82., 83.],
[91., 92., 93.],
[101., 102., 103.]])
def get_sample_packet(per, ch, mult):
"""
......@@ -116,18 +129,28 @@ def get_sample_packet(per, ch, mult):
return SamplePacket(packet, timestamps)
def print_bufs(bufs):
print("FUNC")
print(bufs)
def get_sample_packet(per, ch, mult):
"""
def run():
import sys
import doctest
res = doctest.testmod(sys.modules[__name__])
if res.failed == 0:
print("All tests succeeded!")
:param per: number of samples
:param ch: number of channels
:param mult: multiplication parameter
:return: signal as tuple with two numpy arrays
(numpy.array( [ 1485342990.17, 1485342990.28, 1485342990.42, ...]),
numpy.array([
[ 1, 10, 100, ...],
[ 2, 20, 200, ...],
[ 3, 30, 300, ...],
[ 4, 400, 400, ...]
])
)
"""
timestamps = numpy.zeros(per)
packet = numpy.zeros([per, ch])
for i in range(per):
for j in range(ch):
packet[i][j] = float(j + 1) * mult + i
timestamps[i] = 10.0
return SamplePacket(packet, timestamps)
if __name__ == '__main__':
run()
......@@ -3,144 +3,145 @@
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved.
"""
>>> from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
>>> per, ch = (4, 3)
>>> f = print_bufs
>>> b = R.AutoRingBuffer(5, 5, 5, ch, f, False)
>>> b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 1. 2. 3. 4. 5.]
[ 10. 20. 30. 40. 50.]
[ 100. 200. 300. 400. 500.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 6. 7. 8. 9. 10.]
[ 60. 70. 80. 90. 100.]
[ 600. 700. 800. 900. 1000.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 11. 12. 13. 14. 15.]
[ 110. 120. 130. 140. 150.]
[ 1100. 1200. 1300. 1400. 1500.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 16. 17. 18. 19. 20.]
[ 160. 170. 180. 190. 200.]
[ 1600. 1700. 1800. 1900. 2000.]]
>>> b = R.AutoRingBuffer(10, 5, 3, ch, f, False)
>>> zero_count()
>>> b.handle_sample_packet(get_sample_packet(per, ch))
import numpy
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
>>> b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 3. 4. 5. 6. 7.]
[ 30. 40. 50. 60. 70.]
[ 300. 400. 500. 600. 700.]]
COUNT = 0
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 6. 7. 8. 9. 10.]
[ 60. 70. 80. 90. 100.]
[ 600. 700. 800. 900. 1000.]]
try:
from .test_auto_ring_buffer import ArrayCallback
except (SystemError, ImportError):
from test_auto_ring_buffer import ArrayCallback
def test_auto_ring_buffer2():
from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
per, ch = (4, 3)
callback = ArrayCallback()
b = R.AutoRingBuffer(5, 5, 5, ch, callback, False)
b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[1., 2., 3., 4., 5.],
[10., 20., 30., 40., 50.],
[100., 200., 300., 400., 500.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 9. 10. 11. 12. 13.]
[ 90. 100. 110. 120. 130.]
[ 900. 1000. 1100. 1200. 1300.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[6., 7., 8., 9., 10.],
[60., 70., 80., 90., 100.],
[600., 700., 800., 900., 1000.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 12. 13. 14. 15. 16.]
[ 120. 130. 140. 150. 160.]
[ 1200. 1300. 1400. 1500. 1600.]]
FUNC
[[ 15. 16. 17. 18. 19.]
[ 150. 160. 170. 180. 190.]
[ 1500. 1600. 1700. 1800. 1900.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[11., 12., 13., 14., 15.],
[110., 120., 130., 140., 150.],
[1100., 1200., 1300., 1400., 1500.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 18. 19. 20. 21. 22.]
[ 180. 190. 200. 210. 220.]
[ 1800. 1900. 2000. 2100. 2200.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[16., 17., 18., 19., 20.],
[160., 170., 180., 190., 200.],
[1600., 1700., 1800., 1900., 2000.]])
b = R.AutoRingBuffer(10, 5, 3, ch, callback, False)
>>> b = R.AutoRingBuffer(10, 10, 2, ch, f, False)
zero_count()
>>> zero_count()
b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[3., 4., 5., 6., 7.],
[30., 40., 50., 60., 70.],
[300., 400., 500., 600., 700.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
[ 10. 20. 30. 40. 50. 60. 70. 80. 90. 100.]
[ 100. 200. 300. 400. 500. 600. 700. 800. 900. 1000.]]
FUNC
[[ 3. 4. 5. 6. 7. 8. 9. 10. 11. 12.]
[ 30. 40. 50. 60. 70. 80. 90. 100. 110. 120.]
[ 300. 400. 500. 600. 700. 800. 900. 1000. 1100. 1200.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[6., 7., 8., 9., 10.],
[60., 70., 80., 90., 100.],
[600., 700., 800., 900., 1000.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 5. 6. 7. 8. 9. 10. 11. 12. 13. 14.]
[ 50. 60. 70. 80. 90. 100. 110. 120. 130. 140.]
[ 500. 600. 700. 800. 900. 1000. 1100. 1200. 1300. 1400.]]
FUNC
[[ 7. 8. 9. 10. 11. 12. 13. 14. 15. 16.]
[ 70. 80. 90. 100. 110. 120. 130. 140. 150. 160.]
[ 700. 800. 900. 1000. 1100. 1200. 1300. 1400. 1500. 1600.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[9., 10., 11., 12., 13.],
[90., 100., 110., 120., 130.],
[900., 1000., 1100., 1200., 1300.]])
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[12., 13., 14., 15., 16.],
[120., 130., 140., 150., 160.],
[1200., 1300., 1400., 1500., 1600.]])
callback.assert_called(
[[15., 16., 17., 18., 19.],
[150., 160., 170., 180., 190.],
[1500., 1600., 1700., 1800., 1900.]])
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[18., 19., 20., 21., 22.],
[180., 190., 200., 210., 220.],
[1800., 1900., 2000., 2100., 2200.]])
>>> b = R.AutoRingBuffer(10, 3, 8, ch, f, False)
b = R.AutoRingBuffer(10, 10, 2, ch, callback, False)
>>> zero_count()
zero_count()
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[1., 2., 3., 4., 5., 6., 7., 8., 9., 10.],
[10., 20., 30., 40., 50., 60., 70., 80., 90., 100.],
[100., 200., 300., 400., 500., 600., 700., 800., 900., 1000.]])
callback.assert_called(
[[3., 4., 5., 6., 7., 8., 9., 10., 11., 12.],
[30., 40., 50., 60., 70., 80., 90., 100., 110., 120.],
[300., 400., 500., 600., 700., 800., 900., 1000., 1100., 1200.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 7. 8. 9.]
[ 70. 80. 90.]
[ 700. 800. 900.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[5., 6., 7., 8., 9., 10., 11., 12., 13., 14.],
[50., 60., 70., 80., 90., 100., 110., 120., 130., 140.],
[500., 600., 700., 800., 900., 1000., 1100., 1200., 1300., 1400.]])
callback.assert_called(
[[7., 8., 9., 10., 11., 12., 13., 14., 15., 16.],
[70., 80., 90., 100., 110., 120., 130., 140., 150., 160.],
[700., 800., 900., 1000., 1100., 1200., 1300., 1400., 1500., 1600.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b = R.AutoRingBuffer(10, 3, 8, ch, callback, False)
zero_count()
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 15. 16. 17.]
[ 150. 160. 170.]
[ 1500. 1600. 1700.]]
b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[7., 8., 9.],
[70., 80., 90.],
[700., 800., 900.]])
"""
b.handle_sample_packet(get_sample_packet(per, ch))
import numpy
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[15., 16., 17.],
[150., 160., 170.],
[1500., 1600., 1700.]])
COUNT = 0
......@@ -171,23 +172,7 @@ def get_sample_packet(per, ch):
for i in range(per):
COUNT += 1
for j in range(ch):
samples[i][j] = 10**j * COUNT
samples[i][j] = 10 ** j * COUNT
timestamps[i] = 10.0
return SamplePacket(samples, timestamps)
def print_bufs(bufs):
print("FUNC")
print(bufs)
def run():
import sys
import doctest
res = doctest.testmod(sys.modules[__name__])
if res.failed == 0:
print("All tests succeeded!")
if __name__ == '__main__':
run()
......@@ -2,58 +2,56 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved.
import numpy as np
"""
>>> from obci_readmanager.signal_processing.buffers import ring_buffer as R
>>> r = R.RingBuffer(10, 5, False)
>>> for s in [get_sample(i, 5) for i in range(12)]: r.add(s)
from obci_readmanager.signal_processing.buffers.ring_buffer import RingBuffer
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
>>> vals = r.get(3, 3)
>>> vals
array([[ 5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[ 5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[ 5.00000000e+02, 6.00000000e+02, 7.00000000e+02],
[ 5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[ 5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
def test_ring_buffer():
r = RingBuffer(10, 5, False)
for s in [get_sample(i, 5) for i in range(12)]:
r.add(s)
>>> vals[2, 1] = 1234.0
def assert_equal(arr1, arr2):
assert np.array_equal(np.array(arr1), np.array(arr2))
>>> vals
array([[ 5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[ 5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[ 5.00000000e+02, 1.23400000e+03, 7.00000000e+02],
[ 5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[ 5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
vals = r.get(3, 3)
>>> r.get(3, 3)
array([[ 5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[ 5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[ 5.00000000e+02, 1.23400000e+03, 7.00000000e+02],
[ 5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[ 5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
assert_equal(vals,
[[5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[5.00000000e+02, 6.00000000e+02, 7.00000000e+02],
[5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
vals[2, 1] = 1234.0
>>> r = R.RingBuffer(10, 5, True)
assert_equal(vals, [[5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[5.00000000e+02, 1.23400000e+03, 7.00000000e+02],
[5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
>>> for s in [get_sample(i, 5) for i in range(19)]: r.add(s)
assert_equal(r.get(3, 3), [[5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[5.00000000e+02, 1.23400000e+03, 7.00000000e+02],
[5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
>>> vals = r.get(2, 2)
r = RingBuffer(10, 5, True)
>>> vals
array([[ 1.10000000e+01, 1.20000000e+01],
[ 1.10000000e+02, 1.20000000e+02],
[ 1.10000000e+03, 1.20000000e+03],
[ 1.10000000e+04, 1.20000000e+04],
[ 1.10000000e+05, 1.20000000e+05]])
for s in [get_sample(i, 5) for i in range(19)]:
r.add(s)
"""
vals = r.get(2, 2)
import numpy
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
assert_equal(vals, [[1.10000000e+01, 1.20000000e+01],
[1.10000000e+02, 1.20000000e+02],
[1.10000000e+03, 1.20000000e+03],
[1.10000000e+04, 1.20000000e+04],
[1.10000000e+05, 1.20000000e+05]])
def get_sample(v, ch_num, per=1):
......@@ -64,8 +62,8 @@ def get_sample(v, ch_num, per=1):
:return: one sample as numpy array
numpy.array( [ 1, 10, 100, ...] )
"""
sample = numpy.zeros([per, ch_num])
timestamp = numpy.zeros(per)
sample = np.zeros([per, ch_num])
timestamp = np.zeros(per)
mult = 1
for i in range(ch_num):
......@@ -73,15 +71,3 @@ def get_sample(v, ch_num, per=1):
mult *= 10
timestamp[0] = 10.0
return SamplePacket(sample, timestamp).samples[0]
def run():
import sys
import doctest
res = doctest.testmod(sys.modules[__name__])
if res.failed == 0:
print("All tests succeeded!")
if __name__ == '__main__':
run()
<?xml version="1.0" encoding="UTF-8"?>
<tagFile formatVersion="1.0">
<paging blocks_per_page="5" page_size="20.0" />
<tagData>
<tags>
<tag channelNumber="-1" length="1.0046250820159912" name="trigger" position="0.36085605621337891">
<value>1</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.9283778667449951" name="trigger" position="1.3654811382293701">
<value>0</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.5430161952972412" name="trigger" position="3.2938590049743652">
<value>1</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.7950088977813721" name="trigger" position="4.8368752002716064">
<value>0</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.6539919376373291" name="trigger" position="6.6318840980529785">
<value>1</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.6060090065002441" name="trigger" position="8.2858760356903076">
<value>0</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.7079939842224121" name="trigger" position="9.8918850421905518">
<value>1</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.5549831390380859" name="trigger" position="11.599879026412964">
<value>0</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.5930259227752686" name="trigger" position="13.15486216545105">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.0129780769348145" name="trigger" position="14.747888088226318">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.7710130214691162" name="trigger" position="15.760866165161133">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.6369898319244385" name="trigger" position="17.531879186630249">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.8520011901855469" name="trigger" position="19.168869018554688">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.6960029602050781" name="trigger" position="21.020870208740234">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.8960099220275879" name="trigger" position="22.716873168945312">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.799976110458374" name="trigger" position="24.6128830909729">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.9080259799957275" name="trigger" position="26.412859201431274">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.0279920101165771" name="trigger" position="28.320885181427002">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.3869879245758057" name="trigger" position="29.348877191543579">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.1030158996582031" name="trigger" position="30.735865116119385">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.6669991016387939" name="trigger" position="31.838881015777588">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.0890018939971924" name="trigger" position="33.505880117416382">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.9470040798187256" name="trigger" position="34.594882011413574">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.1589961051940918" name="trigger" position="36.5418860912323">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.0949859619140625" name="trigger" position="37.700882196426392">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.389995813369751" name="trigger" position="38.795868158340454">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.7900252342224121" name="trigger" position="40.185863971710205">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.6329929828643799" name="trigger" position="41.975889205932617">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.0870018005371094" name="trigger" position="43.608882188796997">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.217008113861084" name="trigger" position="44.695883989334106">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.7229928970336914" name="trigger" position="45.91289210319519">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.7080001831054688" name="trigger" position="47.635885000228882">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.3120009899139404" name="trigger" position="49.343885183334351">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.8303978443145752" name="trigger" position="50.655886173248291">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.077592134475708" name="trigger" position="52.486284017562866">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.9700009822845459" name="trigger" position="53.563876152038574">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.8099980354309082" name="trigger" position="55.53387713432312">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.9740610122680664" name="trigger" position="57.343875169754028">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.7009468078613281" name="trigger" position="59.317936182022095">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.2930071353912354" name="trigger" position="61.018882989883423">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.4469950199127197" name="trigger" position="62.311890125274658">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.8379850387573242" name="trigger" position="63.758885145187378">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.9000129699707031" name="trigger" position="65.596870183944702">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.3319909572601318" name="trigger" position="67.496883153915405">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.8179960250854492" name="trigger" position="68.828874111175537">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.2369940280914307" name="trigger" position="70.646870136260986">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.4687058925628662" name="trigger" position="71.883864164352417">
<value>1</value>
</tag>
<tag channelNumber="-1" length="2.0003170967102051" name="trigger" position="73.352570056915283">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.9999959468841553" name="trigger" position="75.352887153625488">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.6439969539642334" name="trigger" position="77.352883100509644">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.0243129730224609" name="trigger" position="78.996880054473877">
<value>1</value>
</tag>
</tags>
</tagData>
</tagFile>
......@@ -6,6 +6,8 @@ import os
import mne
import numpy
import pytest
from obci_readmanager.signal_processing.read_manager import ReadManager
from obci_readmanager.signal_processing import smart_tags_manager as mgr
......@@ -13,6 +15,16 @@ from obci_readmanager.signal_processing.tags import smart_tag_definition as df
from test_smart_tags_manager import fabricate_data_file, fabricate_info_file
def test_mne_semicolon():
pth = os.path.abspath(__file__)
fname = os.path.join(pth[:-len(os.path.basename(pth))], 'data', 'data.obci')
read_manager = ReadManager(fname + '.info',
fname + '.dat',
fname + '_semicolon.tags')
read_manager.get_mne_raw()
def test_mne_compat():
"""Minimal test which just reads file and checks if mne can do something with it"""
pth = os.path.abspath(__file__)
......@@ -70,14 +82,13 @@ def test_mne_and_back(tmpdir):
file3 = os.path.join(str(tmpdir), 'test_rm_mne')
rm3 = ReadManager(file3 + '.obci.xml',
file3 + '.obci.raw',
file3 + '.obci.tag',)
file3 + '.obci.tag', )
_assert_rms_similar(rm2, rm3)
_assert_tags_ok(read_manager, rm3)
def test_mne_epochs(tmpdir):
d = df.SmartTagDurationDefinition(start_tag_name='trigger', start_offset=-0.15,
end_offset=0, duration=1.0)