Commit e631606c authored by Jerzy Grynczewski's avatar Jerzy Grynczewski

adding buffers

refs #30431
parent a028e2fc
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import collections
import numpy
from obci.configs.variables_pb2 import Blink as BlinkProto
from obci.signal_processing.buffers import ring_buffer as rbn
Sample = numpy.zeros
class Blink(object):
def __init__(self, blink: BlinkProto, blink_count: int, blink_pos: int) -> None:
"""
BlinkProtobuf variable as a representation of the stimulus.
:param blink: protobuf variable Blink (object with one field named timestamp)
:param blink_count: index of the stimulus
:param blink_pos: position of the stimulus in relation to the indexes of the samples in the buffer.
"""
self.blink = blink
self.count = blink_count
self.position = blink_pos
class AutoBlinkBuffer(object):
def __init__(self, from_blink, samples_count, num_of_channels, sampling, ret_func, copy_on_ret):
"""
After occurrence of the blink and getting a sufficient number of samples calls ret_func(blink, d),
where blink is a protobuf Blink variable and d is a corresponding numpy array with
<samples_count> samples from <num_of_channels> channels.
:param from_blink: blink from which start collecting samples
:param samples_count: number of samples to return
:param num_of_channels: number of channels to return
:param sampling: sampling frequency
:param ret_func: function to call when return. Takes returned samples.
:param copy_on_ret: if true make deep copy of the buffer while processing
"""
assert(samples_count > 0)
assert(num_of_channels > 0)
self.ret_func = ret_func
self.ret_buf_len = samples_count # 1024
self.blink_from = from_blink # 128
self.sampling = sampling # float(1024)
self.curr_blink = None
self.curr_blink_ts = None
self.count = 0
self.blinks_count = 0
self.is_full = False
self.whole_buf_len = (self.ret_buf_len + abs(self.blink_from)) * 2
self.buffer = rbn.RingBuffer(self.whole_buf_len, num_of_channels,
copy_on_ret)
self.times = rbn.RingBuffer(self.whole_buf_len,
1, copy_on_ret)
self.times_sample = Sample(1)
self.blinks = collections.deque()
def clear(self):
self.count = 0
self.is_full = False
self.buffer.clear()
self.times.clear()
self.clear_blinks()
def clear_blinks(self):
self.blinks_count = 0
self.blinks.clear()
def handle_blink(self, blink: BlinkProto) -> None:
"""
Assignes index and position to the Blink and queues this Blink.
:param blink: protobuf variable Blink (object with one field named timestamp)
:return:
"""
if not self.is_full:
print("AutoBlinkBuffer - Got blink before buffer is full. Ignore!")
return
blink_ts = blink.timestamp + (self.blink_from / self.sampling)
blink_pos = self._get_times_index(blink.timestamp)
if blink_pos < 0:
return
elif blink_pos >= self.whole_buf_len:
# nie ma jeszcze nawet pierwszej probki
blink_count = self.ret_buf_len + int(
self.sampling * (blink_ts - self._get_times_last()))
blink_pos = self.whole_buf_len - self.ret_buf_len
else:
# gdzies w srodku
blink_count = self.ret_buf_len - (self.whole_buf_len - blink_pos)
if blink_count < 0:
blink_count = 0
blink_pos -= blink_count
if not len(self.blinks) == 0:
blink_count -= self.blinks_count # last.count #get_last_blink()
b = Blink(blink, blink_count, blink_pos)
self.blinks.append(b)
self.blinks_count += blink_count
# [10, 15, 18, 22]
def handle_sample_packet(self, sample_packet):
"""
Supports SamplePacket in a manner consistent with AutoBlinkBuffer.
:param sample_packet: signal as a SamplePacket class.
:return: None
"""
for idx in range(len(sample_packet.ts)):
self._handle_sample(sample_packet.samples[idx], sample_packet.ts[idx])
def _handle_sample(self, s, t):
self.buffer.add(s)
self.times_sample[0] = t
self.times.add(self.times_sample)
self.count += 1
if not self.is_full:
self.is_full = (self.count == self.whole_buf_len)
else:
if not len(self.blinks) == 0:
curr = self.blinks[0]
curr.count -= 1
self.blinks_count -= 1
if curr.count <= 0:
curr = self.blinks.popleft()
d = self.buffer.get(curr.position, self.ret_buf_len)
self.ret_func(curr.blink, d)
def _get_times_index(self, value):
if self.is_full:
last = self.whole_buf_len
else:
last = self.count
vect = self.times.get(0, last)[0]
ret = -1
for i, v in enumerate(vect):
if value < v:
return ret
ret = i
return self.whole_buf_len
def _get_times_last(self):
if self.is_full:
last = self.whole_buf_len
else:
last = self.count
return self.times.get(0, last)[0][last - 1]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from obci.signal_processing.buffers import ring_buffer as rbn
class AutoRingBuffer(object):
def __init__(self, from_sample, samples_count, every, num_of_channels, ret_func, copy_on_ret):
"""
When full, calls ret_func(d), where d is numpy array with <samples_count> samples from
<num_of_channels> channels. Between following returns skips <every> samples.
:param from_sample: size of the buffer
:param samples_count: number of samples to return
:param every: interval (number of samples to skip) before next return
:param num_of_channels: number of channels in the returned signal
:param ret_func: function to call when return. Takes returned samples.
:param copy_on_ret: if true make deep copy of the buffer while processing
"""
assert(samples_count > 0)
assert(from_sample > 0)
assert(every > 0)
assert(num_of_channels > 0)
self.every = every
self.ret_func = ret_func
self.whole_buf_len = from_sample
self.ret_buf_len = samples_count
self.count = 0
self.is_full = False
self.buffer = rbn.RingBuffer(from_sample,
num_of_channels,
copy_on_ret)
def clear(self):
self.count = 0
self.is_full = False
self.buffer.clear()
def handle_sample_packet(self, sample_packet):
"""
Supports packets in a manner consistent with AutoBlinkBuffer
:param sample_packet: signal as SamplePackage class
:return: None
"""
for idx in range(len(sample_packet.ts)):
self._handle_sample(sample_packet.samples[idx])
def _handle_sample(self, s):
self.buffer.add(s)
self.count += 1
if not self.is_full:
if self.count == self.whole_buf_len:
self.is_full = True
self.count %= self.every
if self.count == 0:
self.count = self.every
if self.is_full and self.count == self.every:
d = self.buffer.get(0, self.ret_buf_len)
self.ret_func(d)
self.count = 0
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy
from obci.signal_processing.buffers import ring_buffer_base
class RingBuffer(ring_buffer_base.RingBufferBase):
def _get_normal(self, start, end):
return self.buffer[:, start:end]
def _get_concat(self, start, end):
return numpy.concatenate((self.buffer[:, start:],
self.buffer[:, :end]),
axis=1)
def _add(self, s):
for i in range(self.number_of_channels):
self.buffer[i, self.index] = s[i]
def _init_buffer(self):
self.buffer = numpy.zeros((self.number_of_channels,
self.size), dtype='float')
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import copy
class RingBufferBase(object):
def __init__(self, size, number_of_channels, copy_on_ret):
self.size = int(size)
self.number_of_channels = int(number_of_channels)
self.copy_on_ret = bool(copy_on_ret)
self.clear()
def clear(self):
self.is_full = False
self.index = 0
self._init_buffer()
def add(self, s):
self._add(s)
if not self.is_full and self.index == self.size - 1:
self.is_full = True
self.index = (self.index + 1) % self.size
def get(self, start, length):
if not self.is_full:
d = self._get_normal(start, start + length)
else:
if self.index + start + length <= self.size:
d = self._get_normal(self.index + start, self.index + start + length)
elif self.index + start >= self.size:
ind = (self.index + start) % self.size
d = self._get_normal(ind, ind + length)
else:
d = self._get_concat(self.index + start,
length - (self.size - (self.index + start)))
if self.copy_on_ret:
return copy.deepcopy(d)
else:
return d
def _get_normal(self, start, end):
raise Exception("To be implemented!")
def _get_concat(self, start, end):
raise Exception("To be implemented!")
def _add(self, s):
raise Exception("To be implemented!")
def _init_buffer(self):
raise Exception("To be implemented!")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment