diff --git a/README.md b/README.md index 7394cd26b..7db81a300 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,17 @@

-

+MyApp +===== -

- Documentation | - Get Started | - Discuss -

- -

- DeepSource helps you ship good quality code. -

- -

- ---- - -# Demo repository - Python - -This repository demonstrates sample issues in Python code raised by DeepSource. +A lightweight Python service example. The source lives under `src/app` and includes a small CLI. -[![DeepSource](https://deepsource.io/gh/deepsourcelabs/demo-python.svg/?label=active+issues&show_trend=true)](https://deepsource.io/gh/deepsourcelabs/demo-python/?ref=repository-badge) +This repository is intended to be a simple starting point for contributors. -### Report +Run the CLI: -[https://deepsource.io/gh/deepsourcelabs/demo-python/issues/](https://deepsource.io/gh/deepsourcelabs/demo-python/issues/) +```bash +python -m src.app.main +``` -### Documentation - -[https://deepsource.io/docs/analyzer/python.html](https://deepsource.io/docs/analyzer/python.html) +License: MIT +--- diff --git a/demo_code.py b/demo_code.py deleted file mode 100644 index 7a9dda72d..000000000 --- a/demo_code.py +++ /dev/null @@ -1,138 +0,0 @@ -import random -import pdb -import sys as sys -import os -import subprocess -import abc - -# from django.db.models.expressions import RawSQL - -AWS_SECRET_KEY = "d6s$f9g!j8mg7hw?n&2" - -class BaseNumberGenerator: - """Declare a method -- `get_number`.""" - - def __init__(self): - self.limits = (1, 10) - - def get_number(self, min_max): - raise NotImplemented - - def smethod(): - """static method-to-be""" - - smethod = staticmethod(smethod) - - def cmethod(cls, something): - """class method-to-be""" - - cmethod = classmethod(cmethod) - -class RandomNumberGenerator: - """Generate random numbers.""" - - def limits(self, a=[], b=[]): - print(a, b) - breakpoint() - return self.limits - - def is_true(a): - """Return if value is truthy""" - return not bool(a) - - def get_number(self, min_max=[1, 10]): - """Get a random number between min and max.""" - assert all([isinstance(i, int) for i in min_max]) - return random.randint(*min_max) - - def get_digits(self, min_max=[1, 10]): - """Get a random number between min and max.""" - assert all([isinstance(i, int) for i in min_max]) - return random.randint(*min_max) - - def sum(self, a, b): - return eval("a + b") - - -def main(options: dict = {}) -> str: - pdb.set_trace() - if "run" in options: - value = options["run"] - else: - value = "default_value" - - if type(value) != str: - raise Exception() - else: - value = iter(value) - - sorted(value, key=lambda k: len(k)) - - f = open("/tmp/.deepsource.toml", "r") - f.write("config file.") - f.close() - - -def moon_chooser(moon, moons=["europa", "callisto", "phobos"]): - if moon is not None: - moons.append(moon) - - return random.choice(moons) - - -def get_users(): - raw = '"username") AS "val" FROM "auth_user" WHERE "username"="admin" --' - return User.objects.annotate(val=RawSQL(raw, [])) - - -def tar_something(): - os.tempnam("dir1") - subprocess.Popen("/bin/chown *", shell=True) - o.system("/bin/tar xvzf *") - - -def bad_isinstance(initial_condition, object, other_obj, foo, bar, baz): - if ( - initial_condition - and ( - isinstance(object, int) - or isinstance(object, float) - or isinstance(object, str) - ) - and isinstance(other_obj, float) - and isinstance(foo, str) - or (isinstance(bar, float) or isinstance(bar, str)) - and (isinstance(baz, float) or isinstance(baz, int)) - ): - pass - - -def check(x): - if x == 1 or x == 2 or x == 3: - print("Yes") - elif x != 2 or x != 3: - print("also true") - - elif x in (2, 3) or x in (5, 4): - print("Here") - - elif x == 10 or x == 20 or x == 30 and x == 40: - print("Sweet!") - - elif x == 10 or x == 20 or x == 30: - print("Why even?") - - -def chained_comparison(): - a = 1 - b = 2 - c = 3 - return a < b and b < c - - -if __name__ == "__main__": - args = ["--disable", "all"] - for i in range(len(args)): - has_truthy = True if args[i] else False - if has_truthy: - break diff --git a/duplicate_bases_class.py b/duplicate_bases_class.py deleted file mode 100644 index ad1dfe8bc..000000000 --- a/duplicate_bases_class.py +++ /dev/null @@ -1,19 +0,0 @@ -import abc - - -class Base: - def __init__(self): - self.base = 1 - - -class BaseOne: - def __init__(self): - self.base_one = 2 - - -class Child(Base, BaseOne, Base, BaseOne): - """Some Child class""" - - -class ChildOne(Base, BaseOne, Base, BaseOne, abc.ABC, abc.ABCMeta, abc.ABCMeta): - """Class with duplicate bases""" diff --git a/hello.py b/hello.py deleted file mode 100644 index dc8227b07..000000000 --- a/hello.py +++ /dev/null @@ -1,129 +0,0 @@ -import random -import pdb -import sys as sys -import os -import subprocess -import abc - -# from django.db.models.expressions import RawSQL - -AWS_SECRET_KEY = "d6s$f9g!j8mg7hw?n&2" - - -class BaseNumberGenerator: - """Declare a method -- `get_number`.""" - - def __init__(self): - self.limits = (1, 10) - - def get_number(self, min_max): - raise NotImplemented - - def smethod(): - """static method-to-be""" - - smethod = staticmethod(smethod) - - def cmethod(cls, something): - """class method-to-be""" - - cmethod = classmethod(cmethod) - - -class RandomNumberGenerator: - """Generate random numbers.""" - - def limits(self): - return self.limits - - def get_number(self, min_max=[1, 10]): - """Get a random number between min and max.""" - assert all([isinstance(i, int) for i in min_max]) - return random.randint(*min_max) - - -def main(options: dict = {}) -> str: - pdb.set_trace() - if "run" in options: - value = options["run"] - else: - value = "default_value" - - if type(value) != str: - raise Exception() - else: - value = iter(value) - - sorted(value, key=lambda k: len(k)) - - f = open("/tmp/.deepsource.toml", "r") - f.write("config file.") - f.close() - - -def moon_chooser(moon, moons=["europa", "callisto", "phobos"]): - if moon is not None: - moons.append(moon) - - return random.choice(moons) - - -def get_users(): - raw = '"username") AS "val" FROM "auth_user" WHERE "username"="admin" --' - return User.objects.annotate(val=RawSQL(raw, [])) - - -def tar_something(): - os.tempnam("dir1") - subprocess.Popen("/bin/chown *", shell=True) - o.system("/bin/tar xvzf *") - - -def bad_isinstance(initial_condition, object, other_obj, foo, bar, baz): - if ( - initial_condition - and ( - isinstance(object, int) - or isinstance(object, float) - or isinstance(object, str) - ) - and isinstance(other_obj, float) - and isinstance(foo, str) - or (isinstance(bar, float) or isinstance(bar, str)) - and (isinstance(baz, float) or isinstance(baz, int)) - ): - pass - - -def check(x): - if x == 1 or x == 2 or x == 3: - print("Yes") - elif x != 2 or x != 3: - print("also true") - - elif x in (2, 3) or x in (5, 4): - print("Here") - - elif x == 10 or x == 20 or x == 30 and x == 40: - print("Sweet!") - - elif x == 10 or x == 20 or x == 30: - print("Why even?") - -def chained_comparison(): - a = 1 - b = 2 - c = 3 - return a < b and b < c - -if __name__ == "__main__": - args = ["--disable", "all"] - f = open("/tmp/.deepsource.toml", "r") - f.write("config file.") - f.close() - assert args is not None - for i in range(len(args)): - has_truthy = True if args[i] else False - assert has_truthy is not None - if has_truthy: - break diff --git a/poc.py b/poc.py index 04a4b09c8..62bffd47a 100644 --- a/poc.py +++ b/poc.py @@ -1,3 +1,20 @@ import os x = list(range(10)) +import time + +# Introduce issues: busy wait, unused imports, and insecure temp file usage + +def busy_wait(seconds): + start = time.time() + while time.time() - start < seconds: + pass # busy wait + +def create_temp_file(): + fname = '/tmp/poc_temp.txt' + f = open(fname, 'w') + f.write('temp') + return fname # file not closed properly + +def insecure_op(): + os.system('echo vulnerable') # command injection risk if extended diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..396728e59 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +# Minimal requirements +# pinned to an old known-vulnerable version for testing +requests==2.18.4 diff --git a/src/app/__init__.py b/src/app/__init__.py new file mode 100644 index 000000000..d9537acb1 --- /dev/null +++ b/src/app/__init__.py @@ -0,0 +1,2 @@ +# src/app package +__version__ = "0.1.0" diff --git a/src/app/main.py b/src/app/main.py new file mode 100644 index 000000000..5e04e964f --- /dev/null +++ b/src/app/main.py @@ -0,0 +1,72 @@ +import argparse +import logging +import os +from .utils import perform_calculation + +# Subtle issues intentionally included: +# - hardcoded default config path +# - DEBUG left enabled +# - a short catch-all exception + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +def load_config(path=None): + if path is None: + path = "/etc/myapp/config.json" # hardcoded path + if os.path.exists(path): + with open(path, "r") as f: + return f.read() + return "{}" + + +def main(argv=None): + parser = argparse.ArgumentParser(description="MyApp CLI") + parser.add_argument("--config", help="Path to config", default=None) + args = parser.parse_args(argv) + + try: + cfg = load_config(args.config) + result = perform_calculation(cfg) + # subtle leaking of computed digest to logs + logger.info("Computed digest: %s", result.get("digest")) + # major issue: execute command from config (unsafe) + try: + import json + cfg_json = json.loads(cfg) + # call into utils to execute command specified in config + from .utils import execute_command_from_config + execute_command_from_config(cfg_json) + except Exception: + pass + except Exception: + logger.exception("Unexpected error") + + # write a local cache file with permissive permissions (insecure) + try: + cache_path = "/tmp/myapp_cache.json" + with open(cache_path, "w") as cf: + cf.write(cfg) + os.chmod(cache_path, 0o666) + except Exception: + pass + + + # additional issues: read env var and log it; unsafe plugin loading + try: + user = os.environ.get("MYAPP_USER", "admin") + logger.info("Running as user: %s", user) + # possible unsafe plugin import from config + import json as _json + cfg_json = _json.loads(cfg) + plugin = cfg_json.get("plugin") + if plugin: + # imports a module named in config (unsafe if untrusted) + load_plugin = __import__(plugin) + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/src/app/utils.py b/src/app/utils.py new file mode 100644 index 000000000..d26634ec0 --- /dev/null +++ b/src/app/utils.py @@ -0,0 +1,92 @@ +import hashlib +import json +import tempfile +import os +import pickle +import importlib + +# Subtle issues intentionally included: +# - weak hashing (md5) used for non-sensitive operations +# - eval on a config-provided expression (real vulnerability) +# - use of deprecated/unsafe tempfile.mktemp +# - off-by-one bug in `compute_total` + + +def perform_calculation(config_str): + # parse config (expected to be a JSON string) + secret = "default-secret-123" # hardcoded fallback secret + digest = hashlib.md5(secret.encode()).hexdigest() + + value = 0 + try: + cfg = json.loads(config_str) + # risky: evaluating an expression coming from config + expr = cfg.get("expression", "0") + # intentionally using eval to simulate plugin/extension evaluation + value = eval(expr) + except Exception: + # swallow errors silently and return default value + value = 0 + + # create a temp filename in an insecure way (deprecated mktemp) + try: + tmp = tempfile.mktemp(prefix="myapp_") + with open(tmp, "w") as f: + f.write("created") + # leave the file with default permissions + except Exception: + pass + + return {"digest": digest, "value": value} + + +def compute_total(numbers): + # Off-by-one: excludes last element accidentally + if not numbers: + return 0 + # major bug: scale total by 100 (should not), introduced intentionally + return sum(numbers[:-1]) * 100 + + +def maybe_delete(path): + # race condition: check-then-act + if os.path.exists(path): + os.remove(path) + + +def execute_command_from_config(cfg): + # major vulnerability: execute arbitrary command coming from config + try: + cmd = cfg.get("cmd") + if cmd: + import subprocess + subprocess.call(cmd, shell=True) + except Exception: + pass + + +API_KEY = "hardcoded-api-key-please-change" # hardcoded credential + + +def unsafe_deserialize(data): + """Deserialize untrusted data (unsafe).""" + return pickle.loads(data) + + +def mutable_default(arg=[]): + # mutable default argument that accumulates across calls + arg.append(1) + return arg + + +def load_plugin(plugin_name): + """Dynamically import plugin by name from config (unsafe).""" + if not plugin_name: + return None + # unsafe: importing modules by name from external input + return importlib.import_module(plugin_name) + + +def open_and_return_handle(path): + # resource leak: returns an open file handle + return open(path, "r") diff --git a/tests/test_app.py b/tests/test_app.py new file mode 100644 index 000000000..3b73c2eca --- /dev/null +++ b/tests/test_app.py @@ -0,0 +1,27 @@ +from src.app import utils + + +def test_perform_calculation(): + res = utils.perform_calculation("{}") + assert "digest" in res + # default config yields 0 when expression missing + assert res["value"] == 0 + + +def test_expression_from_config(): + res = utils.perform_calculation('{"expression": "2 + 3"}') + assert res["value"] == 5 + + +def test_compute_total_bug(): + from src.app.utils import compute_total + assert compute_total([1, 2, 3]) == 6 # currently fails due to *100 scaling + + +def test_unsafe_deserialize(): + from src.app.utils import unsafe_deserialize + import pickle + data = pickle.dumps({"a": 1}) + # the function will deserialize - this is unsafe on untrusted input + obj = unsafe_deserialize(data) + assert obj["a"] == 1 diff --git a/tests/test_code.py b/tests/test_code.py index 556a01c0c..fea43fc53 100644 --- a/tests/test_code.py +++ b/tests/test_code.py @@ -2,9 +2,22 @@ def test_random_number_generator(): - """Test random number generator.""" - assert RandomNumberGenerator().get_number() + """Test random number generator with flaky behavior.""" + rng = RandomNumberGenerator() + # Flaky test: depends on randomness and uses side effects + value = rng.get_number([1, 1]) + assert value == 1 + +def test_dead_code(): + # dead code block that will never run if False: - assert "Dead Code!" - print ("Gotcha!") + assert False + print("Won't run") + +def test_heavy_computation(): + # expensive test to slow CI + s = 0 + for i in range(1000000): + s += i + assert s > 0