...
 
Commits (8)
......@@ -43,7 +43,7 @@ def get_config():
cfg.style = "pep440"
cfg.tag_prefix = ""
cfg.parentdir_prefix = "''"
cfg.versionfile_source = "obci/_version.py"
cfg.versionfile_source = "obci_readmanager/_version.py"
cfg.verbose = False
return cfg
......
......@@ -5,7 +5,7 @@
"""Mixin for readmanager to create MNE.Raw objects."""
import json
from typing import Union
from copy import copy
from copy import copy, deepcopy
import numpy
from ..signal.signal_constants import sample_type_from_numpy
......@@ -30,11 +30,19 @@ def add_stim_chnl(raw):
stim_data = numpy.zeros((1, len(raw.times)))
info = mne.create_info(['OBCI_STIM'], raw.info['sfreq'], ['stim'])
stim_raw = mne.io.RawArray(stim_data, info)
raw.add_channels([stim_raw], force_update_info=True)
raw.add_channels([stim_raw], force_update_info=False)
def _description_from_tag(tag):
tag_desc = copy(tag)
tag_custom_describtion = tag_desc['desc']
for key in tag_custom_describtion:
try:
value = tag_custom_describtion[key]
if ';' in value:
tag_custom_describtion[key] = value.replace(';', ':')
except TypeError:
pass
tag_desc.pop('start_timestamp')
tag_desc.pop('end_timestamp')
return json.dumps(tag_desc)
......@@ -47,9 +55,6 @@ def tags_from_mne_annotations(ans):
Returns tags (list of dicts).
"""
tags = []
orig_time = ans.orig_time
if orig_time is None:
orig_time = 0
for onset, duration, desc in zip(ans.onset, ans.duration, ans.description):
# try to load annotations, as they would be exported by ReadManager
# if there is no our annotations reformat them to tags
......@@ -59,8 +64,8 @@ def tags_from_mne_annotations(ans):
except (json.decoder.JSONDecodeError, AssertionError):
# MNE created not in OBCI
tag = {'name': desc, 'desc': {}, 'channels': ''}
tag['start_timestamp'] = onset - orig_time
tag['end_timestamp'] = onset + duration - orig_time
tag['start_timestamp'] = onset
tag['end_timestamp'] = onset + duration
tags.append(tag)
return tags
......@@ -132,9 +137,10 @@ class ReadManagerMNEMixin:
def from_mne(cls, mne_raw):
"""Read Raw mne object to ReadManager."""
assert isinstance(mne_raw, mne.io.BaseRaw)
mne_raw.drop_channels(['OBCI_STIM'])
data = mne_raw.get_data() * 1e6 # mne keeps signals in Volts
return cls._rm_from_mne_data(data, mne_raw.info, mne_raw.annotations)
mne_raw_copy = deepcopy(mne_raw)
mne_raw_copy.drop_channels(['OBCI_STIM'])
data = mne_raw_copy.get_data() * 1e6 # mne keeps signals in Volts
return cls._rm_from_mne_data(data, mne_raw_copy.info, mne_raw_copy.annotations)
@classmethod
def _rm_from_mne_data(cls, data, info, annotations=None):
......@@ -178,7 +184,7 @@ class ReadManagerMNEMixin:
except ValueError:
meas_date_us = 0
meas_date = numpy.array([meas_date_s, meas_date_us], dtype=numpy.int32)
meas_date = (meas_date_s, meas_date_us)
info['meas_date'] = meas_date
def mne_annotations(self):
......@@ -225,5 +231,5 @@ class ReadManagerMNEMixin:
add_stim_chnl(raw)
ans = self.mne_annotations()
events, _ = self.mne_events()
raw.annotations = ans
raw.set_annotations(ans)
raw.add_events(events)
......@@ -13,8 +13,14 @@ test_requirements = [
'pytest-catchlog>=1.2.2',
'flaky>=3.3.0',
'nose>=1.3.7',
'mne==0.14.1'
'mne~=0.17.0',
'scipy',
'matplotlib',
]
install_requires = ['mne~=0.17.0',
]
needs_pytest = {'pytest', 'test', 'ptr'}.intersection(sys.argv)
pytest_runner_requirement = ['pytest-runner>=2.9']
......@@ -44,15 +50,10 @@ setup(
packages=find_packages(exclude=['scripts', ]),
include_package_data=True,
exclude_package_data={'': ['.gitignore', '.gitlab-ci.yml']},
install_requires=[],
install_requires=install_requires,
tests_require=test_requirements,
setup_requires=setup_requires,
# entry_points={
# 'console_scripts': [
# 'obci_readmanager = obci_readmanager.cmd.run_readmanager_preset:run',
# ],
# },
extras_require={
'test': pytest_runner_requirement + test_requirements,
},
)
'test': pytest_runner_requirement + test_requirements,
},
)
......@@ -3,10 +3,13 @@ import os
import math
from obci_readmanager.signal_processing.balance.wii_preprocessing import wii_filter_signal, wii_downsample_signal, wii_cut_fragments
from obci_readmanager.signal_processing.balance.wii_analysis import (wii_COP_path, wii_max_sway_AP_MP, wii_mean_COP_sway_AP_ML,
wii_RMS_AP_ML, wii_confidence_ellipse_area, wii_mean_velocity,
wii_get_percentages_values)
from obci_readmanager.signal_processing.balance.wii_preprocessing import wii_filter_signal, wii_downsample_signal, \
wii_cut_fragments
from obci_readmanager.signal_processing.balance.wii_analysis import (wii_COP_path, wii_max_sway_AP_MP,
wii_mean_COP_sway_AP_ML,
wii_RMS_AP_ML, wii_confidence_ellipse_area,
wii_mean_velocity,
wii_get_percentages_values)
from obci_readmanager.signal_processing.balance.wii_read_manager import WBBReadManager
import matplotlib.pyplot as py
......@@ -35,7 +38,6 @@ def test_wii_analysis(plot=False):
def test_wbb_tutorial():
# inicjalizacja klasy WBBReadManager - utwórz obiekt podając na wejściu ścieżki do
# odpowiednich plików :
pth = __file__
......@@ -225,5 +227,6 @@ def test_wbb_tutorial_calculations():
assert math.isclose(bottom_right, 23.487261146496824)
assert math.isclose(bottom_left, 18.232484076433121)
if __name__ == '__main__':
test_wii_analysis(plot=True)
......@@ -2,94 +2,107 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved.
from unittest.mock import Mock
"""
>>> from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
>>> per, ch = (4, 10)
>>> f = print_bufs
>>> b = R.AutoRingBuffer(10, 5, 5, ch, f, False)
>>> v1 = get_sample_packet(per, ch, 1)
>>> v2 = get_sample_packet(per, ch, 10)
>>> v3 = get_sample_packet(per, ch, 100)
>>> v4 = get_sample_packet(per, ch, 1000)
>>> b.handle_sample_packet(v1)
>>> b.handle_sample_packet(v2)
>>> b.handle_sample_packet(v3)
FUNC
[[ 1. 2. 3. 4. 10.]
[ 2. 3. 4. 5. 20.]
[ 3. 4. 5. 6. 30.]
[ 4. 5. 6. 7. 40.]
[ 5. 6. 7. 8. 50.]
[ 6. 7. 8. 9. 60.]
[ 7. 8. 9. 10. 70.]
[ 8. 9. 10. 11. 80.]
[ 9. 10. 11. 12. 90.]
[ 10. 11. 12. 13. 100.]]
>>> b.handle_sample_packet(v4)
FUNC
[[ 11. 12. 13. 100. 101.]
[ 21. 22. 23. 200. 201.]
[ 31. 32. 33. 300. 301.]
[ 41. 42. 43. 400. 401.]
[ 51. 52. 53. 500. 501.]
[ 61. 62. 63. 600. 601.]
[ 71. 72. 73. 700. 701.]
[ 81. 82. 83. 800. 801.]
[ 91. 92. 93. 900. 901.]
[ 101. 102. 103. 1000. 1001.]]
>>> b = R.AutoRingBuffer(10, 3, 5, ch, f, False)
>>> b.handle_sample_packet(v1)
>>> b.handle_sample_packet(v2)
>>> b.handle_sample_packet(v3)
FUNC
[[ 1. 2. 3.]
[ 2. 3. 4.]
[ 3. 4. 5.]
[ 4. 5. 6.]
[ 5. 6. 7.]
[ 6. 7. 8.]
[ 7. 8. 9.]
[ 8. 9. 10.]
[ 9. 10. 11.]
[ 10. 11. 12.]]
>>> b.handle_sample_packet(v4)
FUNC
[[ 11. 12. 13.]
[ 21. 22. 23.]
[ 31. 32. 33.]
[ 41. 42. 43.]
[ 51. 52. 53.]
[ 61. 62. 63.]
[ 71. 72. 73.]
[ 81. 82. 83.]
[ 91. 92. 93.]
[ 101. 102. 103.]]
"""
import numpy
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
class ArrayCallback(Mock):
def __init__(self, *args, **kwargs):
super(ArrayCallback, self).__init__(*args, **kwargs)
self.current_check = 0
def assert_called(self, arr2):
assert self.current_check < self.call_count
arr1 = numpy.array(self.call_args_list[self.current_check][0][0])
arr2 = numpy.array(arr2)
assert numpy.array_equal(arr1, arr2)
self.current_check += 1
def __call__(self, arr):
super(ArrayCallback, self).__call__(arr.copy())
def test_auto_ring_buffer():
from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
per, ch = (4, 10)
callback = ArrayCallback()
b = R.AutoRingBuffer(10, 5, 5, ch, callback, False)
v1 = get_sample_packet(per, ch, 1)
v2 = get_sample_packet(per, ch, 10)
v3 = get_sample_packet(per, ch, 100)
v4 = get_sample_packet(per, ch, 1000)
b.handle_sample_packet(v1)
b.handle_sample_packet(v2)
b.handle_sample_packet(v3)
callback.assert_called(
[[1., 2., 3., 4., 10.],
[2., 3., 4., 5., 20.],
[3., 4., 5., 6., 30.],
[4., 5., 6., 7., 40.],
[5., 6., 7., 8., 50.],
[6., 7., 8., 9., 60.],
[7., 8., 9., 10., 70.],
[8., 9., 10., 11., 80.],
[9., 10., 11., 12., 90.],
[10., 11., 12., 13., 100.]])
b.handle_sample_packet(v4)
callback.assert_called(
[[11., 12., 13., 100., 101.],
[21., 22., 23., 200., 201.],
[31., 32., 33., 300., 301.],
[41., 42., 43., 400., 401.],
[51., 52., 53., 500., 501.],
[61., 62., 63., 600., 601.],
[71., 72., 73., 700., 701.],
[81., 82., 83., 800., 801.],
[91., 92., 93., 900., 901.],
[101., 102., 103., 1000., 1001.]])
b = R.AutoRingBuffer(10, 3, 5, ch, callback, False)
b.handle_sample_packet(v1)
b.handle_sample_packet(v2)
b.handle_sample_packet(v3)
callback.assert_called(
[[1., 2., 3.],
[2., 3., 4.],
[3., 4., 5.],
[4., 5., 6.],
[5., 6., 7.],
[6., 7., 8.],
[7., 8., 9.],
[8., 9., 10.],
[9., 10., 11.],
[10., 11., 12.]])
b.handle_sample_packet(v4)
callback.assert_called(
[[11., 12., 13.],
[21., 22., 23.],
[31., 32., 33.],
[41., 42., 43.],
[51., 52., 53.],
[61., 62., 63.],
[71., 72., 73.],
[81., 82., 83.],
[91., 92., 93.],
[101., 102., 103.]])
def get_sample_packet(per, ch, mult):
"""
......@@ -116,18 +129,28 @@ def get_sample_packet(per, ch, mult):
return SamplePacket(packet, timestamps)
def print_bufs(bufs):
print("FUNC")
print(bufs)
def get_sample_packet(per, ch, mult):
"""
def run():
import sys
import doctest
res = doctest.testmod(sys.modules[__name__])
if res.failed == 0:
print("All tests succeeded!")
:param per: number of samples
:param ch: number of channels
:param mult: multiplication parameter
:return: signal as tuple with two numpy arrays
(numpy.array( [ 1485342990.17, 1485342990.28, 1485342990.42, ...]),
numpy.array([
[ 1, 10, 100, ...],
[ 2, 20, 200, ...],
[ 3, 30, 300, ...],
[ 4, 400, 400, ...]
])
)
"""
timestamps = numpy.zeros(per)
packet = numpy.zeros([per, ch])
for i in range(per):
for j in range(ch):
packet[i][j] = float(j + 1) * mult + i
timestamps[i] = 10.0
return SamplePacket(packet, timestamps)
if __name__ == '__main__':
run()
......@@ -3,144 +3,145 @@
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved.
"""
>>> from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
>>> per, ch = (4, 3)
>>> f = print_bufs
>>> b = R.AutoRingBuffer(5, 5, 5, ch, f, False)
>>> b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 1. 2. 3. 4. 5.]
[ 10. 20. 30. 40. 50.]
[ 100. 200. 300. 400. 500.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 6. 7. 8. 9. 10.]
[ 60. 70. 80. 90. 100.]
[ 600. 700. 800. 900. 1000.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 11. 12. 13. 14. 15.]
[ 110. 120. 130. 140. 150.]
[ 1100. 1200. 1300. 1400. 1500.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 16. 17. 18. 19. 20.]
[ 160. 170. 180. 190. 200.]
[ 1600. 1700. 1800. 1900. 2000.]]
>>> b = R.AutoRingBuffer(10, 5, 3, ch, f, False)
>>> zero_count()
>>> b.handle_sample_packet(get_sample_packet(per, ch))
import numpy
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
>>> b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 3. 4. 5. 6. 7.]
[ 30. 40. 50. 60. 70.]
[ 300. 400. 500. 600. 700.]]
COUNT = 0
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 6. 7. 8. 9. 10.]
[ 60. 70. 80. 90. 100.]
[ 600. 700. 800. 900. 1000.]]
try:
from .test_auto_ring_buffer import ArrayCallback
except (SystemError, ImportError):
from test_auto_ring_buffer import ArrayCallback
def test_auto_ring_buffer2():
from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
per, ch = (4, 3)
callback = ArrayCallback()
b = R.AutoRingBuffer(5, 5, 5, ch, callback, False)
b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[1., 2., 3., 4., 5.],
[10., 20., 30., 40., 50.],
[100., 200., 300., 400., 500.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 9. 10. 11. 12. 13.]
[ 90. 100. 110. 120. 130.]
[ 900. 1000. 1100. 1200. 1300.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[6., 7., 8., 9., 10.],
[60., 70., 80., 90., 100.],
[600., 700., 800., 900., 1000.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 12. 13. 14. 15. 16.]
[ 120. 130. 140. 150. 160.]
[ 1200. 1300. 1400. 1500. 1600.]]
FUNC
[[ 15. 16. 17. 18. 19.]
[ 150. 160. 170. 180. 190.]
[ 1500. 1600. 1700. 1800. 1900.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[11., 12., 13., 14., 15.],
[110., 120., 130., 140., 150.],
[1100., 1200., 1300., 1400., 1500.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 18. 19. 20. 21. 22.]
[ 180. 190. 200. 210. 220.]
[ 1800. 1900. 2000. 2100. 2200.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[16., 17., 18., 19., 20.],
[160., 170., 180., 190., 200.],
[1600., 1700., 1800., 1900., 2000.]])
b = R.AutoRingBuffer(10, 5, 3, ch, callback, False)
>>> b = R.AutoRingBuffer(10, 10, 2, ch, f, False)
zero_count()
>>> zero_count()
b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[3., 4., 5., 6., 7.],
[30., 40., 50., 60., 70.],
[300., 400., 500., 600., 700.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
[ 10. 20. 30. 40. 50. 60. 70. 80. 90. 100.]
[ 100. 200. 300. 400. 500. 600. 700. 800. 900. 1000.]]
FUNC
[[ 3. 4. 5. 6. 7. 8. 9. 10. 11. 12.]
[ 30. 40. 50. 60. 70. 80. 90. 100. 110. 120.]
[ 300. 400. 500. 600. 700. 800. 900. 1000. 1100. 1200.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[6., 7., 8., 9., 10.],
[60., 70., 80., 90., 100.],
[600., 700., 800., 900., 1000.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 5. 6. 7. 8. 9. 10. 11. 12. 13. 14.]
[ 50. 60. 70. 80. 90. 100. 110. 120. 130. 140.]
[ 500. 600. 700. 800. 900. 1000. 1100. 1200. 1300. 1400.]]
FUNC
[[ 7. 8. 9. 10. 11. 12. 13. 14. 15. 16.]
[ 70. 80. 90. 100. 110. 120. 130. 140. 150. 160.]
[ 700. 800. 900. 1000. 1100. 1200. 1300. 1400. 1500. 1600.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[9., 10., 11., 12., 13.],
[90., 100., 110., 120., 130.],
[900., 1000., 1100., 1200., 1300.]])
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[12., 13., 14., 15., 16.],
[120., 130., 140., 150., 160.],
[1200., 1300., 1400., 1500., 1600.]])
callback.assert_called(
[[15., 16., 17., 18., 19.],
[150., 160., 170., 180., 190.],
[1500., 1600., 1700., 1800., 1900.]])
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[18., 19., 20., 21., 22.],
[180., 190., 200., 210., 220.],
[1800., 1900., 2000., 2100., 2200.]])
>>> b = R.AutoRingBuffer(10, 3, 8, ch, f, False)
b = R.AutoRingBuffer(10, 10, 2, ch, callback, False)
>>> zero_count()
zero_count()
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[1., 2., 3., 4., 5., 6., 7., 8., 9., 10.],
[10., 20., 30., 40., 50., 60., 70., 80., 90., 100.],
[100., 200., 300., 400., 500., 600., 700., 800., 900., 1000.]])
callback.assert_called(
[[3., 4., 5., 6., 7., 8., 9., 10., 11., 12.],
[30., 40., 50., 60., 70., 80., 90., 100., 110., 120.],
[300., 400., 500., 600., 700., 800., 900., 1000., 1100., 1200.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 7. 8. 9.]
[ 70. 80. 90.]
[ 700. 800. 900.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[5., 6., 7., 8., 9., 10., 11., 12., 13., 14.],
[50., 60., 70., 80., 90., 100., 110., 120., 130., 140.],
[500., 600., 700., 800., 900., 1000., 1100., 1200., 1300., 1400.]])
callback.assert_called(
[[7., 8., 9., 10., 11., 12., 13., 14., 15., 16.],
[70., 80., 90., 100., 110., 120., 130., 140., 150., 160.],
[700., 800., 900., 1000., 1100., 1200., 1300., 1400., 1500., 1600.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch))
b = R.AutoRingBuffer(10, 3, 8, ch, callback, False)
zero_count()
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 15. 16. 17.]
[ 150. 160. 170.]
[ 1500. 1600. 1700.]]
b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[7., 8., 9.],
[70., 80., 90.],
[700., 800., 900.]])
"""
b.handle_sample_packet(get_sample_packet(per, ch))
import numpy
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[15., 16., 17.],
[150., 160., 170.],
[1500., 1600., 1700.]])
COUNT = 0
......@@ -171,23 +172,7 @@ def get_sample_packet(per, ch):
for i in range(per):
COUNT += 1
for j in range(ch):
samples[i][j] = 10**j * COUNT
samples[i][j] = 10 ** j * COUNT
timestamps[i] = 10.0
return SamplePacket(samples, timestamps)
def print_bufs(bufs):
print("FUNC")
print(bufs)
def run():
import sys
import doctest
res = doctest.testmod(sys.modules[__name__])
if res.failed == 0:
print("All tests succeeded!")
if __name__ == '__main__':
run()
......@@ -2,58 +2,56 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved.
import numpy as np
"""
>>> from obci_readmanager.signal_processing.buffers import ring_buffer as R
>>> r = R.RingBuffer(10, 5, False)
>>> for s in [get_sample(i, 5) for i in range(12)]: r.add(s)
from obci_readmanager.signal_processing.buffers.ring_buffer import RingBuffer
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
>>> vals = r.get(3, 3)
>>> vals
array([[ 5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[ 5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[ 5.00000000e+02, 6.00000000e+02, 7.00000000e+02],
[ 5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[ 5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
def test_ring_buffer():
r = RingBuffer(10, 5, False)
for s in [get_sample(i, 5) for i in range(12)]:
r.add(s)
>>> vals[2, 1] = 1234.0
def assert_equal(arr1, arr2):
assert np.array_equal(np.array(arr1), np.array(arr2))
>>> vals
array([[ 5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[ 5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[ 5.00000000e+02, 1.23400000e+03, 7.00000000e+02],
[ 5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[ 5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
vals = r.get(3, 3)
>>> r.get(3, 3)
array([[ 5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[ 5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[ 5.00000000e+02, 1.23400000e+03, 7.00000000e+02],
[ 5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[ 5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
assert_equal(vals,
[[5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[5.00000000e+02, 6.00000000e+02, 7.00000000e+02],
[5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
vals[2, 1] = 1234.0
>>> r = R.RingBuffer(10, 5, True)
assert_equal(vals, [[5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[5.00000000e+02, 1.23400000e+03, 7.00000000e+02],
[5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
>>> for s in [get_sample(i, 5) for i in range(19)]: r.add(s)
assert_equal(r.get(3, 3), [[5.00000000e+00, 6.00000000e+00, 7.00000000e+00],
[5.00000000e+01, 6.00000000e+01, 7.00000000e+01],
[5.00000000e+02, 1.23400000e+03, 7.00000000e+02],
[5.00000000e+03, 6.00000000e+03, 7.00000000e+03],
[5.00000000e+04, 6.00000000e+04, 7.00000000e+04]])
>>> vals = r.get(2, 2)
r = RingBuffer(10, 5, True)
>>> vals
array([[ 1.10000000e+01, 1.20000000e+01],
[ 1.10000000e+02, 1.20000000e+02],
[ 1.10000000e+03, 1.20000000e+03],
[ 1.10000000e+04, 1.20000000e+04],
[ 1.10000000e+05, 1.20000000e+05]])
for s in [get_sample(i, 5) for i in range(19)]:
r.add(s)
"""
vals = r.get(2, 2)
import numpy
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
assert_equal(vals, [[1.10000000e+01, 1.20000000e+01],
[1.10000000e+02, 1.20000000e+02],
[1.10000000e+03, 1.20000000e+03],
[1.10000000e+04, 1.20000000e+04],
[1.10000000e+05, 1.20000000e+05]])
def get_sample(v, ch_num, per=1):
......@@ -64,8 +62,8 @@ def get_sample(v, ch_num, per=1):
:return: one sample as numpy array
numpy.array( [ 1, 10, 100, ...] )
"""
sample = numpy.zeros([per, ch_num])
timestamp = numpy.zeros(per)
sample = np.zeros([per, ch_num])
timestamp = np.zeros(per)
mult = 1
for i in range(ch_num):
......@@ -73,15 +71,3 @@ def get_sample(v, ch_num, per=1):
mult *= 10
timestamp[0] = 10.0
return SamplePacket(sample, timestamp).samples[0]
def run():
import sys
import doctest
res = doctest.testmod(sys.modules[__name__])
if res.failed == 0:
print("All tests succeeded!")
if __name__ == '__main__':
run()
<?xml version="1.0" encoding="UTF-8"?>
<tagFile formatVersion="1.0">
<paging blocks_per_page="5" page_size="20.0" />
<tagData>
<tags>
<tag channelNumber="-1" length="1.0046250820159912" name="trigger" position="0.36085605621337891">
<value>1</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.9283778667449951" name="trigger" position="1.3654811382293701">
<value>0</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.5430161952972412" name="trigger" position="3.2938590049743652">
<value>1</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.7950088977813721" name="trigger" position="4.8368752002716064">
<value>0</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.6539919376373291" name="trigger" position="6.6318840980529785">
<value>1</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.6060090065002441" name="trigger" position="8.2858760356903076">
<value>0</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.7079939842224121" name="trigger" position="9.8918850421905518">
<value>1</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.5549831390380859" name="trigger" position="11.599879026412964">
<value>0</value>
<semicolon_test>1;2;3</semicolon_test>
</tag>
<tag channelNumber="-1" length="1.5930259227752686" name="trigger" position="13.15486216545105">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.0129780769348145" name="trigger" position="14.747888088226318">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.7710130214691162" name="trigger" position="15.760866165161133">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.6369898319244385" name="trigger" position="17.531879186630249">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.8520011901855469" name="trigger" position="19.168869018554688">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.6960029602050781" name="trigger" position="21.020870208740234">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.8960099220275879" name="trigger" position="22.716873168945312">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.799976110458374" name="trigger" position="24.6128830909729">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.9080259799957275" name="trigger" position="26.412859201431274">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.0279920101165771" name="trigger" position="28.320885181427002">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.3869879245758057" name="trigger" position="29.348877191543579">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.1030158996582031" name="trigger" position="30.735865116119385">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.6669991016387939" name="trigger" position="31.838881015777588">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.0890018939971924" name="trigger" position="33.505880117416382">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.9470040798187256" name="trigger" position="34.594882011413574">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.1589961051940918" name="trigger" position="36.5418860912323">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.0949859619140625" name="trigger" position="37.700882196426392">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.389995813369751" name="trigger" position="38.795868158340454">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.7900252342224121" name="trigger" position="40.185863971710205">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.6329929828643799" name="trigger" position="41.975889205932617">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.0870018005371094" name="trigger" position="43.608882188796997">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.217008113861084" name="trigger" position="44.695883989334106">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.7229928970336914" name="trigger" position="45.91289210319519">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.7080001831054688" name="trigger" position="47.635885000228882">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.3120009899139404" name="trigger" position="49.343885183334351">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.8303978443145752" name="trigger" position="50.655886173248291">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.077592134475708" name="trigger" position="52.486284017562866">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.9700009822845459" name="trigger" position="53.563876152038574">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.8099980354309082" name="trigger" position="55.53387713432312">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.9740610122680664" name="trigger" position="57.343875169754028">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.7009468078613281" name="trigger" position="59.317936182022095">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.2930071353912354" name="trigger" position="61.018882989883423">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.4469950199127197" name="trigger" position="62.311890125274658">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.8379850387573242" name="trigger" position="63.758885145187378">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.9000129699707031" name="trigger" position="65.596870183944702">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.3319909572601318" name="trigger" position="67.496883153915405">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.8179960250854492" name="trigger" position="68.828874111175537">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.2369940280914307" name="trigger" position="70.646870136260986">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.4687058925628662" name="trigger" position="71.883864164352417">
<value>1</value>
</tag>
<tag channelNumber="-1" length="2.0003170967102051" name="trigger" position="73.352570056915283">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.9999959468841553" name="trigger" position="75.352887153625488">
<value>1</value>
</tag>
<tag channelNumber="-1" length="1.6439969539642334" name="trigger" position="77.352883100509644">
<value>0</value>
</tag>
<tag channelNumber="-1" length="1.0243129730224609" name="trigger" position="78.996880054473877">
<value>1</value>
</tag>
</tags>
</tagData>
</tagFile>
......@@ -6,6 +6,8 @@ import os
import mne
import numpy
import pytest
from obci_readmanager.signal_processing.read_manager import ReadManager
from obci_readmanager.signal_processing import smart_tags_manager as mgr
......@@ -13,6 +15,16 @@ from obci_readmanager.signal_processing.tags import smart_tag_definition as df
from test_smart_tags_manager import fabricate_data_file, fabricate_info_file
def test_mne_semicolon():
pth = os.path.abspath(__file__)
fname = os.path.join(pth[:-len(os.path.basename(pth))], 'data', 'data.obci')
read_manager = ReadManager(fname + '.info',
fname + '.dat',
fname + '_semicolon.tags')
read_manager.get_mne_raw()
def test_mne_compat():
"""Minimal test which just reads file and checks if mne can do something with it"""
pth = os.path.abspath(__file__)
......@@ -70,14 +82,13 @@ def test_mne_and_back(tmpdir):
file3 = os.path.join(str(tmpdir), 'test_rm_mne')
rm3 = ReadManager(file3 + '.obci.xml',
file3 + '.obci.raw',
file3 + '.obci.tag',)
file3 + '.obci.tag', )
_assert_rms_similar(rm2, rm3)
_assert_tags_ok(read_manager, rm3)
def test_mne_epochs(tmpdir):
d = df.SmartTagDurationDefinition(start_tag_name='trigger', start_offset=-0.15,
end_offset=0, duration=1.0)
pth = __file__
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved.
import time
"""
>>> from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
import numpy as np
import pytest
>>> from obci_readmanager.signal_processing.signal.data_raw_write_proxy import DataRawWriteProxy
from obci_readmanager.signal_processing.signal.data_generic_write_proxy import SamplePacket
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import DataRawWriteProxy
from obci_readmanager.signal_processing.signal.signal_exceptions import NoNextValue
>>> import os.path, os, time, numpy
>>> # PREPARE SOME SAMPLE FILE *************************************************
def test_read_data_source(tmpdir):
# PREPARE SOME SAMPLE FILE *************************************************
f_name = str(tmpdir.join('./tescik.obci.dat'))
px = DataRawWriteProxy(f_name)
>>> px = DataRawWriteProxy('./tescik.obci.dat')
def write_single_value(value):
packet = SamplePacket(ts=np.array([time.time()]), samples=np.array([[value]]))
px.data_received(packet)
>>> def write_single_value(value):
... packet = SamplePacket(ts=numpy.array([time.time()]), samples=numpy.array([[value]]))
... px.data_received(packet)
write_single_value(1.2)
write_single_value(0.0023)
write_single_value(-123.456)
write_single_value(3.3)
write_single_value(5.0)
write_single_value(0.0)
assert px.finish_saving() == (f_name, 6)
>>> write_single_value(1.2)
from obci_readmanager.signal_processing.signal import read_data_source as s
>>> write_single_value(0.0023)
# TEST MEMORY DATA SOURCE **************************************************
>>> write_single_value(-123.456)
py = s.MemoryDataSource(np.zeros((2, 3)))
>>> write_single_value(3.3)
py.set_sample(0, [1.0, 2.0])
py.set_sample(1, [3.0, 4.0])
py.set_sample(2, [5.0, 6.0])
>>> write_single_value(5.0)
def assert_equal(arr1, arr2):
assert np.array_equal(np.array(arr1), np.array(arr2))
>>> write_single_value(0.0)
assert_equal([i for i in py.iter_samples()],
[np.array([1., 2.]), np.array([3., 4.]), np.array([5., 6.])])
>>> nic = px.finish_saving()
assert_equal([i for i in py.iter_samples()],
[np.array([1., 2.]), np.array([3., 4.]), np.array([5., 6.])])
>>> f = './tescik.obci.dat'
assert_equal(py.get_samples(0, 1), np.array([[1.], [2.]]))
>>> from obci_readmanager.signal_processing.signal import read_data_source as s
with pytest.raises(IndexError):
py.set_sample(3, [3.0, 4.0])
>>> # TEST MEMORY DATA SOURCE **************************************************
with pytest.raises(ValueError):
py.set_sample(2, [1.0, 2.0, 3.0])
>>> import numpy
# TEST FILE DATA SOURCE ****************************************************
>>> import numpy as np
py = s.FileDataSource(f_name, 2)
>>> py = s.MemoryDataSource(numpy.zeros((2,3)))
assert_equal(py.get_samples(0, 0), np.empty((2, 0), dtype=np.float64))
>>> py.set_sample(0, [1.0, 2.0])
assert_equal(np.abs(np.array([[1.20000000e+00, -1.23456000e+02],
[2.30000000e-03, 3.30000000e+00]]) - py.get_samples(0, 2)) < 0.001,
[[True, True], [True, True]])
>>> py.set_sample(1, [3.0, 4.0])
with pytest.raises(NoNextValue):
py.get_samples(0, 10)
assert_equal(np.abs(np.array([[1.20000000e+00, -1.23456000e+02, 5.00000000e+00],
[2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples(0, 3)) < 0.001,
[[True, True, True],
[True, True, True]], )
with pytest.raises(NoNextValue):
py.get_samples(1, 3)
>>> py.set_sample(2, [5.0, 6.0])
assert_equal(np.abs(np.array([[1.20000000e+00, -1.23456000e+02, 5.00000000e+00],
[2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples()) < 0.001,
[[True, True, True],
[True, True, True]])
>>> [i for i in py.iter_samples()]
[array([ 1., 2.]), array([ 3., 4.]), array([ 5., 6.])]
py = s.FileDataSource(f_name, 2)
>>> [i for i in py.iter_samples()]
[array([ 1., 2.]), array([ 3., 4.]), array([ 5., 6.])]
from numpy import array
>>> py.get_samples(0, 1)
array([[ 1.],
[ 2.]])
assert_equal(np.abs(array([[1.20000000e+00, -1.23456000e+02, 5.00000000e+00],
[2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples()) < 0.001,
[[True, True, True],
[True, True, True]])
>>> py.set_sample(3, [3.0, 4.0])
Traceback (most recent call last):
...
IndexError: index 3 is out of bounds for axis 1 with size 3
py = s.FileDataSource(f_name, 2)
>>> py.set_sample(2, [1.0, 2.0, 3.0])
Traceback (most recent call last):
...
ValueError: cannot copy sequence with size 3 to array axis with dimension 2
assert_equal(np.abs(array([[1.20000000e+00, -1.23456000e+02, 5.00000000e+00],
[2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples()) < 0.001,
[[True, True, True],
[True, True, True]])
assert_equal(np.abs(array([[1.20000000e+00, -1.23456000e+02, 5.00000000e+00],
[2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples()) < 0.001,
[[True, True, True],
[True, True, True]])
>>> # TEST FILE DATA SOURCE ****************************************************
assert_equal([max(abs(i - y)) < 0.0001 for i, y in zip(py.iter_samples(),
[array([1.2, 0.0023]), array([-123.456, 3.3]),
array([5., 0.])])],
[True, True, True])
>>> py = s.FileDataSource(f, 2)
py = s.FileDataSource(f_name, 2)
>>> py.get_samples(0, 0)
array([], shape=(2, 0), dtype=float64)
assert [max(abs(i - y)) < 0.0001 for i, y in zip(py.iter_samples(),
[array([1.2, 0.0023]), array([-123.456, 3.3]),
array([5., 0.])])] == \
[True, True, True]
>>> np.abs(np.array([[ 1.20000000e+00, -1.23456000e+02],\
[ 2.30000000e-03, 3.30000000e+00]]) - py.get_samples(0, 2)) < 0.001
array([[ True, True],
[ True, True]], dtype=bool)
>>> py.get_samples(0, 10)
Traceback (most recent call last):
...
obci_readmanager.signal_processing.signal.signal_exceptions.NoNextValue
>>> np.abs(np.array([[ 1.20000000e+00, -1.23456000e+02, 5.00000000e+00],\
[ 2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples(0, 3)) < 0.001
array([[ True, True, True],
[ True, True, True]], dtype=bool)
>>> py.get_samples(1, 3)
Traceback (most recent call last):
...
obci_readmanager.signal_processing.signal.signal_exceptions.NoNextValue
>>> np.abs(np.array([[ 1.20000000e+00, -1.23456000e+02, 5.00000000e+00],\
[ 2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples()) < 0.001
array([[ True, True, True],
[ True, True, True]], dtype=bool)
>>> py = s.FileDataSource(f, 2)
>>> from numpy import array
>>> np.abs(array([[ 1.20000000e+00, -1.23456000e+02, 5.00000000e+00],\
[ 2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples()) < 0.001
array([[ True, True, True],
[ True, True, True]], dtype=bool)
>>> py = s.FileDataSource(f, 2)
>>> np.abs(array([[ 1.20000000e+00, -1.23456000e+02, 5.00000000e+00],\
[ 2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples()) < 0.001
array([[ True, True, True],
[ True, True, True]], dtype=bool)
>>> np.abs(array([[ 1.20000000e+00, -1.23456000e+02, 5.00000000e+00],\
[ 2.30000000e-03, 3.30000000e+00, 0.00000000e+00]]) - py.get_samples()) < 0.001
array([[ True, True, True],
[ True, True, True]], dtype=bool)
>>> [max(abs(i-y))<0.0001 for i,y in zip(py.iter_samples(),\
[array([ 1.2 , 0.0023]), array([-123.456, 3.3 ]), array([ 5., 0.])])]
[True, True, True]
>>> py = s.FileDataSource(f, 2)
>>> [max(abs(i-y))<0.0001 for i,y in zip(py.iter_samples(),\
[array([ 1.2 , 0.0023]), array([-123.456, 3.3 ]), array([ 5., 0.])])]
[True, True, True]
>>> [max(abs(i-y))<0.0001 for i,y in zip(py.iter_samples(),\
[array([ 1.2 , 0.0023]), array([-123.456, 3.3 ]), array([ 5., 0.])])]
[True, True, True]
>>> os.remove(f)
"""
def run():
import doctest
import sys
res = doctest.testmod(sys.modules[__name__])
if res.failed == 0:
print("All tests succeeded!")
if __name__ == '__main__':
run()
assert [max(abs(i - y)) < 0.0001 for i, y in zip(py.iter_samples(),
[array([1.2, 0.0023]), array([-123.456, 3.3]),
array([5., 0.])])] == \
[True, True, True]
......@@ -11,7 +11,8 @@ import numpy
from obci_readmanager.signal_processing import smart_tags_manager as mgr
from obci_readmanager.signal_processing.tags import smart_tag_definition as df
from obci_readmanager.signal_processing.smart_tags_manager import SmartTagsManager
from obci_readmanager.signal_processing.tags.smart_tag_definition import SmartTagDurationDefinition, SmartTagEndTagDefinition
from obci_readmanager.signal_processing.tags.smart_tag_definition import SmartTagDurationDefinition, \
SmartTagEndTagDefinition
from obci_readmanager.signal_processing.read_manager import ReadManager
......@@ -123,13 +124,13 @@ def test_legacy_doctest():
tss_tags = [t.get_start_tag() for t in tss]
assert tss_tags ==\
[{'channels': '', 'start_timestamp': 6.0999999999999996, 'desc': {u'y': u'456', u'x': u'123', u'z': u'789'},
'name': u'B', 'end_timestamp': 6.5}, {'channels': '', 'start_timestamp': 7.0,
'desc': {u'y': u'456', u'x': u'123', u'z': u'789'}, 'name': u'B',
'end_timestamp': 7.0},
{'channels': '', 'start_timestamp': 7.5, 'desc': {u'y': u'456', u'x': u'123', u'z': u'789'},
'name': u'B', 'end_timestamp': 8.6999999999999993}]
assert tss_tags == \
[{'channels': '', 'start_timestamp': 6.0999999999999996, 'desc': {u'y': u'456', u'x': u'123', u'z': u'789'},
'name': u'B', 'end_timestamp': 6.5}, {'channels': '', 'start_timestamp': 7.0,
'desc': {u'y': u'456', u'x': u'123', u'z': u'789'}, 'name': u'B',
'end_timestamp': 7.0},
{'channels': '', 'start_timestamp': 7.5, 'desc': {u'y': u'456', u'x': u'123', u'z': u'789'},
'name': u'B', 'end_timestamp': 8.6999999999999993}]
assert tss[0].get_all_samples()[0][0] == 780.0
......@@ -144,11 +145,11 @@ def test_legacy_doctest():
tss_tags = [t.get_start_tag() for t in tss]
assert tss_tags ==\
[{'channels': '', 'start_timestamp': 10.199999999999999, 'desc': {u'y': u'456', u'x': u'123', u'z': u'789'},
'name': u'B', 'end_timestamp': 11.0}, {'channels': '', 'start_timestamp': 12.0,
'desc': {u'y': u'456', u'x': u'123', u'z': u'789'}, 'name': u'B',
'end_timestamp': 12.5}]
assert tss_tags == \
[{'channels': '', 'start_timestamp': 10.199999999999999, 'desc': {u'y': u'456', u'x': u'123', u'z': u'789'},
'name': u'B', 'end_timestamp': 11.0}, {'channels': '', 'start_timestamp': 12.0,
'desc': {u'y': u'456', u'x': u'123', u'z': u'789'}, 'name': u'B',
'end_timestamp': 12.5}]
for fl in glob.glob('tescik*'):
os.remove(fl)
......