| import os |
| import time |
| from typing import Dict, Any |
| import subprocess |
| from flow_modules.Tachi67.InterpreterFlowModule import InterpreterAtomicFlow |
|
|
|
|
| class ExecuteCodeAtomicFlow(InterpreterAtomicFlow): |
| def _prepare_code(self, input_data: Dict[str, Any]): |
| file_location = input_data["temp_code_file_location"] |
| start_marker = "# Code:\n" |
| end_marker = "############" |
| code_started = False |
| code_str = "" |
| with open(file_location, 'r') as file: |
| for line in file: |
| if line.strip() == start_marker.strip(): |
| code_started = True |
| continue |
| if line.strip() == end_marker.strip(): |
| break |
| if code_started: |
| code_str += line |
| input_data["code"] = code_str |
|
|
| def _check_input(self, input_data: Dict[str, Any]): |
| assert "temp_code_file_location" in input_data, "temp_code_file_location not passed to ExecuteCodeAtomicFlow" |
| assert "language" in input_data, "language not passed to ExecuteCodeAtomicFlow" |
|
|
| def _delete_file(self, file_location): |
| if os.path.exists(file_location): |
| os.remove(file_location) |
|
|
| def _open_file_and_wait_for_upd(self, file_location): |
| process = subprocess.Popen(["code", "--wait", file_location]) |
| while True: |
| if process.poll() is not None: |
| break |
| time.sleep(1) |
|
|
| def run( |
| self, |
| input_data: Dict[str, Any]): |
| self._check_input(input_data) |
| file_loc = input_data["temp_code_file_location"] |
| self._open_file_and_wait_for_upd(file_loc) |
| self._prepare_code(input_data) |
| self._process_input_data(input_data) |
| execution_output = self._call() |
| self._delete_file(file_loc) |
| response = {"interpreter_output": execution_output, "code_ran": input_data['code']} |
| return response |
|
|