from collections import defaultdict import os import sys import pandas as pd import openai import tiktoken from openai.embeddings_utils import get_embedding, cosine_similarity from tree_sitter import Language, Parser SOURCE_DIR = './' openai.api_key = os.getenv('END_OF_WORLD') class TS_Setup_Helper: parser: Parser ts_obj_path: str ext_map: dict def __init__(self, ts_object_path): self.ts_object_path = ts_object_path self.BASH_LANGUAGE = Language(ts_object_path, 'bash') self.C_LANGUAGE = Language(ts_object_path, 'c') self.CPP_LANGUAGE = Language(ts_object_path, 'cpp') self.GO_LANGUAGE = Language(ts_object_path, 'go') self.HS_LANGUAGE = Language(ts_object_path, 'haskell') self.JS_LANGUAGE = Language(ts_object_path, 'javascript') self.PY_LANGUAGE = Language(ts_object_path, 'python') self.RS_LANGUAGE = Language(ts_object_path, 'rust') self.parser = Parser() self.ext_map = { 'sh': self.BASH_LANGUAGE, 'c': self.C_LANGUAGE, 'h': self.C_LANGUAGE, 'cpp': self.CPP_LANGUAGE, 'cxx': self.CPP_LANGUAGE, 'hxx': self.CPP_LANGUAGE, 'hpp': self.CPP_LANGUAGE, 'go': self.GO_LANGUAGE, 'hs': self.HS_LANGUAGE, 'js': self.JS_LANGUAGE, 'py': self.PY_LANGUAGE, 'rs': self.RS_LANGUAGE } self.qmap = { self.BASH_LANGUAGE: ["""(function_definition) @function""", """(variable_assignment) @assign"""], self.C_LANGUAGE: ["""(function_definition) @function""", """(preproc_include) @import"""], self.CPP_LANGUAGE: ["""(function_definition) @function""", """(preproc_include) @import"""], self.GO_LANGUAGE: ["""(function_declaration) @function""", """(method_declaration) @method"""], self.JS_LANGUAGE: ["""[(function) (function_declaration)] @function"""], self.PY_LANGUAGE: ["""(function_definition) @function""", """[(import_statement) (import_from_statement)] @import"""], self.RS_LANGUAGE: ["""(function_item) @function""", """(use_declaration) @import"""] } def ts_query(self, lang, tree, sexp): query = lang.query(sexp) return query.captures(tree.root_node) def ts_get_all_code_blocks(self, code_blocks, file_path, lang, tree, code): """Use treesitter to get all code blocks""" # TODO need way to switch between declaration and definition .. # e.g. golang does not have function definitions according to treesitter results = [ ] for query in self.qmap.get(lang): print(query) results += self.ts_query(lang, tree, query) # TODO something like list comprehension here? for r in results: return_dict = { 'code_type': r[1], 'source': code[r[0].start_byte:r[0].end_byte].decode('utf-8'), 'start_line': r[0].start_point[0], 'end_line': r[0].end_point[0], 'chars': r[0].end_byte - r[0].start_byte, 'file_path': file_path } code_blocks.append(return_dict) def parse_file(self, file_path): print('parse') """take source code file and return pd dataframe""" # read file with open(file_path, 'r') as f: code = f.read() # Tree-Sitter extension = os.path.splitext(file_path)[1].lstrip(".") lang = self.ext_map.get(extension) if lang is None: raise NotImplementedError(f"The file extension .{extension} is not implemented") self.parser.set_language(lang) tree = self.parser.parse(bytes(code, "utf8")) code_blocks = [] self.ts_get_all_code_blocks(code_blocks, file_path, lang, tree, bytes(code, "utf8")) collate_types = ['import', 'assign'] tempblock = None finblocks = [] for block in code_blocks: if block['code_type'] in collate_types: if tempblock is None: tempblock = {k:v for k,v in block.items()} elif tempblock['code_type'] == block['code_type']: tempblock['source'] += f"\n{block['source']}" tempblock['start_line'] = min(tempblock['start_line'], block['start_line']) tempblock['end_line'] = max(tempblock['start_line'], block['end_line']) tempblock['chars'] += (block['chars'] + 1) else: finblocks.append(tempblock) tempblock = {k:v for k,v in block.items()} else: if tempblock is not None: finblocks.append(tempblock) tempblock = None finblocks.append(block) df = pd.DataFrame(finblocks) return df def get_files_to_parse(root_path, files_extensions_to_parse, dirs_to_ignore=['tests', 'vendor', 'unix']) -> list: """get all source file paths as list.""" files_to_parse = [] for root, dirs, files in os.walk(root_path): # there is probably a better way to do this # https://stackoverflow.com/questions/13454164/os-walk-without-hidden-folders files = [f for f in files if not f[0] == '.'] dirs[:] = [d for d in dirs if (not d[0] == '.') and (set(d.split()).isdisjoint(dirs_to_ignore))] for name in files: #if (dirfix(root).rsplit("/", 1)[-1] in dirs_to_ignore) or (name in dirs_to_ignore) or (name.rsplit('.')[-1] not in files_extensions_to_parse): if (name.rsplit('.')[-1] not in files_extensions_to_parse): continue temp_path = os.path.join(root, name) files_to_parse.append(temp_path) return files_to_parse def generate_summary(prompt): enc = tiktoken.encoding_for_model("text-davinci-003") if (len(enc.encode(prompt)) > 2500): return "too long to summarize." prompt = prompt + '\nSummarize the above code: ' response = openai.Completion.create( model="text-davinci-003", prompt=prompt, temperature=0.7, max_tokens=1024, top_p=1.0, frequency_penalty=0.0, presence_penalty=0.0, stop=["\"\"\""] ) return response["choices"][0]["text"] # nate function to create blob. the blob just contains the file path and the source code. def blobify(pandaSeries): return f"file path: {pandaSeries['file_path']}\n {pandaSeries['source']}" ### doing stuff!! ts_helper = TS_Setup_Helper('./ts-languages.so') code_df = pd.DataFrame() #files = get_files_to_parse("../../dirserver/src/dirserver/", ts_helper.ext_map.keys(), dirs_to_ignore=['tests', 'vendor', 'unix']): files = get_files_to_parse("./rs", ts_helper.ext_map.keys()) if len(files) == 0: print("didn't find any files to parse", file=sys.stderr) exit(1) for file in files: code_df = pd.concat([code_df, ts_helper.parse_file(file)]) code_df["blob"] = code_df.apply(lambda x: blobify(x),axis=1) print(type(code_df)) print(code_df) code_df.to_csv('rust_with_blob.csv') print('startng to generate summary') code_df["summary"] = code_df.blob.apply(lambda x: generate_summary(x)) print('done with generate summary') print('generating embeddings') embedding_model = "text-embedding-ada-002" code_df["embedding_summary"] = code_df.summary.apply([lambda x: get_embedding(x, engine=embedding_model)]) print('done with generating embeddings') code_df.to_csv('test_with_summary_and_embeddings.csv')