Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import sys
import os

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
Comment on lines +1 to +4
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider using an installable package instead of sys.path manipulation.

This sys.path.insert hack is fragile and can mask import issues. If the project adopts a pyproject.toml (or setup.py) with pip install -e ., pytest will discover all modules without path manipulation, and this file becomes unnecessary.

For the current project scope this is acceptable, but it won't scale well if the project grows or CI environments differ.

🤖 Prompt for AI Agents
In `@conftest.py` around lines 1 - 4, Remove the fragile sys.path manipulation in
conftest.py: delete the sys.path.insert(0,
os.path.dirname(os.path.abspath(__file__))) line and the associated imports
(sys, os), and instead make the project an installable package so pytest can
import modules normally (add a pyproject.toml or setup.py and use pip install -e
. in local/CI dev workflow); if immediate packaging is not possible, document a
temporary CI setup that installs the package (or sets PYTHONPATH) rather than
mutating sys.path at runtime.

3 changes: 3 additions & 0 deletions consensus/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .pow import mine_block, calculate_hash, MiningExceededError

__all__ = ["mine_block", "calculate_hash", "MiningExceededError"]
Binary file added consensus/__pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file added consensus/__pycache__/pow.cpython-311.pyc
Binary file not shown.
73 changes: 73 additions & 0 deletions consensus/pow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import json
import time
from nacl.hash import sha256
from nacl.encoding import HexEncoder


class MiningExceededError(Exception):
"""Raised when max_nonce, timeout, or cancellation is exceeded during mining."""


def calculate_hash(block_dict):
"""Calculates SHA256 hash of a block header."""
block_string = json.dumps(block_dict, sort_keys=True).encode("utf-8")
return sha256(block_string, encoder=HexEncoder).decode("utf-8")


def mine_block(
block,
difficulty=4,
max_nonce=10_000_000,
timeout_seconds=None,
logger=None,
progress_callback=None
):
"""Mines a block using Proof-of-Work without mutating input block until success."""

target = "0" * difficulty
local_nonce = 0
start_time = time.time()

if logger:
logger.info(
"Mining block %s (Difficulty: %s)",
block.index,
difficulty,
)

while True:

# Enforce max_nonce limit before hashing
if local_nonce >= max_nonce:
if logger:
logger.warning("Max nonce exceeded during mining.")
raise MiningExceededError("Mining failed: max_nonce exceeded")

# Enforce timeout if specified
if timeout_seconds is not None and (time.time() - start_time) > timeout_seconds:
if logger:
logger.warning("Mining timeout exceeded.")
raise MiningExceededError("Mining failed: timeout exceeded")

# Temporarily set nonce for hashing only
block.nonce = local_nonce
block_hash = calculate_hash(block.to_header_dict())
Comment on lines +52 to +54
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Docstring claims no mutation until success, but block.nonce is set every iteration.

Line 25 says "without mutating input block until success," but line 53 sets block.nonce = local_nonce on every loop iteration (needed for to_header_dict()). If MiningExceededError is raised, the block is left with a stale nonce and no hash. Either fix the docstring or build the header dict without mutating the block.

Option A: Fix the docstring
-    """Mines a block using Proof-of-Work without mutating input block until success."""
+    """Mines a block using Proof-of-Work. Note: block.nonce is mutated during mining."""
Option B: Avoid mutating the block during the loop
-        # Temporarily set nonce for hashing only
-        block.nonce = local_nonce
-        block_hash = calculate_hash(block.to_header_dict())
+        header = block.to_header_dict()
+        header["nonce"] = local_nonce
+        block_hash = calculate_hash(header)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Temporarily set nonce for hashing only
block.nonce = local_nonce
block_hash = calculate_hash(block.to_header_dict())
# Get header dict and apply nonce without mutating block
header = block.to_header_dict()
header["nonce"] = local_nonce
block_hash = calculate_hash(header)
🤖 Prompt for AI Agents
In `@consensus/pow.py` around lines 52 - 54, The loop mutates block.nonce each
iteration but the docstring promises no mutation until success; instead of
assigning block.nonce = local_nonce, build a header dict for hashing without
changing the block: call block.to_header_dict() once (or each iteration) into a
local variable, set header['nonce'] = local_nonce, then pass that header to
calculate_hash(header); keep the block object untouched and only set block.nonce
when the mining is successful (or raise MiningExceededError without leaving a
stale nonce).


# Allow cancellation via progress callback (pass nonce explicitly)
if progress_callback:
should_continue = progress_callback(local_nonce, block_hash)
if should_continue is False:
if logger:
logger.info("Mining cancelled via progress_callback.")
raise MiningExceededError("Mining cancelled")

# Check difficulty target
if block_hash.startswith(target):
block.nonce = local_nonce
block.hash = block_hash
if logger:
logger.info("Success! Hash: %s", block_hash)
return block

# Increment nonce after attempt
local_nonce += 1
13 changes: 13 additions & 0 deletions core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .block import Block
from .chain import Blockchain
from .transaction import Transaction
from .state import State
from .contract import ContractMachine

__all__ = [
"Block",
"Blockchain",
"Transaction",
"State",
"ContractMachine",
]
Binary file added core/__pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file added core/__pycache__/block.cpython-311.pyc
Binary file not shown.
Binary file added core/__pycache__/chain.cpython-311.pyc
Binary file not shown.
Binary file added core/__pycache__/contract.cpython-311.pyc
Binary file not shown.
Binary file added core/__pycache__/state.cpython-311.pyc
Binary file not shown.
Binary file added core/__pycache__/transaction.cpython-311.pyc
Binary file not shown.
34 changes: 34 additions & 0 deletions core/block.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import time

class Block:
def __init__(self, index, previous_hash, transactions, timestamp=None, difficulty=None):
self.index = index
self.previous_hash = previous_hash
self.transactions = transactions
self.timestamp = time.time() if timestamp is None else timestamp
self.nonce = 0
self.hash = None
self.difficulty = difficulty

def to_dict(self):
"""Full block data for serialization/transport."""
return {
"index": self.index,
"previous_hash": self.previous_hash,
"transactions": [tx.to_dict() for tx in self.transactions],
"timestamp": self.timestamp,
"difficulty": self.difficulty,
"nonce": self.nonce,
"hash": self.hash
}

def to_header_dict(self):
"""Data used for mining (consensus)."""
return {
"index": self.index,
"previous_hash": self.previous_hash,
"transactions": [tx.to_dict() for tx in self.transactions],
"timestamp": self.timestamp,
"difficulty": self.difficulty,
"nonce": self.nonce
}
73 changes: 73 additions & 0 deletions core/chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from core.block import Block
from core.state import State
from consensus import calculate_hash
import logging

logger = logging.getLogger(__name__)


class Blockchain:
"""
Manages the blockchain, validates blocks, and commits state transitions.
"""

def __init__(self):
self.chain = []
self.state = State()
self._create_genesis_block()

def _create_genesis_block(self):
"""
Creates the genesis block with a fixed hash.
"""
genesis_block = Block(
index=0,
previous_hash="0",
transactions=[]
)
genesis_block.hash = "0" * 64
self.chain.append(genesis_block)

@property
def last_block(self):
"""
Returns the most recent block in the chain.
"""
return self.chain[-1]

def add_block(self, block):
"""
Validates and adds a block to the chain if all transactions succeed.
Uses a copied State to ensure atomic validation.
"""

# Check previous hash linkage
if block.previous_hash != self.last_block.hash:
logger.warning("Block %s rejected: Invalid previous hash %s != %s", block.index, block.previous_hash, self.last_block.hash)
return False

# Check index linkage
if block.index != self.last_block.index + 1:
logger.warning("Block %s rejected: Invalid index %s != %s", block.index, block.index, self.last_block.index + 1)
return False

# Verify block hash
if block.hash != calculate_hash(block.to_dict()):
logger.warning("Block %s rejected: Invalid hash %s", block.index, block.hash)
return False
Comment on lines +54 to +57
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the hash verification bug by checking to_dict and to_header_dict implementations

echo "=== to_dict in block.py ==="
rg -n 'def to_dict' -A 20 core/block.py

echo ""
echo "=== to_header_dict in block.py ==="
rg -n 'def to_header_dict' -A 20 core/block.py

echo ""
echo "=== mine_block hash computation ==="
rg -n 'def mine_block' -A 30 consensus/pow.py

echo ""
echo "=== add_block method in chain.py ==="
rg -n 'def add_block' -A 40 core/chain.py

Repository: StabilityNexus/MiniChain

Length of output: 3959


🏁 Script executed:

#!/bin/bash
# Get the full mine_block function to see hash computation
rg -n 'calculate_hash.*to_' consensus/pow.py

echo ""
echo "=== Full context around hash computation in mine_block ==="
rg -n 'block.hash' consensus/pow.py -B 3 -A 3

echo ""
echo "=== Check if difficulty is validated anywhere in add_block ==="
rg -n 'difficulty' core/chain.py

Repository: StabilityNexus/MiniChain

Length of output: 1149


Critical: Hash verification uses to_dict() instead of to_header_dict() — every block will be rejected.

Mining computes the block hash via calculate_hash(block.to_header_dict()) (consensus/pow.py, line 54), which excludes the hash field. However, validation here recomputes using calculate_hash(block.to_dict()) (line 55), which includes the hash field. Since to_dict() contains "hash": self.hash, the recomputed hash will never match the mined hash, causing add_block() to reject every valid mined block.

Additionally, the PoW difficulty target is not verified in add_block(). A block with a valid hash but insufficient difficulty (i.e., not matching "0" * difficulty) would incorrectly pass validation.

Fix: Use block.to_header_dict() for hash verification and add difficulty target validation:

Proposed fix
         # Verify block hash
-        if block.hash != calculate_hash(block.to_dict()):
+        computed_hash = calculate_hash(block.to_header_dict())
+        if block.hash != computed_hash:
             logger.warning("Block %s rejected: Invalid hash %s", block.index, block.hash)
             return False
+
+        # Verify PoW difficulty target
+        if block.difficulty and not block.hash.startswith("0" * block.difficulty):
+            logger.warning("Block %s rejected: Hash does not meet difficulty %s", block.index, block.difficulty)
+            return False
🤖 Prompt for AI Agents
In `@core/chain.py` around lines 54 - 57, The hash verification in add_block() is
wrong: replace the use of block.to_dict() with block.to_header_dict() when
calling calculate_hash so the recomputed hash excludes the block.hash field (use
calculate_hash(block.to_header_dict()) and compare to block.hash). Also add a
difficulty check in add_block(): verify the block.hash meets the PoW target
(e.g., startswith("0" * difficulty) or otherwise compare against the computed
target) using the current difficulty value before accepting the block; update
any related logging in add_block()/logger.warning to reflect whether the block
failed hash or difficulty verification.


# Validate transactions on a temporary state copy
temp_state = self.state.copy()

for tx in block.transactions:
result = temp_state.validate_and_apply(tx)

# Reject block if any transaction fails
if result is False or result is None:
logger.warning("Block %s rejected: Transaction failed validation", block.index)
return False

# All transactions valid → commit state and append block
self.state = temp_state
self.chain.append(block)
return True
Comment on lines +38 to +73
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Atomic validation is a good pattern, but the block's PoW hash is not verified.

add_block checks previous_hash linkage and validates transactions atomically — good. However, it does not verify that block.hash actually satisfies the difficulty target (i.e., the PoW). Any peer could submit a block with an arbitrary hash and it would be accepted as long as transactions are valid and linkage matches.

🛡️ Proposed fix — add PoW verification
 def add_block(self, block):
+    # Verify PoW
+    from consensus.pow import calculate_hash
+    computed_hash = calculate_hash(block.to_header_dict())
+    if computed_hash != block.hash:
+        return False
+    if block.difficulty and not block.hash.startswith("0" * block.difficulty):
+        return False
+
     # Check previous hash linkage
     if block.previous_hash != self.last_block.hash:
         return False
🤖 Prompt for AI Agents
In `@core/chain.py` around lines 34 - 57, The add_block method currently skips
proof-of-work verification; before validating transactions (i.e., before
creating temp_state or applying validate_and_apply), check that the block's PoW
is valid and meets the chain's difficulty target. Call the block hash
verification routine (e.g., a helper like Block.compute_hash() or an existing
validator such as self.hash_meets_difficulty(block.hash) /
self.get_difficulty()) and if it fails return False immediately. Insert this
check right after the previous_hash linkage check and before temp_state is
created so only blocks with a valid PoW proceed to transaction validation.

135 changes: 135 additions & 0 deletions core/contract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import logging
import multiprocessing
import ast
import sys

logger = logging.getLogger(__name__)

def _safe_exec_worker(code, globals_dict, context_dict, result_queue):
"""
Worker function to execute contract code in a separate process.
"""
try:
# Attempt to set resource limits (Unix only)
try:
import resource
# Limit CPU time (seconds) and memory (bytes) - example values
resource.setrlimit(resource.RLIMIT_CPU, (1, 1))
# resource.setrlimit(resource.RLIMIT_AS, (100 * 1024 * 1024, 100 * 1024 * 1024))
except ImportError as e:
logger.error(f"Resource limits not enforced: {e}")
raise RuntimeError(f"Resource limits not enforced: {e}")
Comment on lines +12 to +21
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Contract execution always fails on Windows due to resource import.

When import resource fails (Windows), a RuntimeError is raised, caught by the outer except on Line 24, and put into the queue as an error. This means all contract executions fail on Windows with "Resource limits not enforced." Since the README/PR doesn't restrict to Unix-only, this is a significant platform limitation.

🐛 Proposed fix — degrade gracefully on non-Unix
         try:
             import resource
             resource.setrlimit(resource.RLIMIT_CPU, (1, 1))
-        except ImportError as e:
-            raise RuntimeError(f"Resource limits not enforced: {e}")
+        except ImportError:
+            pass  # Resource limits not available on this platform (e.g., Windows)
🧰 Tools
🪛 Ruff (0.15.0)

[warning] 19-19: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


[warning] 19-19: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In `@core/contract.py` around lines 11 - 19, The current try block that imports
resource and calls resource.setrlimit (the "resource import" block that raises a
RuntimeError on ImportError) causes contract execution to fail on Windows;
change it to degrade gracefully by catching ImportError without raising
RuntimeError — log or queue a warning/message that resource limits are not
enforced and continue execution, leaving the resource.setrlimit call skipped;
ensure the outer exception handler (which currently captures exceptions from
this block) no longer receives a RuntimeError from missing resource so contracts
run on non-Unix systems.


exec(code, globals_dict, context_dict)
# Return the updated storage
result_queue.put({"status": "success", "storage": context_dict.get("storage")})
except Exception as e:
result_queue.put({"status": "error", "error": str(e)})

class ContractMachine:
"""
A minimal execution environment for Python-based smart contracts.
WARNING: Still not production-safe. For educational use only.
"""

def __init__(self, state):
self.state = state

def execute(self, contract_address, sender_address, payload, amount):
"""
Executes the contract code associated with the contract_address.
"""

account = self.state.get_account(contract_address)
if not account:
return False
Comment on lines +43 to +45
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

get_account always returns a truthy dict — this check is dead code.

State.get_account() lazily initializes and always returns a non-empty dict, so if not account is never True. The code falls through to if not code on Line 50, which is the actual guard. The side effect is that querying a non-existent contract address creates an empty account entry in state.

♻️ Proposed fix
         account = self.state.get_account(contract_address)
-        if not account:
-            return False
-
         code = account.get("code")
🤖 Prompt for AI Agents
In `@core/contract.py` around lines 41 - 43, The guard "if not account" is dead
because State.get_account() always returns a dict and causes a spurious empty
account to be created; update the contract existence check to avoid creating
accounts: call a non-mutating lookup (e.g., use
state.peek_account(contract_address) or state.get_account(contract_address,
create=False) if supported) to fetch the account without side effects, then
check the account's 'code' (the existing "if not code" check) and return False
when appropriate; remove the "if not account" branch and ensure
contract_address/account lookup uses the non-creating API so querying a missing
contract does not mutate state.


code = account.get("code")

# Defensive copy of storage to prevent direct mutation
storage = dict(account.get("storage", {}))
Comment on lines +49 to +50
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Shallow copy doesn't protect nested mutable values in storage.

dict(...) only copies the top-level keys. If a prior execution stored nested dicts/lists (e.g., storage['data'] = {'key': 'val'}), those inner objects are still shared references. A failing contract can mutate them in-place before raising, bypassing the rollback intent.

🐛 Proposed fix — use deep copy
+import copy
+
 class ContractMachine:
-        storage = dict(account.get("storage", {}))
+        storage = copy.deepcopy(account.get("storage", {}))
🤖 Prompt for AI Agents
In `@core/contract.py` around lines 18 - 19, The current defensive copy uses
storage = dict(account.get("storage", {})), which only performs a shallow copy
and leaves nested mutable objects shared; change this to a deep copy by
importing deepcopy (from copy import deepcopy or import copy) and replacing that
line with storage = deepcopy(account.get("storage", {})) so nested dicts/lists
are fully copied and cannot be mutated by failing contract executions; update
imports in core/contract.py accordingly.


if not code:
return False

# AST Validation to prevent introspection
if not self._validate_code_ast(code):
return False

# Restricted builtins (explicit allowlist)
safe_builtins = {
"True": True,
"False": False,
"None": None,
"range": range,
"len": len,
"min": min,
"max": max,
"abs": abs,
}
Comment on lines +60 to +69
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Safe builtins allowlist still permits certain bypasses.

True, False, None as string keys in the builtins dict are redundant — they're language keywords, not builtins lookups, in Python 3. More importantly, contracts can still access type, object, __class__ via literal introspection (e.g., ().__class__.__bases__[0].__subclasses__()) to escape. The warning on line 4 covers this, but consider also blocking attribute access on dunders if the project evolves beyond educational use.

🤖 Prompt for AI Agents
In `@core/contract.py` around lines 25 - 34, Remove the redundant string-key
entries "True", "False", and "None" from safe_builtins and tighten the sandbox
to prevent literal introspection escapes: ensure safe_builtins only exposes the
explicitly allowed callables (range, len, min, max, abs) and does not expose
type/object, and add an AST-level or bytecode-level check (e.g., a new
validate_no_dunder_access / AST NodeVisitor used before exec/eval) that rejects
any attribute access whose name starts with "__" (or any patterns like
__class__, __bases__, __subclasses__) to block constructs such as
().__class__.__bases__[0].__subclasses__(); keep the enforcement tied to the
code path that injects safe_builtins so callers of safe_builtins get the
stricter, minimal environment.


globals_for_exec = {
"__builtins__": safe_builtins
}

# Execution context (locals)
context = {
"storage": storage,
"msg": {
"sender": sender_address,
"value": amount,
"data": payload,
},
"print": print, # Explicitly allowed for debugging
}

try:
# Execute in a subprocess with timeout
queue = multiprocessing.Queue()
p = multiprocessing.Process(
target=_safe_exec_worker,
args=(code, globals_for_exec, context, queue)
)
p.start()
p.join(timeout=2) # 2 second timeout

if p.is_alive():
p.kill()
logger.error("Contract execution timed out")
return False

if queue.empty():
logger.error("Contract execution crashed without result")
return False

result = queue.get()
if result["status"] != "success":
logger.error(f"Contract Execution Failed: {result.get('error')}")
return False

# Commit updated storage only after successful execution
self.state.update_contract_storage(
contract_address,
result["storage"]
)

return True

except Exception as e:
logger.error("Contract Execution Failed", exc_info=True)
return False
Comment on lines +118 to +120
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Use logger.exception() and remove unused variable e.

Per Ruff F841 and G201, e is assigned but never used, and logger.error(..., exc_info=True) should be logger.exception(...).

♻️ Proposed fix
-        except Exception as e:
-            logger.error("Contract Execution Failed", exc_info=True)
+        except Exception:
+            logger.exception("Contract Execution Failed")
             return False
🧰 Tools
🪛 Ruff (0.15.0)

[error] 116-116: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)


[warning] 117-117: Logging .exception(...) should be used instead of .error(..., exc_info=True)

(G201)

🤖 Prompt for AI Agents
In `@core/contract.py` around lines 116 - 118, In the exception handler inside
core.contract.py (the block using logger.error with exc_info=True), remove the
unused exception variable `e` and replace the logger call with
logger.exception("Contract Execution Failed") so the stack trace is logged
correctly; i.e., change the except clause from "except Exception as e:" to
"except Exception:" and call logger.exception(...) instead of logger.error(...,
exc_info=True).


def _validate_code_ast(self, code):
"""Reject code that uses double underscores or introspection."""
try:
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.Attribute) and node.attr.startswith("__"):
logger.warning("Rejected contract code with double-underscore attribute access.")
return False
if isinstance(node, ast.Name) and node.id.startswith("__"):
logger.warning("Rejected contract code with double-underscore name.")
return False
return True
except SyntaxError:
return False
Loading