Skip to content

Commit

Permalink
added an asynchronous event loop to try to fix TTC by offloading things
Browse files Browse the repository at this point in the history
  • Loading branch information
Frix-x committed Feb 9, 2025
1 parent d7e6d02 commit b8bf488
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 144 deletions.
4 changes: 2 additions & 2 deletions shaketune/commands/axes_map_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
SEGMENT_LENGTH = 30 # mm


def axes_map_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
async def axes_map_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
z_height = gcmd.get_float('Z_HEIGHT', default=20.0)
speed = gcmd.get_float('SPEED', default=80.0, minval=20.0)
accel = gcmd.get_int('ACCEL', default=1500, minval=100)
Expand Down Expand Up @@ -113,6 +113,6 @@ def axes_map_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
ConsoleOutput.print('This may take some time (1-3min)')
creator = st_process.get_graph_creator()
creator.configure(accel, SEGMENT_LENGTH)
measurements_manager.wait_for_data_transfers(printer.get_reactor())
await measurements_manager.wait_for_data_transfers()
st_process.run(measurements_manager)
st_process.wait_for_completion()
7 changes: 2 additions & 5 deletions shaketune/commands/axes_shaper_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from ..shaketune_process import ShakeTuneProcess


def axes_shaper_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
async def axes_shaper_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
printer = config.get_printer()
toolhead = printer.lookup_object('toolhead')
res_tester = printer.lookup_object('resonance_tester')
Expand Down Expand Up @@ -70,9 +70,6 @@ def axes_shaper_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
z = z_height
point = (x, y, z)

toolhead.manual_move(point, feedrate_travel)
toolhead.dwell(0.5)

# set the needed acceleration values for the test
toolhead_info = toolhead.get_status(systime)
old_accel = toolhead_info['max_accel']
Expand Down Expand Up @@ -121,7 +118,7 @@ def axes_shaper_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
# And finally generate the graph for each measured axis
ConsoleOutput.print(f'{config["axis"].upper()} axis frequency profile generation...')
ConsoleOutput.print('This may take some time (1-3min)')
measurements_manager.wait_for_data_transfers(printer.get_reactor())
await measurements_manager.wait_for_data_transfers()
st_process.get_graph_creator().configure(scv, max_sm, test_params, max_scale)
st_process.run(measurements_manager)
st_process.wait_for_completion()
Expand Down
7 changes: 2 additions & 5 deletions shaketune/commands/compare_belts_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ..shaketune_process import ShakeTuneProcess


def compare_belts_responses(gcmd, config, st_process: ShakeTuneProcess) -> None:
async def compare_belts_responses(gcmd, config, st_process: ShakeTuneProcess) -> None:
printer = config.get_printer()
toolhead = printer.lookup_object('toolhead')
res_tester = printer.lookup_object('resonance_tester')
Expand Down Expand Up @@ -83,9 +83,6 @@ def compare_belts_responses(gcmd, config, st_process: ShakeTuneProcess) -> None:
z = z_height
point = (x, y, z)

toolhead.manual_move(point, feedrate_travel)
toolhead.dwell(0.5)

# set the needed acceleration values for the test
toolhead_info = toolhead.get_status(systime)
old_accel = toolhead_info['max_accel']
Expand Down Expand Up @@ -134,7 +131,7 @@ def compare_belts_responses(gcmd, config, st_process: ShakeTuneProcess) -> None:
# Run post-processing
ConsoleOutput.print('Belts comparative frequency profile generation...')
ConsoleOutput.print('This may take some time (1-3min)')
measurements_manager.wait_for_data_transfers(printer.get_reactor())
await measurements_manager.wait_for_data_transfers()
st_process.get_graph_creator().configure(motors_config_parser.kinematics, test_params, max_scale)
st_process.run(measurements_manager)
st_process.wait_for_completion()
4 changes: 2 additions & 2 deletions shaketune/commands/create_vibrations_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
MIN_SPEED = 2 # mm/s


def create_vibrations_profile(gcmd, config, st_process: ShakeTuneProcess) -> None:
async def create_vibrations_profile(gcmd, config, st_process: ShakeTuneProcess) -> None:
size = gcmd.get_float('SIZE', default=100.0, minval=50.0)
z_height = gcmd.get_float('Z_HEIGHT', default=20.0)
max_speed = gcmd.get_float('MAX_SPEED', default=200.0, minval=10.0)
Expand Down Expand Up @@ -140,7 +140,7 @@ def create_vibrations_profile(gcmd, config, st_process: ShakeTuneProcess) -> Non
# For this command, we need to wait for the data transfers after finishing each of
# the measurements as there is a high probability to have a lot of measurements in
# the measurement manager and that chunks are written into the /tmp filesystem folder
measurements_manager.wait_for_data_transfers(printer.get_reactor())
await measurements_manager.wait_for_data_transfers()

toolhead.dwell(0.3)
toolhead.wait_moves()
Expand Down
4 changes: 2 additions & 2 deletions shaketune/commands/excitate_axis_at_freq.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ..shaketune_process import ShakeTuneProcess


def excitate_axis_at_freq(gcmd, config, st_process: ShakeTuneProcess) -> None:
async def excitate_axis_at_freq(gcmd, config, st_process: ShakeTuneProcess) -> None:
create_graph = gcmd.get_int('CREATE_GRAPH', default=0, minval=0, maxval=1) == 1
freq = gcmd.get_int('FREQUENCY', default=25, minval=1)
duration = gcmd.get_int('DURATION', default=30, minval=1)
Expand Down Expand Up @@ -108,6 +108,6 @@ def excitate_axis_at_freq(gcmd, config, st_process: ShakeTuneProcess) -> None:

creator = st_process.get_graph_creator()
creator.configure(freq, duration, accel_per_hz)
measurements_manager.wait_for_data_transfers(printer.get_reactor())
await measurements_manager.wait_for_data_transfers()
st_process.run(measurements_manager)
st_process.wait_for_completion()
147 changes: 63 additions & 84 deletions shaketune/helpers/accelerometer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
# compressed format (.stdata) or from the legacy Klipper CSV files.


import asyncio
import os
import pickle
import time
import uuid
from multiprocessing import Process
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import List, Tuple, TypedDict

Expand All @@ -27,8 +28,6 @@
Sample = Tuple[float, float, float, float]
SamplesList = List[Sample]

CHUNK_SIZE = 15 # Maximum number of measurements to keep in memory at once


class Measurement(TypedDict):
name: str
Expand All @@ -42,100 +41,92 @@ def __init__(self, chunk_size: int):
self._uuid = str(uuid.uuid4())[:8]
self._temp_dir = Path(f'/tmp/shaketune_{self._uuid}')
self._temp_dir.mkdir(parents=True, exist_ok=True)
# List of chunk filenames (each chunk is a .stchunk file)
self._chunk_files = []
self._write_processes = []
# List of async tasks that are currently saving chunks to disk
self._save_tasks = []
self.executor = ThreadPoolExecutor(max_workers=1)

def clear_measurements(self, keep_last: bool = False):
self.measurements = [self.measurements[-1]] if keep_last else []
def clear_measurements(self, keep_current: bool = False):
self.measurements = [self.measurements[-1]] if keep_current else []

def append_samples_to_last_measurement(self, additional_samples: SamplesList):
def append_samples_to_current_measurement(self, additional_samples: SamplesList):
try:
self.measurements[-1]['samples'].extend(additional_samples)
except IndexError as err:
raise ValueError('no measurements available to append samples to.') from err
raise ValueError('no measurements available to append samples to...') from err

def add_measurement(self, name: str, samples: SamplesList = None):
samples = samples if samples is not None else []
self.measurements.append({'name': name, 'samples': samples})
self.measurements.append({'name': name, 'samples': samples if samples is not None else []})
if len(self.measurements) > self._chunk_size:
self._save_chunk()

# Save the measurements to the chunk file. We keep the last measurement
# in memory to be able to append new samples to it later if needed
def _save_chunk(self):
# Save the measurements to the chunk file. We keep the last measurement
# in memory to be able to append new samples to it later if needed
# Create a copy of measurements to be saved to disk and schedule the save task asynchronously
chunk_measurements = self.measurements[:-1].copy()
chunk_filename = self._temp_dir / f'{self._uuid}_{len(self._chunk_files)}.stchunk'
process = Process(target=self._save_to_file, args=(chunk_filename, self.measurements[:-1].copy()))
process.daemon = False
process.start()
self._write_processes.append(process)
task = asyncio.create_task(self._async_save_to_file(chunk_filename, chunk_measurements))
self._save_tasks.append(task)
self._chunk_files.append(chunk_filename)
self.clear_measurements(keep_last=True)

# Asynchronously offload the saving of the chunk to the thread pool executor
async def _async_save_to_file(self, filename: Path, measurements: List[Measurement]):
loop = asyncio.get_running_loop()
await loop.run_in_executor(self.executor, self._save_to_file, filename, measurements)

# Blocking file save function, but started asynchronously
def _save_to_file(self, filename: Path, measurements: List[Measurement]):
try:
with open(filename, 'wb') as f:
cctx = zstd.ZstdCompressor(level=3)
with cctx.stream_writer(f) as compressor:
pickle.dump(measurements, compressor)
except Exception as e:
ConsoleOutput.print(f'Warning: unable to save the data to {filename}: {e}')

def save_stdata(self, filename: Path):
process = Process(target=self._reassemble_chunks, args=(filename,))
process.daemon = False
process.start()
self._write_processes.append(process)
task = asyncio.create_task(self._async_reassemble_chunks(filename))
self._save_tasks.append(task)

# Asynchronously offload the reassembly of the chunks to the thread pool executor
async def _async_reassemble_chunks(self, filename: Path):
loop = asyncio.get_running_loop()
await loop.run_in_executor(self.executor, self._reassemble_chunks, filename)

# Blocking file reassembly function, but started asynchronously
def _reassemble_chunks(self, filename: Path):
try:
os.nice(19)
except Exception:
pass # Ignore errors as it's not critical
try:
all_measurements = []
for chunk_file in self._chunk_files:
chunk_measurements = self._load_measurements_from_file(chunk_file)
all_measurements.extend(chunk_measurements)
os.remove(chunk_file) # Remove the chunk file after reading

# Include any remaining measurements in memory
if self.measurements:
all_measurements.extend(self.measurements)

# Save all measurements to the final .stdata file
# Save all measurements to the final .stdata file and clean up
self._save_to_file(filename, all_measurements)

# Clean up
self.clear_measurements()
self._chunk_files = []
except Exception as e:
ConsoleOutput.print(f'Warning: unable to assemble chunks into {filename}: {e}')

def _save_to_file(self, filename: Path, measurements: List[Measurement]):
try:
os.nice(19)
except Exception:
pass # Ignore errors as it's not critical
try:
with open(filename, 'wb') as f:
cctx = zstd.ZstdCompressor(level=3)
with cctx.stream_writer(f) as compressor:
pickle.dump(measurements, compressor)
except Exception as e:
ConsoleOutput.print(f'Warning: unable to save the data to {filename}: {e}')

def wait_for_data_transfers(self, k_reactor, timeout: int = 30):
if not self._write_processes:
# Wait for all scheduled asynchronous file writes to complete
async def wait_for_data_transfers(self, timeout: int = 30):
if not self._save_tasks:
return # No file write is pending

eventtime = k_reactor.monotonic()
endtime = eventtime + timeout
complete = False

while eventtime < endtime:
eventtime = k_reactor.pause(eventtime + 0.05)
if all(not p.is_alive() for p in self._write_processes):
complete = True
break

if not complete:
try:
await asyncio.wait_for(asyncio.gather(*self._save_tasks), timeout)
except asyncio.TimeoutError as err:
raise TimeoutError(
'Shake&Tune was unable to write the accelerometer data on the filesystem. '
'Shake&Tune was unable to write the accelerometer data on the filesystem within timeout. '
'This might be due to a slow, busy or full SD card.'
)

self._write_processes = []
) from err
finally:
self._save_tasks.clear()

def _load_measurements_from_file(self, filename: Path) -> List[Measurement]:
try:
Expand All @@ -154,7 +145,6 @@ def get_measurements(self) -> List[Measurement]:
chunk_measurements = self._load_measurements_from_file(chunk_file)
all_measurements.extend(chunk_measurements)
all_measurements.extend(self.measurements) # Include any remaining measurements in memory

return all_measurements

def load_from_stdata(self, filename: Path) -> List[Measurement]:
Expand Down Expand Up @@ -197,14 +187,12 @@ def load_from_csvs(self, klipper_CSVs: List[Path]) -> List[Measurement]:
'It will be ignored by Shake&Tune!'
)
continue

# Add the parsed klipper raw accelerometer data to Shake&Tune measurements object
samples = [tuple(row) for row in data]
self.add_measurement(name=logname.stem, samples=samples)
except Exception as err:
ConsoleOutput.print(f'Error while reading {logname}: {err}. It will be ignored by Shake&Tune!')
continue

return self.measurements

def __del__(self):
Expand Down Expand Up @@ -237,39 +225,33 @@ def find_axis_accelerometer(printer, axis: str = 'xy'):
return None

def start_recording(self, measurements_manager: MeasurementsManager, name: str = None, append_time: bool = True):
if self._bg_client is None:
self._bg_client = self._k_accelerometer.start_internal_client()

timestamp = time.strftime('%Y%m%d_%H%M%S')
if name is None:
name = timestamp
elif append_time:
name += f'_{timestamp}'

if not name.replace('-', '').replace('_', '').isalnum():
raise ValueError('invalid measurement name!')

self._measurements_manager = measurements_manager
self._measurements_manager.add_measurement(name=name)
else:
if self._bg_client is not None:
raise ValueError('Recording already started!')
self._bg_client = self._k_accelerometer.start_internal_client()
timestamp = time.strftime('%Y%m%d_%H%M%S')
if name is None:
name = timestamp
elif append_time:
name += f'_{timestamp}'
if not name.replace('-', '').replace('_', '').isalnum():
raise ValueError('invalid measurement name!')
self._measurements_manager = measurements_manager
self._measurements_manager.add_measurement(name=name)

def stop_recording(self) -> MeasurementsManager:
if self._bg_client is None:
ConsoleOutput.print('Warning: no recording to stop!')
return None

# Register a callback in Klipper's reactor to finish the measurements and get the
# samples when Klipper is ready to process them (and without blocking its main thread)
self._k_reactor.register_callback(self._finish_and_get_samples)

return self._measurements_manager

def _finish_and_get_samples(self, bg_client):
try:
self._bg_client.finish_measurements()
samples = self._bg_client.samples or self._bg_client.get_samples()
self._measurements_manager.append_samples_to_last_measurement(samples)
self._measurements_manager.append_samples_to_current_measurement(samples)
self._samples_ready = True
except Exception as e:
ConsoleOutput.print(f'Error during accelerometer data retrieval: {e}')
Expand All @@ -280,18 +262,15 @@ def _finish_and_get_samples(self, bg_client):
def wait_for_samples(self, timeout: int = 60):
eventtime = self._k_reactor.monotonic()
endtime = eventtime + timeout

while eventtime < endtime:
eventtime = self._k_reactor.pause(eventtime + 0.05)
if self._samples_ready:
break
if self._sample_error:
raise self._sample_error

if not self._samples_ready:
raise TimeoutError(
'Shake&Tune was unable to retrieve accelerometer data in time. '
'This might be due to slow hardware or a busy system.'
)

self._samples_ready = False
Loading

0 comments on commit b8bf488

Please sign in to comment.