#!/usr/bin/env python3 # Very hacky quick check for sha1-hulud for gitlab repos # Need to set the GITLAB_URL and GITLAB_PAT # Also use the CSV provided by https://github.com/wiz-sec-public/wiz-research-iocs/blob/main/reports/shai-hulud-2-packages.csv and create a patternfile # Patternfile creation: # curl https://raw.githubusercontent.com/wiz-sec-public/wiz-research-iocs/refs/heads/main/reports/shai-hulud-2-packages.csv > sha1-hulud-2-packages.csv # tail -n +2 sha1-hulud-2-packages.csv | awk -F ',' '{print $1}' > patternfile # pip install GitPython requests # You need to have ripgrep installed too # apt-get install ripgrep import os import subprocess import json import csv from git import Repo from requests import Session from pathlib import Path class GitlabConnector: def __init__(self): self.url = os.environ.get('GITLAB_URL') self.pat = os.environ.get('GITLAB_PAT') self.session = Session() self.session.headers.update( { 'PRIVATE-TOKEN': self.pat, 'Content-Type': 'application/json' } ) def query(self, path): url = f"{self.url}/{path}" return self.session.get(url) def get(self, url): return self.session.get(url) class Report(): def __init__(self): self.findings = [] def results(self): print(self.findings) def write_report(self, filename): with open(filename, 'w', newline='') as csvfile: writer = csv.writer(csvfile, delimiter=';', dialect='unix', quotechar='|', quoting=csv.QUOTE_MINIMAL) for row in self.findings: writer.writerow(row) def get_all_projects(next_link=None, group_id=None, prev_result=[]): base_path = '/api/v4' url_params = ["include_subgroups=true", "per_page=50", "search_namespaces=true", "owned=false", "order_by=id", "sort=asc"] if group_id: base_path += f"/groups/{group_id}" else: url_params.append("pagination=keyset") if not next_link: result = session.query(f"{base_path}/projects?{'&'.join(url_params)}") else: result = session.get(next_link) if result.headers.get('Link'): links = result.headers['Link'].split(', ') for link in links: parts = link.split('; ') rel = parts[1].split('=')[1] if rel == '"next"': link = parts[0].replace('<', '').replace('>', '') break prev_result += [{'id': i['id'], 'http_url_to_repo': i['http_url_to_repo'], 'ssh_url_to_repo': i['ssh_url_to_repo'], 'web_url': i['web_url']} for i in result.json()] # I know, not nice.. but im in a hurry try: if rel == "\"next\"": get_all_projects(next_link=link, group_id=group_id, prev_result=prev_result) except: pass return prev_result def clone_repo_with_http(repo_url=None): repo_host_path = repo_url.split('://')[1] repo_http_scheme = repo_url.split('://')[0] repo_credentials = f"token:{session.pat}" repo_remote = f"{repo_http_scheme}://{repo_credentials}@{repo_host_path}" repo_name = repo_host_path.replace('/', '_').rstrip('.git') repo_path = f"{git_tmp_root}/{repo_name}" if os.path.isdir(repo_path) and os.listdir(repo_path): return repo_path print(f"Processing Repository {repo_host_path}") try: repo = Repo.clone_from(repo_remote, repo_path) repo.close() except: print(f"Cant clone {repo_url}") return repo_path return repo_path def scan_repo(path=None, repo=None): scan_result = None scan_result_lines = [] scan_matches = [] ripgrep_cmd = [ "rg", "--json", "-i", "-f", "patternfile", path ] try: scan_result = subprocess.run(ripgrep_cmd, capture_output=True, text=True) except: print(f"Failed to run ripgrep for {path}") return [] scan_out_lines = list(filter(None, scan_result.stdout.split('\n'))) for line in scan_out_lines: line_data = json.loads(line) if line_data.get("type") == "match": scan_matches += [{ 'repo': repo, 'full_path': line_data["data"]["path"]["text"], 'path': line_data["data"]["path"]["text"].replace(path, '').lstrip('/').rstrip(), 'line_number': line_data["data"]["line_number"], 'matches': line_data["data"]["submatches"] }] return scan_matches def evaluate_findings(findings=[]): finding_results = [] for finding in findings: filename = finding['full_path'].split('/')[-1] if filename.startswith("package"): print(f"Found potential match - {finding['path']} - {finding['matches'][0]['match']['text']}") detail = check_line_in_file(file=finding['full_path'], line_number=finding['line_number']) finding_results += [[finding['repo'], finding['path'], finding['line_number'], detail.lstrip(),finding['matches'][0]['match']['text']]] return finding_results def check_line_in_file(file=None, line_number=None): with open(file) as fp: for i, line in enumerate(fp, 1): if i == line_number: print(line) return line.rstrip().replace(',', '') def check_repos(): repos = get_all_projects(group_id=os.environ.get('GITLAB_GROUP')) print(f"Found {len(repos)} Repositories..") for repo in repos: scan_path = clone_repo_with_http(repo['http_url_to_repo']) findings = scan_repo(scan_path, repo['web_url']) if findings: print("Evaluating matches") finding_results = evaluate_findings(findings=findings) if finding_results: report.findings += finding_results subprocess.run(["rm", "-rf", scan_path]) git_tmp_root = os.environ.get('GIT_TMP_ROOT', '/tmp/hulud_check') report_path = os.environ.get('REPORT_PATH', '/tmp/hulud_check_reports') report_file = os.environ.get('REPORT_FILE', 'report.csv') Path(git_tmp_root).mkdir(parents=True, exist_ok=True) Path(report_path).mkdir(parents=True, exist_ok=True) session = GitlabConnector() report = Report() check_repos() report.results() report.write_report(f"{report_path}/{report_file}")