260 lines
8.0 KiB
Python
Executable File
260 lines
8.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# pip install GitPython requests
|
|
# You need to have ripgrep installed too
|
|
# apt-get install ripgrep
|
|
|
|
import os
|
|
import subprocess
|
|
import json
|
|
import csv
|
|
from git import Repo
|
|
from requests import Session
|
|
from pathlib import Path
|
|
|
|
class GitlabConnector:
|
|
|
|
def __init__(self):
|
|
self.url = os.environ.get('GITLAB_URL')
|
|
self.pat = os.environ.get('GITLAB_PAT')
|
|
|
|
self.session = Session()
|
|
self.session.headers.update(
|
|
{
|
|
'PRIVATE-TOKEN': self.pat,
|
|
'Content-Type': 'application/json'
|
|
}
|
|
)
|
|
def query(self, path):
|
|
url = f"{self.url}/{path}"
|
|
return self.session.get(url)
|
|
|
|
def get(self, url):
|
|
return self.session.get(url)
|
|
|
|
class Report():
|
|
|
|
def __init__(self):
|
|
self.findings = []
|
|
|
|
def results(self):
|
|
print(self.findings)
|
|
|
|
def write_report(self, filename):
|
|
with open(filename, 'w', newline='') as csvfile:
|
|
writer = csv.writer(csvfile, delimiter=';', dialect='unix', quotechar='|', quoting=csv.QUOTE_MINIMAL)
|
|
for row in self.findings:
|
|
writer.writerow(row)
|
|
|
|
class GitlabRepositories():
|
|
|
|
def __init__(self):
|
|
|
|
if not os.environ.get('GITLAB_URL'):
|
|
print("Environment variable GITLAB_URL not specified")
|
|
exit(1)
|
|
|
|
if not os.environ.get('GITLAB_PAT'):
|
|
print("Environment variable GITLAB_PAT not specified")
|
|
exit(1)
|
|
|
|
self.projects = []
|
|
self.groups = []
|
|
self.base_path = '/api/v4'
|
|
self.session = GitlabConnector()
|
|
|
|
if os.environ.get('GITLAB_GROUP_ID'):
|
|
self.groups.append(os.environ.get('GITLAB_GROUP_ID'))
|
|
|
|
def parse_pagination(self, result_headers):
|
|
|
|
ret_val = False
|
|
if not result_headers.get('Link'):
|
|
return ret_val
|
|
|
|
links = result_headers['Link'].split(', ')
|
|
|
|
for link in links:
|
|
parts = link.split('; ')
|
|
rel = parts[1].split('=')[1]
|
|
if rel == '"next"':
|
|
ret_val = parts[0].replace('<', '').replace('>', '')
|
|
break
|
|
|
|
return ret_val
|
|
|
|
|
|
def get_groups(self, next_link=None):
|
|
|
|
result = None
|
|
if not next_link:
|
|
result = self.session.query(f"{self.base_path}/groups")
|
|
else:
|
|
result = self.session.get(next_link)
|
|
|
|
if not result:
|
|
print("No groups found or permissions not sufficient.")
|
|
return
|
|
|
|
self.groups += [i['id'] for i in result.json()]
|
|
next_link = self.parse_pagination(result.headers)
|
|
|
|
if next_link:
|
|
self.get_groups(next_link=next_link)
|
|
|
|
def get_projects_by_group(self, next_link=None, group_id=None):
|
|
|
|
result = None
|
|
url_params = ["include_subgroups=true", "per_page=20", "search_namespaces=true", "owned=false", "order_by=id", "sort=asc"]
|
|
group_path = f"{self.base_path}/groups/{group_id}"
|
|
|
|
if not next_link:
|
|
result = self.session.query(f"{group_path}/projects?{'&'.join(url_params)}")
|
|
else:
|
|
result = self.session.get(next_link)
|
|
|
|
if not result:
|
|
print(f"No projects in group {group_id} found or permissions not sufficient.")
|
|
return
|
|
|
|
self.projects += [{'id': i['id'], 'http_url_to_repo': i['http_url_to_repo'], 'ssh_url_to_repo': i['ssh_url_to_repo'], 'web_url': i['web_url']} for i in result.json()]
|
|
next_link = self.parse_pagination(result.headers)
|
|
|
|
if next_link:
|
|
self.get_projects_by_group(next_link=next_link, group_id=group_id)
|
|
|
|
#print(result.json())
|
|
|
|
def get_projects(self):
|
|
print("Getting GitLab Projects")
|
|
# When groups not empty or specified, parse groups
|
|
if not self.groups:
|
|
self.get_groups()
|
|
|
|
for group_id in self.groups:
|
|
print(f"Getting Projects for Group {group_id}")
|
|
self.get_projects_by_group(group_id=group_id)
|
|
|
|
print(len(self.projects))
|
|
|
|
|
|
def clone_repo(self, repo_url=None):
|
|
repo_host_path = repo_url.split('://')[1]
|
|
repo_http_scheme = repo_url.split('://')[0]
|
|
repo_credentials = f"token:{self.session.pat}"
|
|
repo_remote = f"{repo_http_scheme}://{repo_credentials}@{repo_host_path}"
|
|
repo_name = repo_host_path.replace('/', '_').rstrip('.git')
|
|
repo_path = f"{git_tmp_root}/{repo_name}"
|
|
|
|
if os.path.isdir(repo_path) and os.listdir(repo_path):
|
|
return repo_path
|
|
|
|
print(f"Processing Repository {repo_host_path}")
|
|
try:
|
|
repo = Repo.clone_from(repo_remote, repo_path)
|
|
repo.close()
|
|
except:
|
|
print(f"Cant clone {repo_url}")
|
|
return repo_path
|
|
return repo_path
|
|
|
|
def parse_fileglob():
|
|
|
|
glob_file = Path(os.environ.get('GLOB_FILE', 'globfile'))
|
|
ret_val = []
|
|
if not glob_file.is_file():
|
|
print(f"Specified glob file {glob_file} not found. Searching all files.")
|
|
return []
|
|
|
|
with open(glob_file, 'r') as file:
|
|
lines = file.readlines()
|
|
|
|
for line in lines:
|
|
ret_val.append("--glob")
|
|
ret_val.append(line.rstrip())
|
|
|
|
print(ret_val)
|
|
return ret_val
|
|
|
|
def scan_repo(path=None, repo=None):
|
|
|
|
scan_result = None
|
|
scan_result_lines = []
|
|
scan_matches = []
|
|
ripgrep_cmd = [
|
|
"rg",
|
|
"--json",
|
|
"-i",
|
|
"-f",
|
|
pattern_file
|
|
]
|
|
ripgrep_cmd = ripgrep_cmd + fileglob
|
|
ripgrep_cmd.append(path)
|
|
try:
|
|
scan_result = subprocess.run(ripgrep_cmd, capture_output=True, text=True)
|
|
except:
|
|
print(f"Failed to run ripgrep for {path}")
|
|
return []
|
|
|
|
scan_out_lines = list(filter(None, scan_result.stdout.split('\n')))
|
|
for line in scan_out_lines:
|
|
line_data = json.loads(line)
|
|
if line_data.get("type") == "match":
|
|
scan_matches += [{
|
|
'repo': repo,
|
|
'full_path': line_data["data"]["path"]["text"],
|
|
'path': line_data["data"]["path"]["text"].replace(path, '').lstrip('/').rstrip(),
|
|
'line_number': line_data["data"]["line_number"],
|
|
'matches': line_data["data"]["submatches"]
|
|
}]
|
|
return scan_matches
|
|
|
|
def evaluate_findings(findings=[]):
|
|
if not findings:
|
|
return []
|
|
|
|
finding_results = []
|
|
for finding in findings:
|
|
filename = finding['full_path'].split('/')[-1]
|
|
print(f"Found potential match - {finding['path']} - {finding['matches'][0]['match']['text']}")
|
|
detail = check_line_in_file(file=finding['full_path'], line_number=finding['line_number'])
|
|
finding_results += [[finding['repo'], finding['path'], finding['line_number'], detail.lstrip(),finding['matches'][0]['match']['text']]]
|
|
return finding_results
|
|
|
|
def check_line_in_file(file=None, line_number=None):
|
|
with open(file) as fp:
|
|
for i, line in enumerate(fp, 1):
|
|
if i == line_number:
|
|
print(line)
|
|
return line.rstrip().replace(',', '')
|
|
|
|
def check_repos():
|
|
gl = GitlabRepositories()
|
|
gl.get_projects()
|
|
print(f"Found {len(gl.projects)} Repositories..")
|
|
|
|
for repo in gl.projects:
|
|
scan_path = gl.clone_repo(repo['http_url_to_repo'])
|
|
findings = scan_repo(scan_path, repo['web_url'])
|
|
if findings:
|
|
print("Evaluating matches")
|
|
finding_results = evaluate_findings(findings=findings)
|
|
if finding_results:
|
|
report.findings += finding_results
|
|
subprocess.run(["rm", "-rf", scan_path])
|
|
|
|
|
|
git_tmp_root = os.environ.get('GIT_TMP_ROOT', '/tmp/repo_check')
|
|
report_path = os.environ.get('REPORT_PATH', '/tmp/check_reports')
|
|
report_file = os.environ.get('REPORT_FILE', 'report.csv')
|
|
pattern_file = os.environ.get('PATTERN_FILE', 'patternfile')
|
|
|
|
fileglob = parse_fileglob()
|
|
|
|
Path(git_tmp_root).mkdir(parents=True, exist_ok=True)
|
|
Path(report_path).mkdir(parents=True, exist_ok=True)
|
|
report = Report()
|
|
check_repos()
|
|
report.results()
|
|
report.write_report(f"{report_path}/{report_file}")
|