From 59e23df211d5ce20f9ee499c5f984283d8c271c0 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Tue, 3 Dec 2024 16:07:20 -0800 Subject: [PATCH 1/7] add imu structs --- mapillary_tools/imu.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 mapillary_tools/imu.py diff --git a/mapillary_tools/imu.py b/mapillary_tools/imu.py new file mode 100644 index 000000000..d49c4172b --- /dev/null +++ b/mapillary_tools/imu.py @@ -0,0 +1,25 @@ +import typing as T + + +# Gyroscope signal in radians/seconds around XYZ axes of the camera. Rotation is positive in the counterclockwise direction. +class GyroscopeData(T.NamedTuple): + time: float + x: float + y: float + z: float + + +# Accelerometer reading in meters/second^2 along XYZ axes of the camera. +class AccelerationData(T.NamedTuple): + time: float + x: float + y: float + z: float + + +# Ambient magnetic field. +class MagnetometerData(T.NamedTuple): + time: float + x: float + y: float + z: float From d05237f882cb3b37191db3334f58c03fd856e985 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Fri, 6 Dec 2024 13:09:41 -0800 Subject: [PATCH 2/7] add the extraction --- mapillary_tools/geotag/gpmf_parser.py | 118 +++++++++++++++++++++++++- tests/cli/gpmf_parser.py | 16 +++- 2 files changed, 129 insertions(+), 5 deletions(-) diff --git a/mapillary_tools/geotag/gpmf_parser.py b/mapillary_tools/geotag/gpmf_parser.py index 7feaf7134..acd4b2034 100644 --- a/mapillary_tools/geotag/gpmf_parser.py +++ b/mapillary_tools/geotag/gpmf_parser.py @@ -4,7 +4,7 @@ import construct as C -from .. import geo +from .. import geo, imu from ..mp4 import mp4_sample_parser as sample_parser """ @@ -298,6 +298,74 @@ def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> T.List[geo.PointWithF return sample_points +_Float3 = T.Tuple[float, float, float] + + +def _apply_orin(values: _Float3, orin: bytes) -> _Float3: + x, y, z = 0.0, 0.0, 0.0 + + for o, v in zip(orin, values): + axis = o.to_bytes() + if axis == b"X": + x = v + elif axis == b"Y": + y = v + elif axis == b"Z": + z = v + elif axis == b"x": + x = -v + elif axis == b"y": + y = -v + elif axis == b"z": + z = -v + + return (x, y, z) + + +def _extract_xyz_from_stream( + stream: T.Sequence[KLVDict], key: bytes +) -> T.Generator[_Float3, None, None]: + indexed: T.Dict[bytes, KLVDict] = {klv["key"]: klv for klv in stream} + + klv = indexed.get(key) + if klv is None: + return + + scal_klv = indexed.get(b"SCAL") + if scal_klv is None: + return + + try: + scal = scal_klv["data"][0][0] + except (TypeError, IndexError): + return + + if scal == 0: + return + + orin_klv = indexed.get(b"ORIN") + if orin_klv is None: + orin = b"ZXY" + else: + orin = orin_klv["data"][0] + + for values in klv["data"]: + x, y, z = _apply_orin(values, orin) + yield (x / scal, y / scal, z / scal) + + +def _find_first_xyz_stream(stream: T.Sequence[KLVDict], key: bytes): + sample_xyzs: T.List[T.Tuple[float, float, float]] = [] + + for klv in stream: + if klv["key"] == b"STRM": + sample_xyzs = list(_extract_xyz_from_stream(klv["data"], key)) + if sample_xyzs: + break + + return sample_xyzs + + def _extract_dvnm_from_samples( fp: T.BinaryIO, samples: T.Iterable[sample_parser.Sample] ) -> T.Dict[int, bytes]: @@ -326,6 +394,9 @@ def _extract_points_from_samples( ) -> T.List[geo.PointWithFix]: # To keep GPS points from different devices separated points_by_dvid: T.Dict[int, T.List[geo.PointWithFix]] = {} + accls_by_dvid: T.Dict[int, T.List[imu.AccelerationData]] = {} + gyros_by_dvid: T.Dict[int, T.List[imu.GyroscopeData]] = {} + magns_by_dvid: T.Dict[int, T.List[imu.MagnetometerData]] = {} for sample in samples: fp.seek(sample.raw_sample.offset, io.SEEK_SET) @@ -335,6 +406,8 @@ def _extract_points_from_samples( # iterate devices devices = (klv for klv in gpmf_sample_data if klv["key"] == b"DEVC") for device in devices: + device_id = _find_first_device_id(device["data"]) + sample_points = _find_first_gps_stream(device["data"]) if sample_points: # interpolate timestamps in between @@ -342,10 +415,51 @@ def _extract_points_from_samples( for idx, point in enumerate(sample_points): point.time = sample.exact_time + avg_timedelta * idx - device_id = _find_first_device_id(device["data"]) device_points = points_by_dvid.setdefault(device_id, []) device_points.extend(sample_points) + sample_xyzs = _find_first_xyz_stream(device["data"], b"ACCL") + if sample_xyzs: + # interpolate timestamps in between + avg_delta = sample.exact_timedelta / len(sample_xyzs) + accls_by_dvid.setdefault(device_id, []).extend( + imu.AccelerationData( + time=sample.exact_time + avg_delta * idx, + x=x, + y=y, + z=z, + ) + for idx, (x, y, z) in enumerate(sample_xyzs) + ) + + sample_xyzs = _find_first_xyz_stream(device["data"], b"GYRO") + if sample_xyzs: + # interpolate timestamps in between + avg_delta = sample.exact_timedelta / len(sample_xyzs) + gyros_by_dvid.setdefault(device_id, []).extend( + imu.GyroscopeData( + time=sample.exact_time + avg_delta * idx, + x=x, + y=y, + z=z, + ) + for idx, (x, y, z) in enumerate(sample_xyzs) + ) + + sample_xyzs = _find_first_xyz_stream(device["data"], b"MAGN") + if sample_xyzs: + # interpolate timestamps in between + avg_delta = sample.exact_timedelta / len(sample_xyzs) + magns_by_dvid.setdefault(device_id, []).extend( + imu.MagnetometerData( + time=sample.exact_time + avg_delta * idx, + x=x, + y=y, + z=z, + ) + for idx, (x, y, z) in enumerate(sample_xyzs) + ) + values = list(points_by_dvid.values()) return values[0] if values else [] diff --git a/tests/cli/gpmf_parser.py b/tests/cli/gpmf_parser.py index 0ad1ce390..2a69175ce 100644 --- a/tests/cli/gpmf_parser.py +++ b/tests/cli/gpmf_parser.py @@ -1,5 +1,6 @@ import argparse import datetime +import io import json import pathlib import typing as T @@ -10,6 +11,7 @@ import mapillary_tools.geotag.gpmf_parser as gpmf_parser import mapillary_tools.geotag.gps_filter as gps_filter +from mapillary_tools.mp4 import mp4_sample_parser import mapillary_tools.utils as utils @@ -113,13 +115,21 @@ def main(): parsed_args = _parse_args() features = [] - samples = [] + parsed_samples = [] gpx = gpxpy.gpx.GPX() def _process(path: pathlib.Path): if parsed_args.dump: with path.open("rb") as fp: - samples.extend(gpmf_parser.iterate_gpmd_sample_data(fp)) + parser = mp4_sample_parser.MovieBoxParser.parse_stream(fp) + for t in parser.extract_tracks(): + for sample in t.extract_samples(): + if gpmf_parser._is_gpmd_description(sample.description): + fp.seek(sample.raw_sample.offset, io.SEEK_SET) + data = fp.read(sample.raw_sample.size) + parsed_samples.append( + T.cast(T.Dict, gpmf_parser.GPMFSampleData.parse(data)) + ) elif parsed_args.geojson: features.extend(_convert_geojson(path)) else: @@ -129,7 +139,7 @@ def _process(path: pathlib.Path): _process(path) if parsed_args.dump: - for sample in samples: + for sample in parsed_samples: print(sample) else: if features: From e48b7e2b8bda9de7a31a1243a81bd98a67492e1c Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Sun, 8 Dec 2024 20:21:58 -0800 Subject: [PATCH 3/7] use orin/orio/mtrx to calibrate IMUs --- mapillary_tools/geotag/gpmf_parser.py | 218 +++++++++++++++++--------- tests/cli/gpmf_parser.py | 22 +-- 2 files changed, 158 insertions(+), 82 deletions(-) diff --git a/mapillary_tools/geotag/gpmf_parser.py b/mapillary_tools/geotag/gpmf_parser.py index acd4b2034..b5f8ca29e 100644 --- a/mapillary_tools/geotag/gpmf_parser.py +++ b/mapillary_tools/geotag/gpmf_parser.py @@ -1,3 +1,4 @@ +import dataclasses import io import pathlib import typing as T @@ -5,7 +6,7 @@ import construct as C from .. import geo, imu -from ..mp4 import mp4_sample_parser as sample_parser +from ..mp4.mp4_sample_parser import TrackBoxParser, MovieBoxParser, Sample """ Parsing GPS from GPMF data format stored in GoPros. See the GPMF spec: https://github.com/gopro/gpmf-parser @@ -125,6 +126,14 @@ class KLVDict(T.TypedDict): GPMFSampleData = C.GreedyRange(KLV) +@dataclasses.dataclass +class TelemetryData: + gps: T.List[geo.PointWithFix] + accl: T.List[imu.AccelerationData] + gyro: T.List[imu.GyroscopeData] + magn: T.List[imu.MagnetometerData] + + # A GPS5 stream example: # key = b'STRM' type = b'\x00' structure_size = 1 repeat = 400 # data = ListContainer: @@ -298,33 +307,77 @@ def _find_first_gps_stream(stream: T.Sequence[KLVDict]) -> T.List[geo.PointWithF return sample_points -_Float3 = T.Tuple[float, float, float] +# a sensor matrix with only [1,0,0, 0,-1,0, 0,0,1], is just a form of non-calibrated sensor orientation +def _is_matrix_calibration(matrix: T.Sequence[float]) -> bool: + for v in matrix: + if v not in [0, -1, 1]: + return True + return False + + +def _build_matrix( + orin: bytes | T.Sequence[int], orio: bytes | T.Sequence[int] +) -> T.Sequence[float]: + matrix = [] + + # list(b'aA') == [97, 65] + lower_a, upper_A = 97, 65 + + for out_char in orin: + for in_char in orio: + if in_char == out_char: + matrix.append(1.0) + elif (in_char - lower_a) == (out_char - upper_A): + matrix.append(-1.0) + elif (in_char - upper_A) == (out_char - lower_a): + matrix.append(-1.0) + else: + matrix.append(0.0) + + return matrix + +def _apply_matrix( + matrix: T.Sequence[float], values: T.Sequence[float] +) -> T.Generator[float, None, None]: + size = len(values) + assert ( + len(matrix) == size * size + ), f"expecting a square matrix of size {size} x {size} but got {len(matrix)}" -def _apply_orin(values: _Float3, orin: bytes) -> _Float3: - x, y, z = 0.0, 0.0, 0.0 + for y in range(size): + row_start = y * size + yield sum(matrix[row_start + x] * values[x] for x in range(size)) - for o, v in zip(orin, values): - axis = o.to_bytes() - if axis == b"X": - x = v - elif axis == b"Y": - y = v - elif axis == b"Z": - z = v - elif axis == b"x": - x = -v - elif axis == b"y": - y = -v - elif axis == b"z": - z = -v - return (x, y, z) +def _flatten(nested): + if isinstance(nested, T.Sequence): + for sublist in nested: + yield from _flatten(sublist) + else: + yield nested + + +def _get_matrix(klv: dict[bytes, KLVDict]) -> T.Sequence[float] | None: + mtrx = klv.get(b"MTRX") + if mtrx is not None: + matrix = tuple(_flatten(mtrx["data"])) + if _is_matrix_calibration(matrix): + return matrix + + orin = klv.get(b"ORIN") + orio = klv.get(b"ORIO") + if orin is not None and orio is not None: + mtrx = _build_matrix(b"".join(orin["data"]), b"".join(orio["data"])) + return mtrx -def _extract_xyz_from_stream( + return None + + +def _scale_and_calibrate( stream: T.Sequence[KLVDict], key: bytes -) -> T.Generator[_Float3, None, None]: +) -> T.Generator[T.Tuple[float, ...], None, None]: indexed: T.Dict[bytes, KLVDict] = {klv["key"]: klv for klv in stream} klv = indexed.get(key) @@ -343,31 +396,29 @@ def _extract_xyz_from_stream( if scal == 0: return - orin_klv = indexed.get(b"ORIN") - if orin_klv is None: - orin = b"ZXY" - else: - orin = orin_klv["data"][0] + matrix = _get_matrix(indexed) for values in klv["data"]: - x, y, z = _apply_orin(values, orin) - yield (x / scal, y / scal, z / scal) + if matrix is None: + yield tuple(v / scal for v in values) + else: + yield tuple(v / scal for v in _apply_matrix(matrix, values)) -def _find_first_xyz_stream(stream: T.Sequence[KLVDict], key: bytes): - sample_xyzs: T.List[T.Tuple[float, float, float]] = [] +def _find_first_telemetry_stream(stream: T.Sequence[KLVDict], key: bytes): + values: T.List[T.Sequence[float]] = [] for klv in stream: if klv["key"] == b"STRM": - sample_xyzs = list(_extract_xyz_from_stream(klv["data"], key)) - if sample_xyzs: + values = list(_scale_and_calibrate(klv["data"], key)) + if values: break - return sample_xyzs + return values def _extract_dvnm_from_samples( - fp: T.BinaryIO, samples: T.Iterable[sample_parser.Sample] + fp: T.BinaryIO, samples: T.Iterable[Sample] ) -> T.Dict[int, bytes]: dvnm_by_dvid: T.Dict[int, bytes] = {} @@ -390,8 +441,8 @@ def _extract_dvnm_from_samples( def _extract_points_from_samples( - fp: T.BinaryIO, samples: T.Iterable[sample_parser.Sample] -) -> T.List[geo.PointWithFix]: + fp: T.BinaryIO, samples: T.Iterable[Sample] +) -> TelemetryData: # To keep GPS points from different devices separated points_by_dvid: T.Dict[int, T.List[geo.PointWithFix]] = {} accls_by_dvid: T.Dict[int, T.List[imu.AccelerationData]] = {} @@ -418,10 +469,10 @@ def _extract_points_from_samples( device_points = points_by_dvid.setdefault(device_id, []) device_points.extend(sample_points) - sample_xyzs = _find_first_xyz_stream(device["data"], b"ACCL") - if sample_xyzs: + sample_accls = _find_first_telemetry_stream(device["data"], b"ACCL") + if sample_accls: # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_xyzs) + avg_delta = sample.exact_timedelta / len(sample_accls) accls_by_dvid.setdefault(device_id, []).extend( imu.AccelerationData( time=sample.exact_time + avg_delta * idx, @@ -429,13 +480,13 @@ def _extract_points_from_samples( y=y, z=z, ) - for idx, (x, y, z) in enumerate(sample_xyzs) + for idx, (z, x, y, *_) in enumerate(sample_accls) ) - sample_xyzs = _find_first_xyz_stream(device["data"], b"GYRO") - if sample_xyzs: + sample_gyros = _find_first_telemetry_stream(device["data"], b"GYRO") + if sample_gyros: # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_xyzs) + avg_delta = sample.exact_timedelta / len(sample_gyros) gyros_by_dvid.setdefault(device_id, []).extend( imu.GyroscopeData( time=sample.exact_time + avg_delta * idx, @@ -443,13 +494,13 @@ def _extract_points_from_samples( y=y, z=z, ) - for idx, (x, y, z) in enumerate(sample_xyzs) + for idx, (z, x, y, *_) in enumerate(sample_gyros) ) - sample_xyzs = _find_first_xyz_stream(device["data"], b"MAGN") - if sample_xyzs: + sample_magns = _find_first_telemetry_stream(device["data"], b"MAGN") + if sample_magns: # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_xyzs) + avg_delta = sample.exact_timedelta / len(sample_magns) magns_by_dvid.setdefault(device_id, []).extend( imu.MagnetometerData( time=sample.exact_time + avg_delta * idx, @@ -457,50 +508,73 @@ def _extract_points_from_samples( y=y, z=z, ) - for idx, (x, y, z) in enumerate(sample_xyzs) + for idx, (z, x, y, *_) in enumerate(sample_magns) ) - values = list(points_by_dvid.values()) - return values[0] if values else [] + return TelemetryData( + gps=list(points_by_dvid.values())[0] if points_by_dvid else [], + accl=list(accls_by_dvid.values())[0] if accls_by_dvid else [], + gyro=list(gyros_by_dvid.values())[0] if gyros_by_dvid else [], + magn=list(magns_by_dvid.values())[0] if magns_by_dvid else [], + ) def _is_gpmd_description(description: T.Dict) -> bool: return description["format"] == b"gpmd" -def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.PointWithFix]]: +def _contains_gpmd_description(track: TrackBoxParser) -> bool: + descriptions = track.extract_sample_descriptions() + return any(_is_gpmd_description(d) for d in descriptions) + + +def _extract_gpmd_samples(track: TrackBoxParser) -> T.Generator[Sample, None, None]: + for sample in track.extract_samples(): + if _is_gpmd_description(sample.description): + yield sample + + +def extract_points(fp: T.BinaryIO) -> T.List[geo.PointWithFix]: """ Return a list of points (could be empty) if it is a valid GoPro video, otherwise None """ - points = None - moov = sample_parser.MovieBoxParser.parse_stream(fp) + moov = MovieBoxParser.parse_stream(fp) for track in moov.extract_tracks(): - descriptions = track.extract_sample_descriptions() - if any(_is_gpmd_description(d) for d in descriptions): - gpmd_samples = ( - sample - for sample in track.extract_samples() - if _is_gpmd_description(sample.description) - ) - points = list(_extract_points_from_samples(fp, gpmd_samples)) + if _contains_gpmd_description(track): + gpmd_samples = _extract_gpmd_samples(track) + telemetry = _extract_points_from_samples(fp, gpmd_samples) # return the firstly found non-empty points - if points: - return points + if telemetry.gps: + return telemetry.gps + # points could be empty list or None here - return points + return [] + + +def extract_telemetry_data(fp: T.BinaryIO) -> T.Optional[TelemetryData]: + """ + Return the telemetry data from the first found GoPro GPMF track + """ + moov = MovieBoxParser.parse_stream(fp) + + for track in moov.extract_tracks(): + if _contains_gpmd_description(track): + gpmd_samples = _extract_gpmd_samples(track) + telemetry = _extract_points_from_samples(fp, gpmd_samples) + # return the firstly found non-empty points + if telemetry.gps: + return telemetry + + # points could be empty list or None here + return None def extract_all_device_names(fp: T.BinaryIO) -> T.Dict[int, bytes]: - moov = sample_parser.MovieBoxParser.parse_stream(fp) + moov = MovieBoxParser.parse_stream(fp) for track in moov.extract_tracks(): - descriptions = track.extract_sample_descriptions() - if any(_is_gpmd_description(d) for d in descriptions): - gpmd_samples = ( - sample - for sample in track.extract_samples() - if _is_gpmd_description(sample.description) - ) + if _contains_gpmd_description(track): + gpmd_samples = _extract_gpmd_samples(track) device_names = _extract_dvnm_from_samples(fp, gpmd_samples) if device_names: return device_names diff --git a/tests/cli/gpmf_parser.py b/tests/cli/gpmf_parser.py index 2a69175ce..713388fb5 100644 --- a/tests/cli/gpmf_parser.py +++ b/tests/cli/gpmf_parser.py @@ -101,6 +101,17 @@ def _convert_geojson(path: pathlib.Path): return features +def _parse_samples(path: pathlib.Path) -> T.Generator[T.Dict, None, None]: + with path.open("rb") as fp: + parser = mp4_sample_parser.MovieBoxParser.parse_stream(fp) + for t in parser.extract_tracks(): + for sample in t.extract_samples(): + if gpmf_parser._is_gpmd_description(sample.description): + fp.seek(sample.raw_sample.offset, io.SEEK_SET) + data = fp.read(sample.raw_sample.size) + yield T.cast(T.Dict, gpmf_parser.GPMFSampleData.parse(data)) + + def _parse_args(): parser = argparse.ArgumentParser() parser.add_argument("path", nargs="+", help="Path to video file or directory") @@ -120,16 +131,7 @@ def main(): def _process(path: pathlib.Path): if parsed_args.dump: - with path.open("rb") as fp: - parser = mp4_sample_parser.MovieBoxParser.parse_stream(fp) - for t in parser.extract_tracks(): - for sample in t.extract_samples(): - if gpmf_parser._is_gpmd_description(sample.description): - fp.seek(sample.raw_sample.offset, io.SEEK_SET) - data = fp.read(sample.raw_sample.size) - parsed_samples.append( - T.cast(T.Dict, gpmf_parser.GPMFSampleData.parse(data)) - ) + parsed_samples.extend(_parse_samples(path)) elif parsed_args.geojson: features.extend(_convert_geojson(path)) else: From 62ada4345da15bcf40a916657f729ab3b9a90ae6 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 9 Dec 2024 14:54:55 -0800 Subject: [PATCH 4/7] refactor --- mapillary_tools/geotag/gpmf_parser.py | 37 ++++++++++++++++----------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/mapillary_tools/geotag/gpmf_parser.py b/mapillary_tools/geotag/gpmf_parser.py index b5f8ca29e..b8c2a61d0 100644 --- a/mapillary_tools/geotag/gpmf_parser.py +++ b/mapillary_tools/geotag/gpmf_parser.py @@ -1,5 +1,6 @@ import dataclasses import io +import itertools import pathlib import typing as T @@ -377,7 +378,7 @@ def _get_matrix(klv: dict[bytes, KLVDict]) -> T.Sequence[float] | None: def _scale_and_calibrate( stream: T.Sequence[KLVDict], key: bytes -) -> T.Generator[T.Tuple[float, ...], None, None]: +) -> T.Generator[T.Sequence[float], None, None]: indexed: T.Dict[bytes, KLVDict] = {klv["key"]: klv for klv in stream} klv = indexed.get(key) @@ -385,24 +386,30 @@ def _scale_and_calibrate( return scal_klv = indexed.get(b"SCAL") - if scal_klv is None: - return - try: - scal = scal_klv["data"][0][0] - except (TypeError, IndexError): - return + if scal_klv is not None: + # replace 0s with 1s to avoid division by zero + scals = [s or 1 for s in _flatten(scal_klv["data"])] - if scal == 0: - return + if not scals: + scals = [1] + + if len(scals) == 1: + # infinite repeat + scales = itertools.repeat(scals[0]) + else: + scales = scals + + if klv["type"] == b"?": + complex_parser = C.Sequence(*[_type_mapping[t.to_bytes()][0] for t in gps_value_types]) matrix = _get_matrix(indexed) for values in klv["data"]: if matrix is None: - yield tuple(v / scal for v in values) + yield tuple(v / s for v, s in zip(values, scales)) else: - yield tuple(v / scal for v in _apply_matrix(matrix, values)) + yield tuple(v / s for v, s in zip(_apply_matrix(matrix, values), scales)) def _find_first_telemetry_stream(stream: T.Sequence[KLVDict], key: bytes): @@ -528,7 +535,7 @@ def _contains_gpmd_description(track: TrackBoxParser) -> bool: return any(_is_gpmd_description(d) for d in descriptions) -def _extract_gpmd_samples(track: TrackBoxParser) -> T.Generator[Sample, None, None]: +def _filter_gpmd_samples(track: TrackBoxParser) -> T.Generator[Sample, None, None]: for sample in track.extract_samples(): if _is_gpmd_description(sample.description): yield sample @@ -542,7 +549,7 @@ def extract_points(fp: T.BinaryIO) -> T.List[geo.PointWithFix]: moov = MovieBoxParser.parse_stream(fp) for track in moov.extract_tracks(): if _contains_gpmd_description(track): - gpmd_samples = _extract_gpmd_samples(track) + gpmd_samples = _filter_gpmd_samples(track) telemetry = _extract_points_from_samples(fp, gpmd_samples) # return the firstly found non-empty points if telemetry.gps: @@ -560,7 +567,7 @@ def extract_telemetry_data(fp: T.BinaryIO) -> T.Optional[TelemetryData]: for track in moov.extract_tracks(): if _contains_gpmd_description(track): - gpmd_samples = _extract_gpmd_samples(track) + gpmd_samples = _filter_gpmd_samples(track) telemetry = _extract_points_from_samples(fp, gpmd_samples) # return the firstly found non-empty points if telemetry.gps: @@ -574,7 +581,7 @@ def extract_all_device_names(fp: T.BinaryIO) -> T.Dict[int, bytes]: moov = MovieBoxParser.parse_stream(fp) for track in moov.extract_tracks(): if _contains_gpmd_description(track): - gpmd_samples = _extract_gpmd_samples(track) + gpmd_samples = _filter_gpmd_samples(track) device_names = _extract_dvnm_from_samples(fp, gpmd_samples) if device_names: return device_names From e96e6eb4b7a3804e9004f5d69428c179d8149420 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 9 Dec 2024 15:01:18 -0800 Subject: [PATCH 5/7] refactor --- mapillary_tools/geotag/gpmf_parser.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/mapillary_tools/geotag/gpmf_parser.py b/mapillary_tools/geotag/gpmf_parser.py index b8c2a61d0..24f209ca1 100644 --- a/mapillary_tools/geotag/gpmf_parser.py +++ b/mapillary_tools/geotag/gpmf_parser.py @@ -351,18 +351,17 @@ def _apply_matrix( yield sum(matrix[row_start + x] * values[x] for x in range(size)) -def _flatten(nested): - if isinstance(nested, T.Sequence): - for sublist in nested: - yield from _flatten(sublist) - else: - yield nested +def _flatten(nested: T.Sequence[T.Sequence[float]]) -> list[float]: + output = [] + for row in nested: + output.extend(row) + return output def _get_matrix(klv: dict[bytes, KLVDict]) -> T.Sequence[float] | None: mtrx = klv.get(b"MTRX") if mtrx is not None: - matrix = tuple(_flatten(mtrx["data"])) + matrix = _flatten(mtrx["data"]) if _is_matrix_calibration(matrix): return matrix @@ -400,9 +399,6 @@ def _scale_and_calibrate( else: scales = scals - if klv["type"] == b"?": - complex_parser = C.Sequence(*[_type_mapping[t.to_bytes()][0] for t in gps_value_types]) - matrix = _get_matrix(indexed) for values in klv["data"]: From c8c8ed34fb182894ebad0beb1eca9ac7715bede6 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 9 Dec 2024 15:51:03 -0800 Subject: [PATCH 6/7] fix types --- mapillary_tools/geotag/gpmf_parser.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mapillary_tools/geotag/gpmf_parser.py b/mapillary_tools/geotag/gpmf_parser.py index 24f209ca1..9650e17a8 100644 --- a/mapillary_tools/geotag/gpmf_parser.py +++ b/mapillary_tools/geotag/gpmf_parser.py @@ -352,7 +352,7 @@ def _apply_matrix( def _flatten(nested: T.Sequence[T.Sequence[float]]) -> list[float]: - output = [] + output: list[float] = [] for row in nested: output.extend(row) return output @@ -361,7 +361,7 @@ def _flatten(nested: T.Sequence[T.Sequence[float]]) -> list[float]: def _get_matrix(klv: dict[bytes, KLVDict]) -> T.Sequence[float] | None: mtrx = klv.get(b"MTRX") if mtrx is not None: - matrix = _flatten(mtrx["data"]) + matrix: T.Sequence[float] = _flatten(mtrx["data"]) if _is_matrix_calibration(matrix): return matrix @@ -369,8 +369,8 @@ def _get_matrix(klv: dict[bytes, KLVDict]) -> T.Sequence[float] | None: orio = klv.get(b"ORIO") if orin is not None and orio is not None: - mtrx = _build_matrix(b"".join(orin["data"]), b"".join(orio["data"])) - return mtrx + matrix = _build_matrix(b"".join(orin["data"]), b"".join(orio["data"])) + return matrix return None @@ -395,7 +395,7 @@ def _scale_and_calibrate( if len(scals) == 1: # infinite repeat - scales = itertools.repeat(scals[0]) + scales: T.Iterable[float] = itertools.repeat(scals[0]) else: scales = scals From 1919d084ea5d9442e4a8cef657062b872fa1f8e2 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 9 Dec 2024 16:09:19 -0800 Subject: [PATCH 7/7] fix types --- mapillary_tools/geotag/gpmf_parser.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mapillary_tools/geotag/gpmf_parser.py b/mapillary_tools/geotag/gpmf_parser.py index 9650e17a8..d86deed72 100644 --- a/mapillary_tools/geotag/gpmf_parser.py +++ b/mapillary_tools/geotag/gpmf_parser.py @@ -317,7 +317,7 @@ def _is_matrix_calibration(matrix: T.Sequence[float]) -> bool: def _build_matrix( - orin: bytes | T.Sequence[int], orio: bytes | T.Sequence[int] + orin: T.Union[bytes, T.Sequence[int]], orio: T.Union[bytes, T.Sequence[int]] ) -> T.Sequence[float]: matrix = [] @@ -351,14 +351,14 @@ def _apply_matrix( yield sum(matrix[row_start + x] * values[x] for x in range(size)) -def _flatten(nested: T.Sequence[T.Sequence[float]]) -> list[float]: - output: list[float] = [] +def _flatten(nested: T.Sequence[T.Sequence[float]]) -> T.List[float]: + output: T.List[float] = [] for row in nested: output.extend(row) return output -def _get_matrix(klv: dict[bytes, KLVDict]) -> T.Sequence[float] | None: +def _get_matrix(klv: T.Dict[bytes, KLVDict]) -> T.Optional[T.Sequence[float]]: mtrx = klv.get(b"MTRX") if mtrx is not None: matrix: T.Sequence[float] = _flatten(mtrx["data"])