Commit f1d4926a authored by Marian Dovgialo's avatar Marian Dovgialo

#45153 Merge branch 'development' into 'master'

Releasing 1.1.3

See merge request !20
parents c009e152 262aef09
Pipeline #17044 passed with stages
in 1 minute and 16 seconds
...@@ -43,7 +43,7 @@ def get_config(): ...@@ -43,7 +43,7 @@ def get_config():
cfg.style = "pep440" cfg.style = "pep440"
cfg.tag_prefix = "" cfg.tag_prefix = ""
cfg.parentdir_prefix = "''" cfg.parentdir_prefix = "''"
cfg.versionfile_source = "obci/_version.py" cfg.versionfile_source = "obci_readmanager/_version.py"
cfg.verbose = False cfg.verbose = False
return cfg return cfg
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
"""Mixin for readmanager to create MNE.Raw objects.""" """Mixin for readmanager to create MNE.Raw objects."""
import json import json
from typing import Union from typing import Union
from copy import copy from copy import copy, deepcopy
import numpy import numpy
from ..signal.signal_constants import sample_type_from_numpy from ..signal.signal_constants import sample_type_from_numpy
...@@ -30,11 +30,19 @@ def add_stim_chnl(raw): ...@@ -30,11 +30,19 @@ def add_stim_chnl(raw):
stim_data = numpy.zeros((1, len(raw.times))) stim_data = numpy.zeros((1, len(raw.times)))
info = mne.create_info(['OBCI_STIM'], raw.info['sfreq'], ['stim']) info = mne.create_info(['OBCI_STIM'], raw.info['sfreq'], ['stim'])
stim_raw = mne.io.RawArray(stim_data, info) stim_raw = mne.io.RawArray(stim_data, info)
raw.add_channels([stim_raw], force_update_info=True) raw.add_channels([stim_raw], force_update_info=False)
def _description_from_tag(tag): def _description_from_tag(tag):
tag_desc = copy(tag) tag_desc = copy(tag)
tag_custom_describtion = tag_desc['desc']
for key in tag_custom_describtion:
try:
value = tag_custom_describtion[key]
if ';' in value:
tag_custom_describtion[key] = value.replace(';', ':')
except TypeError:
pass
tag_desc.pop('start_timestamp') tag_desc.pop('start_timestamp')
tag_desc.pop('end_timestamp') tag_desc.pop('end_timestamp')
return json.dumps(tag_desc) return json.dumps(tag_desc)
...@@ -47,9 +55,6 @@ def tags_from_mne_annotations(ans): ...@@ -47,9 +55,6 @@ def tags_from_mne_annotations(ans):
Returns tags (list of dicts). Returns tags (list of dicts).
""" """
tags = [] tags = []
orig_time = ans.orig_time
if orig_time is None:
orig_time = 0
for onset, duration, desc in zip(ans.onset, ans.duration, ans.description): for onset, duration, desc in zip(ans.onset, ans.duration, ans.description):
# try to load annotations, as they would be exported by ReadManager # try to load annotations, as they would be exported by ReadManager
# if there is no our annotations reformat them to tags # if there is no our annotations reformat them to tags
...@@ -59,8 +64,8 @@ def tags_from_mne_annotations(ans): ...@@ -59,8 +64,8 @@ def tags_from_mne_annotations(ans):
except (json.decoder.JSONDecodeError, AssertionError): except (json.decoder.JSONDecodeError, AssertionError):
# MNE created not in OBCI # MNE created not in OBCI
tag = {'name': desc, 'desc': {}, 'channels': ''} tag = {'name': desc, 'desc': {}, 'channels': ''}
tag['start_timestamp'] = onset - orig_time tag['start_timestamp'] = onset
tag['end_timestamp'] = onset + duration - orig_time tag['end_timestamp'] = onset + duration
tags.append(tag) tags.append(tag)
return tags return tags
...@@ -132,9 +137,10 @@ class ReadManagerMNEMixin: ...@@ -132,9 +137,10 @@ class ReadManagerMNEMixin:
def from_mne(cls, mne_raw): def from_mne(cls, mne_raw):
"""Read Raw mne object to ReadManager.""" """Read Raw mne object to ReadManager."""
assert isinstance(mne_raw, mne.io.BaseRaw) assert isinstance(mne_raw, mne.io.BaseRaw)
mne_raw.drop_channels(['OBCI_STIM']) mne_raw_copy = deepcopy(mne_raw)
data = mne_raw.get_data() * 1e6 # mne keeps signals in Volts mne_raw_copy.drop_channels(['OBCI_STIM'])
return cls._rm_from_mne_data(data, mne_raw.info, mne_raw.annotations) data = mne_raw_copy.get_data() * 1e6 # mne keeps signals in Volts
return cls._rm_from_mne_data(data, mne_raw_copy.info, mne_raw_copy.annotations)
@classmethod @classmethod
def _rm_from_mne_data(cls, data, info, annotations=None): def _rm_from_mne_data(cls, data, info, annotations=None):
...@@ -178,7 +184,7 @@ class ReadManagerMNEMixin: ...@@ -178,7 +184,7 @@ class ReadManagerMNEMixin:
except ValueError: except ValueError:
meas_date_us = 0 meas_date_us = 0
meas_date = numpy.array([meas_date_s, meas_date_us], dtype=numpy.int32) meas_date = (meas_date_s, meas_date_us)
info['meas_date'] = meas_date info['meas_date'] = meas_date
def mne_annotations(self): def mne_annotations(self):
...@@ -225,5 +231,5 @@ class ReadManagerMNEMixin: ...@@ -225,5 +231,5 @@ class ReadManagerMNEMixin:
add_stim_chnl(raw) add_stim_chnl(raw)
ans = self.mne_annotations() ans = self.mne_annotations()
events, _ = self.mne_events() events, _ = self.mne_events()
raw.annotations = ans raw.set_annotations(ans)
raw.add_events(events) raw.add_events(events)
...@@ -13,8 +13,14 @@ test_requirements = [ ...@@ -13,8 +13,14 @@ test_requirements = [
'pytest-catchlog>=1.2.2', 'pytest-catchlog>=1.2.2',
'flaky>=3.3.0', 'flaky>=3.3.0',
'nose>=1.3.7', 'nose>=1.3.7',
'mne==0.14.1' 'mne~=0.17.0',
'scipy',
'matplotlib',
] ]
install_requires = ['mne~=0.17.0',
]
needs_pytest = {'pytest', 'test', 'ptr'}.intersection(sys.argv) needs_pytest = {'pytest', 'test', 'ptr'}.intersection(sys.argv)
pytest_runner_requirement = ['pytest-runner>=2.9'] pytest_runner_requirement = ['pytest-runner>=2.9']
...@@ -44,15 +50,10 @@ setup( ...@@ -44,15 +50,10 @@ setup(
packages=find_packages(exclude=['scripts', ]), packages=find_packages(exclude=['scripts', ]),
include_package_data=True, include_package_data=True,
exclude_package_data={'': ['.gitignore', '.gitlab-ci.yml']}, exclude_package_data={'': ['.gitignore', '.gitlab-ci.yml']},
install_requires=[], install_requires=install_requires,
tests_require=test_requirements, tests_require=test_requirements,
setup_requires=setup_requires, setup_requires=setup_requires,
# entry_points={
# 'console_scripts': [
# 'obci_readmanager = obci_readmanager.cmd.run_readmanager_preset:run',
# ],
# },
extras_require={ extras_require={
'test': pytest_runner_requirement + test_requirements, 'test': pytest_runner_requirement + test_requirements,
}, },
) )
...@@ -3,10 +3,13 @@ import os ...@@ -3,10 +3,13 @@ import os
import math import math
from obci_readmanager.signal_processing.balance.wii_preprocessing import wii_filter_signal, wii_downsample_signal, wii_cut_fragments from obci_readmanager.signal_processing.balance.wii_preprocessing import wii_filter_signal, wii_downsample_signal, \
from obci_readmanager.signal_processing.balance.wii_analysis import (wii_COP_path, wii_max_sway_AP_MP, wii_mean_COP_sway_AP_ML, wii_cut_fragments
wii_RMS_AP_ML, wii_confidence_ellipse_area, wii_mean_velocity, from obci_readmanager.signal_processing.balance.wii_analysis import (wii_COP_path, wii_max_sway_AP_MP,
wii_get_percentages_values) wii_mean_COP_sway_AP_ML,
wii_RMS_AP_ML, wii_confidence_ellipse_area,
wii_mean_velocity,
wii_get_percentages_values)
from obci_readmanager.signal_processing.balance.wii_read_manager import WBBReadManager from obci_readmanager.signal_processing.balance.wii_read_manager import WBBReadManager
import matplotlib.pyplot as py import matplotlib.pyplot as py
...@@ -35,7 +38,6 @@ def test_wii_analysis(plot=False): ...@@ -35,7 +38,6 @@ def test_wii_analysis(plot=False):
def test_wbb_tutorial(): def test_wbb_tutorial():
# inicjalizacja klasy WBBReadManager - utwórz obiekt podając na wejściu ścieżki do # inicjalizacja klasy WBBReadManager - utwórz obiekt podając na wejściu ścieżki do
# odpowiednich plików : # odpowiednich plików :
pth = __file__ pth = __file__
...@@ -225,5 +227,6 @@ def test_wbb_tutorial_calculations(): ...@@ -225,5 +227,6 @@ def test_wbb_tutorial_calculations():
assert math.isclose(bottom_right, 23.487261146496824) assert math.isclose(bottom_right, 23.487261146496824)
assert math.isclose(bottom_left, 18.232484076433121) assert math.isclose(bottom_left, 18.232484076433121)
if __name__ == '__main__': if __name__ == '__main__':
test_wii_analysis(plot=True) test_wii_analysis(plot=True)
...@@ -2,94 +2,107 @@ ...@@ -2,94 +2,107 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl> # Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved. # All rights reserved.
from unittest.mock import Mock
"""
>>> from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
>>> per, ch = (4, 10)
>>> f = print_bufs
>>> b = R.AutoRingBuffer(10, 5, 5, ch, f, False)
>>> v1 = get_sample_packet(per, ch, 1)
>>> v2 = get_sample_packet(per, ch, 10)
>>> v3 = get_sample_packet(per, ch, 100)
>>> v4 = get_sample_packet(per, ch, 1000)
>>> b.handle_sample_packet(v1)
>>> b.handle_sample_packet(v2)
>>> b.handle_sample_packet(v3)
FUNC
[[ 1. 2. 3. 4. 10.]
[ 2. 3. 4. 5. 20.]
[ 3. 4. 5. 6. 30.]
[ 4. 5. 6. 7. 40.]
[ 5. 6. 7. 8. 50.]
[ 6. 7. 8. 9. 60.]
[ 7. 8. 9. 10. 70.]
[ 8. 9. 10. 11. 80.]
[ 9. 10. 11. 12. 90.]
[ 10. 11. 12. 13. 100.]]
>>> b.handle_sample_packet(v4)
FUNC
[[ 11. 12. 13. 100. 101.]
[ 21. 22. 23. 200. 201.]
[ 31. 32. 33. 300. 301.]
[ 41. 42. 43. 400. 401.]
[ 51. 52. 53. 500. 501.]
[ 61. 62. 63. 600. 601.]
[ 71. 72. 73. 700. 701.]
[ 81. 82. 83. 800. 801.]
[ 91. 92. 93. 900. 901.]
[ 101. 102. 103. 1000. 1001.]]
>>> b = R.AutoRingBuffer(10, 3, 5, ch, f, False)
>>> b.handle_sample_packet(v1)
>>> b.handle_sample_packet(v2)
>>> b.handle_sample_packet(v3)
FUNC
[[ 1. 2. 3.]
[ 2. 3. 4.]
[ 3. 4. 5.]
[ 4. 5. 6.]
[ 5. 6. 7.]
[ 6. 7. 8.]
[ 7. 8. 9.]
[ 8. 9. 10.]
[ 9. 10. 11.]
[ 10. 11. 12.]]
>>> b.handle_sample_packet(v4)
FUNC
[[ 11. 12. 13.]
[ 21. 22. 23.]
[ 31. 32. 33.]
[ 41. 42. 43.]
[ 51. 52. 53.]
[ 61. 62. 63.]
[ 71. 72. 73.]
[ 81. 82. 83.]
[ 91. 92. 93.]
[ 101. 102. 103.]]
"""
import numpy import numpy
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
class ArrayCallback(Mock):
def __init__(self, *args, **kwargs):
super(ArrayCallback, self).__init__(*args, **kwargs)
self.current_check = 0
def assert_called(self, arr2):
assert self.current_check < self.call_count
arr1 = numpy.array(self.call_args_list[self.current_check][0][0])
arr2 = numpy.array(arr2)
assert numpy.array_equal(arr1, arr2)
self.current_check += 1
def __call__(self, arr):
super(ArrayCallback, self).__call__(arr.copy())
def test_auto_ring_buffer():
from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
per, ch = (4, 10)
callback = ArrayCallback()
b = R.AutoRingBuffer(10, 5, 5, ch, callback, False)
v1 = get_sample_packet(per, ch, 1)
v2 = get_sample_packet(per, ch, 10)
v3 = get_sample_packet(per, ch, 100)
v4 = get_sample_packet(per, ch, 1000)
b.handle_sample_packet(v1)
b.handle_sample_packet(v2)
b.handle_sample_packet(v3)
callback.assert_called(
[[1., 2., 3., 4., 10.],
[2., 3., 4., 5., 20.],
[3., 4., 5., 6., 30.],
[4., 5., 6., 7., 40.],
[5., 6., 7., 8., 50.],
[6., 7., 8., 9., 60.],
[7., 8., 9., 10., 70.],
[8., 9., 10., 11., 80.],
[9., 10., 11., 12., 90.],
[10., 11., 12., 13., 100.]])
b.handle_sample_packet(v4)
callback.assert_called(
[[11., 12., 13., 100., 101.],
[21., 22., 23., 200., 201.],
[31., 32., 33., 300., 301.],
[41., 42., 43., 400., 401.],
[51., 52., 53., 500., 501.],
[61., 62., 63., 600., 601.],
[71., 72., 73., 700., 701.],
[81., 82., 83., 800., 801.],
[91., 92., 93., 900., 901.],
[101., 102., 103., 1000., 1001.]])
b = R.AutoRingBuffer(10, 3, 5, ch, callback, False)
b.handle_sample_packet(v1)
b.handle_sample_packet(v2)
b.handle_sample_packet(v3)
callback.assert_called(
[[1., 2., 3.],
[2., 3., 4.],
[3., 4., 5.],
[4., 5., 6.],
[5., 6., 7.],
[6., 7., 8.],
[7., 8., 9.],
[8., 9., 10.],
[9., 10., 11.],
[10., 11., 12.]])
b.handle_sample_packet(v4)
callback.assert_called(
[[11., 12., 13.],
[21., 22., 23.],
[31., 32., 33.],
[41., 42., 43.],
[51., 52., 53.],
[61., 62., 63.],
[71., 72., 73.],
[81., 82., 83.],
[91., 92., 93.],
[101., 102., 103.]])
def get_sample_packet(per, ch, mult): def get_sample_packet(per, ch, mult):
""" """
...@@ -116,18 +129,28 @@ def get_sample_packet(per, ch, mult): ...@@ -116,18 +129,28 @@ def get_sample_packet(per, ch, mult):
return SamplePacket(packet, timestamps) return SamplePacket(packet, timestamps)
def print_bufs(bufs): def get_sample_packet(per, ch, mult):
print("FUNC") """
print(bufs)
def run(): :param per: number of samples
import sys :param ch: number of channels
import doctest :param mult: multiplication parameter
res = doctest.testmod(sys.modules[__name__]) :return: signal as tuple with two numpy arrays
if res.failed == 0: (numpy.array( [ 1485342990.17, 1485342990.28, 1485342990.42, ...]),
print("All tests succeeded!") numpy.array([
[ 1, 10, 100, ...],
[ 2, 20, 200, ...],
[ 3, 30, 300, ...],
[ 4, 400, 400, ...]
])
)
"""
timestamps = numpy.zeros(per)
packet = numpy.zeros([per, ch])
for i in range(per):
for j in range(ch):
packet[i][j] = float(j + 1) * mult + i
timestamps[i] = 10.0
return SamplePacket(packet, timestamps)
if __name__ == '__main__':
run()
...@@ -3,144 +3,145 @@ ...@@ -3,144 +3,145 @@
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl> # Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved. # All rights reserved.
"""
>>> from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
>>> per, ch = (4, 3) import numpy
from obci_readmanager.signal_processing.signal.data_raw_write_proxy import SamplePacket
>>> f = print_bufs
>>> b = R.AutoRingBuffer(5, 5, 5, ch, f, False)
>>> b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 1. 2. 3. 4. 5.]
[ 10. 20. 30. 40. 50.]
[ 100. 200. 300. 400. 500.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 6. 7. 8. 9. 10.]
[ 60. 70. 80. 90. 100.]
[ 600. 700. 800. 900. 1000.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 11. 12. 13. 14. 15.]
[ 110. 120. 130. 140. 150.]
[ 1100. 1200. 1300. 1400. 1500.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch))
FUNC
[[ 16. 17. 18. 19. 20.]
[ 160. 170. 180. 190. 200.]
[ 1600. 1700. 1800. 1900. 2000.]]
>>> b = R.AutoRingBuffer(10, 5, 3, ch, f, False)
>>> zero_count()
>>> b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch)) COUNT = 0
FUNC
[[ 3. 4. 5. 6. 7.]
[ 30. 40. 50. 60. 70.]
[ 300. 400. 500. 600. 700.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch)) try:
FUNC from .test_auto_ring_buffer import ArrayCallback
[[ 6. 7. 8. 9. 10.] except (SystemError, ImportError):
[ 60. 70. 80. 90. 100.] from test_auto_ring_buffer import ArrayCallback
[ 600. 700. 800. 900. 1000.]]
def test_auto_ring_buffer2():
from obci_readmanager.signal_processing.buffers import auto_ring_buffer as R
per, ch = (4, 3)
callback = ArrayCallback()
b = R.AutoRingBuffer(5, 5, 5, ch, callback, False)
b.handle_sample_packet(get_sample_packet(per, ch))
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[1., 2., 3., 4., 5.],
[10., 20., 30., 40., 50.],
[100., 200., 300., 400., 500.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))
FUNC callback.assert_called(
[[ 9. 10. 11. 12. 13.] [[6., 7., 8., 9., 10.],
[ 90. 100. 110. 120. 130.] [60., 70., 80., 90., 100.],
[ 900. 1000. 1100. 1200. 1300.]] [600., 700., 800., 900., 1000.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))
FUNC callback.assert_called(
[[ 12. 13. 14. 15. 16.] [[11., 12., 13., 14., 15.],
[ 120. 130. 140. 150. 160.] [110., 120., 130., 140., 150.],
[ 1200. 1300. 1400. 1500. 1600.]] [1100., 1200., 1300., 1400., 1500.]])
FUNC
[[ 15. 16. 17. 18. 19.]
[ 150. 160. 170. 180. 190.]
[ 1500. 1600. 1700. 1800. 1900.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))
FUNC callback.assert_called(
[[ 18. 19. 20. 21. 22.] [[16., 17., 18., 19., 20.],
[ 180. 190. 200. 210. 220.] [160., 170., 180., 190., 200.],
[ 1800. 1900. 2000. 2100. 2200.]] [1600., 1700., 1800., 1900., 2000.]])
b = R.AutoRingBuffer(10, 5, 3, ch, callback, False)
>>> b = R.AutoRingBuffer(10, 10, 2, ch, f, False) zero_count()
>>> zero_count() b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[3., 4., 5., 6., 7.],
[30., 40., 50., 60., 70.],
[300., 400., 500., 600., 700.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))
FUNC callback.assert_called(
[[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.] [[6., 7., 8., 9., 10.],
[ 10. 20. 30. 40. 50. 60. 70. 80. 90. 100.] [60., 70., 80., 90., 100.],
[ 100. 200. 300. 400. 500. 600. 700. 800. 900. 1000.]] [600., 700., 800., 900., 1000.]])
FUNC
[[ 3. 4. 5. 6. 7. 8. 9. 10. 11. 12.]
[ 30. 40. 50. 60. 70. 80. 90. 100. 110. 120.]
[ 300. 400. 500. 600. 700. 800. 900. 1000. 1100. 1200.]]
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))
FUNC callback.assert_called(
[[ 5. 6. 7. 8. 9. 10. 11. 12. 13. 14.] [[9., 10., 11., 12., 13.],
[ 50. 60. 70. 80. 90. 100. 110. 120. 130. 140.] [90., 100., 110., 120., 130.],
[ 500. 600. 700. 800. 900. 1000. 1100. 1200. 1300. 1400.]] [900., 1000., 1100., 1200., 1300.]])
FUNC
[[ 7. 8. 9. 10. 11. 12. 13. 14. 15. 16.]
[ 70. 80. 90. 100. 110. 120. 130. 140. 150. 160.]
[ 700. 800. 900. 1000. 1100. 1200. 1300. 1400. 1500. 1600.]]
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[12., 13., 14., 15., 16.],
[120., 130., 140., 150., 160.],
[1200., 1300., 1400., 1500., 1600.]])
callback.assert_called(
[[15., 16., 17., 18., 19.],
[150., 160., 170., 180., 190.],
[1500., 1600., 1700., 1800., 1900.]])
b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[18., 19., 20., 21., 22.],
[180., 190., 200., 210., 220.],
[1800., 1900., 2000., 2100., 2200.]])
>>> b = R.AutoRingBuffer(10, 3, 8, ch, f, False) b = R.AutoRingBuffer(10, 10, 2, ch, callback, False)
>>> zero_count() zero_count()
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))
callback.assert_called(
[[1., 2., 3., 4., 5., 6., 7., 8., 9., 10.],
[10., 20., 30., 40., 50., 60., 70., 80., 90., 100.],
[100., 200., 300., 400., 500., 600., 700., 800., 900., 1000.]])
callback.assert_called(
[[3., 4., 5., 6., 7., 8., 9., 10., 11., 12.],
[30., 40., 50., 60., 70., 80., 90., 100., 110., 120.],
[300., 400., 500., 600., 700., 800., 900., 1000., 1100., 1200.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))
FUNC callback.assert_called(
[[ 7. 8. 9.] [[5., 6., 7., 8., 9., 10., 11., 12., 13., 14.],
[ 70. 80. 90.] [50., 60., 70., 80., 90., 100., 110., 120., 130., 140.],
[ 700. 800. 900.]] [500., 600., 700., 800., 900., 1000., 1100., 1200., 1300., 1400.]])
callback.assert_called(
[[7., 8., 9., 10., 11., 12., 13., 14., 15., 16.],
[70., 80., 90., 100., 110., 120., 130., 140., 150., 160.],
[700., 800., 900., 1000., 1100., 1200., 1300., 1400., 1500., 1600.]])
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b = R.AutoRingBuffer(10, 3, 8, ch, callback, False)
zero_count()
>>> b.handle_sample_packet(get_sample_packet(per, ch)) b.handle_sample_packet(get_sample_packet(per, ch))