from collections import defaultdict import os import sys from pathlib import PurePosixPath import base64 from flask import Flask, request, jsonify, render_template import pandas as pd import openai from openai.embeddings_utils import get_embedding, cosine_similarity openai.api_key = os.getenv('OPENAI_KEY') if len(sys.argv) != 2: print("USAGE: python server.py PATH_TO_CSV") print("wrong number of arguments", file=sys.stderr) sys.exit(1) try: df=pd.read_csv(sys.argv[1], converters={"embedding_summary": pd.eval}) except: print(f"Problem opening {sys.argv[1]}", file=sys.stderr) sys.exit(1) app = Flask(__name__, template_folder="./frontend", static_folder="./frontend", static_url_path="") def search_code(df, query, n=4): query_embedding = get_embedding( query, engine="text-embedding-ada-002" ) df["similarity"] = df.embedding_summary.apply(lambda x: cosine_similarity(x, query_embedding)) results = ( df.sort_values("similarity", ascending=False) ) return results.head(n) def generate_answer(question): results = search_code(df, question, n=4) prompt = '' for i in range(3): prompt += results.iloc[i]["summary"] + "\n" + results.iloc[i]["blob"] + "\n" prompt += "\n" + "Answer the following question using the code context given above, and show an example with 'Example'\nQ: " + question + "\nA: " response = openai.Completion.create( model="text-davinci-003", # model="code-davinci-002", prompt=prompt, temperature=0.7, max_tokens=1000, top_p=1.0, frequency_penalty=0.0, presence_penalty=0.0, stop=["\"\"\""] ) return response["choices"][0]["text"] def add_to_tree(tree: dict, path: str): parts = PurePosixPath(path).parts current = tree for i, part in enumerate(parts): if part not in current: current[part] = {} if i == len(parts) - 1: current[part]["filetype"] = "file" else: current[part]["filetype"] = "dir" current[part]["children"] = {} current = current[part].get("children", {}) def create_directory_tree(df): paths = list(df['file_path'].unique()) directory_tree = {} for path in paths: add_to_tree(directory_tree, path) return directory_tree def get_outermost_item(dirstructure): return list(dirstructure.keys())[0] def get_kids_of_root(dirstructure): return list(dirstructure.values())[0].get("children").keys() def check_path(path, dirstructure): components = PurePosixPath(path).parts if components[0] in dirstructure.keys(): if dirstructure[components[0]]["filetype"] == "dir": subdict = dirstructure.get(components[0]) if len(components) == 1: ftype = subdict["filetype"] kids = list(subdict["children"].keys()) return True, str(ftype), kids else: for c in components[1:]: if c in subdict["children"]: found = True subdict = subdict["children"].get(c) ftype = subdict["filetype"] kids = list(subdict["children"].keys()) if ftype == 'dir' else None else: found = False if found: return found, str(ftype), kids else: return True, "file", None return False, None, None # need to do this to read in the data frame correctly from csv. (pd.eval) #df=pd.read_csv("./frontend/data/test_with_summary_and_embeddings.csv", converters={'embedding_summary': pd.eval}) #df=pd.read_csv("./frontend/data/r2.csv", converters={'embedding_summary': pd.eval}) # old strategy: # df['embeddings'] = df['embeddings'].apply(lambda x: eval(x)) @app.route('/') def home(): req_path = request.args.get('path') path_decode = None if req_path is None else base64.urlsafe_b64decode(req_path).decode('utf-8') dirstructure = create_directory_tree(df) if req_path is None: rootname = get_outermost_item(dirstructure) if check_path(rootname, dirstructure)[1 == "file"]: if len(dirstructure.keys()) == 1: loctype = "file" text = [[(x, None), y] for x, y in zip( list(df[df["file_path"] == rootname]["source"]), list(df[df["file_path"] == rootname]["summary"]) )] else: loctype = "folder" files = list(dirstructure.keys()) text = [[(x, base64.urlsafe_b64encode(bytes(x, 'utf-8')).decode("utf-8")), ""] for x in files] else: loctype = "folder" if not rootname.endswith("/"): rootname += '/' kids = get_kids_of_root(dirstructure) text = [[(k, base64.urlsafe_b64encode(bytes(rootname + k, 'utf-8')).decode('utf-8')), ""] for k in kids] else: path_info = check_path(path_decode, dirstructure) if path_info[0] is False: text = [["error", "path not found"]] loctype = "file" elif path_info[1] == "file": loctype = "file" text = [[(x, None), y] for x, y in zip( list(df[df["file_path"] == path_decode]["source"]), list(df[df["file_path"] == path_decode]["summary"]) )] elif path_info[1] == "dir": loctype = "folder" text = [[(x, base64.urlsafe_b64encode(bytes(path_decode + "/" + x, 'utf-8')).decode("utf-8")), ""] for x in path_info[2]] if req_path is not None: curr = path_decode elif loctype == "folder": curr = 'root directory' else: curr = get_outermost_item(dirstructure) res = { 'loctype': loctype, 'text': text, 'current': curr } return render_template('index.html', payload=res) @app.route('/answer') def answer(): q = request.args.get('q', '').strip() a = search_code(df, q) res = [{'blob': x['blob'], 'summary': x['summary']} for x in a.to_dict('records')] return jsonify(res) @app.route('/explain') def explain(): q = request.args.get('q', '').strip() a = generate_answer(q) return jsonify(a) if __name__ == '__main__': app.run(port=8080, debug=True)