Commit 8a49aa3b authored by Joanna Duda's avatar Joanna Duda Committed by Marian Dovgialo

Flake8 all for catalogue: signal processing

automatic changes

refs #31070

Flake8 all for catalogue: signal processing
    custom changes

    refs #31070

Doc fixes

refs #31518
parent 59c42201
"""Package providing utils for utils for reading and writing signal data,and data info and data taga, and blinks."""
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Module provides classes representing single blink and blink buffers."""
import collections
import numpy
......@@ -10,6 +9,8 @@ Sample = numpy.zeros
class BlinkEntry(object):
"""Class representing single blink."""
def __init__(self, blink: Blink, blink_count: int, blink_pos: int) -> None:
"""
Blink variable as a representation of the stimulus.
......@@ -24,6 +25,12 @@ class BlinkEntry(object):
class AutoBlinkBuffer(object):
"""
Class representing buffer which returns predefined epochs of signal around some blink event.
Ex. to use in event related potential paradigms.
"""
def __init__(self, from_blink, samples_count, num_of_channels, sampling, ret_func, copy_on_ret):
"""
After occurrence of the blink and getting a sufficient number of samples calls ret_func(blink, d).
......@@ -38,9 +45,8 @@ class AutoBlinkBuffer(object):
:param ret_func: function to call when return. Takes returned samples.
:param copy_on_ret: if true make deep copy of the buffer while processing
"""
assert(samples_count > 0)
assert(num_of_channels > 0)
assert (samples_count > 0)
assert (num_of_channels > 0)
self.ret_func = ret_func
self.ret_buf_len = samples_count # 1024
......@@ -62,19 +68,20 @@ class AutoBlinkBuffer(object):
self.blinks = collections.deque()
def clear(self):
"""Clear buffer."""
self.count = 0
self.is_full = False
self.buffer.clear()
self.times.clear()
self.clear_blinks()
self._clear_blinks()
def clear_blinks(self):
def _clear_blinks(self):
self.blinks_count = 0
self.blinks.clear()
def handle_blink(self, blink: Blink) -> None:
"""
Assignees index and position to the Blink and queues this Blink.
Method which assigns index and position to the Blink and queues this Blink.
:param blink: Blink (object with one field named timestamp)
:return:
......@@ -108,7 +115,7 @@ class AutoBlinkBuffer(object):
def handle_sample_packet(self, sample_packet):
"""
Supports SamplePacket in a manner consistent with AutoBlinkBuffer.
Method handles signal sample packets in a manner consistent with AutoBlinkBuffer.
:param sample_packet: signal as a SamplePacket class.
:return: None
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Module provides single class representing sample packet buffer."""
from obci.signal_processing.buffers import ring_buffer as rbn
class AutoRingBuffer(object):
"""Class representing sample packet buffer. When full calls ret_func."""
def __init__(self, from_sample, samples_count, every, num_of_channels, ret_func, copy_on_ret):
"""
When full, calls ret_func(d), where d is numpy array with <samples_count> samples from
<num_of_channels> channels. Between following returns skips <every> samples.
When full, calls ret_func(d), where d is numpy array with <samples_count>samples from <num_of_channels>channels.
Skips samples before next ret_func call.
:param from_sample: size of the buffer
:param samples_count: number of samples to return
......@@ -16,11 +19,10 @@ class AutoRingBuffer(object):
:param ret_func: function to call when return. Takes returned samples.
:param copy_on_ret: if true make deep copy of the buffer while processing
"""
assert(samples_count > 0)
assert(from_sample > 0)
assert(every > 0)
assert(num_of_channels > 0)
assert (samples_count > 0)
assert (from_sample > 0)
assert (every > 0)
assert (num_of_channels > 0)
self.every = every
self.ret_func = ret_func
......@@ -36,6 +38,7 @@ class AutoRingBuffer(object):
copy_on_ret)
def clear(self):
"""Clear buffer."""
self.count = 0
self.is_full = False
self.buffer.clear()
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Module provides single class representing buffer."""
import numpy
from obci.signal_processing.buffers import ring_buffer_base
class RingBuffer(ring_buffer_base.RingBufferBase):
"""Subclass 'RingBufferBase' representing buffer."""
def _get_normal(self, start, end):
return self.buffer[:, start:end]
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Module provides single class representing base buffer."""
import copy
class RingBufferBase(object):
"""Class representing base buffer."""
def __init__(self, size, number_of_channels, copy_on_ret):
"""Initialize buffer."""
self.size = int(size)
self.number_of_channels = int(number_of_channels)
self.copy_on_ret = bool(copy_on_ret)
self.clear()
def clear(self):
"""Clear buffer."""
self.is_full = False
self.index = 0
self._init_buffer()
def add(self, s):
"""Add sample to buffer."""
self._add(s)
if not self.is_full and self.index == self.size - 1:
self.is_full = True
self.index = (self.index + 1) % self.size
def get(self, start, length):
"""Get samples from buffer (start, start + length)."""
if not self.is_full:
d = self._get_normal(start, start + length)
else:
......
......@@ -18,9 +18,7 @@
#
# Author:
# Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""Module defines a single method get_logger that returns logger with
set logging level. Change loggin.INFO lines to change logging level."""
"""Module defines a single method get_logger that returns logger with set logging level."""
import logging
......@@ -32,9 +30,11 @@ LEVELS = {'debug': logging.DEBUG,
def get_logger(p_name, p_level='info'):
"""Return logger with p_name as name. And logging level p_level.
p_level should be in (starting with the most talkactive):
'debug', 'info', 'warning', 'error', 'critical'."""
"""
Return logger with p_name as name. And logging level p_level.
p_level should be in (starting with the most talkactive): 'debug', 'info', 'warning', 'error', 'critical'.
"""
logger = logging.getLogger(p_name)
if len(logger.handlers) == 0:
# Some module migh be imported few times. In every get_logger call
......
# Author:
# Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
#
"""
Module provides single :class 'ReadManager' responsible for reading OpenBCI file format.
Author:
Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
import copy
import numpy
import os.path
from typing import Union
import numpy
from typing import Union
from . import logging as logger
from .signal import data_raw_write_proxy
......@@ -36,10 +39,12 @@ class ReadManager:
the function should return a value for 'new_param' request).
#. Call ``get_param('new_param')`` every time you want to get the param.
"""
def __init__(self,
p_info_source: Union[str, read_info_source.FileInfoSource],
p_data_source: Union[str, read_data_source.FileDataSource],
p_tags_source: Union[str, read_tags_source.FileTagsSource]):
"""Initialize FileInfoSource, FileDataSource and FileTagsSource."""
super().__init__()
if isinstance(p_info_source, str):
......@@ -62,12 +67,14 @@ class ReadManager:
self.tags_source = p_tags_source
def __deepcopy__(self, memo):
"""Construct a new object, next recursively, inserts copies into it."""
info_source = copy.deepcopy(self.info_source)
tags_source = copy.deepcopy(self.tags_source)
samples_source = copy.deepcopy(self.data_source)
return ReadManager(info_source, samples_source, tags_source)
def save_to_file(self, p_dir: str, p_name: str):
"""Save to file (tags, info, data)."""
path = os.path.join(p_dir, p_name)
tags = self.get_tags()
......@@ -110,23 +117,36 @@ class ReadManager:
raise Exception('Unrecognised unit type. Should be sample or second!. Abort!')
def get_all_samples(self):
"""Return an array of all samples."""
return self.get_samples(p_from=None, p_len=None, p_unit='sample')
def get_channel_samples(self, p_ch_name, p_from=None, p_len=None, p_unit='sample'):
"""Return an array of values for channel p_ch_name, or
raise ValueError exception if there is channel with that name.
p_unit can be 'sample' or 'second' and makes sense only if p_from and p_len is not none."""
"""
Return an array of values for channel p_ch_name.
Raise ValueError exception if there is no channel with that name.
p_unit can be 'sample' or 'second' and makes sense only if p_from and p_len is not none.
"""
ch_ind = self.get_param('channels_names').index(p_ch_name) # TODO error
return self.get_samples(p_from, p_len, p_unit)[ch_ind]
def get_channels_samples(self, p_ch_names, p_from=None, p_len=None, p_unit='sample'):
assert(len(p_ch_names) > 0)
"""Return an array of values for channel p_ch_names."""
assert (len(p_ch_names) > 0)
s = self.get_channel_samples(p_ch_names[0], p_from, p_len, p_unit)
for ch in p_ch_names[1:]:
s = numpy.vstack((s, self.get_channel_samples(ch, p_from, p_len, p_unit)))
return s
def set_samples(self, p_samples, p_channel_names, p_copy=False):
"""
Set new samples and channel names.
:param p_samples: 2 dimensional numpy array (n_channels x n_samples)
:param p_channel_names: list of strings, names of the channels. Should be the same length as p_samples.shape[0]
:param p_copy: True if you want to make a copy of the p_samples array
"""
try:
dim = p_samples.ndim
except AttributeError:
......@@ -152,35 +172,52 @@ class ReadManager:
return self.tags_source.get_tags(p_tag_type, p_from, p_len, p_func)
def set_tags(self, p_tags):
"""Set tags for :given type of tags."""
if self.tags_source is not None:
self.tags_source.set_tags(p_tags)
else:
self.tags_source = read_tags_source.MemoryTagsSource(p_tags)
def get_param(self, p_param_name):
"""Return parameter value for p_param_name.
Raise NoParameter exception if p_param_name
parameters was not found."""
"""
Return parameter value for p_param_name:str.
Raise NoParameter exception if p_param_name parameters was not found.
"""
return self.info_source.get_param(p_param_name)
def get_params(self):
"""Get params from info_source."""
return self.info_source.get_params()
def set_param(self, p_key, p_value):
"""Set param (p_key, p_value) in info_source."""
self.info_source.set_param(p_key, p_value)
def set_params(self, p_params):
"""Set params in info_source."""
self.info_source.update_params(p_params)
def reset_params(self):
"""Reset params in info source."""
self.info_source.reset_params()
def iter_tags(self, p_tag_type=None, p_from=None, p_len=None, p_func=None):
"""
Generator, iterates tag by tag (of given type).
:param p_tag_type: type of tag
:param p_from: start timestamp
:param p_len: number of tag
:param p_func: logic who to choose tag eg. tag['desc'][u'index'] == tag['desc'][u'target']
:return: tag or None
"""
if self.tags_source is None:
return
for tag in self.tags_source.get_tags(p_tag_type, p_from, p_len, p_func):
yield tag
def iter_samples(self):
"""Generator, iterates sample by sample."""
for s in self.data_source.iter_samples():
yield s
"""Package providing proxy for signal data and info."""
# Author:
# Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
#
"""
Module defines single :class: 'DataAsciiWriteProxy'.
Author:
Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
from obci.drivers.eeg.eeg_amplifier import SamplePacket
from .data_generic_write_proxy import DataGenericWriteProxy
class DataAsciiWriteProxy(DataGenericWriteProxy):
"""Subclass write data in ASCII format to file."""
def __init__(self, p_file_path):
"""Open p_file_path file for writing."""
super().__init__(p_file_path, 'wt')
def data_received(self, packet: SamplePacket):
"""
Method gets and saves next sample of signal.
:param packet: 'SamplePacket'
"""
self._write_file(str(packet.samples.tolist()) + '\n')
self._number_of_samples += 1
# Author:
# Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
#
"""
Module providing generic temporary data buffer.
Author:
Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
from obci.drivers.eeg.eeg_amplifier import SamplePacket
from obci.signal_processing.signal.data_raw_write_proxy import DataRawWriteProxy
......@@ -8,6 +11,7 @@ BUF_SIZE = 1024
class DataBufferedWriteProxy(DataRawWriteProxy):
"""Subclass of class 'DataRawWriteProxy', stores data in temp buffer."""
def __init__(self, p_file_path, p_append_ts=False, p_sample_type='FLOAT'):
"""Open p_file_path file for writing."""
......@@ -22,8 +26,7 @@ class DataBufferedWriteProxy(DataRawWriteProxy):
self._flush_buffer()
def finish_saving(self):
"""Close the file, return a tuple -
file`s name and number of samples."""
"""Close the file, return a tuple - file`s name and number of samples."""
self._flush_buffer()
return super().finish_saving()
......
# Author:
# Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
#
"""
Module providing :class 'DataGenericWriteProxy' representing data file.
Author:
Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
import abc
import numpy
......@@ -38,14 +41,15 @@ class DataGenericWriteProxy(metaclass=abc.ABCMeta):
@abc.abstractmethod
def data_received(self, packet: SamplePacket):
"""Should be implemented in subclass."""
pass
def set_first_sample_timestamp(self, timestamp):
"""Set first sample timestamp."""
self._first_sample_timestamp = timestamp
def finish_saving(self):
"""Close the file, return a tuple -
file`s name and number of samples."""
"""Close the file, return a tuple - file`s name and number of samples."""
self._file.flush()
self._file.close()
return self._file_path, self._number_of_samples
......
# Author:
# Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
#
"""
Module defines single :class 'DataRawWriteProxy'.
Author:
Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
from obci.drivers.eeg.eeg_amplifier import SamplePacket
from .data_generic_write_proxy import DataGenericWriteProxy
from .signal_constants import SAMPLE_NUMPY_TYPES
class DataRawWriteProxy(DataGenericWriteProxy):
"""Subclass of class 'DataGenericWriteProxy', saves raw data into file."""
def __init__(self, p_file_path, p_append_ts=False, p_sample_type='FLOAT'):
"""Open p_file_path file for writing."""
......@@ -18,5 +22,6 @@ class DataRawWriteProxy(DataGenericWriteProxy):
raise Exception('Invalid sample type to write: ' + p_sample_type)
def data_received(self, packet: SamplePacket):
"""Write sample packet to file."""
self._write_file(self._bytes_from_packet(packet))
self._number_of_samples += 1
# Author:
# Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
#
"""
Module defines single :class 'DataReadProxy'.
Author:
Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
import os.path
import struct
......@@ -16,13 +19,22 @@ SAMPLE_STRUCT_TYPES = signal_constants.SAMPLE_STRUCT_TYPES
class DataReadProxy:
"""Class 'DataReadProxy' reads data from file."""
def __init__(self, p_file_path, sample_type='FLOAT'):
"""
Initialize class and start reading from file.
:param p_file_path: name of file containing data samples
:param sample_type: type of data
"""
self._file_path = p_file_path
self._sample_size = SAMPLE_SIZES[sample_type]
self._sample_struct_type = '<' + SAMPLE_STRUCT_TYPES[sample_type]
self.start_reading()
def start_reading(self):
"""Start reading from file."""
try:
self._data_file = open(self._file_path, 'rb')
except IOError as e:
......@@ -30,10 +42,12 @@ class DataReadProxy:
raise e
def finish_reading(self):
"""Finish reading from file."""
self._data_file.close()
def get_all_values(self, p_channels_num=1):
assert(p_channels_num > 0)
"""Get all values from channel (:param. p_channel)."""
assert (p_channels_num > 0)
self.finish_reading()
self.start_reading()
......@@ -60,8 +74,11 @@ class DataReadProxy:
return vals
def get_next_value(self):
"""Return next value from data file (as python float).
Close data file and raise NoNextValue exception if eof."""
"""
Return next value from data file (as python float).
Close data file and raise NoNextValue exception if eof.
"""
l_raw_data = self._data_file.read(self._sample_size)
try:
# TODO - by now it is assumed that error means eof..
......@@ -72,9 +89,11 @@ class DataReadProxy:
raise signal_exceptions.NoNextValue()
def get_next_values(self, p_num):
"""Return next p_num values from data file (as numpy array).
Close data file and raise NoNextValue exception if eof."""
"""
Return next p_num values from data file (as numpy array).
Close data file and raise NoNextValue exception if eof.
"""
# Read data from file
# LOGGER.debug("Before reading "+str(self._sample_size*p_num)+" samples.
# CURRENT POSITION/8 = "+str(self._data_file.tell()/8))
......@@ -102,8 +121,7 @@ class DataReadProxy:
def goto_value(self, p_value_no):
"""
Set the engine, so that nex 'get_next_value' call will return
value number p_value_no+1.
Set the engine, so that nex 'get_next_value' call will return value number p_value_no+1.
Eg. if p_value_no == 0, calling get_next_value will return first value.
if p_value_no == 11, calling get_next_value will return 12-th value.
......
# Author:
# Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
#
"""
Module defines single :method 'get_proxy'.
Author:
Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
from .data_ascii_write_proxy import DataAsciiWriteProxy
from .data_buffered_write_proxy import DataBufferedWriteProxy
from .data_raw_write_proxy import DataRawWriteProxy
......@@ -10,6 +13,15 @@ LOGGER = logger.get_logger('data_write_proxy', 'info')
def get_proxy(file_path, append_ts=False, use_own_buffer=False, format='FLOAT'):
"""
Return data write proxy.
:param file_path: name of file containing data samples
:param append_ts: append sample timestamp
:param use_own_buffer: if False create buffer: DataRawWriteProxy else: pass own buffer
:param format: data format
:return: proxy or DataAsciiWriteProxy(file_path)
"""
if format.lower() == 'ascii':
return DataAsciiWriteProxy(file_path)
cls = DataBufferedWriteProxy if use_own_buffer else DataRawWriteProxy
......
......@@ -16,9 +16,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Author:
# Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
#
"""
Module providing generic info file proxy.
Author:
Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
import xml.dom.minidom
from . import signal_exceptions
......@@ -67,7 +70,7 @@ TAGS_DEFINITIONS = {
'export_date':
('simple', ['exportDate'])
}
"""
'''
For every tag we have entry in format
::
......@@ -75,19 +78,18 @@ For every tag we have entry in format
'tag_universal_name':
('list' or 'simple', # tag type
[one element for 'simple', two elements for 'list']) # list of tag translations
"""
'''
class OpenBciDocument(xml.dom.minidom.Document):
"""Abstract class for future development, used in proxies."""
pass
class GenericInfoFileWriteProxy:
"""
A class that is responsible for implementing logic of OpenBCI
signal parameters storage in info file.
A class that is responsible for implementing logic of OpenBCI signal parameters storage in info file.
The file is supposed to be compatible with signalml 2.0. By now it isn`t :)
......@@ -99,10 +101,11 @@ class GenericInfoFileWriteProxy:
Public interface:
* :meth:`finish_saving`
:meth:`finish_saving`
"""
def __init__(self, p_file_path):
"""A class representing info file."""
super().__init__()
self._file_path = p_file_path
# TODO works in windows and linux on path with spaces?
......@@ -115,8 +118,7 @@ class GenericInfoFileWriteProxy:
return OpenBciDocument()
def set_attributes(self, p_attrs_dict):
"""For every pair key-> value in p_attrs_dict create tag.
The type of tag depends on self._tags_control."""
"""For every pair key-> value in p_attrs_dict create tag. The type of tag depends on self._tags_control."""
for i_key, i_value in p_attrs_dict.items():
self._set_tag(i_key, i_value)
......@@ -126,9 +128,9 @@ class GenericInfoFileWriteProxy:
Arguments:
* ``p_file_name`` - a name of to-be-created info file
* ``p_dir_path`` - a dir-path where p_file_name is to be created
* ``p_signal_params`` - a dictionary of all signal parameters that should be stored in info file.
:arg ``p_file_name`` : a name of to-be-created info file
:arg ``p_dir_path`` : a dir-path where p_file_name is to be created
:arg ``p_signal_params`` : a dictionary of all signal parameters that should be stored in info file.
What is the logics flow of analysing parameters?
......@@ -159,25 +161,28 @@ class GenericInfoFileWriteProxy:
return self._file_path
def _set_remaining_tags(self):
"""Set all default (hardcoded) tags and other tags as now we
we have all needed data."""
"""Set all default (hardcoded) tags and other tags as now we we have all needed data."""
self.set_attributes({
'byte_order': 'LITTLE_ENDIAN',
})
def _create_xml_root(self):
"""Create root xml element and add standard parameters:
'sample_type' (double by now)
'file' (data file`s name).
"""
Create root xml element and add standard parameters.
:param'sample_type' (double by now)
:param'file' (data file`s name)
"""
self._xml_root = self._xml_factory.createElement('OpenBciDataFormat')
# this is going to be an in-memory representation of xml info file
self._xml_factory.appendChild(self._xml_root)
def _set_tag(self, p_tag_name, p_tag_params):
"""For given tag name and tag parameters create in-memory
representation of xml tag. Tag type is defined in self._tags_control
so use it to determine specific action."""
"""
For given tag name and tag parameters create in-memory representation of xml tag.
Tag type is defined in self._tags_control so use it to determine specific action.
"""
# first get a tupe (function, list_of_function_params)
l_ctr = self._tags_controls[p_tag_name]
l_std_params = list(l_ctr['params'])
......@@ -188,7 +193,9 @@ class GenericInfoFileWriteProxy:
l_ctr['function'](*l_std_params)
def _set_simple_tag(self, p_tag_name, p_tag_value):
"""A generic method for adding an xml element with
"""
A generic method for adding an xml element.
- tag name: 'param',
- id: 'p_tag_name',
- value: p_tag_value.
......@@ -199,7 +206,9 @@ class GenericInfoFileWriteProxy:
def _set_list_tag(
self, p_tag_name, p_subtag_name, p_tag_values):
"""Ad xml tag like:
"""
Add xml tag.
<p_tag_name>
<p_subtag_name>p_tag_values[0]</p_subtag_name>
<p_subtag_name>p_tag_values[1]</p_subtag_name>
......@@ -214,7 +223,9 @@ class GenericInfoFileWriteProxy:
self._xml_root.appendChild(l_xml_list_root)
def _create_xml_text_element(self, p_tag_name, p_text_value):
"""A generic method for adding an xml text element with
"""
A generic method for adding an xml text element.
- tag name: 'p_tag_name',
- value: p_text_value.
- id: 'p_id_value' if different from ''
......@@ -224,8 +235,7 @@ class GenericInfoFileWriteProxy:
return l_xml_element
def _create_tags_controls(self):
"""Define tags control functions for every recognisable parameter.
See self.__init__ for more details."""
"""Define tags control functions for every recognisable parameter. See self.__init__ for more details."""
self._tags_controls = {}
for i_tag_name, i_tag_def in TAGS_DEFINITIONS.items():
if i_tag_def[0] == 'simple':
......@@ -238,6 +248,8 @@ class GenericInfoFileWriteProxy:
class GenericInfoFileReadProxy:
"""Class reading info from xml file."""
def __init__(self, p_file_path):
"""Info file reader."""
self._file_path = p_file_path
......@@ -267,6 +279,11 @@ class GenericInfoFileReadProxy:
self._xml_doc = xml.dom.minidom.parse(p_info_file)
def get_params(self):
"""
Return all parameters for TAGS_DEFINITIONS.
Raise NoParameter exception if p_param_name parameters was not found.