diff --git a/kai/reactive_codeplanner/task_manager/task_manager.py b/kai/reactive_codeplanner/task_manager/task_manager.py index 28fe5032..5c29df1c 100644 --- a/kai/reactive_codeplanner/task_manager/task_manager.py +++ b/kai/reactive_codeplanner/task_manager/task_manager.py @@ -82,11 +82,6 @@ def set_seed_tasks(self, *tasks: Task) -> None: # Now add them back to the queue so they aren't mistakenly detected as children self.priority_queue.push(task) - msg = "
Got some pending tasks" - msg += f"" - msg += "
" - chatter.get().chat_markdown(msg) - for task in tasks: task.priority = 0 self.priority_queue.push(task) @@ -97,10 +92,6 @@ def execute_task(self, task: Task) -> TaskResult: agent = self.get_agent_for_task(task) logger.info("Agent selected for task: %s", agent) - chatter.get().chat_simple( - f"Using agent {str(agent.__class__.__name__)} to execute task {str(task)}" - ) - result: TaskResult try: result = agent.execute_task(self.rcm, task) @@ -192,14 +183,6 @@ def run_validator( "Validator %s found errors: %s", validator, result.errors ) - msg = "
" - msg += f"Found {len(result.errors)} tasks from validator {str(validator)}" - msg += "
" - chatter.get().chat_markdown(msg) - except Exception: logger.exception( "Exception occurred while processing validator %s:", validator @@ -246,7 +229,7 @@ def get_next_task( if max_depth == 0: self.handle_depth_0_task_after_processing(task) else: - self.handle_new_tasks_after_processing(task) + self.handle_new_tasks_after_processing(task, max_depth) def initialize_priority_queue(self) -> None: logger.info("Initializing task stacks.") @@ -264,9 +247,12 @@ def handle_depth_0_task_after_processing(self, task: Task) -> None: logger.info( "Handling depth 0 task, assuming fix has applied for task: %s", task ) + chatter.get().chat_simple(f"completed task: {task}") self.priority_queue.remove(task) - def handle_new_tasks_after_processing(self, task: Task) -> None: + def handle_new_tasks_after_processing( + self, task: Task, max_depth: Optional[int] + ) -> None: logger.info("Handling new tasks after processing task: %s", task) self._validators_are_stale = True unprocessed_new_tasks = set(self.run_validators()) @@ -296,6 +282,9 @@ def handle_new_tasks_after_processing(self, task: Task) -> None: logger.info( "Task %s resolved indirectly and removed from queue.", resolved_task ) + chatter.get().chat_simple( + f"Resolved task {resolved_task} indirectly while fixing {task}." + ) # Check if the current task is still unprocessed (or similar) similar_tasks = [ @@ -305,11 +294,11 @@ def handle_new_tasks_after_processing(self, task: Task) -> None: if similar_tasks: for t in similar_tasks: unprocessed_new_tasks.remove(t) - logger.debug("Task %s still unsolved after execution.", task) self.handle_ignored_task(task) else: self.processed_tasks.add(task) logger.debug("Task %s processed successfully.", task) + chatter.get().chat_simple(f"Resolved task {task}.") new_child_tasks = unprocessed_new_tasks - tasks_in_queue # We want the higher priority things at the end of the list, so when we append and pop we get the highest priority @@ -321,9 +310,21 @@ def handle_new_tasks_after_processing(self, task: Task) -> None: ) child_task.parent = task child_task.depth = task.depth + 1 + + # On retry, we need to see if these child tasks exist. + if child_task in task.children: + continue + for child in task.children: + if self.is_similar_to_task(child, child_task): + continue + task.children.append(child_task) if not self.should_skip_task(child_task): self.priority_queue.push(child_task) + if max_depth is not None and child_task.depth <= max_depth: + chatter.get().chat_simple( + f"Found new task {child_task} to solve while fixing task {task}." + ) except ValueError: logger.exception("Error adding child task") @@ -370,11 +371,17 @@ def handle_ignored_task(self, task: Task) -> None: task.retry_count, ) self.priority_queue.push(task) + chatter.get().chat_simple( + f"Task {task} was not resolved. Retrying... ({task.retry_count}/{task.max_retries})" + ) else: self.ignored_tasks.append(task) logger.warning( "Task %s exceeded max retries and added to ignored tasks.", task ) + chatter.get().chat_simple( + f"Task {task} was not resolved. Ignoring... ({task.retry_count}/{task.max_retries})" + ) def stop(self) -> None: logger.info("Stopping TaskManager.") diff --git a/kai/rpc_server/server.py b/kai/rpc_server/server.py index e84989d5..7153194a 100644 --- a/kai/rpc_server/server.py +++ b/kai/rpc_server/server.py @@ -551,8 +551,6 @@ def get_codeplan_agent_solution( validation_error.incidents = [] for i in violation_incidents: - if i.line_number < 0: - continue validation_error.incidents.append(Incident(**i.model_dump())) if len(validation_error.incidents) > 0: @@ -588,9 +586,6 @@ def get_codeplan_agent_solution( chatter.get().chat_simple(f"Executing task {task.__class__.__name__}.") - pre_task_solved_tasks = app.task_manager.processed_tasks - pre_task_ignored_tasks = set(app.task_manager.ignored_tasks) - result = app.task_manager.execute_task(task) app.log.debug(f"Task {task.__class__.__name__}, result: {result}") @@ -610,48 +605,6 @@ def get_codeplan_agent_solution( app.log.debug(result) - if seed_tasks: - # If we have seed tasks, we are fixing a set of issues, - # Lets only focus on this when showing the queue. - all_tasks = app.task_manager.priority_queue.all_tasks() - tasks = app.task_manager.priority_queue.task_stacks.get(0) - if tasks is not None: - app.log.debug("QUEUE_STATE_SEED_TASK: START") - for task in tasks: - lines = app.task_manager.priority_queue._stringify_tasks( - task, 0, set(), all_tasks - ) - for line in lines: - app.log.debug(f"QUEUE_STATE_SEED_TASK: {line}") - app.log.debug("QUEUE_STATE_SEED_TASK: END") - app.log.debug("QUEUE_STATE_SEED_TASKS: SUCCESSFUL_TASKS: START") - for task in app.task_manager.processed_tasks - pre_task_solved_tasks: - app.log.debug(f"QUEUE_STATE_SEED_TASKS: SUCCESSFUL_TASKS: {task}") - app.log.debug("QUEUE_STATE_SEED_TASKS: SUCCESSFUL_TASKS: END") - app.log.debug("QUEUE_STATE_SEED_TASKS: IGNORED_TASKS: START") - for task in ( - set(app.task_manager.ignored_tasks) - pre_task_ignored_tasks - ): - app.log.debug(f"QUEUE_STATE_SEED_TASKS: SUCCESSFUL_TASKS: {task}") - app.log.debug("QUEUE_STATE_SEED_TASKS: IGNORED_TASKS: END") - else: - app.log.debug("QUEUE_STATE: START") - try: - queue_state = str(app.task_manager.priority_queue) - for line in queue_state.splitlines(): - app.log.debug(f"QUEUE_STATE: {line}") - except Exception as e: - app.log.error(f"QUEUE_STATE: {e}") - app.log.debug("QUEUE_STATE: END") - app.log.debug("QUEUE_STATE: SUCCESSFUL_TASKS: START") - for task in app.task_manager.processed_tasks: - app.log.debug(f"QUEUE_STATE: SUCCESSFUL_TASKS: {task}") - app.log.debug("QUEUE_STATE: SUCCESSFUL_TASKS: END") - app.log.debug("QUEUE_STATE: IGNORED_TASKS: START") - for task in app.task_manager.ignored_tasks: - app.log.debug(f"QUEUE_STATE: IGNORED_TASKS: {task}") - app.log.debug("QUEUE_STATE: IGNORED_TASKS: END") - # after we have completed all the tasks, we should show what has been # accomplished for this particular solution app.log.debug("QUEUE_STATE_END_OF_CODE_PLAN: SUCCESSFUL TASKS: START")