-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupload_to_defectdojo.py
146 lines (129 loc) · 4.86 KB
/
upload_to_defectdojo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import requests
# DefectDojo settings
DEFECTDOJO_URL = os.getenv("DEFECTDOJO_URL") # Fetch from environment variables
API_KEY = os.getenv("DEFECTDOJO_API_KEY") # Fetch from environment variables
print(f"API_KEY: {API_KEY}") # Debugging output
PRODUCT_NAME = "Automated Product" # Name of the product to use or create
PRODUCT_TYPE_NAME = "Automated Product Type" # Name of the product type to use or create
ENGAGEMENT_NAME = "Automated Engagement"
SCAN_TYPE = "Anchore Grype" # Scan type matching the tool used
SCAN_FILE_PATH = os.getenv("SCAN_FILE_PATH")
# Headers
HEADERS = {
"Authorization": f"Token {API_KEY}",
}
try:
with open(SCAN_FILE_PATH, "r") as file:
file_contents = file.read()
print(f"Contents of the scan file:\n{file_contents}")
except Exception as e:
print(f"Error reading file {SCAN_FILE_PATH}: {e}")
def get_or_create_product_type():
"""
Ensures a product type exists and returns its ID.
"""
# Check for existing product type
response = requests.get(f"{DEFECTDOJO_URL}/api/v2/product_types/", headers=HEADERS)
if response.status_code == 200:
product_types = response.json()["results"]
for product_type in product_types:
if product_type["name"] == PRODUCT_TYPE_NAME:
print(f"Found product type: {PRODUCT_TYPE_NAME} with ID {product_type['id']}")
return product_type["id"]
# Create product type if not found
print(f"Product type {PRODUCT_TYPE_NAME} not found. Creating it.")
url = f"{DEFECTDOJO_URL}/api/v2/product_types/"
data = {"name": PRODUCT_TYPE_NAME, "description": "Created for automated scans"}
response = requests.post(url, headers=HEADERS, json=data)
if response.status_code == 201:
product_type_id = response.json()["id"]
print(f"Product type created with ID: {product_type_id}")
return product_type_id
else:
print("Failed to create product type.")
print(response.json())
return None
def get_or_create_product():
"""
Ensures a product exists, creating it if necessary, and returns its ID.
"""
# Get or create the product type
product_type_id = get_or_create_product_type()
if not product_type_id:
return None
# Check for existing product
response = requests.get(f"{DEFECTDOJO_URL}/api/v2/products/", headers=HEADERS)
if response.status_code == 200:
products = response.json()["results"]
for product in products:
if product["name"] == PRODUCT_NAME:
print(f"Found product: {PRODUCT_NAME} with ID {product['id']}")
return product["id"]
# Create product if not found
print(f"Product {PRODUCT_NAME} not found. Creating it.")
url = f"{DEFECTDOJO_URL}/api/v2/products/"
data = {
"name": PRODUCT_NAME,
"description": "Created for automated scans",
"prod_type": product_type_id, # Include the required product type ID
}
response = requests.post(url, headers=HEADERS, json=data)
if response.status_code == 201:
product_id = response.json()["id"]
print(f"Product created with ID: {product_id}")
return product_id
else:
print("Failed to create product.")
print(response.json())
return None
def create_engagement(product_id):
"""
Creates an engagement in DefectDojo and returns its ID.
"""
url = f"{DEFECTDOJO_URL}/api/v2/engagements/"
data = {
"name": ENGAGEMENT_NAME,
"product": product_id,
"target_start": "2024-01-01",
"target_end": "2024-12-31",
"status": "In Progress",
"engagement_type": "CI/CD",
}
response = requests.post(url, headers=HEADERS, json=data)
if response.status_code == 201:
engagement_id = response.json()["id"]
print(f"Engagement created with ID: {engagement_id}")
return engagement_id
else:
print("Failed to create engagement.")
print(response.json())
return None
def upload_scan(engagement_id):
"""
Uploads the scan result to DefectDojo.
"""
url = f"{DEFECTDOJO_URL}/api/v2/import-scan/"
with open(SCAN_FILE_PATH, "rb") as scan_file:
files = {"file": scan_file}
data = {
"engagement": engagement_id,
"scan_type": SCAN_TYPE,
"active": True,
"verified": True,
}
response = requests.post(url, headers=HEADERS, files=files, data=data)
if response.status_code == 201:
print("Scan successfully uploaded to DefectDojo.")
else:
print("Failed to upload scan.")
print(response.json())
# Main workflow
def main():
product_id = get_or_create_product()
if product_id:
engagement_id = create_engagement(product_id)
if engagement_id:
upload_scan(engagement_id)
if __name__ == "__main__":
main()