Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 130 additions & 94 deletions tools/pynuttx/nxgdb/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import gdb

from . import utils
from .protocols.fs import File, Inode
from .protocols import fs as p
from .protocols.thread import Tcb

FSNODEFLAG_TYPE_MASK = utils.get_symbol_value("FSNODEFLAG_TYPE_MASK")
Expand Down Expand Up @@ -72,55 +72,91 @@ class InodeType(enum.Enum):
UNKNOWN = 12


def get_inode_name(inode: Inode):
if not inode:
return ""

def special_inode_name(inode):
global g_special_inodes
if not g_special_inodes:
g_special_inodes = {}
for special in (
"perf",
"timerfd",
"signalfd",
"dir",
"inotify",
"epoll",
"eventfd",
"sock",
):
value = utils.gdb_eval_or_none(f"g_{special}_inode")
if value:
g_special_inodes[special] = value.address
class Inode(utils.Value, p.Inode):
def __init__(self, obj: gdb.Value | utils.Value):
super().__init__(obj)

for name, value in g_special_inodes.items():
if value == inode:
return name
def __str__(self) -> str:
name = get_inode_name(self)
type = inode_gettype(self)
return f"{name}{'/' if type == InodeType.PSEUDODIR else ''}{int(self): #x} {type.name}"

return None
def __repr__(self) -> str:
return self.__str__()

if name := special_inode_name(inode):
return name
def details(self) -> str:
details = (
f"ino: {self.i_ino}, crefs: {self.i_crefs}"
f", flags: {self.i_flags}, private: {self.i_private}"
)

ptr = inode.i_name.cast(gdb.lookup_type("char").pointer())
return ptr.string()
if CONFIG_PSEUDOFS_FILE:
details += f", i_size: {self.i_size}"
if CONFIG_PSEUDOFS_ATTRIBUTES:
details += f", i_mode: {self.i_mode}, i_owner: {self.i_owner}, i_group: {self.i_group}"
details += f", i_atime: {self.i_atime}, i_mtime: {self.i_mtime}, i_ctime: {self.i_ctime}"
return details

@property
def nodetype(self) -> InodeType:
return inode_gettype(self)


def get_inode_name(inode: p.Inode):
try:
if not inode:
return ""

def special_inode_name(inode):
global g_special_inodes
if not g_special_inodes:
g_special_inodes = {}
for special in (
"perf",
"timerfd",
"signalfd",
"dir",
"inotify",
"epoll",
"eventfd",
"sock",
):
value = utils.gdb_eval_or_none(f"g_{special}_inode")
if value:
g_special_inodes[special] = value.address

for name, value in g_special_inodes.items():
if value == inode:
return name

return None

if name := special_inode_name(inode):
return name

ptr = inode.i_name.cast(gdb.lookup_type("char").pointer())
return ptr.string()
except gdb.MemoryError:
return "<MemoryError>"

def inode_getpath(inode: Inode):

def inode_getpath(inode: p.Inode):
"""get path fron inode"""
if not inode:
return ""
try:
if not inode:
return ""

name = get_inode_name(inode)
name = get_inode_name(inode)

if inode.i_parent:
return inode_getpath(inode.i_parent) + "/" + name
if inode.i_parent:
return inode_getpath(inode.i_parent) + "/" + name

return name
return name
except gdb.MemoryError:
return "<MemoryError>"


def inode_gettype(inode: Inode) -> InodeType:
def inode_gettype(inode: p.Inode) -> InodeType:
if not inode:
return InodeType.UNKNOWN

Expand Down Expand Up @@ -148,8 +184,8 @@ def get_file(tcb: Tcb, fd):
return fl_files[row][col]


def foreach_inode(root=None, path="") -> Generator[Tuple[Inode, str], None, None]:
node: Inode = root or utils.parse_and_eval("g_root_inode").i_child
def foreach_inode(root=None, path="") -> Generator[Tuple[p.Inode, str], None, None]:
node: p.Inode = root or utils.parse_and_eval("g_root_inode").i_child
while node:
newpath = path + "/" + get_inode_name(node)
yield node, newpath
Expand Down Expand Up @@ -184,7 +220,7 @@ def __init__(self):
super().__init__("fdinfo", gdb.COMMAND_DATA, gdb.COMPLETE_EXPRESSION)
self.total_fd_count = 0

def print_file_info(self, fd, file: File, formatter: str):
def print_file_info(self, fd, file: p.File, formatter: str):
backtrace_formatter = "{0:<5} {1:<36} {2}"

oflags = int(file.f_oflags)
Expand Down Expand Up @@ -305,7 +341,6 @@ class ForeachInode(gdb.Command):

def __init__(self):
super().__init__("foreach inode", gdb.COMMAND_USER)
self.level = 4096

def get_root_inode(self, addr_or_expr):
try:
Expand All @@ -321,78 +356,65 @@ def parse_arguments(self, argv):
"-L",
"--level",
type=int,
default=None,
default=4096,
help="Only render the tree to a specific depth",
)
parser.add_argument(
"--nodetype",
type=str,
choices=[e.name.lower() for e in InodeType],
default=None,
help="Only show the specific type of inode",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Show more information",
)
parser.add_argument(
"addr_or_expr",
type=str,
nargs="?",
default=None,
help="set the start inode to be tranversed",
)

try:
args = parser.parse_args(argv)
except SystemExit:
return None
return {
"level": args.level if args.level else 4096,
"root_inode": (
self.get_root_inode(args.addr_or_expr)
if args.addr_or_expr
else utils.gdb_eval_or_none("g_root_inode")
),
}

def print_inode_info(self, node: Inode, level, prefix):
if level > self.level:
return args

def print_inode_info(
self, node: Inode, level=1, prefix="", maxlevel=4096, type=None, verbose=False
):
if level > maxlevel:
return

while node:
if node.i_peer:
initial_indent = prefix + "├── "
subsequent_indent = prefix + "│ "
newprefix = prefix + "│ "
else:
initial_indent = prefix + "└── "
subsequent_indent = prefix + " "
newprefix = prefix + " "
gdb.write(
"%s [%s], %s, %s\n"
% (initial_indent, get_inode_name(node), node.i_ino, node)
)
gdb.write(
"%s i_crefs: %s, i_flags: %s, i_private: %s\n"
% (
subsequent_indent,
node.i_crefs,
node.i_flags,
node.i_private,
)
)
if CONFIG_PSEUDOFS_FILE:
gdb.write("%s i_size: %s\n" % (subsequent_indent, node.i_size))
if CONFIG_PSEUDOFS_ATTRIBUTES:
gdb.write(
"%s i_mode: %s, i_owner: %s, i_group: %s\n"
% (
subsequent_indent,
node.i_mode,
node.i_owner,
node.i_group,
)
)

if (
not type
or node.nodetype == type
or any(n.i_parent == node for n, _ in self.nodes)
):
gdb.write(
"%s i_atime: %s, i_mtime: %s, i_ctime: %s\n"
% (
subsequent_indent,
node.i_atime,
node.i_mtime,
node.i_ctime,
)
f"{initial_indent}{node} {node.details() if verbose else ''}\n"
)

if node.i_child:
self.print_inode_info(node.i_child, level + 1, newprefix)
node = node.i_peer
self.print_inode_info(
Inode(node.i_child), level + 1, newprefix, maxlevel, type, verbose
)
node = Inode(node.i_peer)

def diagnose(self, *args, **kwargs):
output = gdb.execute("foreach inode", to_string=True)
Expand All @@ -406,11 +428,25 @@ def diagnose(self, *args, **kwargs):
}

def invoke(self, args, from_tty):
arg = self.parse_arguments(args.split(" "))
if not arg:
args = self.parse_arguments(args.split(" "))
if not args:
return
self.level = arg["level"]
self.print_inode_info(arg["root_inode"], 1, "")

root = (
self.get_root_inode(args.addr_or_expr)
if args.addr_or_expr
else utils.gdb_eval_or_none("g_root_inode")
)

nodetype = InodeType[args.nodetype.upper()] if args.nodetype else None
if nodetype:
self.nodes = list(
filter(lambda x: inode_gettype(x[0]) == nodetype, foreach_inode(root))
)

self.print_inode_info(
Inode(root), maxlevel=args.level, type=nodetype, verbose=args.verbose
)


class InfoShmfs(gdb.Command):
Expand All @@ -422,7 +458,7 @@ def __init__(self):
self.total_size = 0
self.block_count = 0

def shm_filter(self, node: Inode, path):
def shm_filter(self, node: p.Inode, path):
if inode_gettype(node) != InodeType.SHM:
return

Expand Down
Loading