From 7435e423776c7b35b9c6c9bebba25a44691554bf Mon Sep 17 00:00:00 2001 From: Nate Buttke Date: Mon, 14 Aug 2023 20:35:45 -0700 Subject: huge fix to path handling. added clean cli, cost estimate, ignorefile. --- setup.py | 200 ++++++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 128 insertions(+), 72 deletions(-) (limited to 'setup.py') diff --git a/setup.py b/setup.py index 8ff5027..9efb57c 100644 --- a/setup.py +++ b/setup.py @@ -1,18 +1,19 @@ -from collections import defaultdict import os import sys import pandas as pd -import openai +import openai import tiktoken -from openai.embeddings_utils import get_embedding, cosine_similarity - +from openai.embeddings_utils import get_embedding from tree_sitter import Language, Parser - -SOURCE_DIR = './' +from typing_extensions import Annotated +import typer openai.api_key = os.getenv('END_OF_WORLD') class TS_Setup_Helper: + """ + Tree sitter functions and data for the setup process + """ parser: Parser ts_obj_path: str ext_map: dict @@ -50,7 +51,8 @@ class TS_Setup_Helper: self.CPP_LANGUAGE: ["""(function_definition) @function""", """(preproc_include) @import"""], self.GO_LANGUAGE: ["""(function_declaration) @function""", """(method_declaration) @method"""], self.JS_LANGUAGE: ["""[(function) (function_declaration)] @function"""], - self.PY_LANGUAGE: ["""(function_definition) @function""", """[(import_statement) (import_from_statement)] @import"""], + self.PY_LANGUAGE: ["""(function_definition) @function""", + """[(import_statement) (import_from_statement)] @import"""], self.RS_LANGUAGE: ["""(function_item) @function""", """(use_declaration) @import"""] } @@ -61,11 +63,8 @@ class TS_Setup_Helper: def ts_get_all_code_blocks(self, code_blocks, file_path, lang, tree, code): """Use treesitter to get all code blocks""" - # TODO need way to switch between declaration and definition .. - # e.g. golang does not have function definitions according to treesitter results = [ ] for query in self.qmap.get(lang): - print(query) results += self.ts_query(lang, tree, query) # TODO something like list comprehension here? @@ -81,22 +80,21 @@ class TS_Setup_Helper: code_blocks.append(return_dict) def parse_file(self, file_path): - print('parse') """take source code file and return pd dataframe""" # read file - with open(file_path, 'r') as f: + with open(file_path[0], 'r') as f: code = f.read() # Tree-Sitter - extension = os.path.splitext(file_path)[1].lstrip(".") + extension = os.path.splitext(file_path[0])[1].lstrip(".") lang = self.ext_map.get(extension) if lang is None: - raise NotImplementedError(f"The file extension .{extension} is not implemented") + raise NotImplementedError(f"The file extension .{extension} is not implemented ({file_path[0]})") self.parser.set_language(lang) tree = self.parser.parse(bytes(code, "utf8")) code_blocks = [] - self.ts_get_all_code_blocks(code_blocks, file_path, lang, tree, bytes(code, "utf8")) + self.ts_get_all_code_blocks(code_blocks, file_path[1], lang, tree, bytes(code, "utf8")) collate_types = ['import', 'assign'] tempblock = None @@ -123,72 +121,130 @@ class TS_Setup_Helper: return df -def get_files_to_parse(root_path, files_extensions_to_parse, dirs_to_ignore=['tests', 'vendor', 'unix']) -> list: +def get_files_to_parse(root_path, files_extensions_to_parse, dirs_to_ignore) -> list: """get all source file paths as list.""" files_to_parse = [] for root, dirs, files in os.walk(root_path): - # there is probably a better way to do this + # there may be a better way to do this # https://stackoverflow.com/questions/13454164/os-walk-without-hidden-folders - files = [f for f in files if not f[0] == '.'] + files = [ + f for f in files if (not f[0] == '.') + and (os.path.splitext(f)[-1].lstrip(".") in files_extensions_to_parse) + ] dirs[:] = [d for d in dirs if (not d[0] == '.') and (set(d.split()).isdisjoint(dirs_to_ignore))] for name in files: - #if (dirfix(root).rsplit("/", 1)[-1] in dirs_to_ignore) or (name in dirs_to_ignore) or (name.rsplit('.')[-1] not in files_extensions_to_parse): - if (name.rsplit('.')[-1] not in files_extensions_to_parse): - continue - temp_path = os.path.join(root, name) - files_to_parse.append(temp_path) + full = os.path.join(root, name) + rel_dir = os.path.relpath(root, root_path) + rel_filepath = os.path.join(rel_dir, name) + if rel_filepath.startswith("./"): + rel_filepath = rel_filepath[len("./"):] + files_to_parse.append((full, rel_filepath)) return files_to_parse def generate_summary(prompt): - enc = tiktoken.encoding_for_model("text-davinci-003") - if (len(enc.encode(prompt)) > 2500): - return "too long to summarize." - - prompt = prompt + '\nSummarize the above code: ' - response = openai.Completion.create( - model="text-davinci-003", - prompt=prompt, - temperature=0.7, - max_tokens=1024, - top_p=1.0, - frequency_penalty=0.0, - presence_penalty=0.0, - stop=["\"\"\""] - ) - return response["choices"][0]["text"] - - -# nate function to create blob. the blob just contains the file path and the source code. + enc = tiktoken.encoding_for_model("gpt-3.5-turbo") + if (len(enc.encode(prompt)) > 3000): + return "too long to summarize." + + prompt = prompt + '\nSummarize the above code: ' + + # response = openai.ChatCompletion.create( + # model="gpt-3.5-turbo", + # messages=[{"role": "user", "content": prompt}], + # temperature=0.7, + # max_tokens=1024, + # top_p=1.0, + # frequency_penalty=0.0, + # presence_penalty=0.0, + # stop=["\"\"\""] + # ) + + #return response["choices"][0]["message"]["content"] + return 'herro. this is a test summary' + +# create blob. the blob just contains the file path and the source code. def blobify(pandaSeries): return f"file path: {pandaSeries['file_path']}\n {pandaSeries['source']}" - -### doing stuff!! -ts_helper = TS_Setup_Helper('./ts-languages.so') - -code_df = pd.DataFrame() -#files = get_files_to_parse("../../dirserver/src/dirserver/", ts_helper.ext_map.keys(), dirs_to_ignore=['tests', 'vendor', 'unix']): - -files = get_files_to_parse("./rs", ts_helper.ext_map.keys()) -if len(files) == 0: - print("didn't find any files to parse", file=sys.stderr) - exit(1) -for file in files: - code_df = pd.concat([code_df, ts_helper.parse_file(file)]) - -code_df["blob"] = code_df.apply(lambda x: blobify(x),axis=1) -print(type(code_df)) -print(code_df) - -code_df.to_csv('rust_with_blob.csv') - -print('startng to generate summary') -code_df["summary"] = code_df.blob.apply(lambda x: generate_summary(x)) -print('done with generate summary') - -print('generating embeddings') -embedding_model = "text-embedding-ada-002" -code_df["embedding_summary"] = code_df.summary.apply([lambda x: get_embedding(x, engine=embedding_model)]) -print('done with generating embeddings') - -code_df.to_csv('test_with_summary_and_embeddings.csv') +def estimate_cost(df, skip_summary: bool): + enc = tiktoken.encoding_for_model("text-embedding-ada-002") + print(f'found {len(df.blob)} fns') + token_count = 0 + for s in df.blob: + token_count += len(enc.encode(s)) + embed_cost = (token_count / 1000) * 0.0001 # Ada v2 + print(f"it will cost ~${embed_cost:.6f} to generate embeddings") + + if not skip_summary: + enc = tiktoken.encoding_for_model("gpt-3.5-turbo") + token_count = 0 + for s in df.blob: + token_count += len(enc.encode(s)) + summary_cost = ((token_count / 1000) * 0.0015) + ( len(df.blob) * (500/1000) * 0.002) + print(f"it will cost ~${summary_cost:.6f} to generate summaries (see --skip-summary)") + print(f"which is ~${embed_cost + summary_cost:.6f} total.") + + if input("\nType yes to continue or anything else to quit: ") != "yes": + sys.exit(0) + return + + +def setup( + filepath: Annotated[str, typer.Argument(help="path to repo")], + output_csv_filepath: Annotated[str, typer.Argument(help="filepath for csv output")], + ignorefile: Annotated[str, typer.Option(help="Path to text file containing dirnames to ignore. One name per line.")] = None, + skip_summary: Annotated[bool, typer.Option(help="Do not produce summaries for each function (to save cost).")] = False + ): + + dirs_to_ignore = [] + if ignorefile != None: + #https://stackoverflow.com/questions/3925614/how-do-you-read-a-file-into-a-list-in-python + try: + with open(ignorefile) as file: + for line in file: + line = line.strip() + dirs_to_ignore.append(line) + except: + print(f"IO error while procesing {ignorefile}", file=sys.stderr) + + ts_helper = TS_Setup_Helper('./ts-languages.so') + code_df = pd.DataFrame() + + files = get_files_to_parse( filepath, list(ts_helper.ext_map.keys()), dirs_to_ignore) + + if len(files) == 0: + print("didn't find any files to parse", file=sys.stderr) + sys.exit(1) + for file in files: + #print(file) + code_df = pd.concat([code_df, ts_helper.parse_file(file)]) + + code_df["blob"] = code_df.apply(lambda x: blobify(x),axis=1) + + code_df.to_csv('rust_with_blob.csv') + + estimate_cost(code_df, skip_summary) + + if not skip_summary: + print('generating summary') + code_df["summary"] = code_df.blob.apply(lambda x: generate_summary(x)) + print('done with summaries') + else: + code_df["summary"] = "no summary. --skip-summary" + + print('generating embeddings') + embedding_model = "text-embedding-ada-002" + #code_df["embedding_summary"] = code_df.summary.apply( + # [lambda x: get_embedding(x, engine=embedding_model)] + # ) + print('done with embeddings') + code_df.to_csv(output_csv_filepath) + + sys.exit(0) + +if __name__ == "__main__": + typer.run(setup) + #setup('YOUR_PATH_HERE', ['ignore', 'dirs', 'here']) + #setup("../../openpilot/", "./ope.csv", + # ['tests', 'vendor', 'unix', 'test', 'debug', 'ui', 'third_party', 'tools', 'system'] + # ) -- cgit v1.2.3