-
Notifications
You must be signed in to change notification settings - Fork 490
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Helper library to interface with ARVO docker containers
Summary: This adds: * A library to interact with ARVO containers * An example script to showcase usage eg. for a sample fuzzing crash: https://bugs.chromium.org/p/oss-fuzz/issues/detail?id=1076&q=1076&can=2, we use the 'id' for interfacing: ``` repro = ArvoReproducer(1076) repro.container_exists() // Whether there exists a reproducer repro.verify() // Can we repro crash with oss-fuzz found crash input repro.repro_with_input(b"\x00\x00\x00\x00"): // Does the crash repro with given input repro.clean_up_image() // Clean up memory space ``` Reviewed By: dwjsong Differential Revision: D61420472 fbshipit-source-id: 47cd18e4d473cdce681ffc564d4f8f794ac070f4
- Loading branch information
1 parent
4da9ecc
commit 45b8dc7
Showing
2 changed files
with
152 additions
and
0 deletions.
There are no files selected for viewing
103 changes: 103 additions & 0 deletions
103
CybersecurityBenchmarks/datasets/canary_exploit/oss_fuzz/arvo_reproducer.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
# Copyright (c) Meta Platforms, Inc. and affiliates. | ||
# | ||
# This source code is licensed under the MIT license found in the | ||
# LICENSE file in the root directory of this source tree. | ||
|
||
# pyre-strict | ||
|
||
|
||
import logging | ||
import subprocess | ||
import tempfile | ||
from typing import List, Optional | ||
|
||
import requests | ||
|
||
ARVO_DOCKER_REPOSITORY = "n132/arvo" | ||
|
||
|
||
class ArvoReproducer: | ||
def __init__(self, id: int) -> None: | ||
self.id = id | ||
self.tag = f"{self.id}-vul" | ||
|
||
def get_container_name(self) -> str: | ||
return f"{ARVO_DOCKER_REPOSITORY}:{self.tag}" | ||
|
||
def container_exists(self) -> bool: | ||
"""Check if the specified Docker container exists.""" | ||
url = f"https://registry.hub.docker.com/v2/repositories/{ARVO_DOCKER_REPOSITORY}/tags/{self.tag}/" | ||
response = requests.get(url) | ||
|
||
# Check if the image is found (status code 200 means it exists) | ||
return response.status_code == 200 | ||
|
||
def _run_docker( | ||
self, extra_args: Optional[List[str]] = None | ||
) -> subprocess.CompletedProcess[bytes]: | ||
cmd_args = [ | ||
"docker", | ||
"run", | ||
"-it", | ||
"--rm", # Remove the container after it exits | ||
] | ||
if extra_args is not None: | ||
cmd_args.extend(extra_args) | ||
cmd_args.extend( | ||
[ | ||
self.get_container_name(), | ||
"arvo", | ||
], | ||
) | ||
result = subprocess.run( | ||
cmd_args, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
) | ||
|
||
output = result.stdout.strip() | ||
error = result.stderr.strip() | ||
|
||
# Health check | ||
if b"INFO: Seed: " not in output: | ||
logging.error("Error while running docker") | ||
logging.error("stdout:") | ||
logging.error(output) | ||
logging.error("stderr:") | ||
logging.error(error) | ||
raise Exception("Error running docker") | ||
|
||
return result | ||
|
||
def verify(self) -> bool: | ||
"""Run the Docker container with fuzzer found crash input and capture the output and error code.""" | ||
result = self._run_docker() | ||
|
||
error_code = result.returncode | ||
return error_code != 0 | ||
|
||
def repro_with_input(self, input_data: bytes) -> bool: | ||
"""Run the Docker container, replace /tmp/poc with given input, and capture output and error code.""" | ||
# Create a temporary file for the input data | ||
with tempfile.NamedTemporaryFile() as tmp_file: | ||
tmp_file.write(input_data) | ||
tmp_file.flush() | ||
|
||
# Run the Docker container with the replaced /tmp/poc | ||
result = self._run_docker( | ||
[ | ||
"-v", | ||
f"{tmp_file.name}:/tmp/poc", | ||
] | ||
) | ||
|
||
error_code = result.returncode | ||
return error_code != 0 | ||
|
||
def clean_up_image(self) -> None: | ||
"""Clean up docker image associated with this issue""" | ||
subprocess.run( | ||
["docker", "image", "rmi", "-f", self.get_container_name()], | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
) |
49 changes: 49 additions & 0 deletions
49
CybersecurityBenchmarks/datasets/canary_exploit/oss_fuzz/arvo_tester.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
# Copyright (c) Meta Platforms, Inc. and affiliates. | ||
# | ||
# This source code is licensed under the MIT license found in the | ||
# LICENSE file in the root directory of this source tree. | ||
|
||
# pyre-strict | ||
|
||
import base64 | ||
|
||
import unittest | ||
from typing import Dict, List, Tuple | ||
|
||
from .arvo_reproducer import ArvoReproducer | ||
|
||
vuln_test_cases: Dict[int, List[Tuple[bool, bytes]]] = { | ||
1076: [ | ||
(False, b"\x00\x00\x00\x00"), | ||
( | ||
True, | ||
base64.b64decode( | ||
""" | ||
PD94bWwgZW5jb2Rpbmc9Im5vIjxJU08tMDI0MTgtRU50DlJVPjxJQU46X1NVPhk8QU5ZU35+fn4g | ||
PEUgeG1sbnM9Ii8vLy8vLy8vL35+fn5+fn5+fn5+fn5+fn5+fn5+fn5+fn5+fn5+fn5+fn5+fn5+ | ||
ODg4ODg4ODg4ODg4ODg4ODg4Nzk5OTk5OTk5OS8vLyYjeDg4ODg4ODg4ODg4ODg4ODg4ODg4ODg4 | ||
ODg4ODg4ODg4ODg4ODg4ODg4ODg4ODg4ODg4ODg4ODg4ODg4ONLHx8fHfn5+fn5+fn5+fn5+fn5+ | ||
fn5+fn5+fn5+fn5+fn5+fn5+fn5+fn5+fn5+fn5+fn5+fg== | ||
""" | ||
), | ||
), | ||
] | ||
} | ||
|
||
|
||
class ArvoReproducerTest(unittest.TestCase): | ||
@unittest.skip("This test only works if docker is installed on the machine") | ||
def test_arvo_reproducer(self) -> None: | ||
for vuln, test_cases in vuln_test_cases.items(): | ||
repro = ArvoReproducer(vuln) | ||
self.assertTrue(repro.container_exists(), "Container does not exist") | ||
self.assertTrue(repro.verify(), "Fuzzer input does not crash") | ||
|
||
for expected_result, input in test_cases: | ||
self.assertEqual( | ||
repro.repro_with_input(input), | ||
expected_result, | ||
"Mismatch of expected result", | ||
) | ||
|
||
repro.clean_up_image() |