#!/usr/bin/env python3 # pip install GitPython requests # You need to have ripgrep installed too # apt-get install ripgrep import os import subprocess import json import csv from git import Repo from requests import Session from pathlib import Path class GitlabConnector: def __init__(self): self.url = os.environ.get('GITLAB_URL') self.pat = os.environ.get('GITLAB_PAT') self.session = Session() self.session.headers.update( { 'PRIVATE-TOKEN': self.pat, 'Content-Type': 'application/json' } ) def query(self, path): url = f"{self.url}/{path}" return self.session.get(url) def get(self, url): return self.session.get(url) class Report(): def __init__(self): self.findings = [] def results(self): print(self.findings) def write_report(self, filename): with open(filename, 'w', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=';', dialect='unix', quotechar='|', quoting=csv.QUOTE_MINIMAL) for row in self.findings: writer.writerow(row) class GitlabRepositories(): def __init__(self): if not os.environ.get('GITLAB_URL'): print("Environment variable GITLAB_URL not specified") exit(1) if not os.environ.get('GITLAB_PAT'): print("Environment variable GITLAB_PAT not specified") exit(1) self.projects = [] self.groups = [] self.base_path = '/api/v4' self.session = GitlabConnector() if os.environ.get('GITLAB_GROUP_ID'): self.groups.append(os.environ.get('GITLAB_GROUP_ID')) def parse_pagination(self, result_headers): ret_val = False if not result_headers.get('Link'): return ret_val links = result_headers['Link'].split(', ') for link in links: parts = link.split('; ') rel = parts[1].split('=')[1] if rel == '"next"': ret_val = parts[0].replace('<', '').replace('>', '') break return ret_val def get_groups(self, next_link=None): result = None if not next_link: result = self.session.query(f"{self.base_path}/groups") else: result = self.session.get(next_link) if not result: print("No groups found or permissions not sufficient.") return self.groups += [i['id'] for i in result.json()] next_link = self.parse_pagination(result.headers) if next_link: self.get_groups(next_link=next_link) def get_projects_by_group(self, next_link=None, group_id=None): result = None url_params = ["include_subgroups=true", "per_page=20", "search_namespaces=true", "owned=false", "order_by=id", "sort=asc"] group_path = f"{self.base_path}/groups/{group_id}" if not next_link: result = self.session.query(f"{group_path}/projects?{'&'.join(url_params)}") else: result = self.session.get(next_link) if not result: print(f"No projects in group {group_id} found or permissions not sufficient.") return self.projects += [{'id': i['id'], 'http_url_to_repo': i['http_url_to_repo'], 'ssh_url_to_repo': i['ssh_url_to_repo'], 'web_url': i['web_url']} for i in result.json()] next_link = self.parse_pagination(result.headers) if next_link: self.get_projects_by_group(next_link=next_link, group_id=group_id) #print(result.json()) def get_projects(self): print("Getting GitLab Projects") # When groups not empty or specified, parse groups if not self.groups: self.get_groups() for group_id in self.groups: print(f"Getting Projects for Group {group_id}") self.get_projects_by_group(group_id=group_id) print(len(self.projects)) def clone_repo(self, repo_url=None): repo_host_path = repo_url.split('://')[1] repo_http_scheme = repo_url.split('://')[0] repo_credentials = f"token:{self.session.pat}" repo_remote = f"{repo_http_scheme}://{repo_credentials}@{repo_host_path}" repo_name = repo_host_path.replace('/', '_').rstrip('.git') repo_path = f"{git_tmp_root}/{repo_name}" if os.path.isdir(repo_path) and os.listdir(repo_path): return repo_path print(f"Processing Repository {repo_host_path}") try: repo = Repo.clone_from(repo_remote, repo_path) repo.close() except: print(f"Cant clone {repo_url}") return repo_path return repo_path def parse_fileglob(): glob_file = Path(os.environ.get('GLOB_FILE', 'globfile')) ret_val = [] if not glob_file.is_file(): print(f"Specified glob file {glob_file} not found. Searching all files.") return [] with open(glob_file, 'r') as file: lines = file.readlines() for line in lines: ret_val.append("--glob") ret_val.append(line.rstrip()) print(ret_val) return ret_val def scan_repo(path=None, repo=None): scan_result = None scan_result_lines = [] scan_matches = [] ripgrep_cmd = [ "rg", "--json", "-i", "-f", pattern_file ] ripgrep_cmd = ripgrep_cmd + fileglob ripgrep_cmd.append(path) try: scan_result = subprocess.run(ripgrep_cmd, capture_output=True, text=True) except: print(f"Failed to run ripgrep for {path}") return [] scan_out_lines = list(filter(None, scan_result.stdout.split('\n'))) for line in scan_out_lines: line_data = json.loads(line) if line_data.get("type") == "match": scan_matches += [{ 'repo': repo, 'full_path': line_data["data"]["path"]["text"], 'path': line_data["data"]["path"]["text"].replace(path, '').lstrip('/').rstrip(), 'line_number': line_data["data"]["line_number"], 'matches': line_data["data"]["submatches"] }] return scan_matches def evaluate_findings(findings=[]): if not findings: return [] finding_results = [] for finding in findings: filename = finding['full_path'].split('/')[-1] print(f"Found potential match - {finding['path']} - {finding['matches'][0]['match']['text']}") detail = check_line_in_file(file=finding['full_path'], line_number=finding['line_number']) finding_results += [[finding['repo'], finding['path'], finding['line_number'], detail.lstrip(),finding['matches'][0]['match']['text']]] return finding_results def check_line_in_file(file=None, line_number=None): with open(file) as fp: for i, line in enumerate(fp, 1): if i == line_number: print(line) return line.rstrip().replace(',', '') def check_repos(): gl = GitlabRepositories() gl.get_projects() print(f"Found {len(gl.projects)} Repositories..") for repo in gl.projects: scan_path = gl.clone_repo(repo['http_url_to_repo']) findings = scan_repo(scan_path, repo['web_url']) if findings: print("Evaluating matches") finding_results = evaluate_findings(findings=findings) if finding_results: report.findings += finding_results subprocess.run(["rm", "-rf", scan_path]) git_tmp_root = os.environ.get('GIT_TMP_ROOT', '/tmp/repo_check') report_path = os.environ.get('REPORT_PATH', '/tmp/check_reports') report_file = os.environ.get('REPORT_FILE', 'report.csv') pattern_file = os.environ.get('PATTERN_FILE', 'patternfile') fileglob = parse_fileglob() Path(git_tmp_root).mkdir(parents=True, exist_ok=True) Path(report_path).mkdir(parents=True, exist_ok=True) report = Report() check_repos() report.results() report.write_report(f"{report_path}/{report_file}")