data_read_proxy.py 4.58 KB
Newer Older
1 2 3 4
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved.

5 6 7 8 9 10
"""
Module defines single :class 'DataReadProxy'.

Author:
    Mateusz Kruszyński <mateusz.kruszynski@gmail.com>
"""
11 12 13 14 15 16 17 18 19
import os.path
import struct

import numpy as np

from . import signal_constants
from . import signal_exceptions
from . import signal_logging as logger

20
LOGGER = logger.get_logger('data_read_proxy', 'info')
21 22 23 24
SAMPLE_SIZES = signal_constants.SAMPLE_SIZES
SAMPLE_STRUCT_TYPES = signal_constants.SAMPLE_STRUCT_TYPES


25
class DataReadProxy:
26 27
    """Class 'DataReadProxy' reads data from file."""

28
    def __init__(self, p_file_path, sample_type='FLOAT'):
29 30 31 32 33 34
        """
        Initialize class and start reading from file.

        :param p_file_path: name of file containing data samples
        :param sample_type: type of data
        """
35 36 37 38 39 40
        self._file_path = p_file_path
        self._sample_size = SAMPLE_SIZES[sample_type]
        self._sample_struct_type = '<' + SAMPLE_STRUCT_TYPES[sample_type]
        self.start_reading()

    def start_reading(self):
41
        """Start reading from file."""
42 43 44
        try:
            self._data_file = open(self._file_path, 'rb')
        except IOError as e:
45 46
            LOGGER.error("An error occurred while opening the data file!")
            raise e
47 48

    def finish_reading(self):
49
        """Finish reading from file."""
50 51 52
        self._data_file.close()

    def get_all_values(self, p_channels_num=1):
53 54
        """Get all values from channel (:param. p_channel)."""
        assert (p_channels_num > 0)
55

56 57
        self.finish_reading()
        self.start_reading()
58

59 60
        f_len = float(os.path.getsize(self._file_path))
        ch_len = int((f_len / self._sample_size) / p_channels_num)
61

62
        if ch_len * self._sample_size * p_channels_num != f_len:
63 64
            LOGGER.info("Remained samples {} .Should be 0."
                        .format(f_len - ch_len * self._sample_size * p_channels_num))
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80

        vals = np.zeros((p_channels_num, ch_len))
        k = 0
        while True:
            try:
                r = np.fromfile(self._data_file, self._sample_struct_type, p_channels_num)
            except MemoryError:
                break
            if len(r) < p_channels_num:
                break
            for i in range(p_channels_num):
                vals[i, k] = r[i]
            k += 1
        return vals

    def get_next_value(self):
81 82 83 84 85
        """
        Return next value from data file (as python float).

        Close data file and raise NoNextValue exception if eof.
        """
86 87 88 89 90 91 92 93 94 95
        l_raw_data = self._data_file.read(self._sample_size)
        try:
            # TODO - by now it is assumed that error means eof..
            # What if it is not eof but eg. 4-chars
            # string from the end of a broken file?
            return struct.unpack(self._sample_struct_type, l_raw_data)[0]
        except struct.error:
            raise signal_exceptions.NoNextValue()

    def get_next_values(self, p_num):
96 97
        """
        Return next p_num values from data file (as numpy array).
98

99 100
        Close data file and raise NoNextValue exception if eof.
        """
101 102 103 104 105 106 107 108 109
        # Read data from file
        # LOGGER.debug("Before reading "+str(self._sample_size*p_num)+" samples.
        # CURRENT POSITION/8 = "+str(self._data_file.tell()/8))
        l_raw_data = self._data_file.read(self._sample_size * p_num)
        # LOGGER.debug("After read. CURRENT POSITION/8 = "+str(self._data_file.tell()/8))

        # Initialize return array
        if len(l_raw_data) == self._sample_size * p_num:
            # If all data required for return array is present
110
            return np.frombuffer(l_raw_data, dtype=self._sample_struct_type).astype('float64')
111 112 113 114
        else:
            # Either len(l_raw_data) is 0 and its ok -> EOF
            # or len(l_raw_data) > 0 and its last len(l_raw_data) data from the file.

115 116
            LOGGER.info("Remained sample of %s length (should be zero) as required length was %s. No next value.",
                        len(l_raw_data), self._sample_size * p_num)
117 118 119
            raise signal_exceptions.NoNextValue()

    def goto_value(self, p_value_no):
120
        """
121
        Set the engine, so that nex 'get_next_value' call will return value number p_value_no+1.
122

123
        Eg. if p_value_no == 0, calling get_next_value will return first value.
124 125
        if p_value_no == 11, calling get_next_value will return 12-th value.
        """
126 127 128
        # LOGGER.debug("DOING SEEK TO: "+str(p_value_no))
        self._data_file.seek(p_value_no * self._sample_size)
        # LOGGER.debug("DATA FILE SEEK DONE. CURRENT POSITION/8 = "+str(self._data_file.tell()/8))