Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import random | |
| from recurrentgpt import RecurrentGPT | |
| from human_simulator import Human | |
| from sentence_transformers import SentenceTransformer | |
| from utils import get_init, parse_instructions | |
| import re | |
| ################################################################################################################################################################### | |
| ################################################################################################################################################################### | |
| # TODO: add user database | |
| # from urllib.parse import quote_plus | |
| # from pymongo import MongoClient | |
| # uri = "mongodb://%s:%s@%s" % (quote_plus("xxx"), | |
| # quote_plus("xxx"), "localhost") | |
| # client = MongoClient(uri, maxPoolSize=None) | |
| # db = client.recurrentGPT_db | |
| # log = db.log | |
| ################################################################################################################################################################### | |
| ################################################################################################################################################################### | |
| _CACHE = {} | |
| # TODO : try with different embedding models | |
| ## Build the semantic search model | |
| embedder = SentenceTransformer('multi-qa-mpnet-base-cos-v1') | |
| # TODO: make a prompt class | |
| def init_prompt(text_type, description, structure, units, tone): | |
| if description == "": | |
| description = "" | |
| else: | |
| description = " about " + description | |
| return f""" | |
| Please write a {text_type} about {description} with {units} {structure}. Follow the format below precisely: | |
| Begin with the title of the {text_type}. | |
| Next, write an outline for the first {structure}. The outline should describe the {description} and the beginning of the {text_type}. | |
| Write the first three paragraphs with their indication of the {text_type} based on your outline. Write in a {tone} style and take your time to set the context. | |
| Write a summary that captures the key information of the three paragraphs. | |
| Finally, write three different instructions for what to write next, each containing around five sentences. Each instruction should present a possible, interesting continuation of the {text_type}. | |
| The output format should follow these guidelines: | |
| Name: <name of the {text_type}> | |
| Outline: <outline for the first {structure}> | |
| Paragraph 1: <content for paragraph 1> | |
| Paragraph 2: <content for paragraph 2> | |
| Paragraph 3: <content for paragraph 3> | |
| Summary: <content of summary> | |
| Instruction 1: <content for instruction 1> | |
| Instruction 2: <content for instruction 2> | |
| Instruction 3: <content for instruction 3> | |
| Make sure to be precise and follow the output format strictly. | |
| """ | |
| def init(text_type, description, request: gr.Request): | |
| if tex_type == "": | |
| text_type = "Linkedin post" | |
| global _CACHE | |
| cookie = request.headers['cookie'] | |
| cookie = cookie.split('; _gat_gtag')[0] | |
| # prepare first init | |
| init_paragraphs = get_init(text=init_prompt(text_type, description, structure, units, tone)) | |
| # print(init_paragraphs) | |
| start_input_to_human = { | |
| 'output_paragraph': init_paragraphs['Paragraph 3'], | |
| 'input_paragraph': '\n\n'.join([init_paragraphs['Paragraph 1'], init_paragraphs['Paragraph 2']]), | |
| 'output_memory': init_paragraphs['Summary'], | |
| "output_instruction": [init_paragraphs['Instruction 1'], init_paragraphs['Instruction 2'], init_paragraphs['Instruction 3']] | |
| } | |
| _CACHE[cookie] = {"start_input_to_human": start_input_to_human, | |
| "init_paragraphs": init_paragraphs} | |
| written_paras = f"""Title: {init_paragraphs['name']} | |
| Outline: {init_paragraphs['Outline']} | |
| Paragraphs: | |
| {start_input_to_human['input_paragraph']}""" | |
| long_memory = parse_instructions([init_paragraphs['Paragraph 1'], init_paragraphs['Paragraph 2']]) | |
| # short memory, long memory, current written paragraphs, 3 next instructions | |
| return start_input_to_human['output_memory'], long_memory, written_paras, init_paragraphs['Instruction 1'], init_paragraphs['Instruction 2'], init_paragraphs['Instruction 3'] | |
| def step(short_memory, long_memory, instruction1, instruction2, instruction3, current_paras, request: gr.Request, ): | |
| if current_paras == "": | |
| return "", "", "", "", "", "" | |
| global _CACHE | |
| # print(list(_CACHE.keys())) | |
| # print(request.headers.get('cookie')) | |
| cookie = request.headers['cookie'] | |
| cookie = cookie.split('; _gat_gtag')[0] | |
| cache = _CACHE[cookie] | |
| if "writer" not in cache: | |
| start_input_to_human = cache["start_input_to_human"] | |
| start_input_to_human['output_instruction'] = [ | |
| instruction1, instruction2, instruction3] | |
| init_paragraphs = cache["init_paragraphs"] | |
| human = Human(input=start_input_to_human, | |
| memory=None, embedder=embedder) | |
| human.step() | |
| start_short_memory = init_paragraphs['Summary'] | |
| writer_start_input = human.output | |
| # Init writerGPT | |
| writer = RecurrentGPT(input=writer_start_input, short_memory=start_short_memory, long_memory=[ | |
| init_paragraphs['Paragraph 1'], init_paragraphs['Paragraph 2']], memory_index=None, embedder=embedder) | |
| cache["writer"] = writer | |
| cache["human"] = human | |
| writer.step() | |
| else: | |
| human = cache["human"] | |
| writer = cache["writer"] | |
| output = writer.output | |
| output['output_memory'] = short_memory | |
| #randomly select one instruction out of three | |
| instruction_index = random.randint(0,2) | |
| output['output_instruction'] = [instruction1, instruction2, instruction3][instruction_index] | |
| human.input = output | |
| human.step() | |
| writer.input = human.output | |
| writer.step() | |
| long_memory = [[v] for v in writer.long_memory] | |
| # short memory, long memory, current written paragraphs, 3 next instructions | |
| return writer.output['output_memory'], long_memory, current_paras + '\n\n' + writer.output['input_paragraph'], human.output['output_instruction'], *writer.output['output_instruction'] | |
| def controled_step(short_memory, long_memory, selected_instruction, current_paras, request: gr.Request, ): | |
| if current_paras == "": | |
| return "", "", "", "", "", "" | |
| global _CACHE | |
| # print(list(_CACHE.keys())) | |
| # print(request.headers.get('cookie')) | |
| cookie = request.headers['cookie'] | |
| cookie = cookie.split('; _gat_gtag')[0] | |
| cache = _CACHE[cookie] | |
| if "writer" not in cache: | |
| start_input_to_human = cache["start_input_to_human"] | |
| start_input_to_human['output_instruction'] = selected_instruction | |
| init_paragraphs = cache["init_paragraphs"] | |
| human = Human(input=start_input_to_human, | |
| memory=None, embedder=embedder) | |
| human.step() | |
| start_short_memory = init_paragraphs['Summary'] | |
| writer_start_input = human.output | |
| # Init writerGPT | |
| writer = RecurrentGPT(input=writer_start_input, short_memory=start_short_memory, long_memory=[ | |
| init_paragraphs['Paragraph 1'], init_paragraphs['Paragraph 2']], memory_index=None, embedder=embedder) | |
| cache["writer"] = writer | |
| cache["human"] = human | |
| writer.step() | |
| else: | |
| human = cache["human"] | |
| writer = cache["writer"] | |
| output = writer.output | |
| output['output_memory'] = short_memory | |
| output['output_instruction'] = selected_instruction | |
| human.input = output | |
| human.step() | |
| writer.input = human.output | |
| writer.step() | |
| # short memory, long memory, current written paragraphs, 3 next instructions | |
| return writer.output['output_memory'], parse_instructions(writer.long_memory), current_paras + '\n\n' + writer.output['input_paragraph'], *writer.output['output_instruction'] | |
| # SelectData is a subclass of EventData | |
| def on_select(instruction1, instruction2, instruction3, evt: gr.SelectData): | |
| selected_plan = int(evt.value.replace("Instruction ", "")) | |
| selected_plan = [instruction1, instruction2, instruction3][selected_plan-1] | |
| return selected_plan | |
| with gr.Blocks(title="RecurrentGPT", css="footer {visibility: hidden}", theme='sudeepshouche/minimalist') as demo: | |
| gr.Markdown( | |
| """ | |
| # RecurrentGPT | |
| Interactive Generation of (Arbitrarily) Long Texts with Human-in-the-Loop | |
| """) | |
| with gr.Tab("Auto-Generation"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Box(): | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=200): | |
| novel_type = gr.Textbox( | |
| label="Novel Type", placeholder="e.g. science fiction") | |
| with gr.Column(scale=2, min_width=400): | |
| description = gr.Textbox(label="Description") | |
| btn_init = gr.Button( | |
| "Init Novel Generation", variant="primary") | |
| gr.Examples(["Science Fiction", "Romance", "Mystery", "Fantasy", | |
| "Historical", "Horror", "Thriller", "Western", "Young Adult", ], inputs=[novel_type]) | |
| written_paras = gr.Textbox( | |
| label="Written Paragraphs (editable)", max_lines=21, lines=21) | |
| with gr.Column(): | |
| with gr.Box(): | |
| gr.Markdown("### Memory Module\n") | |
| short_memory = gr.Textbox( | |
| label="Short-Term Memory (editable)", max_lines=3, lines=3) | |
| long_memory = gr.Textbox( | |
| label="Long-Term Memory (editable)", max_lines=6, lines=6) | |
| # long_memory = gr.Dataframe( | |
| # # label="Long-Term Memory (editable)", | |
| # headers=["Long-Term Memory (editable)"], | |
| # datatype=["str"], | |
| # row_count=3, | |
| # max_rows=3, | |
| # col_count=(1, "fixed"), | |
| # type="array", | |
| # ) | |
| with gr.Box(): | |
| gr.Markdown("### Instruction Module\n") | |
| with gr.Row(): | |
| instruction1 = gr.Textbox( | |
| label="Instruction 1 (editable)", max_lines=4, lines=4) | |
| instruction2 = gr.Textbox( | |
| label="Instruction 2 (editable)", max_lines=4, lines=4) | |
| instruction3 = gr.Textbox( | |
| label="Instruction 3 (editable)", max_lines=4, lines=4) | |
| selected_plan = gr.Textbox( | |
| label="Revised Instruction (from last step)", max_lines=2, lines=2) | |
| btn_step = gr.Button("Next Step", variant="primary") | |
| btn_init.click(init, inputs=[novel_type, description], outputs=[ | |
| short_memory, long_memory, written_paras, instruction1, instruction2, instruction3]) | |
| btn_step.click(step, inputs=[short_memory, long_memory, instruction1, instruction2, instruction3, written_paras], outputs=[ | |
| short_memory, long_memory, written_paras, selected_plan, instruction1, instruction2, instruction3]) | |
| with gr.Tab("Human-in-the-Loop"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Box(): | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=200): | |
| novel_type = gr.Textbox( | |
| label="Novel Type", placeholder="e.g. science fiction") | |
| with gr.Column(scale=2, min_width=400): | |
| description = gr.Textbox(label="Description") | |
| btn_init = gr.Button( | |
| "Init Novel Generation", variant="primary") | |
| gr.Examples(["Science Fiction", "Romance", "Mystery", "Fantasy", | |
| "Historical", "Horror", "Thriller", "Western", "Young Adult", ], inputs=[novel_type]) | |
| written_paras = gr.Textbox( | |
| label="Written Paragraphs (editable)", max_lines=23, lines=23) | |
| with gr.Column(): | |
| with gr.Box(): | |
| gr.Markdown("### Memory Module\n") | |
| short_memory = gr.Textbox( | |
| label="Short-Term Memory (editable)", max_lines=3, lines=3) | |
| long_memory = gr.Textbox( | |
| label="Long-Term Memory (editable)", max_lines=6, lines=6) | |
| with gr.Box(): | |
| gr.Markdown("### Instruction Module\n") | |
| with gr.Row(): | |
| instruction1 = gr.Textbox( | |
| label="Instruction 1", max_lines=3, lines=3, interactive=False) | |
| instruction2 = gr.Textbox( | |
| label="Instruction 2", max_lines=3, lines=3, interactive=False) | |
| instruction3 = gr.Textbox( | |
| label="Instruction 3", max_lines=3, lines=3, interactive=False) | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=100): | |
| selected_plan = gr.Radio(["Instruction 1", "Instruction 2", "Instruction 3"], label="Instruction Selection",) | |
| # info="Select the instruction you want to revise and use for the next step generation.") | |
| with gr.Column(scale=3, min_width=300): | |
| selected_instruction = gr.Textbox( | |
| label="Selected Instruction (editable)", max_lines=5, lines=5) | |
| btn_step = gr.Button("Next Step", variant="primary") | |
| btn_init.click(init, inputs=[novel_type, description], outputs=[ | |
| short_memory, long_memory, written_paras, instruction1, instruction2, instruction3]) | |
| btn_step.click(controled_step, inputs=[short_memory, long_memory, selected_instruction, written_paras], outputs=[ | |
| short_memory, long_memory, written_paras, instruction1, instruction2, instruction3]) | |
| selected_plan.select(on_select, inputs=[ | |
| instruction1, instruction2, instruction3], outputs=[selected_instruction]) | |
| demo.queue(concurrency_count=1) | |
| if __name__ == "__main__": | |
| demo.launch() |