Coverage for src / sdynpy / signal_processing / sdynpy_buffer.py: 18%
38 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 16:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 16:22 +0000
1# -*- coding: utf-8 -*-
2"""
3Created on Wed Jul 23 12:34:27 2025
5@author: dprohe
6"""
7import numpy as np
9class CircularBufferWithOverlap:
10 def __init__(self, buffer_size, block_size, overlap_size, dtype = 'float', data_shape = ()):
11 """
12 Initialize the circular buffer.
14 Parameters:
15 - buffer_size: Total size of the circular buffer.
16 - block_size: Number of samples written in each block.
17 - overlap_size: Number of samples from the previous block to include in the read.
18 """
19 self.buffer_size = buffer_size
20 self.block_size = block_size
21 self.overlap_size = overlap_size
22 self.buffer = np.zeros(tuple(data_shape) + (buffer_size,), dtype=dtype) # Initialize buffer with zeros
23 self.buffer_read = np.ones((buffer_size,),dtype=bool)
24 self.write_index = 0 # Index where the next block will be written
25 self.read_index = 0 # Index where the next block will be read from
27 def report_buffer_state(self):
28 read_samples = np.sum(self.buffer_read)
29 write_samples = self.buffer_size - read_samples
30 print(f'{read_samples} of {self.buffer_size} have been read')
31 print(f'{write_samples} of {self.buffer_size} have been written but not read')
33 def write_get_data(self,data, read_remaining = False):
34 """
35 Writes a block of data and then returns a block if available
37 Parameters:
38 - data: Array to write to the buffer.
39 """
40 self.write(data)
41 try:
42 return self.read(read_remaining)
43 except ValueError:
44 return None
46 def write(self, data):
47 """
48 Write a block of data to the circular buffer.
50 Parameters:
51 - data: Array to write to the buffer.
52 """
53 # Compute the end index for the write operation
54 indices = np.arange(self.write_index,self.write_index+data.shape[-1]+self.overlap_size) % self.buffer_size
56 if np.any(~self.buffer_read[indices]):
57 raise ValueError('Overwriting data on buffer that has not been read. Read data before writing again.')
59 self.buffer[...,indices[:None if self.overlap_size == 0 else -self.overlap_size]] = data
60 self.buffer_read[indices[:None if self.overlap_size == 0 else -self.overlap_size]] = False
62 # Update the write index
63 self.write_index = (self.write_index + data.shape[-1]) % self.buffer_size
65 # print(self.buffer)
66 # print(self.buffer_read)
68 def read(self, read_remaining = False):
69 indices = np.arange(self.read_index - self.overlap_size, self.read_index + self.block_size) % self.buffer_size
70 if read_remaining:
71 # Pick out just the indices that are ok to read
72 # print('Reading Remaining:')
73 # print(f"{indices.copy()=}")
74 indices = np.concatenate((indices[:self.overlap_size],indices[self.overlap_size:][~self.buffer_read[indices[self.overlap_size:]]]))
75 # print(f"{indices.copy()=}")
76 if np.any(self.buffer_read[indices[self.overlap_size:]]):
77 raise ValueError('Data would be read multiple times. Write data before reading again.')
78 return_data = self.buffer[...,indices]
79 self.buffer_read[indices[self.overlap_size:]] = True
80 self.read_index = (self.read_index + (return_data.shape[-1]-self.overlap_size)) % self.buffer_size
81 # print(self.buffer)
82 # print(self.buffer_read)
83 return return_data