Coverage for src / sdynpy / signal_processing / sdynpy_buffer.py: 18%

38 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-11 16:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Created on Wed Jul 23 12:34:27 2025 

4 

5@author: dprohe 

6""" 

7import numpy as np 

8 

9class CircularBufferWithOverlap: 

10 def __init__(self, buffer_size, block_size, overlap_size, dtype = 'float', data_shape = ()): 

11 """ 

12 Initialize the circular buffer. 

13  

14 Parameters: 

15 - buffer_size: Total size of the circular buffer. 

16 - block_size: Number of samples written in each block. 

17 - overlap_size: Number of samples from the previous block to include in the read. 

18 """ 

19 self.buffer_size = buffer_size 

20 self.block_size = block_size 

21 self.overlap_size = overlap_size 

22 self.buffer = np.zeros(tuple(data_shape) + (buffer_size,), dtype=dtype) # Initialize buffer with zeros 

23 self.buffer_read = np.ones((buffer_size,),dtype=bool) 

24 self.write_index = 0 # Index where the next block will be written 

25 self.read_index = 0 # Index where the next block will be read from 

26 

27 def report_buffer_state(self): 

28 read_samples = np.sum(self.buffer_read) 

29 write_samples = self.buffer_size - read_samples 

30 print(f'{read_samples} of {self.buffer_size} have been read') 

31 print(f'{write_samples} of {self.buffer_size} have been written but not read') 

32 

33 def write_get_data(self,data, read_remaining = False): 

34 """ 

35 Writes a block of data and then returns a block if available 

36 

37 Parameters: 

38 - data: Array to write to the buffer. 

39 """ 

40 self.write(data) 

41 try: 

42 return self.read(read_remaining) 

43 except ValueError: 

44 return None 

45 

46 def write(self, data): 

47 """ 

48 Write a block of data to the circular buffer. 

49  

50 Parameters: 

51 - data: Array to write to the buffer. 

52 """ 

53 # Compute the end index for the write operation 

54 indices = np.arange(self.write_index,self.write_index+data.shape[-1]+self.overlap_size) % self.buffer_size 

55 

56 if np.any(~self.buffer_read[indices]): 

57 raise ValueError('Overwriting data on buffer that has not been read. Read data before writing again.') 

58 

59 self.buffer[...,indices[:None if self.overlap_size == 0 else -self.overlap_size]] = data 

60 self.buffer_read[indices[:None if self.overlap_size == 0 else -self.overlap_size]] = False 

61 

62 # Update the write index 

63 self.write_index = (self.write_index + data.shape[-1]) % self.buffer_size 

64 

65 # print(self.buffer) 

66 # print(self.buffer_read) 

67 

68 def read(self, read_remaining = False): 

69 indices = np.arange(self.read_index - self.overlap_size, self.read_index + self.block_size) % self.buffer_size 

70 if read_remaining: 

71 # Pick out just the indices that are ok to read 

72 # print('Reading Remaining:') 

73 # print(f"{indices.copy()=}") 

74 indices = np.concatenate((indices[:self.overlap_size],indices[self.overlap_size:][~self.buffer_read[indices[self.overlap_size:]]])) 

75 # print(f"{indices.copy()=}") 

76 if np.any(self.buffer_read[indices[self.overlap_size:]]): 

77 raise ValueError('Data would be read multiple times. Write data before reading again.') 

78 return_data = self.buffer[...,indices] 

79 self.buffer_read[indices[self.overlap_size:]] = True 

80 self.read_index = (self.read_index + (return_data.shape[-1]-self.overlap_size)) % self.buffer_size 

81 # print(self.buffer) 

82 # print(self.buffer_read) 

83 return return_data