| import json |
| import logging |
|
|
| from flask import abort, request |
| from flask_restful import Resource, marshal_with, reqparse |
| from werkzeug.exceptions import Forbidden, InternalServerError, NotFound |
|
|
| import services |
| from controllers.console import api |
| from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync |
| from controllers.console.app.wraps import get_app_model |
| from controllers.console.wraps import account_initialization_required, setup_required |
| from core.app.apps.base_app_queue_manager import AppQueueManager |
| from core.app.entities.app_invoke_entities import InvokeFrom |
| from factories import variable_factory |
| from fields.workflow_fields import workflow_fields |
| from fields.workflow_run_fields import workflow_run_node_execution_fields |
| from libs import helper |
| from libs.helper import TimestampField, uuid_value |
| from libs.login import current_user, login_required |
| from models import App |
| from models.model import AppMode |
| from services.app_dsl_service import AppDslService |
| from services.app_generate_service import AppGenerateService |
| from services.errors.app import WorkflowHashNotEqualError |
| from services.workflow_service import WorkflowService |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class DraftWorkflowApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) |
| @marshal_with(workflow_fields) |
| def get(self, app_model: App): |
| """ |
| Get draft workflow |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| |
| workflow_service = WorkflowService() |
| workflow = workflow_service.get_draft_workflow(app_model=app_model) |
|
|
| if not workflow: |
| raise DraftWorkflowNotExist() |
|
|
| |
| return workflow |
|
|
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) |
| def post(self, app_model: App): |
| """ |
| Sync draft workflow |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| content_type = request.headers.get("Content-Type", "") |
|
|
| if "application/json" in content_type: |
| parser = reqparse.RequestParser() |
| parser.add_argument("graph", type=dict, required=True, nullable=False, location="json") |
| parser.add_argument("features", type=dict, required=True, nullable=False, location="json") |
| parser.add_argument("hash", type=str, required=False, location="json") |
| |
| parser.add_argument("environment_variables", type=list, required=False, location="json") |
| parser.add_argument("conversation_variables", type=list, required=False, location="json") |
| args = parser.parse_args() |
| elif "text/plain" in content_type: |
| try: |
| data = json.loads(request.data.decode("utf-8")) |
| if "graph" not in data or "features" not in data: |
| raise ValueError("graph or features not found in data") |
|
|
| if not isinstance(data.get("graph"), dict) or not isinstance(data.get("features"), dict): |
| raise ValueError("graph or features is not a dict") |
|
|
| args = { |
| "graph": data.get("graph"), |
| "features": data.get("features"), |
| "hash": data.get("hash"), |
| "environment_variables": data.get("environment_variables"), |
| "conversation_variables": data.get("conversation_variables"), |
| } |
| except json.JSONDecodeError: |
| return {"message": "Invalid JSON data"}, 400 |
| else: |
| abort(415) |
|
|
| workflow_service = WorkflowService() |
|
|
| try: |
| environment_variables_list = args.get("environment_variables") or [] |
| environment_variables = [ |
| variable_factory.build_variable_from_mapping(obj) for obj in environment_variables_list |
| ] |
| conversation_variables_list = args.get("conversation_variables") or [] |
| conversation_variables = [ |
| variable_factory.build_variable_from_mapping(obj) for obj in conversation_variables_list |
| ] |
| workflow = workflow_service.sync_draft_workflow( |
| app_model=app_model, |
| graph=args["graph"], |
| features=args["features"], |
| unique_hash=args.get("hash"), |
| account=current_user, |
| environment_variables=environment_variables, |
| conversation_variables=conversation_variables, |
| ) |
| except WorkflowHashNotEqualError: |
| raise DraftWorkflowNotSync() |
|
|
| return { |
| "result": "success", |
| "hash": workflow.unique_hash, |
| "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at), |
| } |
|
|
|
|
| class DraftWorkflowImportApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) |
| @marshal_with(workflow_fields) |
| def post(self, app_model: App): |
| """ |
| Import draft workflow |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| parser = reqparse.RequestParser() |
| parser.add_argument("data", type=str, required=True, nullable=False, location="json") |
| args = parser.parse_args() |
|
|
| workflow = AppDslService.import_and_overwrite_workflow( |
| app_model=app_model, data=args["data"], account=current_user |
| ) |
|
|
| return workflow |
|
|
|
|
| class AdvancedChatDraftWorkflowRunApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT]) |
| def post(self, app_model: App): |
| """ |
| Run draft workflow |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| parser = reqparse.RequestParser() |
| parser.add_argument("inputs", type=dict, location="json") |
| parser.add_argument("query", type=str, required=True, location="json", default="") |
| parser.add_argument("files", type=list, location="json") |
| parser.add_argument("conversation_id", type=uuid_value, location="json") |
| parser.add_argument("parent_message_id", type=uuid_value, required=False, location="json") |
|
|
| args = parser.parse_args() |
|
|
| try: |
| response = AppGenerateService.generate( |
| app_model=app_model, user=current_user, args=args, invoke_from=InvokeFrom.DEBUGGER, streaming=True |
| ) |
|
|
| return helper.compact_generate_response(response) |
| except services.errors.conversation.ConversationNotExistsError: |
| raise NotFound("Conversation Not Exists.") |
| except services.errors.conversation.ConversationCompletedError: |
| raise ConversationCompletedError() |
| except ValueError as e: |
| raise e |
| except Exception as e: |
| logging.exception("internal server error.") |
| raise InternalServerError() |
|
|
|
|
| class AdvancedChatDraftRunIterationNodeApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT]) |
| def post(self, app_model: App, node_id: str): |
| """ |
| Run draft workflow iteration node |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| parser = reqparse.RequestParser() |
| parser.add_argument("inputs", type=dict, location="json") |
| args = parser.parse_args() |
|
|
| try: |
| response = AppGenerateService.generate_single_iteration( |
| app_model=app_model, user=current_user, node_id=node_id, args=args, streaming=True |
| ) |
|
|
| return helper.compact_generate_response(response) |
| except services.errors.conversation.ConversationNotExistsError: |
| raise NotFound("Conversation Not Exists.") |
| except services.errors.conversation.ConversationCompletedError: |
| raise ConversationCompletedError() |
| except ValueError as e: |
| raise e |
| except Exception as e: |
| logging.exception("internal server error.") |
| raise InternalServerError() |
|
|
|
|
| class WorkflowDraftRunIterationNodeApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.WORKFLOW]) |
| def post(self, app_model: App, node_id: str): |
| """ |
| Run draft workflow iteration node |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| parser = reqparse.RequestParser() |
| parser.add_argument("inputs", type=dict, location="json") |
| args = parser.parse_args() |
|
|
| try: |
| response = AppGenerateService.generate_single_iteration( |
| app_model=app_model, user=current_user, node_id=node_id, args=args, streaming=True |
| ) |
|
|
| return helper.compact_generate_response(response) |
| except services.errors.conversation.ConversationNotExistsError: |
| raise NotFound("Conversation Not Exists.") |
| except services.errors.conversation.ConversationCompletedError: |
| raise ConversationCompletedError() |
| except ValueError as e: |
| raise e |
| except Exception as e: |
| logging.exception("internal server error.") |
| raise InternalServerError() |
|
|
|
|
| class DraftWorkflowRunApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.WORKFLOW]) |
| def post(self, app_model: App): |
| """ |
| Run draft workflow |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| parser = reqparse.RequestParser() |
| parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json") |
| parser.add_argument("files", type=list, required=False, location="json") |
| args = parser.parse_args() |
|
|
| response = AppGenerateService.generate( |
| app_model=app_model, |
| user=current_user, |
| args=args, |
| invoke_from=InvokeFrom.DEBUGGER, |
| streaming=True, |
| ) |
|
|
| return helper.compact_generate_response(response) |
|
|
|
|
| class WorkflowTaskStopApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) |
| def post(self, app_model: App, task_id: str): |
| """ |
| Stop workflow task |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id) |
|
|
| return {"result": "success"} |
|
|
|
|
| class DraftWorkflowNodeRunApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) |
| @marshal_with(workflow_run_node_execution_fields) |
| def post(self, app_model: App, node_id: str): |
| """ |
| Run draft workflow node |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| parser = reqparse.RequestParser() |
| parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json") |
| args = parser.parse_args() |
|
|
| workflow_service = WorkflowService() |
| workflow_node_execution = workflow_service.run_draft_workflow_node( |
| app_model=app_model, node_id=node_id, user_inputs=args.get("inputs"), account=current_user |
| ) |
|
|
| return workflow_node_execution |
|
|
|
|
| class PublishedWorkflowApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) |
| @marshal_with(workflow_fields) |
| def get(self, app_model: App): |
| """ |
| Get published workflow |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| |
| workflow_service = WorkflowService() |
| workflow = workflow_service.get_published_workflow(app_model=app_model) |
|
|
| |
| return workflow |
|
|
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) |
| def post(self, app_model: App): |
| """ |
| Publish workflow |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| workflow_service = WorkflowService() |
| workflow = workflow_service.publish_workflow(app_model=app_model, account=current_user) |
|
|
| return {"result": "success", "created_at": TimestampField().format(workflow.created_at)} |
|
|
|
|
| class DefaultBlockConfigsApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) |
| def get(self, app_model: App): |
| """ |
| Get default block config |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| |
| workflow_service = WorkflowService() |
| return workflow_service.get_default_block_configs() |
|
|
|
|
| class DefaultBlockConfigApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) |
| def get(self, app_model: App, block_type: str): |
| """ |
| Get default block config |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| parser = reqparse.RequestParser() |
| parser.add_argument("q", type=str, location="args") |
| args = parser.parse_args() |
|
|
| filters = None |
| if args.get("q"): |
| try: |
| filters = json.loads(args.get("q")) |
| except json.JSONDecodeError: |
| raise ValueError("Invalid filters") |
|
|
| |
| workflow_service = WorkflowService() |
| return workflow_service.get_default_block_config(node_type=block_type, filters=filters) |
|
|
|
|
| class ConvertToWorkflowApi(Resource): |
| @setup_required |
| @login_required |
| @account_initialization_required |
| @get_app_model(mode=[AppMode.CHAT, AppMode.COMPLETION]) |
| def post(self, app_model: App): |
| """ |
| Convert basic mode of chatbot app to workflow mode |
| Convert expert mode of chatbot app to workflow mode |
| Convert Completion App to Workflow App |
| """ |
| |
| if not current_user.is_editor: |
| raise Forbidden() |
|
|
| if request.data: |
| parser = reqparse.RequestParser() |
| parser.add_argument("name", type=str, required=False, nullable=True, location="json") |
| parser.add_argument("icon_type", type=str, required=False, nullable=True, location="json") |
| parser.add_argument("icon", type=str, required=False, nullable=True, location="json") |
| parser.add_argument("icon_background", type=str, required=False, nullable=True, location="json") |
| args = parser.parse_args() |
| else: |
| args = {} |
|
|
| |
| workflow_service = WorkflowService() |
| new_app_model = workflow_service.convert_to_workflow(app_model=app_model, account=current_user, args=args) |
|
|
| |
| return { |
| "new_app_id": new_app_model.id, |
| } |
|
|
|
|
| api.add_resource(DraftWorkflowApi, "/apps/<uuid:app_id>/workflows/draft") |
| api.add_resource(DraftWorkflowImportApi, "/apps/<uuid:app_id>/workflows/draft/import") |
| api.add_resource(AdvancedChatDraftWorkflowRunApi, "/apps/<uuid:app_id>/advanced-chat/workflows/draft/run") |
| api.add_resource(DraftWorkflowRunApi, "/apps/<uuid:app_id>/workflows/draft/run") |
| api.add_resource(WorkflowTaskStopApi, "/apps/<uuid:app_id>/workflow-runs/tasks/<string:task_id>/stop") |
| api.add_resource(DraftWorkflowNodeRunApi, "/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run") |
| api.add_resource( |
| AdvancedChatDraftRunIterationNodeApi, |
| "/apps/<uuid:app_id>/advanced-chat/workflows/draft/iteration/nodes/<string:node_id>/run", |
| ) |
| api.add_resource( |
| WorkflowDraftRunIterationNodeApi, "/apps/<uuid:app_id>/workflows/draft/iteration/nodes/<string:node_id>/run" |
| ) |
| api.add_resource(PublishedWorkflowApi, "/apps/<uuid:app_id>/workflows/publish") |
| api.add_resource(DefaultBlockConfigsApi, "/apps/<uuid:app_id>/workflows/default-workflow-block-configs") |
| api.add_resource( |
| DefaultBlockConfigApi, "/apps/<uuid:app_id>/workflows/default-workflow-block-configs/<string:block_type>" |
| ) |
| api.add_resource(ConvertToWorkflowApi, "/apps/<uuid:app_id>/convert-to-workflow") |
|
|