import os import sys import pandas as pd import openai import tiktoken from openai.embeddings_utils import get_embedding from tree_sitter import Language, Parser from typing_extensions import Annotated import typer openai.api_key = os.getenv('END_OF_WORLD') class TS_Setup_Helper: """ Tree sitter functions and data for the setup process """ parser: Parser ts_obj_path: str ext_map: dict def __init__(self, ts_object_path): self.ts_object_path = ts_object_path self.BASH_LANGUAGE = Language(ts_object_path, 'bash') self.C_LANGUAGE = Language(ts_object_path, 'c') self.CPP_LANGUAGE = Language(ts_object_path, 'cpp') self.GO_LANGUAGE = Language(ts_object_path, 'go') self.HS_LANGUAGE = Language(ts_object_path, 'haskell') self.JS_LANGUAGE = Language(ts_object_path, 'javascript') self.PY_LANGUAGE = Language(ts_object_path, 'python') self.RS_LANGUAGE = Language(ts_object_path, 'rust') self.parser = Parser() self.ext_map = { 'sh': self.BASH_LANGUAGE, 'c': self.C_LANGUAGE, 'h': self.C_LANGUAGE, 'cpp': self.CPP_LANGUAGE, 'cxx': self.CPP_LANGUAGE, 'hxx': self.CPP_LANGUAGE, 'hpp': self.CPP_LANGUAGE, 'go': self.GO_LANGUAGE, 'hs': self.HS_LANGUAGE, 'js': self.JS_LANGUAGE, 'py': self.PY_LANGUAGE, 'rs': self.RS_LANGUAGE } self.qmap = { self.BASH_LANGUAGE: ["""(function_definition) @function""", """(variable_assignment) @assign"""], self.C_LANGUAGE: ["""(function_definition) @function""", """(preproc_include) @import"""], self.CPP_LANGUAGE: ["""(function_definition) @function""", """(preproc_include) @import"""], self.GO_LANGUAGE: ["""(function_declaration) @function""", """(method_declaration) @method"""], self.JS_LANGUAGE: ["""[(function) (function_declaration)] @function"""], self.PY_LANGUAGE: ["""(function_definition) @function""", """[(import_statement) (import_from_statement)] @import"""], self.RS_LANGUAGE: ["""(function_item) @function""", """(use_declaration) @import"""] } def ts_query(self, lang, tree, sexp): query = lang.query(sexp) return query.captures(tree.root_node) def ts_get_all_code_blocks(self, code_blocks, file_path, lang, tree, code): """Use treesitter to get all code blocks""" results = [ ] for query in self.qmap.get(lang): results += self.ts_query(lang, tree, query) # TODO something like list comprehension here? for r in results: return_dict = { 'code_type': r[1], 'source': code[r[0].start_byte:r[0].end_byte].decode('utf-8'), 'start_line': r[0].start_point[0], 'end_line': r[0].end_point[0], 'chars': r[0].end_byte - r[0].start_byte, 'file_path': file_path } code_blocks.append(return_dict) def parse_file(self, file_path): """take source code file and return pd dataframe""" # read file with open(file_path[0], 'r') as f: code = f.read() # Tree-Sitter extension = os.path.splitext(file_path[0])[1].lstrip(".") lang = self.ext_map.get(extension) if lang is None: raise NotImplementedError(f"The file extension .{extension} is not implemented ({file_path[0]})") self.parser.set_language(lang) tree = self.parser.parse(bytes(code, "utf8")) code_blocks = [] self.ts_get_all_code_blocks(code_blocks, file_path[1], lang, tree, bytes(code, "utf8")) collate_types = ['import', 'assign'] tempblock = None finblocks = [] for block in code_blocks: if block['code_type'] in collate_types: if tempblock is None: tempblock = {k:v for k,v in block.items()} elif tempblock['code_type'] == block['code_type']: tempblock['source'] += f"\n{block['source']}" tempblock['start_line'] = min(tempblock['start_line'], block['start_line']) tempblock['end_line'] = max(tempblock['start_line'], block['end_line']) tempblock['chars'] += (block['chars'] + 1) else: finblocks.append(tempblock) tempblock = {k:v for k,v in block.items()} else: if tempblock is not None: finblocks.append(tempblock) tempblock = None finblocks.append(block) df = pd.DataFrame(finblocks) return df def get_files_to_parse(root_path, files_extensions_to_parse, dirs_to_ignore) -> list: """get all source file paths as list.""" files_to_parse = [] for root, dirs, files in os.walk(root_path): # there may be a better way to do this # https://stackoverflow.com/questions/13454164/os-walk-without-hidden-folders files = [ f for f in files if (not f[0] == '.') and (os.path.splitext(f)[-1].lstrip(".") in files_extensions_to_parse) ] dirs[:] = [d for d in dirs if (not d[0] == '.') and (set(d.split()).isdisjoint(dirs_to_ignore))] for name in files: full = os.path.join(root, name) rel_dir = os.path.relpath(root, root_path) rel_filepath = os.path.join(rel_dir, name) if rel_filepath.startswith("./"): rel_filepath = rel_filepath[len("./"):] files_to_parse.append((full, rel_filepath)) return files_to_parse def generate_summary(prompt): enc = tiktoken.encoding_for_model("gpt-3.5-turbo") if (len(enc.encode(prompt)) > 3000): return "too long to summarize." prompt = prompt + '\nSummarize the above code (be succinct): ' response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}], temperature=0.7, max_tokens=300, top_p=1.0, frequency_penalty=0.0, presence_penalty=0.0, stop=["\"\"\""] ) return response["choices"][0]["message"]["content"] # create blob. the blob just contains the file path and the source code. def blobify(pandaSeries): return f"file path: {pandaSeries['file_path']}\n {pandaSeries['source']}" def estimate_cost(df, skip_summary: bool): enc = tiktoken.encoding_for_model("text-embedding-ada-002") print(f'found {len(df.blob)} fns') token_count = 0 for s in df.blob: token_count += len(enc.encode(s)) embed_cost = (token_count / 1000) * 0.0001 # Ada v2 print(f"it will cost ~${embed_cost:.6f} to generate embeddings") if not skip_summary: enc = tiktoken.encoding_for_model("gpt-3.5-turbo") token_count = 0 for s in df.blob: token_count += len(enc.encode(s)) summary_cost = ((token_count / 1000) * 0.0015) + ( len(df.blob) * (500/1000) * 0.002) print(f"it will cost ~${summary_cost:.6f} to generate summaries (see --skip-summary)") print(f"which is ~${embed_cost + summary_cost:.6f} total.") if input("\nType yes to continue or anything else to quit: ") != "yes": sys.exit(0) return def setup( filepath: Annotated[str, typer.Argument(help="path to repo")], output_csv_filepath: Annotated[str, typer.Argument(help="filepath for csv output")], ignorefile: Annotated[str, typer.Option(help="Path to text file containing dirnames to ignore. One name per line.")] = None, skip_summary: Annotated[bool, typer.Option(help="Do not produce summaries for each function (to save cost).")] = False ): dirs_to_ignore = [] if ignorefile != None: #https://stackoverflow.com/questions/3925614/how-do-you-read-a-file-into-a-list-in-python try: with open(ignorefile) as file: for line in file: line = line.strip() dirs_to_ignore.append(line) except: print(f"IO error while procesing {ignorefile}", file=sys.stderr) ts_helper = TS_Setup_Helper('./ts-languages.so') code_df = pd.DataFrame() files = get_files_to_parse( filepath, list(ts_helper.ext_map.keys()), dirs_to_ignore) if len(files) == 0: print("didn't find any files to parse", file=sys.stderr) sys.exit(1) for file in files: #print(file) code_df = pd.concat([code_df, ts_helper.parse_file(file)]) code_df["blob"] = code_df.apply(lambda x: blobify(x),axis=1) code_df.to_csv('rust_with_blob.csv') estimate_cost(code_df, skip_summary) if not skip_summary: print('generating summary') code_df["summary"] = code_df.blob.apply(lambda x: generate_summary(x)) print('done with summaries') else: code_df["summary"] = "no summary. --skip-summary" print('generating embeddings') embedding_model = "text-embedding-ada-002" code_df["embedding_summary"] = code_df.summary.apply( [lambda x: get_embedding(x, engine=embedding_model)] ) print('done with embeddings') code_df.to_csv(output_csv_filepath) sys.exit(0) if __name__ == "__main__": typer.run(setup) #setup('YOUR_PATH_HERE', ['ignore', 'dirs', 'here']) #setup("../../openpilot/", "./ope.csv", # ['tests', 'vendor', 'unix', 'test', 'debug', 'ui', 'third_party', 'tools', 'system'] # )