Security for Bitbucket Data Center Cookbook
The following recipes are provided for informational purposes only and any code here is provided on an “as-is” basis. The reader assumes all risk of any use of the code in this Cookbook. THERE IS NO WARRANTY ON THIS SOFTWARE, EXPRESS OR IMPLIED, AND ALL WARRANTIES ARE EXPRESSLY DISCLAIMED HEREIN, INCLUDING THE IMPLIED WARRANTY OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
Copy reviewed false positives from one repository to another
Introduction
Copying reviewed false positives in bulk might be desirable if a repository has been completely cloned from one place to another.
In this recipe you will learn:
How to interact with the Security for Bitbucket REST API to export reviewed false positives
How to interact with the Security for Bitbucket REST API to create new reviewed false positives
Steps
Create a new Python project using your preferred dependency management system. For example, you could create a python3 virtual environment:
$ virtualenv security-for-bitbucket-clone-reviewed
# ...
$ source security-for-bitbucket-clone-reviewed/bin/activate
Install the following required dependencies:
click
,atlassian-python-api
, andstrictyaml
(for the YAML configuration).
$ pip install click atlassian-python-api strictyaml
Create the following configuration file in
/etc/soteri/security-for-bitbucket-clone-reviewed.yaml
.
Relying on OS permissions to protect credentials on disk may not be approved by your organizational security policy - in which case, you will need to find an alternative way to provide your Bitbucket credentials.
bitbucket:
url: https://my-bitbucket-instance/bitbucket
username: username-of-utility
password: agent-password
Create the following utility script:
#!/usr/bin/env python3
import sys
import click
from atlassian import Bitbucket
from strictyaml import load
def initialize_config(config_filename):
with open(config_filename, 'w') as file:
file.write("""
bitbucket:
url: URL
username: USERNAME
password: PASSWORD
""")
def get_reviewed_findings_csv(bitbucket, from_project, from_repo):
resp = bitbucket.get(
f'/rest/security/latest/export/reviewed/projects/{from_project}/repos/{from_repo}',
advanced_mode=True,
headers={ "Accept": "*/*"}
)
return resp.text
def trim_quotes(string: str):
string = string.strip()
if string.startswith('"') and string.endswith('"'):
return string[1:-1]
return string
def process_csv(csv):
headers = None
for line in csv.split('\n'):
if line.strip() == "":
continue
if headers is None:
headers = [trim_quotes(item) for item in line.split(',')]
else:
items = [trim_quotes(item) for item in line.split(',')]
finding = dict(zip(headers, items))
yield finding
def review_finding(bitbucket: Bitbucket, to_project, to_repo, finding):
bitbucket.post(f'/rest/security/latest/review-lines/projects/{to_project}/repos/{to_repo}/create', json={
"matchText": finding['match text'],
"ruleName": finding['rule name']
})
@click.command()
@click.option("--config-filename", default='/etc/soteri/security-for-bitbucket-clone-reviewed.yaml',
help="YAML configuration filename")
@click.option("--dry-run", is_flag=True, help="Don't make any changes")
@click.option("--init", is_flag=True, help="Generate a YAML config")
@click.option("--from-project", help="Origin project")
@click.option("--from-repo", help="Origin repository")
@click.option("--from-csv", help="File containing CSV with reviewed findings")
@click.option("--to-project", help="Destination project")
@click.option("--to-repo", help="Destination repository")
def cli(config_filename, dry_run, init, from_project, from_repo, from_csv, to_project, to_repo):
if init:
initialize_config(config_filename)
sys.exit(0)
with open(config_filename) as file:
config = load(file.read())
bitbucket = Bitbucket(**config['bitbucket'].data)
if from_csv is None:
csv = get_reviewed_findings_csv(bitbucket, from_project, from_repo)
else:
with open(from_csv) as file:
csv = file.read()
for finding in process_csv(csv):
if dry_run:
click.echo(f"Would have reviewed a finding with rule name {finding['rule name']}")
else:
review_finding(bitbucket, to_project, to_repo, finding)
click.echo(f"Reviewed finding with rule name {finding['rule name']}")
if __name__ == '__main__':
cli()
Test the utility script in a dry run:
$ ./security-for-bitbucket-clone-reviewed.py --dry-run --from-project DEV --from-repo rep_1 \
--to-project DEV --to-repo rep_2
Would have reviewed a finding with rule name hello
Remove the
--dry-run
flag and use the tool as designed.
The tool also supports a --from-csv
flag in place of --from-project
and --from-repo
, so you can export reviewed findings from the Security for Bitbucket UI, and even edit the CSV before running it through the tool.
Create a Prometheus Exporter in Python for Security for Bitbucket Metrics
Introduction
Metrics are an important part of any enterprise ops workflow. It’s especially important for SecOps teams to be able to track in near-real-time the security status of their infrastructure.
Prometheus exporters can be consumed not just by Prometheus but by other telemetry aggregation products such as Splunk.
In this recipe, you will learn:
How to create an exporter for your Security for Bitbucket metrics
How to use the Security for Bitbucket REST API to compute metrics
How to integrate your exporter with your ops workflow
Steps
Create a new Python project using your preferred dependency management system. For example, you could create a python3 virtual environment:
$ virtualenv security-for-bitbucket-exporter-env
# ...
$ source security-for-bitbucket-exporter-env/bin/activate
Install the following required dependencies:
prometheus-client
,atlassian-python-api
, andstrictyaml
(for Prometheus-flavored configuration).
$ pip install prometheus-client atlassian-python-api strictyaml
Create the following configuration file in
/etc/prometheus/soteri-bitbucket-exporter.yaml
.
Relying on OS permissions to protect credentials on disk may not be approved by your organizational security policy - in which case, you will need to find an alternative way to provide your Bitbucket credentials.
bitbucket:
url: https://my-bitbucket-instance/bitbucket
username: username-of-exporter
password: agent-password
polling_seconds: 600 # 10 minutes
port: 8000
Create the following exporter script:
security-for-bitbucket-exporter.py
#!/usr/bin/env python3
# dependencies
from prometheus_client import start_http_server, Gauge
from strictyaml import load
from atlassian import Bitbucket
# builtins
from time import sleep
from datetime import datetime
from functools import reduce
PROJECTS_SCANNED = Gauge("projects_scanned", "Number of projects scanned.")
REPOS_SCANNED = Gauge("repos_scanned", "Number of repositories scanned.")
SECURE_REPOS = Gauge("secure_repos", "Number of repositories with no findings.")
VULNERABLE_REPOS = Gauge("vulnerable_repos", "Number of repositories with findings.")
BITBUCKET = None
def stream_project_statuses():
url = '/rest/security/latest/status/projects'
limit = 100
start = 0
last_page = False
while not last_page:
response = BITBUCKET.get(url + f'?start={start}&limit={limit}')
last_page = response['lastPage']
start = response['nextPageRequestStart']
for status in response['data']:
yield status
def update_gauges():
try:
project_statuses = list(stream_project_statuses())
except requests.exceptions.ConnectionError:
print(f"{datetime.now()}: Bitbucket is down. Not updating metrics.")
return
PROJECTS_SCANNED.set(reduce(
lambda accumulator, next_status: accumulator+(1 if next_status['repositoriesNotScanned'] == 0 else 0),
project_statuses, 0
))
REPOS_SCANNED.set(reduce(
lambda accumulator, next_status:
accumulator + (next_status['repositoriesTotal'] - next_status['repositoriesNotScanned']),
project_statuses, 0
))
SECURE_REPOS.set(reduce(
lambda accumulator, next_status:
accumulator + next_status['repositoriesSecure'], project_statuses, 0
))
VULNERABLE_REPOS.set(reduce(
lambda accumulator, next_status:
accumulator + next_status['repositoriesVulnerable'], project_statuses, 0
))
print(f"{datetime.now()}: Metrics updated successfully.")
if __name__ == '__main__':
with open('/etc/prometheus/soteri-bitbucket-exporter.yaml') as config_file:
config = load(config_file.read())
BITBUCKET = Bitbucket(**config['bitbucket'].data)
start_http_server(int(config['port']))
while True:
update_gauges()
sleep(int(config['polling_seconds']))
Add the agent to your service initializer (e.g.:
systemd
).
/etc/systemd/system/security-for-bitbucket-exporter.service
[Unit]
Description=Security for Bitbucket Prometheus Exporter
After=network.target
StartLimitIntervalSec=0
[Service]
Type=simple
Restart=always
RestartSec=1
User=exporter
ExecStart=/home/exporter/security-for-bitbucket-exporter-env/bin/python /home/exporter/security-for-bitbucket-exporter.py
[Install]
WantedBy=multi-user.target
And enable it - for example:
$ sudo systemctl enable security-for-bitbucket-exporter
curl localhost:8000
(or whatever port you’ve decided to run your agent on) to test:
$ curl localhost:8000
# HELP python_gc_objects_collected_total Objects collected during gc
# TYPE python_gc_objects_collected_total counter
python_gc_objects_collected_total{generation="0"} 262.0
python_gc_objects_collected_total{generation="1"} 13.0
python_gc_objects_collected_total{generation="2"} 0.0
# HELP python_gc_objects_uncollectable_total Uncollectable objects found during GC
# TYPE python_gc_objects_uncollectable_total counter
python_gc_objects_uncollectable_total{generation="0"} 0.0
python_gc_objects_uncollectable_total{generation="1"} 0.0
python_gc_objects_uncollectable_total{generation="2"} 0.0
# HELP python_gc_collections_total Number of times this generation was collected
# TYPE python_gc_collections_total counter
python_gc_collections_total{generation="0"} 96.0
python_gc_collections_total{generation="1"} 8.0
python_gc_collections_total{generation="2"} 0.0
# HELP python_info Python platform information
# TYPE python_info gauge
python_info{implementation="CPython",major="3",minor="12",patchlevel="0",version="3.12.0"} 1.0
# HELP projects_scanned Number of projects scanned.
# TYPE projects_scanned gauge
projects_scanned 3.0
# HELP repos_scanned Number of repositories scanned.
# TYPE repos_scanned gauge
repos_scanned 7.0
# HELP secure_repos Number of repositories with no findings.
# TYPE secure_repos gauge
secure_repos 6.0
# HELP vulnerable_repos Number of repositories with findings.
# TYPE vulnerable_repos gauge
vulnerable_repos 1.0