diff --git a/CHANGELOG.md b/CHANGELOG.md index 316e10f46..d5f0d2e80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,13 +33,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - ENH: Exponential backoff decorator (fix #449) [#588](https://github.com/RocketPy-Team/RocketPy/pull/588) +- ENH: Function Validation Rework & Swap `np.searchsorted` to `bisect_left` [#582](https://github.com/RocketPy-Team/RocketPy/pull/582) - ENH: Add new stability margin properties to Flight class [#572](https://github.com/RocketPy-Team/RocketPy/pull/572) - ENH: adds `Function.remove_outliers` method [#554](https://github.com/RocketPy-Team/RocketPy/pull/554) ### Changed - BLD: Change setup.py to pyproject.toml [#589](https://github.com/RocketPy-Team/RocketPy/pull/589) -- DEP: delete deprecated rocketpy.tools.cached_property [#587](https://github.com/RocketPy-Team/RocketPy/pull/587) +- DEP: delete deprecated rocketpy.tools.cached_property [#587](https://github.com/RocketPy-Team/RocketPy/pull/587) +- ENH: Flight simulation speed up [#581] (https://github.com/RocketPy-Team/RocketPy/pull/581) - MNT: Modularize Rocket Draw [#580](https://github.com/RocketPy-Team/RocketPy/pull/580) - DOC: Improvements of Environment docstring phrasing [#565](https://github.com/RocketPy-Team/RocketPy/pull/565) - MNT: Refactor flight prints module [#579](https://github.com/RocketPy-Team/RocketPy/pull/579) diff --git a/rocketpy/environment/environment.py b/rocketpy/environment/environment.py index 3dbf5f1bc..1f50364bf 100644 --- a/rocketpy/environment/environment.py +++ b/rocketpy/environment/environment.py @@ -1551,24 +1551,26 @@ def process_custom_atmosphere( # Check maximum height of custom wind input if not callable(self.wind_velocity_x.source): max_expected_height = max(self.wind_velocity_x[-1, 0], max_expected_height) - if not callable(self.wind_velocity_y.source): - max_expected_height = max(self.wind_velocity_y[-1, 0], max_expected_height) - # Compute wind profile direction and heading - wind_heading = ( - lambda h: np.arctan2(self.wind_velocity_x(h), self.wind_velocity_y(h)) - * (180 / np.pi) - % 360 - ) + def wind_heading_func(h): + return ( + np.arctan2( + self.wind_velocity_x.get_value_opt(h), + self.wind_velocity_y.get_value_opt(h), + ) + * (180 / np.pi) + % 360 + ) + self.wind_heading = Function( - wind_heading, + wind_heading_func, inputs="Height Above Sea Level (m)", outputs="Wind Heading (Deg True)", interpolation="linear", ) def wind_direction(h): - return (wind_heading(h) - 180) % 360 + return (wind_heading_func(h) - 180) % 360 self.wind_direction = Function( wind_direction, @@ -1578,7 +1580,10 @@ def wind_direction(h): ) def wind_speed(h): - return np.sqrt(self.wind_velocity_x(h) ** 2 + self.wind_velocity_y(h) ** 2) + return np.sqrt( + self.wind_velocity_x.get_value_opt(h) ** 2 + + self.wind_velocity_y.get_value_opt(h) ** 2 + ) self.wind_speed = Function( wind_speed, @@ -3197,22 +3202,26 @@ def add_wind_gust(self, wind_gust_x, wind_gust_y): # Reset wind heading and velocity magnitude self.wind_heading = Function( lambda h: (180 / np.pi) - * np.arctan2(self.wind_velocity_x(h), self.wind_velocity_y(h)) + * np.arctan2( + self.wind_velocity_x.get_value_opt(h), + self.wind_velocity_y.get_value_opt(h), + ) % 360, "Height (m)", "Wind Heading (degrees)", extrapolation="constant", ) self.wind_speed = Function( - lambda h: (self.wind_velocity_x(h) ** 2 + self.wind_velocity_y(h) ** 2) + lambda h: ( + self.wind_velocity_x.get_value_opt(h) ** 2 + + self.wind_velocity_y.get_value_opt(h) ** 2 + ) ** 0.5, "Height (m)", "Wind Speed (m/s)", extrapolation="constant", ) - return None - def info(self): """Prints most important data and graphs available about the Environment. diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index eda903bcc..6dc1c764b 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -5,6 +5,7 @@ """ import warnings +from bisect import bisect_left from collections.abc import Iterable from copy import deepcopy from functools import cached_property @@ -16,6 +17,14 @@ from scipy import integrate, linalg, optimize NUMERICAL_TYPES = (float, int, complex, np.ndarray, np.integer, np.floating) +INTERPOLATION_TYPES = { + "linear": 0, + "polynomial": 1, + "akima": 2, + "spline": 3, + "shepard": 4, +} +EXTRAPOLATION_TYPES = {"zero": 0, "natural": 1, "constant": 2} class Function: @@ -106,28 +115,20 @@ def __init__( (II) Fields in CSV files may be enclosed in double quotes. If fields are not quoted, double quotes should not appear inside them. """ - if inputs is None: - inputs = ["Scalar"] - if outputs is None: - outputs = ["Scalar"] - - inputs, outputs, interpolation, extrapolation = self._check_user_input( - source, inputs, outputs, interpolation, extrapolation - ) - - # initialize variables to avoid errors when being called by other methods - self.get_value_opt = None - self.__polynomial_coefficients__ = None - self.__akima_coefficients__ = None - self.__spline_coefficients__ = None - - # store variables - self.set_inputs(inputs) - self.set_outputs(outputs) + # initialize parameters + self.source = source + self.__inputs__ = inputs + self.__outputs__ = outputs self.__interpolation__ = interpolation self.__extrapolation__ = extrapolation - self.set_source(source) - self.set_title(title) + self.title = title + self.__img_dim__ = 1 # always 1, here for backwards compatibility + + # args must be passed from self. + self.set_source(self.source) + self.set_inputs(self.__inputs__) + self.set_outputs(self.__outputs__) + self.set_title(self.title) # Define all set methods def set_inputs(self, inputs): @@ -142,8 +143,7 @@ def set_inputs(self, inputs): ------- self : Function """ - self.__inputs__ = [inputs] if isinstance(inputs, str) else list(inputs) - self.__dom_dim__ = len(self.__inputs__) + self.__inputs__ = self.__validate_inputs(inputs) return self def set_outputs(self, outputs): @@ -158,8 +158,7 @@ def set_outputs(self, outputs): ------- self : Function """ - self.__outputs__ = [outputs] if isinstance(outputs, str) else list(outputs) - self.__img_dim__ = len(self.__outputs__) + self.__outputs__ = self.__validate_outputs(outputs) return self def set_source(self, source): @@ -208,101 +207,45 @@ def set_source(self, source): self : Function Returns the Function instance. """ - *_, interpolation, extrapolation = self._check_user_input( - source, - self.__inputs__, - self.__outputs__, - self.__interpolation__, - self.__extrapolation__, - ) - # If the source is a Function - if isinstance(source, Function): - source = source.get_source() - # Import CSV if source is a string or Path and convert values to ndarray - if isinstance(source, (str, Path)): - with open(source, "r") as file: - try: - source = np.loadtxt(file, delimiter=",", dtype=float) - except ValueError: - # If an error occurs, headers are present - source = np.loadtxt(source, delimiter=",", dtype=float, skiprows=1) - except Exception as e: - raise ValueError( - "The source file is not a valid csv or txt file." - ) from e - - # Convert to ndarray if source is a list - if isinstance(source, (list, tuple)): - source = np.array(source, dtype=np.float64) - # Convert number source into vectorized lambda function - if isinstance(source, (int, float)): - temp = 1 * source - - def source_function(_): - return temp - - source = source_function + source = self.__validate_source(source) # Handle callable source or number source if callable(source): - # Set source - self.source = source - # Set get_value_opt self.get_value_opt = source + self.__interpolation__ = None + self.__extrapolation__ = None + # Set arguments name and domain dimensions parameters = signature(source).parameters self.__dom_dim__ = len(parameters) - if self.__inputs__ == ["Scalar"]: + if self.__inputs__ is None: self.__inputs__ = list(parameters) - # Set interpolation and extrapolation - self.__interpolation__ = None - self.__extrapolation__ = None + # Handle ndarray source else: - # Check to see if dimensions match incoming data set - new_total_dim = len(source[0, :]) - old_total_dim = self.__dom_dim__ + self.__img_dim__ + # Evaluate dimension + self.__dom_dim__ = source.shape[1] - 1 - # If they don't, update default values or throw error - if new_total_dim != old_total_dim: - # Update dimensions and inputs - self.__dom_dim__ = new_total_dim - 1 - self.__inputs__ = self.__dom_dim__ * self.__inputs__ - - # Do things if domDim is 1 + # set x and y. If Function is 2D, also set z if self.__dom_dim__ == 1: source = source[source[:, 0].argsort()] - self.x_array = source[:, 0] self.x_initial, self.x_final = self.x_array[0], self.x_array[-1] - self.y_array = source[:, 1] self.y_initial, self.y_final = self.y_array[0], self.y_array[-1] - - # Finally set data source as source - self.source = source - # Do things if function is multivariate - else: + self.get_value_opt = self.__get_value_opt_1d + elif self.__dom_dim__ > 1: self.x_array = source[:, 0] self.x_initial, self.x_final = self.x_array[0], self.x_array[-1] - self.y_array = source[:, 1] self.y_initial, self.y_final = self.y_array[0], self.y_array[-1] - self.z_array = source[:, 2] self.z_initial, self.z_final = self.z_array[0], self.z_array[-1] + self.get_value_opt = self.__get_value_opt_nd - # Finally set data source as source - self.source = source - # Update extrapolation method - if self.__extrapolation__ is None: - self.set_extrapolation(extrapolation) - # Set default interpolation for point source if it hasn't - if self.__interpolation__ is None: - self.set_interpolation(interpolation) - else: - # Updates interpolation coefficients - self.set_interpolation(self.__interpolation__) + self.source = source + self.set_interpolation(self.__interpolation__) + self.set_extrapolation(self.__extrapolation__) return self @cached_property @@ -342,21 +285,27 @@ def set_interpolation(self, method="spline"): ------- self : Function """ - # Set interpolation method - self.__interpolation__ = method + if not callable(self.source): + self.__interpolation__ = self.__validate_interpolation(method) + self.__update_interpolation_coefficients(self.__interpolation__) + self.__set_interpolation_func() + return self + + def __update_interpolation_coefficients(self, method): + """Update interpolation coefficients for the given method.""" # Spline, akima and polynomial need data processing # Shepard, and linear do not - if method == "spline": - self.__interpolate_spline__() - elif method == "polynomial": + if method == "polynomial": self.__interpolate_polynomial__() + self._coeffs = self.__polynomial_coefficients__ elif method == "akima": self.__interpolate_akima__() - - # Set get_value_opt - self.set_get_value_opt() - - return self + self._coeffs = self.__akima_coefficients__ + elif method == "spline" or method is None: + self.__interpolate_spline__() + self._coeffs = self.__spline_coefficients__ + else: + self._coeffs = [] def set_extrapolation(self, method="constant"): """Set extrapolation behavior of data set. @@ -375,137 +324,163 @@ def set_extrapolation(self, method="constant"): self : Function The Function object. """ - self.__extrapolation__ = method + if not callable(self.source): + self.__extrapolation__ = self.__validate_extrapolation(method) + self.__set_extrapolation_func() return self + def __set_interpolation_func(self): + """Defines interpolation function used by the Function. Each + interpolation method has its own function with exception of shepard, + which has its interpolation/extrapolation function defined in + ``Function.__interpolate_shepard__``. The function is stored in + the attribute _interpolation_func.""" + interpolation = INTERPOLATION_TYPES[self.__interpolation__] + if interpolation == 0: # linear + + def linear_interpolation(x, x_min, x_max, x_data, y_data, coeffs): + x_interval = bisect_left(x_data, x) + x_left = x_data[x_interval - 1] + y_left = y_data[x_interval - 1] + dx = float(x_data[x_interval] - x_left) + dy = float(y_data[x_interval] - y_left) + return (x - x_left) * (dy / dx) + y_left + + self._interpolation_func = linear_interpolation + + elif interpolation == 1: # polynomial + + def polynomial_interpolation(x, x_min, x_max, x_data, y_data, coeffs): + return np.sum(coeffs * x ** np.arange(len(coeffs))) + + self._interpolation_func = polynomial_interpolation + + elif interpolation == 2: # akima + + def akima_interpolation(x, x_min, x_max, x_data, y_data, coeffs): + x_interval = bisect_left(x_data, x) + x_interval = x_interval if x_interval != 0 else 1 + a = coeffs[4 * x_interval - 4 : 4 * x_interval] + return a[3] * x**3 + a[2] * x**2 + a[1] * x + a[0] + + self._interpolation_func = akima_interpolation + + elif interpolation == 3: # spline + + def spline_interpolation(x, x_min, x_max, x_data, y_data, coeffs): + x_interval = bisect_left(x_data, x) + x_interval = max(x_interval, 1) + a = coeffs[:, x_interval - 1] + x = x - x_data[x_interval - 1] + return a[3] * x**3 + a[2] * x**2 + a[1] * x + a[0] + + self._interpolation_func = spline_interpolation + + elif interpolation == 4: # shepard does not use interpolation function + self._interpolation_func = None + + def __set_extrapolation_func(self): + """Defines extrapolation function used by the Function. Each + extrapolation method has its own function. The function is stored in + the attribute _extrapolation_func.""" + interpolation = INTERPOLATION_TYPES[self.__interpolation__] + extrapolation = EXTRAPOLATION_TYPES[self.__extrapolation__] + + if interpolation == 4: # shepard does not use extrapolation function + self._extrapolation_func = None + + elif extrapolation == 0: # zero + + def zero_extrapolation(x, x_min, x_max, x_data, y_data, coeffs): + return 0 + + self._extrapolation_func = zero_extrapolation + elif extrapolation == 1: # natural + if interpolation == 0: # linear + + def natural_extrapolation(x, x_min, x_max, x_data, y_data, coeffs): + x_interval = 1 if x < x_min else -1 + x_left = x_data[x_interval - 1] + y_left = y_data[x_interval - 1] + dx = float(x_data[x_interval] - x_left) + dy = float(y_data[x_interval] - y_left) + return (x - x_left) * (dy / dx) + y_left + + elif interpolation == 1: # polynomial + + def natural_extrapolation(x, x_min, x_max, x_data, y_data, coeffs): + return np.sum(coeffs * x ** np.arange(len(coeffs))) + + elif interpolation == 2: # akima + + def natural_extrapolation(x, x_min, x_max, x_data, y_data, coeffs): + a = coeffs[:4] if x < x_min else coeffs[-4:] + return a[3] * x**3 + a[2] * x**2 + a[1] * x + a[0] + + elif interpolation == 3: # spline + + def natural_extrapolation(x, x_min, x_max, x_data, y_data, coeffs): + if x < x_min: + a = coeffs[:, 0] + x = x - x_data[0] + else: + a = coeffs[:, -1] + x = x - x_data[-2] + return a[3] * x**3 + a[2] * x**2 + a[1] * x + a[0] + + self._extrapolation_func = natural_extrapolation + elif extrapolation == 2: # constant + + def constant_extrapolation(x, x_min, x_max, x_data, y_data, coeffs): + return y_data[0] if x < x_min else y_data[-1] + + self._extrapolation_func = constant_extrapolation + def set_get_value_opt(self): - """Crates a method that evaluates interpolations rather quickly - when compared to other options available, such as just calling - the object instance or calling ``Function.get_value`` directly. See - ``Function.get_value_opt`` for documentation. + """Defines a method that evaluates interpolations. Returns ------- self : Function """ + if callable(self.source): + self.get_value_opt = self.source + elif self.__dom_dim__ == 1: + self.get_value_opt = self.__get_value_opt_1d + elif self.__dom_dim__ > 1: + self.get_value_opt = self.__get_value_opt_nd + return self + + def __get_value_opt_1d(self, x): + """Evaluate the Function at a single point x. This method is used + when the Function is 1-D. + + Parameters + ---------- + x : scalar + Value where the Function is to be evaluated. + + Returns + ------- + y : scalar + Value of the Function at the specified point. + """ # Retrieve general info x_data = self.x_array y_data = self.y_array x_min, x_max = self.x_initial, self.x_final - if self.__extrapolation__ == "zero": - extrapolation = 0 # Extrapolation is zero - elif self.__extrapolation__ == "natural": - extrapolation = 1 # Extrapolation is natural - elif self.__extrapolation__ == "constant": - extrapolation = 2 # Extrapolation is constant + coeffs = self._coeffs + if x_min <= x <= x_max: + y = self._interpolation_func(x, x_min, x_max, x_data, y_data, coeffs) else: - raise ValueError(f"Invalid extrapolation type {self.__extrapolation__}") + y = self._extrapolation_func(x, x_min, x_max, x_data, y_data, coeffs) + return y - # Crete method to interpolate this info for each interpolation type - if self.__interpolation__ == "spline": - coeffs = self.__spline_coefficients__ - - def get_value_opt(x): - x_interval = np.searchsorted(x_data, x) - # Interval found... interpolate... or extrapolate - if x_min <= x <= x_max: - # Interpolate - x_interval = x_interval if x_interval != 0 else 1 - a = coeffs[:, x_interval - 1] - x = x - x_data[x_interval - 1] - y = a[3] * x**3 + a[2] * x**2 + a[1] * x + a[0] - else: - # Extrapolate - if extrapolation == 0: # Extrapolation == zero - y = 0 - elif extrapolation == 1: # Extrapolation == natural - a = coeffs[:, 0] if x < x_min else coeffs[:, -1] - x = x - x_data[0] if x < x_min else x - x_data[-2] - y = a[3] * x**3 + a[2] * x**2 + a[1] * x + a[0] - else: # Extrapolation is set to constant - y = y_data[0] if x < x_min else y_data[-1] - return y - - elif self.__interpolation__ == "linear": - - def get_value_opt(x): - x_interval = np.searchsorted(x_data, x) - # Interval found... interpolate... or extrapolate - if x_min <= x <= x_max: - # Interpolate - dx = float(x_data[x_interval] - x_data[x_interval - 1]) - dy = float(y_data[x_interval] - y_data[x_interval - 1]) - y = (x - x_data[x_interval - 1]) * (dy / dx) + y_data[ - x_interval - 1 - ] - else: - # Extrapolate - if extrapolation == 0: # Extrapolation == zero - y = 0 - elif extrapolation == 1: # Extrapolation == natural - x_interval = 1 if x < x_min else -1 - dx = float(x_data[x_interval] - x_data[x_interval - 1]) - dy = float(y_data[x_interval] - y_data[x_interval - 1]) - y = (x - x_data[x_interval - 1]) * (dy / dx) + y_data[ - x_interval - 1 - ] - else: # Extrapolation is set to constant - y = y_data[0] if x < x_min else y_data[-1] - return y - - elif self.__interpolation__ == "akima": - coeffs = np.array(self.__akima_coefficients__) - - def get_value_opt(x): - x_interval = np.searchsorted(x_data, x) - # Interval found... interpolate... or extrapolate - if x_min <= x <= x_max: - # Interpolate - x_interval = x_interval if x_interval != 0 else 1 - a = coeffs[4 * x_interval - 4 : 4 * x_interval] - y = a[3] * x**3 + a[2] * x**2 + a[1] * x + a[0] - else: - # Extrapolate - if extrapolation == 0: # Extrapolation == zero - y = 0 - elif extrapolation == 1: # Extrapolation == natural - a = coeffs[:4] if x < x_min else coeffs[-4:] - y = a[3] * x**3 + a[2] * x**2 + a[1] * x + a[0] - else: # Extrapolation is set to constant - y = y_data[0] if x < x_min else y_data[-1] - return y - - elif self.__interpolation__ == "polynomial": - coeffs = self.__polynomial_coefficients__ - - def get_value_opt(x): - # Interpolate... or extrapolate - if x_min <= x <= x_max: - # Interpolate - y = 0 - for i, coef in enumerate(coeffs): - y += coef * (x**i) - else: - # Extrapolate - if extrapolation == 0: # Extrapolation == zero - y = 0 - elif extrapolation == 1: # Extrapolation == natural - y = 0 - for i, coef in enumerate(coeffs): - y += coef * (x**i) - else: # Extrapolation is set to constant - y = y_data[0] if x < x_min else y_data[-1] - return y - - elif self.__interpolation__ == "shepard": - # change the function's name to avoid mypy's error - def get_value_opt_multiple(*args): - return self.__interpolate_shepard__(args) - - get_value_opt = get_value_opt_multiple - - self.get_value_opt = get_value_opt - return self + def __get_value_opt_nd(self, *args): + """Evaluate the Function at a single point (x, y, z). This method is + used when the Function is N-D.""" + # always use shepard for N-D functions + return self.__interpolate_shepard__(args) def set_discrete( self, @@ -900,107 +875,20 @@ def get_value(self, *args): if all(isinstance(arg, Iterable) for arg in args): return [self.source(*arg) for arg in zip(*args)] - # Returns value for shepard interpolation - elif self.__interpolation__ == "shepard": - return self.__interpolate_shepard__(args) + elif self.__dom_dim__ > 1: # deals with nd functions and shepard interp + return self.get_value_opt(*args) - # Returns value for polynomial interpolation function type - elif self.__interpolation__ == "polynomial": - if isinstance(args[0], (int, float)): - args = [list(args)] - x = np.array(args[0]) - x_data = self.x_array - y_data = self.y_array - x_min, x_max = self.x_initial, self.x_final - coeffs = self.__polynomial_coefficients__ - matrix = np.zeros((len(args[0]), coeffs.shape[0])) - for i in range(coeffs.shape[0]): - matrix[:, i] = x**i - ans = matrix.dot(coeffs).tolist() - for i, _ in enumerate(x): - if not x_min <= x[i] <= x_max: - if self.__extrapolation__ == "constant": - ans[i] = y_data[0] if x[i] < x_min else y_data[-1] - elif self.__extrapolation__ == "zero": - ans[i] = 0 - return ans if len(ans) > 1 else ans[0] - # Returns value for spline, akima or linear interpolation function type - elif self.__interpolation__ in ["spline", "akima", "linear"]: + # Returns value for other interpolation type + else: # interpolation is "polynomial", "spline", "akima" or "linear" if isinstance(args[0], (int, float, complex, np.integer)): args = [list(args)] - x = list(args[0]) - x_data = self.x_array - y_data = self.y_array - x_intervals = np.searchsorted(x_data, x) - x_min, x_max = self.x_initial, self.x_final - if self.__interpolation__ == "spline": - coeffs = self.__spline_coefficients__ - for i, _ in enumerate(x): - if x[i] == x_min or x[i] == x_max: - x[i] = y_data[x_intervals[i]] - elif x_min < x[i] < x_max or (self.__extrapolation__ == "natural"): - if not x_min < x[i] < x_max: - a = coeffs[:, 0] if x[i] < x_min else coeffs[:, -1] - x[i] = ( - x[i] - x_data[0] if x[i] < x_min else x[i] - x_data[-2] - ) - else: - a = coeffs[:, x_intervals[i] - 1] - x[i] = x[i] - x_data[x_intervals[i] - 1] - x[i] = a[3] * x[i] ** 3 + a[2] * x[i] ** 2 + a[1] * x[i] + a[0] - else: - # Extrapolate - if self.__extrapolation__ == "zero": - x[i] = 0 - else: # Extrapolation is set to constant - x[i] = y_data[0] if x[i] < x_min else y_data[-1] - elif self.__interpolation__ == "linear": - for i, _ in enumerate(x): - # Interval found... interpolate... or extrapolate - inter = x_intervals[i] - if x_min <= x[i] <= x_max: - # Interpolate - dx = float(x_data[inter] - x_data[inter - 1]) - dy = float(y_data[inter] - y_data[inter - 1]) - x[i] = (x[i] - x_data[inter - 1]) * (dy / dx) + y_data[ - inter - 1 - ] - else: - # Extrapolate - if self.__extrapolation__ == "zero": # Extrapolation == zero - x[i] = 0 - elif ( - self.__extrapolation__ == "natural" - ): # Extrapolation == natural - inter = 1 if x[i] < x_min else -1 - dx = float(x_data[inter] - x_data[inter - 1]) - dy = float(y_data[inter] - y_data[inter - 1]) - x[i] = (x[i] - x_data[inter - 1]) * (dy / dx) + y_data[ - inter - 1 - ] - else: # Extrapolation is set to constant - x[i] = y_data[0] if x[i] < x_min else y_data[-1] - else: - coeffs = self.__akima_coefficients__ - for i, _ in enumerate(x): - if x[i] == x_min or x[i] == x_max: - x[i] = y_data[x_intervals[i]] - elif x_min < x[i] < x_max or (self.__extrapolation__ == "natural"): - if not x_min < x[i] < x_max: - a = coeffs[:4] if x[i] < x_min else coeffs[-4:] - else: - a = coeffs[4 * x_intervals[i] - 4 : 4 * x_intervals[i]] - x[i] = a[3] * x[i] ** 3 + a[2] * x[i] ** 2 + a[1] * x[i] + a[0] - else: - # Extrapolate - if self.__extrapolation__ == "zero": - x[i] = 0 - else: # Extrapolation is set to constant - x[i] = y_data[0] if x[i] < x_min else y_data[-1] - if isinstance(args[0], np.ndarray): - return np.array(x) - else: - return x if len(x) > 1 else x[0] + + x = list(args[0]) + x = list(map(self.get_value_opt, x)) + if isinstance(args[0], np.ndarray): + return np.array(x) + else: + return x if len(x) > 1 else x[0] def __getitem__(self, args): """Returns item of the Function source. If the source is not an array, @@ -1984,7 +1872,7 @@ def __add__(self, other): # Create new Function object return Function(source, inputs, outputs, interpolation, extrapolation) else: - return Function(lambda x: (self.get_value(x) + other(x))) + return Function(lambda x: (self.get_value_opt(x) + other(x))) # If other is Float except... except AttributeError: if isinstance(other, NUMERICAL_TYPES): @@ -2005,10 +1893,10 @@ def __add__(self, other): source, inputs, outputs, interpolation, extrapolation ) else: - return Function(lambda x: (self.get_value(x) + other)) + return Function(lambda x: (self.get_value_opt(x) + other)) # Or if it is just a callable elif callable(other): - return Function(lambda x: (self.get_value(x) + other(x))) + return Function(lambda x: (self.get_value_opt(x) + other(x))) def __radd__(self, other): """Sums 'other' and a Function object and returns a new Function @@ -2051,7 +1939,7 @@ def __sub__(self, other): try: return self + (-other) except TypeError: - return Function(lambda x: (self.get_value(x) - other(x))) + return Function(lambda x: (self.get_value_opt(x) - other(x))) def __rsub__(self, other): """Subtracts a Function object from 'other' and returns a new Function @@ -2091,54 +1979,40 @@ def __mul__(self, other): result : Function A Function object which gives the result of self(x)*other(x). """ - # If other is Function try... - try: - # Check if Function objects source is array or callable - # Check if Function objects have the same domain discretization - if ( - isinstance(other.source, np.ndarray) - and isinstance(self.source, np.ndarray) - and self.__dom_dim__ == other.__dom_dim__ - and np.array_equal(self.x_array, other.x_array) - ): - # Operate on grid values - ys = self.y_array * other.y_array - xs = self.x_array - source = np.concatenate(([xs], [ys])).transpose() - # Retrieve inputs, outputs and interpolation - inputs = self.__inputs__[:] - outputs = self.__outputs__[0] + "*" + other.__outputs__[0] - outputs = "(" + outputs + ")" - interpolation = self.__interpolation__ - extrapolation = self.__extrapolation__ - # Create new Function object - return Function(source, inputs, outputs, interpolation, extrapolation) - else: - return Function(lambda x: (self.get_value(x) * other(x))) - # If other is Float except... - except AttributeError: - if isinstance(other, NUMERICAL_TYPES): - # Check if Function object source is array or callable - if isinstance(self.source, np.ndarray): - # Operate on grid values - ys = self.y_array * other - xs = self.x_array - source = np.concatenate(([xs], [ys])).transpose() - # Retrieve inputs, outputs and interpolation - inputs = self.__inputs__[:] - outputs = self.__outputs__[0] + "*" + str(other) - outputs = "(" + outputs + ")" - interpolation = self.__interpolation__ - extrapolation = self.__extrapolation__ - # Create new Function object - return Function( - source, inputs, outputs, interpolation, extrapolation - ) - else: - return Function(lambda x: (self.get_value(x) * other)) - # Or if it is just a callable - elif callable(other): - return Function(lambda x: (self.get_value(x) * other(x))) + self_source_is_array = isinstance(self.source, np.ndarray) + other_source_is_array = ( + isinstance(other.source, np.ndarray) + if isinstance(other, Function) + else False + ) + inputs = self.__inputs__[:] + interp = self.__interpolation__ + extrap = self.__extrapolation__ + + if ( + self_source_is_array + and other_source_is_array + and np.array_equal(self.x_array, other.x_array) + ): + source = np.column_stack((self.x_array, self.y_array * other.y_array)) + outputs = f"({self.__outputs__[0]}*{other.__outputs__[0]})" + return Function(source, inputs, outputs, interp, extrap) + elif isinstance(other, NUMERICAL_TYPES): + if not self_source_is_array: + return Function(lambda x: (self.get_value_opt(x) * other), inputs) + source = np.column_stack((self.x_array, np.multiply(self.y_array, other))) + outputs = f"({self.__outputs__[0]}*{other})" + return Function( + source, + inputs, + outputs, + interp, + extrap, + ) + elif callable(other): + return Function(lambda x: (self.get_value_opt(x) * other(x)), inputs) + else: + raise TypeError("Unsupported type for multiplication") def __rmul__(self, other): """Multiplies 'other' by a Function object and returns a new Function @@ -2331,10 +2205,10 @@ def __pow__(self, other): source, inputs, outputs, interpolation, extrapolation ) else: - return Function(lambda x: (self.get_value(x) ** other)) + return Function(lambda x: (self.get_value_opt(x) ** other)) # Or if it is just a callable elif callable(other): - return Function(lambda x: (self.get_value(x) ** other(x))) + return Function(lambda x: (self.get_value_opt(x) ** other(x))) def __rpow__(self, other): """Raises 'other' to the power of a Function object and returns @@ -2367,10 +2241,10 @@ def __rpow__(self, other): # Create new Function object return Function(source, inputs, outputs, interpolation, extrapolation) else: - return Function(lambda x: (other ** self.get_value(x))) + return Function(lambda x: (other ** self.get_value_opt(x))) # Or if it is just a callable elif callable(other): - return Function(lambda x: (other(x) ** self.get_value(x))) + return Function(lambda x: (other(x) ** self.get_value_opt(x))) def __matmul__(self, other): """Operator @ as an alias for composition. Therefore, this @@ -2545,10 +2419,12 @@ def differentiate(self, x, dx=1e-6, order=1): Evaluated derivative. """ if order == 1: - return (self.get_value(x + dx) - self.get_value(x - dx)) / (2 * dx) + return (self.get_value_opt(x + dx) - self.get_value_opt(x - dx)) / (2 * dx) elif order == 2: return ( - self.get_value(x + dx) - 2 * self.get_value(x) + self.get_value(x - dx) + self.get_value_opt(x + dx) + - 2 * self.get_value_opt(x) + + self.get_value_opt(x - dx) ) / dx**2 def identity_function(self): @@ -2992,189 +2868,188 @@ def savetxt( file.write(header_line + newline) np.savetxt(file, data_points, fmt=fmt, delimiter=delimiter, newline=newline) - @staticmethod - def _check_user_input( - source, - inputs=None, - outputs=None, - interpolation=None, - extrapolation=None, - ): - """ - Validates and processes the user input parameters for creating or - modifying a Function object. This function ensures the inputs, outputs, - interpolation, and extrapolation parameters are compatible with the - given source. It converts the source to a numpy array if necessary, sets - default values and raises warnings or errors for incompatible or - ill-defined parameters. + # Input validators + def __validate_source(self, source): + """Used to validate the source parameter for creating a Function object. Parameters ---------- - source : list, np.ndarray, or callable - The source data or Function object. If a list or ndarray, it should - contain numeric data. If a Function, its inputs and outputs are - checked against the provided inputs and outputs. - inputs : list of str or None - The names of the input variables. If None, defaults are generated - based on the dimensionality of the source. - outputs : str or list of str - The name(s) of the output variable(s). If a list is provided, it - must have a single element. - interpolation : str or None - The method of interpolation to be used. For multidimensional sources - it defaults to 'shepard' if not provided. - extrapolation : str or None - The method of extrapolation to be used. For multidimensional sources - it defaults to 'natural' if not provided. + source : np.ndarray, callable, str, Path, Function, list + The source data of the Function object. This can be a numpy array, + a callable function, a string or Path object to a csv or txt file, + a Function object, or a list of numbers. Returns ------- - tuple - A tuple containing the processed inputs, outputs, interpolation, and - extrapolation parameters. + np.ndarray, callable + The validated source parameter. Raises ------ ValueError - If the dimensionality of the source does not match the combined - dimensions of inputs and outputs. If the outputs list has more than - one element. - TypeError - If the source is not a list, np.ndarray, Function object, str or - Path. - Warning - If inputs or outputs do not match for a Function source, or if - defaults are used for inputs, interpolation,and extrapolation for a - multidimensional source. + If the source is not a valid type or if the source is not a 2D array + or a callable function. + """ + if isinstance(source, Function): + return source.get_source() - Examples - -------- - >>> from rocketpy import Function - >>> source = np.array([(1, 1), (2, 4), (3, 9)]) - >>> inputs = "x" - >>> outputs = ["y"] - >>> interpolation = 'linear' - >>> extrapolation = 'zero' - >>> inputs, outputs, interpolation, extrapolation = Function._check_user_input( - ... source, inputs, outputs, interpolation, extrapolation - ... ) - >>> inputs - ['x'] - >>> outputs - ['y'] - >>> interpolation - 'linear' - >>> extrapolation - 'zero' - """ - # check output type and dimensions - if isinstance(outputs, str): - outputs = [outputs] - if isinstance(inputs, str): - inputs = [inputs] + if isinstance(source, (str, Path)): + # Read csv or txt files and create a numpy array + try: + source = np.loadtxt(source, delimiter=",", dtype=np.float64) + except ValueError: + with open(source, "r") as file: + header, *data = file.read().splitlines() + + header = [label.strip("'").strip('"') for label in header.split(",")] + source = np.loadtxt(data, delimiter=",", dtype=np.float64) + + if len(source[0]) == len(header): + if self.__inputs__ is None: + self.__inputs__ = header[:-1] + if self.__outputs__ is None: + self.__outputs__ = [header[-1]] + except Exception as e: + raise ValueError( + "Could not read the csv or txt file to create Function source." + ) from e + + if isinstance(source, list) or isinstance(source, np.ndarray): + # Triggers an error if source is not a list of numbers + source = np.array(source, dtype=np.float64) - if len(outputs) > 1: + # Checks if 2D array + if len(source.shape) != 2: + raise ValueError( + "Source must be a 2D array in the form [[x1, x2 ..., xn, y], ...]." + ) + return source + + if isinstance(source, (int, float)): + # Convert number source into vectorized lambda function + temp = 1 * source + + def source_function(_): + return temp + + return source_function + + # If source is a callable function + return source + + def __validate_inputs(self, inputs): + """Used to validate the inputs parameter for creating a Function object. + It sets a default value if it is not provided. + + Parameters + ---------- + inputs : list of str, None + The name(s) of the input variable(s). If None, defaults to "Scalar". + + Returns + ------- + list + The validated inputs parameter. + """ + if self.__dom_dim__ == 1: + if inputs is None: + return ["Scalar"] + if isinstance(inputs, str): + return [inputs] + if isinstance(inputs, (list, tuple)): + if len(inputs) == 1: + return inputs raise ValueError( - "Output must either be a string or have dimension 1, " - + f"it currently has dimension ({len(outputs)})." + "Inputs must be a string or a list of strings with " + "the length of the domain dimension." + ) + if self.__dom_dim__ > 1: + if inputs is None: + return [f"Input {i+1}" for i in range(self.__dom_dim__)] + if isinstance(inputs, list): + if len(inputs) == self.__dom_dim__ and all( + isinstance(i, str) for i in inputs + ): + return inputs + raise ValueError( + "Inputs must be a list of strings with " + "the length of the domain dimension." ) - # check source for data type - # if list or ndarray, check for dimensions, interpolation and extrapolation - if isinstance(source, Function): - source = source.get_source() - if isinstance(source, (list, np.ndarray, str, Path)): - # Deal with csv or txt - if isinstance(source, (str, Path)): - # Convert to numpy array - try: - source = np.loadtxt(source, delimiter=",", dtype=float) - except ValueError: - with open(source, "r") as file: - header, *data = file.read().splitlines() - - header = [ - label.strip("'").strip('"') for label in header.split(",") - ] - source = np.loadtxt(data, delimiter=",", dtype=float) - - if len(source[0]) == len(header): - if inputs == ["Scalar"]: - inputs = header[:-1] - if outputs == ["Scalar"]: - outputs = [header[-1]] - except Exception as e: - raise ValueError( - "The source file is not a valid csv or txt file." - ) from e - - else: - # this will also trigger an error if the source is not a list of - # numbers or if the array is not homogeneous - source = np.array(source, dtype=np.float64) - - # check dimensions - source_dim = source.shape[1] - - # check interpolation and extrapolation - - ## single dimension - if source_dim == 2: - # possible interpolation values: linear, polynomial, akima and spline - if interpolation is None: - interpolation = "spline" - elif interpolation.lower() not in [ - "spline", - "linear", - "polynomial", - "akima", - ]: - warnings.warn( - "Interpolation method for single dimensional functions was " - + f"set to 'spline', the {interpolation} method is not supported." - ) - interpolation = "spline" - - # possible extrapolation values: constant, natural, zero - if extrapolation is None: - extrapolation = "constant" - elif extrapolation.lower() not in ["constant", "natural", "zero"]: - warnings.warn( - "Extrapolation method for single dimensional functions was " - + f"set to 'constant', the {extrapolation} method is not supported." - ) - extrapolation = "constant" - - ## multiple dimensions - elif source_dim > 2: - # check for inputs and outputs - if inputs == ["Scalar"]: - inputs = [f"Input {i+1}" for i in range(source_dim - 1)] - - if interpolation not in [None, "shepard"]: - warnings.warn( - ( - "Interpolation method for multidimensional functions was" - "set to 'shepard', other methods are not supported yet." - ), - ) - interpolation = "shepard" + def __validate_outputs(self, outputs): + """Used to validate the outputs parameter for creating a Function object. + It sets a default value if it is not provided. - if extrapolation not in [None, "natural"]: - warnings.warn( - "Extrapolation method for multidimensional functions was set" - "to 'natural', other methods are not supported yet." - ) - extrapolation = "natural" + Parameters + ---------- + outputs : str, list of str, None + The name of the output variables. If None, defaults to "Scalar". - # check input dimensions - in_out_dim = len(inputs) + len(outputs) - if source_dim != in_out_dim: + Returns + ------- + list + The validated outputs parameter. + """ + if outputs is None: + return ["Scalar"] + if isinstance(outputs, str): + return [outputs] + if isinstance(outputs, (list, tuple)): + if len(outputs) > 1: raise ValueError( - f"Source dimension ({source_dim}) does not match input " - + f"and output dimension ({in_out_dim})." + "Output must either be a string or a list of strings with " + + f"one item. It currently has dimension ({len(outputs)})." + ) + return outputs + + def __validate_interpolation(self, interpolation): + if self.__dom_dim__ == 1: + # possible interpolation values: linear, polynomial, akima and spline + if interpolation is None: + interpolation = "spline" + elif interpolation.lower() not in [ + "spline", + "linear", + "polynomial", + "akima", + ]: + warnings.warn( + "Interpolation method set to 'spline' because the " + f"{interpolation} method is not supported." + ) + interpolation = "spline" + ## multiple dimensions + elif self.__dom_dim__ > 1: + if interpolation not in [None, "shepard"]: + warnings.warn( + ( + "Interpolation method set to 'shepard'. Only 'shepard' " + "interpolation is supported for multiple dimensions." + ), + ) + interpolation = "shepard" + return interpolation + + def __validate_extrapolation(self, extrapolation): + if self.__dom_dim__ == 1: + if extrapolation is None: + extrapolation = "constant" + elif extrapolation.lower() not in ["constant", "natural", "zero"]: + warnings.warn( + "Extrapolation method set to 'constant' because the " + f"{extrapolation} method is not supported." + ) + extrapolation = "constant" + + ## multiple dimensions + elif self.__dom_dim__ > 1: + if extrapolation not in [None, "natural"]: + warnings.warn( + "Extrapolation method set to 'natural'. Other methods " + "are not supported yet." ) - return inputs, outputs, interpolation, extrapolation + extrapolation = "natural" + return extrapolation class PiecewiseFunction(Function): @@ -3258,7 +3133,7 @@ def calc_output(func, inputs): """ output = np.zeros(len(inputs)) for j, value in enumerate(inputs): - output[j] = func.get_value(value) + output[j] = func.get_value_opt(value) return output input_data = [] diff --git a/rocketpy/plots/environment_analysis_plots.py b/rocketpy/plots/environment_analysis_plots.py index 9fbf68d95..26727aba9 100644 --- a/rocketpy/plots/environment_analysis_plots.py +++ b/rocketpy/plots/environment_analysis_plots.py @@ -1454,7 +1454,7 @@ def wind_speed_profile_grid(self, clear_range_limits=False): if self.env_analysis.forecast: forecast = self.env_analysis.forecast y = self.env_analysis.average_wind_speed_profile_by_hour[hour][1] - x = forecast[hour].wind_speed.get_value(y) * convert_units( + x = forecast[hour].wind_speed.get_value_opt(y) * convert_units( 1, "m/s", self.env_analysis.unit_system["wind_speed"] ) ax.plot(x, y, "b--") diff --git a/rocketpy/rocket/rocket.py b/rocketpy/rocket/rocket.py index c074afc8d..c65f1eca1 100644 --- a/rocketpy/rocket/rocket.py +++ b/rocketpy/rocket/rocket.py @@ -546,7 +546,11 @@ def evaluate_stability_margin(self): """ self.stability_margin.set_source( lambda mach, time: ( - (self.center_of_mass(time) - self.cp_position(mach)) / (2 * self.radius) + ( + self.center_of_mass.get_value_opt(time) + - self.cp_position.get_value_opt(mach) + ) + / (2 * self.radius) ) * self._csys ) @@ -564,7 +568,10 @@ def evaluate_static_margin(self): """ # Calculate static margin self.static_margin.set_source( - lambda time: (self.center_of_mass(time) - self.cp_position(0)) + lambda time: ( + self.center_of_mass.get_value_opt(time) + - self.cp_position.get_value_opt(0) + ) / (2 * self.radius) ) # Change sign if coordinate system is upside down diff --git a/rocketpy/simulation/flight.py b/rocketpy/simulation/flight.py index badbc31e4..b762c376f 100644 --- a/rocketpy/simulation/flight.py +++ b/rocketpy/simulation/flight.py @@ -12,7 +12,10 @@ from ..plots.flight_plots import _FlightPlots from ..prints.flight_prints import _FlightPrints from ..tools import ( + calculate_cubic_hermite_coefficients, find_closest, + find_root_linear_interpolation, + find_roots_cubic_function, quaternions_to_nutation, quaternions_to_precession, quaternions_to_spin, @@ -182,13 +185,13 @@ class Flight: Flight.function_evaluations : array List that stores number of derivative function evaluations during numerical integration in cumulative manner. - Flight.function_evaluations_per_time_step : array + Flight.function_evaluations_per_time_step : list List that stores number of derivative function evaluations per time step during numerical integration. Flight.time_steps : array List of time steps taking during numerical integration in seconds. - Flight.FlightPhases : Flight.FlightPhases + Flight.flight_phases : Flight.FlightPhases Stores and manages flight phases. Flight.wind_velocity_x : Function Wind velocity X (East) experienced by the rocket as a @@ -580,13 +583,7 @@ def __init__( ------- None """ - # Fetch helper classes and functions - FlightPhases = self.FlightPhases - TimeNodes = self.TimeNodes - time_iterator = self.time_iterator - - # Save rocket, parachutes, environment, maximum simulation time - # and termination events + # Save arguments self.env = environment self.rocket = rocket self.rail_length = rail_length @@ -608,38 +605,45 @@ def __init__( self.equations_of_motion = equations_of_motion # Flight initialization - self.__init_post_process_variables() self.__init_solution_monitors() self.__init_equations_of_motion() - - # Initialize prints and plots objects - self.prints = _FlightPrints(self) - self.plots = _FlightPlots(self) - - # Initialize solver monitors self.__init_solver_monitors() # Create known flight phases - self.FlightPhases = FlightPhases() - self.FlightPhases.add_phase( + self.flight_phases = self.FlightPhases() + self.flight_phases.add_phase( self.t_initial, self.initial_derivative, clear=False ) - self.FlightPhases.add_phase(self.max_time) + self.flight_phases.add_phase(self.max_time) # Simulate flight - for phase_index, phase in time_iterator(self.FlightPhases): - # print('\nCurrent Flight Phase List') - # print(self.FlightPhases) - # print('\n\tCurrent Flight Phase') - # print('\tIndex: ', phase_index, ' | Phase: ', phase) + self.__simulate(verbose) + + # Initialize prints and plots objects + self.prints = _FlightPrints(self) + self.plots = _FlightPlots(self) + + def __repr__(self): + return ( + f"" + ) + + def __simulate(self, verbose): + """Simulate the flight trajectory.""" + for phase_index, phase in self.time_iterator(self.flight_phases): # Determine maximum time for this flight phase - phase.time_bound = self.FlightPhases[phase_index + 1].t + phase.time_bound = self.flight_phases[phase_index + 1].t # Evaluate callbacks for callback in phase.callbacks: callback(self) - # Create solver for this flight phase + # Create solver for this flight phase # TODO: allow different integrators self.function_evaluations.append(0) phase.solver = integrate.LSODA( phase.derivative, @@ -651,46 +655,36 @@ def __init__( rtol=self.rtol, atol=self.atol, ) - # print('\n\tSolver Initialization Details') - # print('\t_initial Time: ', phase.t) - # print('\t_initial State: ', self.y_sol) - # print('\tTime Bound: ', phase.time_bound) - # print('\tMin Step: ', self.min_time_step) - # print('\tMax Step: ', self.max_time_step) - # print('\tTolerances: ', self.rtol, self.atol) # Initialize phase time nodes - phase.TimeNodes = TimeNodes() - # Add first time node to permanent list - phase.TimeNodes.add_node(phase.t, [], []) + phase.time_nodes = self.TimeNodes() + # Add first time node to the time_nodes list + phase.time_nodes.add_node(phase.t, [], []) # Add non-overshootable parachute time nodes if self.time_overshoot is False: - phase.TimeNodes.add_parachutes( + # TODO: move parachutes to controllers + phase.time_nodes.add_parachutes( self.parachutes, phase.t, phase.time_bound ) - phase.TimeNodes.add_controllers( + phase.time_nodes.add_controllers( self._controllers, phase.t, phase.time_bound ) - # Add lst time node to permanent list - phase.TimeNodes.add_node(phase.time_bound, [], []) - # Sort time nodes - phase.TimeNodes.sort() - # Merge equal time nodes - phase.TimeNodes.merge() + # Add last time node to the time_nodes list + phase.time_nodes.add_node(phase.time_bound, [], []) + # Organize time nodes with sort() and merge() + phase.time_nodes.sort() + phase.time_nodes.merge() # Clear triggers from first time node if necessary if phase.clear: - phase.TimeNodes[0].parachutes = [] - phase.TimeNodes[0].callbacks = [] - - # print('\n\tPhase Time Nodes') - # print('\tTime Nodes Length: ', str(len(phase.TimeNodes)), ' | Time Nodes Preview: ', phase.TimeNodes[0:3]) + phase.time_nodes[0].parachutes = [] + phase.time_nodes[0].callbacks = [] # Iterate through time nodes - for node_index, node in time_iterator(phase.TimeNodes): - # print('\n\t\tCurrent Time Node') - # print('\t\tIndex: ', node_index, ' | Time Node: ', node) + for node_index, node in self.time_iterator(phase.time_nodes): # Determine time bound for this time node - node.time_bound = phase.TimeNodes[node_index + 1].t + node.time_bound = phase.time_nodes[node_index + 1].t + # NOTE: Setting the time bound and status for the phase solver, + # and updating its internal state for the next integration step. phase.solver.t_bound = node.time_bound phase.solver._lsoda_solver._integrator.rwork[0] = phase.solver.t_bound phase.solver._lsoda_solver._integrator.call_args[4] = ( @@ -699,6 +693,7 @@ def __init__( phase.solver.status = "running" # Feed required parachute and discrete controller triggers + # TODO: parachutes should be moved to controllers for callback in node.callbacks: callback(self) @@ -707,28 +702,21 @@ def __init__( for parachute in node.parachutes: # Calculate and save pressure signal - pressure = self.env.pressure.get_value_opt(self.y_sol[2]) - parachute.clean_pressure_signal.append([node.t, pressure]) - # Calculate and save noise - noise = parachute.noise_function() - parachute.noise_signal.append([node.t, noise]) - parachute.noisy_pressure_signal.append([node.t, pressure + noise]) - # Gets height above ground level considering noise - hAGL = ( - self.env.barometric_height(pressure + noise) - - self.env.elevation + noisy_pressure, height_above_ground_level = ( + self.__calculate_and_save_pressure_signals( + parachute, node.t, self.y_sol[2] + ) ) - if parachute.triggerfunc(pressure + noise, hAGL, self.y_sol): - # print('\nEVENT DETECTED') - # print('Parachute Triggered') - # print('Name: ', parachute.name, ' | Lag: ', parachute.lag) + if parachute.triggerfunc( + noisy_pressure, height_above_ground_level, self.y_sol + ): # Remove parachute from flight parachutes self.parachutes.remove(parachute) # Create flight phase for time after detection and before inflation # Must only be created if parachute has any lag i = 1 if parachute.lag != 0: - self.FlightPhases.add_phase( + self.flight_phases.add_phase( node.t, phase.derivative, clear=True, @@ -741,7 +729,7 @@ def __init__( self, "parachute_cd_s", parachute_cd_s ) ] - self.FlightPhases.add_phase( + self.flight_phases.add_phase( node.t + parachute.lag, self.u_dot_parachute, callbacks, @@ -749,37 +737,24 @@ def __init__( index=phase_index + i, ) # Prepare to leave loops and start new flight phase - phase.TimeNodes.flush_after(node_index) - phase.TimeNodes.add_node(self.t, [], []) + phase.time_nodes.flush_after(node_index) + phase.time_nodes.add_node(self.t, [], []) phase.solver.status = "finished" # Save parachute event self.parachute_events.append([self.t, parachute]) # Step through simulation while phase.solver.status == "running": - # Step + # Execute solver step, log solution and function evaluations phase.solver.step() - # Save step result self.solution += [[phase.solver.t, *phase.solver.y]] - # Step step metrics - self.function_evaluations_per_time_step.append( - phase.solver.nfev - self.function_evaluations[-1] - ) self.function_evaluations.append(phase.solver.nfev) - self.time_steps.append(phase.solver.step_size) + # Update time and state self.t = phase.solver.t self.y_sol = phase.solver.y if verbose: - print( - "Current Simulation Time: {:3.4f} s".format(self.t), - end="\r", - ) - # print('\n\t\t\tCurrent Step Details') - # print('\t\t\tIState: ', phase.solver._lsoda_solver._integrator.istate) - # print('\t\t\tTime: ', phase.solver.t) - # print('\t\t\tAltitude: ', phase.solver.y[2]) - # print('\t\t\tEvals: ', self.function_evaluations_per_time_step[-1]) + print(f"Current Simulation Time: {self.t:3.4f} s", end="\r") # Check for first out of rail event if len(self.out_of_rail_state) == 1 and ( @@ -788,12 +763,7 @@ def __init__( + (self.y_sol[2] - self.env.elevation) ** 2 >= self.effective_1rl**2 ): - # Rocket is out of rail # Check exactly when it went out using root finding - # States before and after - # t0 -> 0 - # print('\nEVENT DETECTED') - # print('Rocket is Out of Rail!') # Disconsider elevation self.solution[-2][3] -= self.env.elevation self.solution[-1][3] -= self.env.elevation @@ -823,24 +793,23 @@ def __init__( self.solution[-2][3] += self.env.elevation self.solution[-1][3] += self.env.elevation # Cubic Hermite interpolation (ax**3 + bx**2 + cx + d) - D = float(phase.solver.step_size) - d = float(y0) - c = float(yp0) - b = float((3 * y1 - yp1 * D - 2 * c * D - 3 * d) / (D**2)) - a = float(-(2 * y1 - yp1 * D - c * D - 2 * d) / (D**3)) + 1e-5 + a, b, c, d = calculate_cubic_hermite_coefficients( + 0, + float(phase.solver.step_size), + y0, + yp0, + y1, + yp1, + ) + a += 1e-5 # TODO: why?? # Find roots - d0 = b**2 - 3 * a * c - d1 = 2 * b**3 - 9 * a * b * c + 27 * d * a**2 - c1 = ((d1 + (d1**2 - 4 * d0**3) ** (0.5)) / 2) ** (1 / 3) - t_roots = [] - for k in [0, 1, 2]: - c2 = c1 * (-1 / 2 + 1j * (3**0.5) / 2) ** k - t_roots.append(-(1 / (3 * a)) * (b + c2 + d0 / c2)) + t_roots = find_roots_cubic_function(a, b, c, d) # Find correct root - valid_t_root = [] - for t_root in t_roots: - if 0 < t_root.real < t1 and abs(t_root.imag) < 0.001: - valid_t_root.append(t_root.real) + valid_t_root = [ + t_root.real + for t_root in t_roots + if 0 < t_root.real < t1 and abs(t_root.imag) < 0.001 + ] if len(valid_t_root) > 1: raise ValueError( "Multiple roots found when solving for rail exit time." @@ -857,31 +826,23 @@ def __init__( self.out_of_rail_time = self.t self.out_of_rail_time_index = len(self.solution) - 1 self.out_of_rail_state = self.y_sol - self.out_of_rail_velocity = ( - self.y_sol[3] ** 2 + self.y_sol[4] ** 2 + self.y_sol[5] ** 2 - ) ** (0.5) # Create new flight phase - self.FlightPhases.add_phase( + self.flight_phases.add_phase( self.t, self.u_dot_generalized, index=phase_index + 1, ) # Prepare to leave loops and start new flight phase - phase.TimeNodes.flush_after(node_index) - phase.TimeNodes.add_node(self.t, [], []) + phase.time_nodes.flush_after(node_index) + phase.time_nodes.add_node(self.t, [], []) phase.solver.status = "finished" # Check for apogee event if len(self.apogee_state) == 1 and self.y_sol[5] < 0: - # print('\nPASSIVE EVENT DETECTED') - # print('Rocket Has Reached Apogee!') - # Apogee reported # Assume linear vz(t) to detect when vz = 0 - vz0 = self.solution[-2][6] - t0 = self.solution[-2][0] - vz1 = self.solution[-1][6] - t1 = self.solution[-1][0] - t_root = -(t1 - t0) * vz0 / (vz1 - vz0) + t0 + t0, vz0 = self.solution[-2][0], self.solution[-2][6] + t1, vz1 = self.solution[-1][0], self.solution[-1][6] + t_root = find_root_linear_interpolation(t0, t1, vz0, vz1, 0) # Fetch state at t_root interpolator = phase.solver.dense_output() self.apogee_state = interpolator(t_root) @@ -890,88 +851,66 @@ def __init__( self.apogee_x = self.apogee_state[0] self.apogee_y = self.apogee_state[1] self.apogee = self.apogee_state[2] + if self.terminate_on_apogee: - # print('Terminate on Apogee Activated!') - self.t = t_root - self.t_final = self.t - self.state = self.apogee_state + self.t = self.t_final = t_root # Roll back solution - self.solution[-1] = [self.t, *self.state] + self.solution[-1] = [self.t, *self.apogee_state] # Set last flight phase - self.FlightPhases.flush_after(phase_index) - self.FlightPhases.add_phase(self.t) + self.flight_phases.flush_after(phase_index) + self.flight_phases.add_phase(self.t) # Prepare to leave loops and start new flight phase - phase.TimeNodes.flush_after(node_index) - phase.TimeNodes.add_node(self.t, [], []) + phase.time_nodes.flush_after(node_index) + phase.time_nodes.add_node(self.t, [], []) phase.solver.status = "finished" # Check for impact event if self.y_sol[2] < self.env.elevation: - # print('\nPASSIVE EVENT DETECTED') - # print('Rocket Has Reached Ground!') - # Impact reported - # Check exactly when it went out using root finding - # States before and after - # t0 -> 0 - # Disconsider elevation - self.solution[-2][3] -= self.env.elevation - self.solution[-1][3] -= self.env.elevation - # Get points - y0 = self.solution[-2][3] - yp0 = self.solution[-2][6] - t1 = self.solution[-1][0] - self.solution[-2][0] - y1 = self.solution[-1][3] - yp1 = self.solution[-1][6] - # Put elevation back - self.solution[-2][3] += self.env.elevation - self.solution[-1][3] += self.env.elevation + # Check exactly when it happened using root finding # Cubic Hermite interpolation (ax**3 + bx**2 + cx + d) - D = float(phase.solver.step_size) - d = float(y0) - c = float(yp0) - b = float((3 * y1 - yp1 * D - 2 * c * D - 3 * d) / (D**2)) - a = float(-(2 * y1 - yp1 * D - c * D - 2 * d) / (D**3)) + a, b, c, d = calculate_cubic_hermite_coefficients( + x0=0, # t0 + x1=float(phase.solver.step_size), # t1 - t0 + y0=float(self.solution[-2][3] - self.env.elevation), # z0 + yp0=float(self.solution[-2][6]), # vz0 + y1=float(self.solution[-1][3] - self.env.elevation), # z1 + yp1=float(self.solution[-1][6]), # vz1 + ) # Find roots - d0 = b**2 - 3 * a * c - d1 = 2 * b**3 - 9 * a * b * c + 27 * d * a**2 - c1 = ((d1 + (d1**2 - 4 * d0**3) ** (0.5)) / 2) ** (1 / 3) - t_roots = [] - for k in [0, 1, 2]: - c2 = c1 * (-1 / 2 + 1j * (3**0.5) / 2) ** k - t_roots.append(-(1 / (3 * a)) * (b + c2 + d0 / c2)) + t_roots = find_roots_cubic_function(a, b, c, d) # Find correct root - valid_t_root = [] - for t_root in t_roots: - if 0 < t_root.real < t1 and abs(t_root.imag) < 0.001: - valid_t_root.append(t_root.real) + t1 = self.solution[-1][0] - self.solution[-2][0] + valid_t_root = [ + t_root.real + for t_root in t_roots + if abs(t_root.imag) < 0.001 and 0 < t_root.real < t1 + ] if len(valid_t_root) > 1: raise ValueError( "Multiple roots found when solving for impact time." ) # Determine impact state at t_root - self.t = valid_t_root[0] + self.solution[-2][0] + self.t = self.t_final = valid_t_root[0] + self.solution[-2][0] interpolator = phase.solver.dense_output() - self.y_sol = interpolator(self.t) + self.y_sol = self.impact_state = interpolator(self.t) # Roll back solution self.solution[-1] = [self.t, *self.y_sol] # Save impact state - self.impact_state = self.y_sol self.x_impact = self.impact_state[0] self.y_impact = self.impact_state[1] self.z_impact = self.impact_state[2] self.impact_velocity = self.impact_state[5] - self.t_final = self.t # Set last flight phase - self.FlightPhases.flush_after(phase_index) - self.FlightPhases.add_phase(self.t) + self.flight_phases.flush_after(phase_index) + self.flight_phases.add_phase(self.t) # Prepare to leave loops and start new flight phase - phase.TimeNodes.flush_after(node_index) - phase.TimeNodes.add_node(self.t, [], []) + phase.time_nodes.flush_after(node_index) + phase.time_nodes.add_node(self.t, [], []) phase.solver.status = "finished" # List and feed overshootable time nodes if self.time_overshoot: # Initialize phase overshootable time nodes - overshootable_nodes = TimeNodes() + overshootable_nodes = self.TimeNodes() # Add overshootable parachute time nodes overshootable_nodes.add_parachutes( self.parachutes, self.solution[-2][0], self.t @@ -979,65 +918,46 @@ def __init__( # Add last time node (always skipped) overshootable_nodes.add_node(self.t, [], []) if len(overshootable_nodes) > 1: - # Sort overshootable time nodes + # Sort and merge equal overshootable time nodes overshootable_nodes.sort() - # Merge equal overshootable time nodes overshootable_nodes.merge() # Clear if necessary if overshootable_nodes[0].t == phase.t and phase.clear: overshootable_nodes[0].parachutes = [] overshootable_nodes[0].callbacks = [] - # print('\n\t\t\t\tOvershootable Time Nodes') - # print('\t\t\t\tInterval: ', self.solution[-2][0], self.t) - # print('\t\t\t\tOvershootable Nodes Length: ', str(len(overshootable_nodes)), ' | Overshootable Nodes: ', overshootable_nodes) # Feed overshootable time nodes trigger interpolator = phase.solver.dense_output() for ( overshootable_index, overshootable_node, - ) in time_iterator(overshootable_nodes): - # print('\n\t\t\t\tCurrent Overshootable Node') - # print('\t\t\t\tIndex: ', overshootable_index, ' | Overshootable Node: ', overshootable_node) + ) in self.time_iterator(overshootable_nodes): # Calculate state at node time - overshootable_node.y = interpolator( + overshootable_node.y_sol = interpolator( overshootable_node.t ) - # Calculate and save pressure signal - pressure = self.env.pressure.get_value_opt( - overshootable_node.y[2] - ) for parachute in overshootable_node.parachutes: - # Save pressure signal - parachute.clean_pressure_signal.append( - [overshootable_node.t, pressure] - ) - # Calculate and save noise - noise = parachute.noise_function() - parachute.noise_signal.append( - [overshootable_node.t, noise] - ) - parachute.noisy_pressure_signal.append( - [overshootable_node.t, pressure + noise] - ) - # Gets height above ground level considering noise - hAGL = ( - self.env.barometric_height(pressure + noise) - - self.env.elevation + # Calculate and save pressure signal + noisy_pressure, height_above_ground_level = ( + self.__calculate_and_save_pressure_signals( + parachute, + overshootable_node.t, + overshootable_node.y_sol[2], + ) ) + # Check for parachute trigger if parachute.triggerfunc( - pressure + noise, hAGL, overshootable_node.y + noisy_pressure, + height_above_ground_level, + overshootable_node.y_sol, ): - # print('\nEVENT DETECTED') - # print('Parachute Triggered') - # print('Name: ', parachute.name, ' | Lag: ', parachute.lag) # Remove parachute from flight parachutes self.parachutes.remove(parachute) # Create flight phase for time after detection and before inflation # Must only be created if parachute has any lag i = 1 if parachute.lag != 0: - self.FlightPhases.add_phase( + self.flight_phases.add_phase( overshootable_node.t, phase.derivative, clear=True, @@ -1050,7 +970,7 @@ def __init__( self, "parachute_cd_s", parachute_cd_s ) ] - self.FlightPhases.add_phase( + self.flight_phases.add_phase( overshootable_node.t + parachute.lag, self.u_dot_parachute, callbacks, @@ -1059,17 +979,17 @@ def __init__( ) # Rollback history self.t = overshootable_node.t - self.y_sol = overshootable_node.y + self.y_sol = overshootable_node.y_sol self.solution[-1] = [ overshootable_node.t, - *overshootable_node.y, + *overshootable_node.y_sol, ] # Prepare to leave loops and start new flight phase overshootable_nodes.flush_after( overshootable_index ) - phase.TimeNodes.flush_after(node_index) - phase.TimeNodes.add_node(self.t, [], []) + phase.time_nodes.flush_after(node_index) + phase.time_nodes.add_node(self.t, [], []) phase.solver.status = "finished" # Save parachute event self.parachute_events.append( @@ -1077,30 +997,52 @@ def __init__( ) self.t_final = self.t - self._calculate_pressure_signal() + self.__transform_pressure_signals_lists_to_functions() if verbose: - print("Simulation Completed at Time: {:3.4f} s".format(self.t)) + print(f"\n>>> Simulation Completed at Time: {self.t:3.4f} s") + + def __calculate_and_save_pressure_signals(self, parachute, t, z): + """Gets noise and pressure signals and saves them in the parachute + object given the current time and altitude. + + Parameters + ---------- + parachute : Parachute + The parachute object to calculate signals for. + t : float + The current time in seconds. + z : float + The altitude above sea level in meters. - def __init_post_process_variables(self): - """Initialize post-process variables.""" - # Initialize all variables calculated after initialization. - # Important to do so that MATLAB® can access them - self._drift = Function(0) - self._bearing = Function(0) - self._latitude = Function(0) - self._longitude = Function(0) + Returns + ------- + tuple[float, float] + The noisy pressure and height above ground level. + """ + # Calculate pressure and noise + pressure = self.env.pressure.get_value_opt(z) + noise = parachute.noise_function() + noisy_pressure = pressure + noise + + # Stores in the parachute object + parachute.clean_pressure_signal.append([t, pressure]) + parachute.noise_signal.append([t, noise]) + + # Gets height above ground level considering noise + height_above_ground_level = ( + self.env.barometric_height.get_value_opt(noisy_pressure) + - self.env.elevation + ) + + return noisy_pressure, height_above_ground_level def __init_solution_monitors(self): # Initialize solution monitors self.out_of_rail_time = 0 self.out_of_rail_time_index = 0 self.out_of_rail_state = np.array([0]) - self.out_of_rail_velocity = 0 self.apogee_state = np.array([0]) self.apogee_time = 0 - self.apogee_x = 0 - self.apogee_y = 0 - self.apogee = 0 self.x_impact = 0 self.y_impact = 0 self.impact_velocity = 0 @@ -1108,8 +1050,6 @@ def __init_solution_monitors(self): self.parachute_events = [] self.post_processed = False - return None - def __init_flight_state(self): """Initialize flight state variables.""" if self.initial_solution is None: @@ -1165,8 +1105,6 @@ def __init_flight_state(self): def __init_solver_monitors(self): # Initialize solver monitors self.function_evaluations = [] - self.function_evaluations_per_time_step = [] - self.time_steps = [] # Initialize solution state self.solution = [] self.__init_flight_state() @@ -1179,11 +1117,7 @@ def __init_solver_monitors(self): def __init_equations_of_motion(self): """Initialize equations of motion.""" if self.equations_of_motion == "solid_propulsion": - self.u_dot_generalized = self.u_dot - - def __init_equations_of_motion(self): - """Initialize equations of motion.""" - if self.equations_of_motion == "solid_propulsion": + # NOTE: The u_dot is faster, but only works for solid propulsion self.u_dot_generalized = self.u_dot @cached_property @@ -1219,25 +1153,34 @@ def effective_2rl(self): @cached_property def frontal_surface_wind(self): - # Surface wind magnitude in the frontal direction at the rail's elevation - wind_u = self.env.wind_velocity_x(self.env.elevation) - wind_v = self.env.wind_velocity_y(self.env.elevation) + """Frontal wind velocity at the surface level. The frontal wind is + defined as the wind blowing in the direction of the rocket's heading. + + Returns + ------- + float + Wind velocity in the frontal direction at the surface level. + """ + wind_u = self.env.wind_velocity_x.get_value_opt(self.env.elevation) + wind_v = self.env.wind_velocity_y.get_value_opt(self.env.elevation) heading_rad = self.heading * np.pi / 180 - frontal_surface_wind = wind_u * np.sin(heading_rad) + wind_v * np.cos( - heading_rad - ) - return frontal_surface_wind + return wind_u * np.sin(heading_rad) + wind_v * np.cos(heading_rad) @cached_property def lateral_surface_wind(self): - # Surface wind magnitude in the lateral direction at the rail's elevation - wind_u = self.env.wind_velocity_x(self.env.elevation) - wind_v = self.env.wind_velocity_y(self.env.elevation) + """Lateral wind velocity at the surface level. The lateral wind is + defined as the wind blowing perpendicular to the rocket's heading. + + Returns + ------- + float + Wind velocity in the lateral direction at the surface level. + """ + wind_u = self.env.wind_velocity_x.get_value_opt(self.env.elevation) + wind_v = self.env.wind_velocity_y.get_value_opt(self.env.elevation) heading_rad = self.heading * np.pi / 180 - lateral_surface_wind = -wind_u * np.cos(heading_rad) + wind_v * np.sin( - heading_rad - ) - return lateral_surface_wind + + return -wind_u * np.cos(heading_rad) + wind_v * np.sin(heading_rad) def udot_rail1(self, t, u, post_processing=False): """Calculates derivative of u state vector with respect to time @@ -1261,11 +1204,6 @@ def udot_rail1(self, t, u, post_processing=False): e0dot, e1dot, e2dot, e3dot, alpha1, alpha2, alpha3]. """ - # Check if post processing mode is on - if post_processing: - # Use u_dot post processing code - return self.u_dot_generalized(t, u, True) - # Retrieve integration data x, y, z, vx, vy, vz, e0, e1, e2, e3, omega1, omega2, omega3 = u @@ -1288,7 +1226,9 @@ def udot_rail1(self, t, u, post_processing=False): R3 = -0.5 * rho * (free_stream_speed**2) * self.rocket.area * (drag_coeff) # Calculate Linear acceleration - a3 = (R3 + thrust) / M - (e0**2 - e1**2 - e2**2 + e3**2) * self.env.gravity(z) + a3 = (R3 + thrust) / M - ( + e0**2 - e1**2 - e2**2 + e3**2 + ) * self.env.gravity.get_value_opt(z) if a3 > 0: ax = 2 * (e1 * e3 + e0 * e2) * a3 ay = 2 * (e2 * e3 - e0 * e1) * a3 @@ -1296,6 +1236,15 @@ def udot_rail1(self, t, u, post_processing=False): else: ax, ay, az = 0, 0, 0 + if post_processing: + self.u_dot_generalized(t, u, post_processing=True) + self.__post_processed_variables[-1][1] = ax + self.__post_processed_variables[-1][2] = ay + self.__post_processed_variables[-1][3] = az + self.__post_processed_variables[-1][4] = 0 + self.__post_processed_variables[-1][5] = 0 + self.__post_processed_variables[-1][6] = 0 + return [vx, vy, vz, ax, ay, az, 0, 0, 0, 0, 0, 0, 0] def udot_rail2(self, t, u, post_processing=False): @@ -1348,8 +1297,7 @@ def u_dot(self, t, u, post_processing=False): # Retrieve integration data x, y, z, vx, vy, vz, e0, e1, e2, e3, omega1, omega2, omega3 = u # Determine lift force and moment - R1, R2 = 0, 0 - M1, M2, M3 = 0, 0, 0 + R1, R2, M1, M2, M3 = 0, 0, 0, 0, 0 # Determine current behavior if t < self.rocket.motor.burn_out_time: # Motor burning @@ -1369,7 +1317,6 @@ def u_dot(self, t, u, post_processing=False): M2 -= self.rocket.thrust_eccentricity_y * thrust else: # Motor stopped - # Retrieve important motor quantities # Inertias Tz, Ti, Tzdot, Tidot = 0, 0, 0, 0 # Mass @@ -1389,7 +1336,7 @@ def u_dot(self, t, u, post_processing=False): # b = -self.rocket.distance_rocket_propellant b = ( -( - self.rocket.center_of_propellant_position(0) + self.rocket.center_of_propellant_position.get_value_opt(0) - self.rocket.center_of_dry_mass_position ) * self.rocket._csys @@ -1413,8 +1360,6 @@ def u_dot(self, t, u, post_processing=False): a33 = 1 - 2 * (e1**2 + e2**2) # Transformation matrix: (123) -> (XYZ) K = [[a11, a12, a13], [a21, a22, a23], [a31, a32, a33]] - # Transformation matrix: (XYZ) -> (123) or K transpose - Kt = [[a11, a21, a31], [a12, a22, a32], [a13, a23, a33]] # Calculate Forces and Moments # Get freestream speed @@ -1435,7 +1380,7 @@ def u_dot(self, t, u, post_processing=False): R3 = -0.5 * rho * (free_stream_speed**2) * self.rocket.area * drag_coeff for air_brakes in self.rocket.air_brakes: if air_brakes.deployment_level > 0: - air_brakes_cd = air_brakes.drag_coefficient( + air_brakes_cd = air_brakes.drag_coefficient.get_value_opt( air_brakes.deployment_level, free_stream_mach ) air_brakes_force = ( @@ -1449,7 +1394,6 @@ def u_dot(self, t, u, post_processing=False): R3 = air_brakes_force # Substitutes rocket drag coefficient else: R3 += air_brakes_force - # R3 += self.__computeDragForce(z, Vector(vx, vy, vz)) # Off center moment M1 += self.rocket.cp_eccentricity_y * R3 M2 -= self.rocket.cp_eccentricity_x * R3 @@ -1490,7 +1434,9 @@ def u_dot(self, t, u, post_processing=False): comp_stream_vz_bn = comp_stream_vz_b / comp_stream_speed if -1 * comp_stream_vz_bn < 1: comp_attack_angle = np.arccos(-comp_stream_vz_bn) - c_lift = aero_surface.cl(comp_attack_angle, free_stream_mach) + c_lift = aero_surface.cl.get_value_opt( + comp_attack_angle, free_stream_mach + ) # component lift force magnitude comp_lift = ( 0.5 * rho * (comp_stream_speed**2) * reference_area * c_lift @@ -1513,14 +1459,14 @@ def u_dot(self, t, u, post_processing=False): * reference_area * 2 * surface_radius - * clf_delta(free_stream_mach) + * clf_delta.get_value_opt(free_stream_mach) * cant_angle_rad ) M3d = ( (1 / 2 * rho * free_stream_speed) * reference_area * (2 * surface_radius) ** 2 - * cld_omega(free_stream_mach) + * cld_omega.get_value_opt(free_stream_mach) * omega3 / 2 ) @@ -1565,7 +1511,7 @@ def u_dot(self, t, u, post_processing=False): (R3 - b * Mt * (alpha2 - omega1 * omega3) + thrust) / M, ] ax, ay, az = np.dot(K, L) - az -= self.env.gravity(z) # Include gravity + az -= self.env.gravity.get_value_opt(z) # Include gravity # Create u_dot u_dot = [ @@ -1585,27 +1531,8 @@ def u_dot(self, t, u, post_processing=False): ] if post_processing: - # Dynamics variables - self.R1_list.append([t, R1]) - self.R2_list.append([t, R2]) - self.R3_list.append([t, R3]) - self.M1_list.append([t, M1]) - self.M2_list.append([t, M2]) - self.M3_list.append([t, M3]) - # Atmospheric Conditions - self.wind_velocity_x_list.append( - [t, self.env.wind_velocity_x.get_value_opt(z)] - ) - self.wind_velocity_y_list.append( - [t, self.env.wind_velocity_y.get_value_opt(z)] - ) - self.density_list.append([t, self.env.density.get_value_opt(z)]) - self.dynamic_viscosity_list.append( - [t, self.env.dynamic_viscosity.get_value_opt(z)] - ) - self.pressure_list.append([t, self.env.pressure.get_value_opt(z)]) - self.speed_of_sound_list.append( - [t, self.env.speed_of_sound.get_value_opt(z)] + self.__post_processed_variables.append( + [t, ax, ay, az, alpha1, alpha2, alpha3, R1, R2, R3, M1, M2, M3] ) return u_dot @@ -1669,11 +1596,8 @@ def u_dot_generalized(self, t, u, post_processing=False): * self.rocket._csys ) S_noz_33 = 0.5 * self.rocket.motor.nozzle_radius**2 - S_noz_11 = 0.5 * S_noz_33 + 0.25 * r_NOZ**2 - S_noz_22 = S_noz_11 - S_noz_12 = 0 - S_noz_13 = 0 - S_noz_23 = 0 + S_noz_11 = S_noz_22 = 0.5 * S_noz_33 + 0.25 * r_NOZ**2 + S_noz_12, S_noz_13, S_noz_23 = 0, 0, 0 S_nozzle = Matrix( [ [S_noz_11, S_noz_12, S_noz_13], @@ -1726,7 +1650,9 @@ def u_dot_generalized(self, t, u, post_processing=False): wind_velocity_y = self.env.wind_velocity_y.get_value_opt(z) wind_velocity = Vector([wind_velocity_x, wind_velocity_y, 0]) free_stream_speed = abs((wind_velocity - Vector(v))) - free_stream_mach = free_stream_speed / self.env.speed_of_sound.get_value_opt(z) + speed_of_sound = self.env.speed_of_sound.get_value_opt(z) + free_stream_mach = free_stream_speed / speed_of_sound + if t < self.rocket.motor.burn_out_time: drag_coeff = self.rocket.power_on_drag.get_value_opt(free_stream_mach) else: @@ -1734,7 +1660,7 @@ def u_dot_generalized(self, t, u, post_processing=False): R3 += -0.5 * rho * (free_stream_speed**2) * self.rocket.area * drag_coeff for air_brakes in self.rocket.air_brakes: if air_brakes.deployment_level > 0: - air_brakes_cd = air_brakes.drag_coefficient( + air_brakes_cd = air_brakes.drag_coefficient.get_value_opt( air_brakes.deployment_level, free_stream_mach ) air_brakes_force = ( @@ -1773,18 +1699,18 @@ def u_dot_generalized(self, t, u, post_processing=False): comp_stream_velocity = comp_wind_vb - comp_vb comp_stream_vx_b, comp_stream_vy_b, comp_stream_vz_b = comp_stream_velocity comp_stream_speed = abs(comp_stream_velocity) - comp_stream_mach = ( - comp_stream_speed / self.env.speed_of_sound.get_value_opt(z) - ) + comp_stream_mach = comp_stream_speed / speed_of_sound # Component attack angle and lift force comp_attack_angle = 0 comp_lift, comp_lift_xb, comp_lift_yb = 0, 0, 0 - if comp_stream_vx_b**2 + comp_stream_vy_b**2 != 0: + if comp_stream_vx_b**2 + comp_stream_vy_b**2 != 0: # TODO: maybe try/except # Normalize component stream velocity in body frame comp_stream_vz_bn = comp_stream_vz_b / comp_stream_speed if -1 * comp_stream_vz_bn < 1: comp_attack_angle = np.arccos(-comp_stream_vz_bn) - c_lift = aero_surface.cl(comp_attack_angle, comp_stream_mach) + c_lift = aero_surface.cl.get_value_opt( + comp_attack_angle, comp_stream_mach + ) # Component lift force magnitude comp_lift = ( 0.5 * rho * (comp_stream_speed**2) * reference_area * c_lift @@ -1807,31 +1733,29 @@ def u_dot_generalized(self, t, u, post_processing=False): * reference_area * 2 * surface_radius - * clf_delta(comp_stream_mach) + * clf_delta.get_value_opt(comp_stream_mach) * cant_angle_rad ) M3d = ( (1 / 2 * rho * comp_stream_speed) * reference_area * (2 * surface_radius) ** 2 - * cld_omega(comp_stream_mach) + * cld_omega.get_value_opt(comp_stream_mach) * omega3 / 2 ) M3 += M3f - M3d except AttributeError: pass - weightB = Kt @ Vector([0, 0, -total_mass * self.env.gravity(z)]) + weightB = Kt @ Vector([0, 0, -total_mass * self.env.gravity.get_value_opt(z)]) T00 = total_mass * r_CM - T03 = ( - 2 * total_mass_dot * (Vector([0, 0, r_NOZ]) - r_CM) - - 2 * total_mass * r_CM_dot - ) + r_NOZ_vector = Vector([0, 0, r_NOZ]) + T03 = 2 * total_mass_dot * (r_NOZ_vector - r_CM) - 2 * total_mass * r_CM_dot T04 = ( - self.rocket.motor.thrust(t) * Vector([0, 0, 1]) + self.rocket.motor.thrust.get_value_opt(t) * Vector([0, 0, 1]) - total_mass * r_CM_ddot - 2 * total_mass_dot * r_CM_dot - + total_mass_ddot * (Vector([0, 0, r_NOZ]) - r_CM) + + total_mass_ddot * (r_NOZ_vector - r_CM) ) T05 = total_mass_dot * S_nozzle - I_dot @@ -1860,27 +1784,8 @@ def u_dot_generalized(self, t, u, post_processing=False): u_dot = [*r_dot, *v_dot, *e_dot, *w_dot] if post_processing: - # Dynamics variables - self.R1_list.append([t, R1]) - self.R2_list.append([t, R2]) - self.R3_list.append([t, R3]) - self.M1_list.append([t, M1]) - self.M2_list.append([t, M2]) - self.M3_list.append([t, M3]) - # Atmospheric Conditions - self.wind_velocity_x_list.append( - [t, self.env.wind_velocity_x.get_value_opt(z)] - ) - self.wind_velocity_y_list.append( - [t, self.env.wind_velocity_y.get_value_opt(z)] - ) - self.density_list.append([t, self.env.density.get_value_opt(z)]) - self.dynamic_viscosity_list.append( - [t, self.env.dynamic_viscosity.get_value_opt(z)] - ) - self.pressure_list.append([t, self.env.pressure.get_value_opt(z)]) - self.speed_of_sound_list.append( - [t, self.env.speed_of_sound.get_value_opt(z)] + self.__post_processed_variables.append( + [t, *v_dot, *w_dot, R1, R2, R3, M1, M2, M3] ) return u_dot @@ -1944,20 +1849,9 @@ def u_dot_parachute(self, t, u, post_processing=False): az = (Dz - 9.8 * mp) / (mp + ma) if post_processing: - # Dynamics variables - self.R1_list.append([t, Dx]) - self.R2_list.append([t, Dy]) - self.R3_list.append([t, Dz]) - self.M1_list.append([t, 0]) - self.M2_list.append([t, 0]) - self.M3_list.append([t, 0]) - # Atmospheric Conditions - self.wind_velocity_x_list.append([t, self.env.wind_velocity_x(z)]) - self.wind_velocity_y_list.append([t, self.env.wind_velocity_y(z)]) - self.density_list.append([t, self.env.density(z)]) - self.dynamic_viscosity_list.append([t, self.env.dynamic_viscosity(z)]) - self.pressure_list.append([t, self.env.pressure(z)]) - self.speed_of_sound_list.append([t, self.env.speed_of_sound(z)]) + self.__post_processed_variables.append( + [t, ax, ay, az, 0, 0, 0, Dx, Dy, Dz, 0, 0, 0] + ) return [vx, vy, vz, ax, ay, az, 0, 0, 0, 0, 0, 0, 0] @@ -1966,11 +1860,29 @@ def solution_array(self): """Returns solution array of the rocket flight.""" return np.array(self.solution) + @property + def function_evaluations_per_time_step(self): + """Get the number of function evaluations per time step. This method + calculates the difference between consecutive function evaluations + during numerical integration and returns it as a list. + + Returns + ------- + list + The list of differences in function evaluations per time step. + """ + return np.diff(self.function_evaluations).tolist() + @cached_property def time(self): """Returns time array from solution.""" return self.solution_array[:, 0] + @cached_property + def time_steps(self): + """Returns time step array.""" + return np.diff(self.time) + def get_solution_at_time(self, t, atol=1e-3): """Returns the solution state vector at a given time. If the time is not found in the solution, the closest time is used and a warning is @@ -2078,51 +1990,51 @@ def w3(self): @funcify_method("Time (s)", "Ax (m/s²)", "spline", "zero") def ax(self): """Rocket x acceleration as a Function of time.""" - return self.retrieve_acceleration_arrays[0] + return self.__evaluate_post_process[:, [0, 1]] @funcify_method("Time (s)", "Ay (m/s²)", "spline", "zero") def ay(self): """Rocket y acceleration as a Function of time.""" - return self.retrieve_acceleration_arrays[1] + return self.__evaluate_post_process[:, [0, 2]] @funcify_method("Time (s)", "Az (m/s²)", "spline", "zero") def az(self): """Rocket z acceleration as a Function of time.""" - return self.retrieve_acceleration_arrays[2] + return self.__evaluate_post_process[:, [0, 3]] @funcify_method("Time (s)", "α1 (rad/s²)", "spline", "zero") def alpha1(self): """Rocket angular acceleration α1 as a Function of time.""" - return self.retrieve_acceleration_arrays[3] + return self.__evaluate_post_process[:, [0, 4]] @funcify_method("Time (s)", "α2 (rad/s²)", "spline", "zero") def alpha2(self): """Rocket angular acceleration α2 as a Function of time.""" - return self.retrieve_acceleration_arrays[4] + return self.__evaluate_post_process[:, [0, 5]] @funcify_method("Time (s)", "α3 (rad/s²)", "spline", "zero") def alpha3(self): """Rocket angular acceleration α3 as a Function of time.""" - return self.retrieve_acceleration_arrays[5] + return self.__evaluate_post_process[:, [0, 6]] # Process third type of outputs - Temporary values @funcify_method("Time (s)", "R1 (N)", "spline", "zero") def R1(self): """Aerodynamic force along the first axis that is perpendicular to the rocket's axis of symmetry as a Function of time.""" - return self.retrieve_temporary_values_arrays[0] + return self.__evaluate_post_process[:, [0, 7]] @funcify_method("Time (s)", "R2 (N)", "spline", "zero") def R2(self): """Aerodynamic force along the second axis that is perpendicular to the rocket's axis of symmetry as a Function of time.""" - return self.retrieve_temporary_values_arrays[1] + return self.__evaluate_post_process[:, [0, 8]] @funcify_method("Time (s)", "R3 (N)", "spline", "zero") def R3(self): """Aerodynamic force along the rocket's axis of symmetry as a Function of time.""" - return self.retrieve_temporary_values_arrays[2] + return self.__evaluate_post_process[:, [0, 9]] @funcify_method("Time (s)", "M1 (Nm)", "spline", "zero") def M1(self): @@ -2130,54 +2042,51 @@ def M1(self): perpendicular to the rocket's axis of symmetry as a Function of time. """ - return self.retrieve_temporary_values_arrays[3] + return self.__evaluate_post_process[:, [0, 10]] @funcify_method("Time (s)", "M2 (Nm)", "spline", "zero") def M2(self): """Aerodynamic bending moment in the same direction as the axis that is perpendicular to the rocket's axis of symmetry as a Function of time.""" - return self.retrieve_temporary_values_arrays[4] + return self.__evaluate_post_process[:, [0, 11]] @funcify_method("Time (s)", "M3 (Nm)", "spline", "zero") def M3(self): """Aerodynamic bending moment in the same direction as the rocket's axis of symmetry as a Function of time.""" - return self.retrieve_temporary_values_arrays[5] + return self.__evaluate_post_process[:, [0, 12]] @funcify_method("Time (s)", "Pressure (Pa)", "spline", "constant") def pressure(self): """Air pressure felt by the rocket as a Function of time.""" - return self.retrieve_temporary_values_arrays[6] + return [(t, self.env.pressure.get_value_opt(z)) for t, z in self.z] @funcify_method("Time (s)", "Density (kg/m³)", "spline", "constant") def density(self): """Air density felt by the rocket as a Function of time.""" - return self.retrieve_temporary_values_arrays[7] + return [(t, self.env.density.get_value_opt(z)) for t, z in self.z] @funcify_method("Time (s)", "Dynamic Viscosity (Pa s)", "spline", "constant") def dynamic_viscosity(self): """Air dynamic viscosity felt by the rocket as a Function of time.""" - return self.retrieve_temporary_values_arrays[8] + return [(t, self.env.dynamic_viscosity.get_value_opt(z)) for t, z in self.z] @funcify_method("Time (s)", "Speed of Sound (m/s)", "spline", "constant") def speed_of_sound(self): - """Speed of sound in the air felt by the rocket as a Function - of time.""" - return self.retrieve_temporary_values_arrays[9] + """Speed of sound in the air felt by the rocket as a Function of time.""" + return [(t, self.env.speed_of_sound.get_value_opt(z)) for t, z in self.z] @funcify_method("Time (s)", "Wind Velocity X (East) (m/s)", "spline", "constant") def wind_velocity_x(self): - """Wind velocity in the X direction (east) as a Function of - time.""" - return self.retrieve_temporary_values_arrays[10] + """Wind velocity in the X direction (east) as a Function of time.""" + return [(t, self.env.wind_velocity_x.get_value_opt(z)) for t, z in self.z] @funcify_method("Time (s)", "Wind Velocity Y (North) (m/s)", "spline", "constant") def wind_velocity_y(self): - """Wind velocity in the y direction (north) as a Function of - time.""" - return self.retrieve_temporary_values_arrays[11] + """Wind velocity in the y direction (north) as a Function of time.""" + return [(t, self.env.wind_velocity_y.get_value_opt(z)) for t, z in self.z] # Process fourth type of output - values calculated from previous outputs @@ -2188,6 +2097,11 @@ def speed(self): """Rocket speed, or velocity magnitude, as a Function of time.""" return (self.vx**2 + self.vy**2 + self.vz**2) ** 0.5 + @property + def out_of_rail_velocity(self): + """Velocity at which the rocket leaves the launch rail.""" + return self.speed.get_value_opt(self.out_of_rail_time) + @cached_property def max_speed_time(self): """Time at which the rocket reaches its maximum speed.""" @@ -2197,7 +2111,7 @@ def max_speed_time(self): @cached_property def max_speed(self): """Maximum speed reached by the rocket.""" - return self.speed(self.max_speed_time) + return self.speed.get_value_opt(self.max_speed_time) # Accelerations @funcify_method("Time (s)", "acceleration Magnitude (m/s²)") @@ -2223,7 +2137,7 @@ def max_acceleration_power_on_time(self): @cached_property def max_acceleration_power_on(self): """Maximum acceleration reached by the rocket during motor burn.""" - return self.acceleration(self.max_acceleration_power_on_time) + return self.acceleration.get_value_opt(self.max_acceleration_power_on_time) @cached_property def max_acceleration_power_off_time(self): @@ -2240,7 +2154,7 @@ def max_acceleration_power_off_time(self): @cached_property def max_acceleration_power_off(self): """Maximum acceleration reached by the rocket after motor burn.""" - return self.acceleration(self.max_acceleration_power_off_time) + return self.acceleration.get_value_opt(self.max_acceleration_power_off_time) @cached_property def max_acceleration_time(self): @@ -2293,13 +2207,13 @@ def attitude_angle(self): attitude_angle = (180 / np.pi) * np.arctan2( self.attitude_vector_z[:, 1], horizontal_attitude_proj[:, 1] ) - attitude_angle = np.column_stack([self.time, attitude_angle]) - return attitude_angle + return np.column_stack([self.time, attitude_angle]) # Lateral Attitude Angle @funcify_method("Time (s)", "Lateral Attitude Angle (°)") def lateral_attitude_angle(self): """Rocket lateral attitude angle as a Function of time.""" + # TODO: complex method, it should be defined elsewhere. lateral_vector_angle = (np.pi / 180) * (self.heading - 90) lateral_vector_x = np.sin(lateral_vector_angle) lateral_vector_y = np.cos(lateral_vector_angle) @@ -2355,24 +2269,17 @@ def theta(self): @funcify_method("Time (s)", "Freestream Velocity X (m/s)", "spline", "constant") def stream_velocity_x(self): """Freestream velocity X component as a Function of time.""" - stream_velocity_x = np.column_stack( - (self.time, self.wind_velocity_x[:, 1] - self.vx[:, 1]) - ) - return stream_velocity_x + return np.column_stack((self.time, self.wind_velocity_x[:, 1] - self.vx[:, 1])) @funcify_method("Time (s)", "Freestream Velocity Y (m/s)", "spline", "constant") def stream_velocity_y(self): """Freestream velocity Y component as a Function of time.""" - stream_velocity_y = np.column_stack( - (self.time, self.wind_velocity_y[:, 1] - self.vy[:, 1]) - ) - return stream_velocity_y + return np.column_stack((self.time, self.wind_velocity_y[:, 1] - self.vy[:, 1])) @funcify_method("Time (s)", "Freestream Velocity Z (m/s)", "spline", "constant") def stream_velocity_z(self): """Freestream velocity Z component as a Function of time.""" - stream_velocity_z = np.column_stack((self.time, -self.vz[:, 1])) - return stream_velocity_z + return np.column_stack((self.time, -self.vz[:, 1])) @funcify_method("Time (s)", "Freestream Speed (m/s)", "spline", "constant") def free_stream_speed(self): @@ -2388,7 +2295,7 @@ def free_stream_speed(self): @cached_property def apogee_freestream_speed(self): """Freestream speed at apogee in m/s.""" - return self.free_stream_speed(self.apogee_time) + return self.free_stream_speed.get_value_opt(self.apogee_time) # Mach Number @funcify_method("Time (s)", "Mach Number", "spline", "zero") @@ -2405,7 +2312,7 @@ def max_mach_number_time(self): @cached_property def max_mach_number(self): """Maximum Mach number.""" - return self.mach_number(self.max_mach_number_time) + return self.mach_number.get_value_opt(self.max_mach_number_time) # Stability Margin @cached_property @@ -2417,7 +2324,7 @@ def max_stability_margin_time(self): @cached_property def max_stability_margin(self): """Maximum stability margin.""" - return self.stability_margin(self.max_stability_margin_time) + return self.stability_margin.get_value_opt(self.max_stability_margin_time) @cached_property def min_stability_margin_time(self): @@ -2428,7 +2335,7 @@ def min_stability_margin_time(self): @cached_property def min_stability_margin(self): """Minimum stability margin.""" - return self.stability_margin(self.min_stability_margin_time) + return self.stability_margin.get_value_opt(self.min_stability_margin_time) @property def initial_stability_margin(self): @@ -2438,7 +2345,7 @@ def initial_stability_margin(self): ------- float """ - return self.stability_margin(self.time[0]) + return self.stability_margin.get_value_opt(self.time[0]) @property def out_of_rail_stability_margin(self): @@ -2448,7 +2355,7 @@ def out_of_rail_stability_margin(self): ------- float """ - return self.stability_margin(self.out_of_rail_time) + return self.stability_margin.get_value_opt(self.out_of_rail_time) # Reynolds Number @funcify_method("Time (s)", "Reynolds Number", "spline", "zero") @@ -2467,7 +2374,7 @@ def max_reynolds_number_time(self): @cached_property def max_reynolds_number(self): """Maximum Reynolds number.""" - return self.reynolds_number(self.max_reynolds_number_time) + return self.reynolds_number.get_value_opt(self.max_reynolds_number_time) # Dynamic Pressure @funcify_method("Time (s)", "Dynamic Pressure (Pa)", "spline", "zero") @@ -2484,7 +2391,7 @@ def max_dynamic_pressure_time(self): @cached_property def max_dynamic_pressure(self): """Maximum dynamic pressure.""" - return self.dynamic_pressure(self.max_dynamic_pressure_time) + return self.dynamic_pressure.get_value_opt(self.max_dynamic_pressure_time) # Total Pressure @funcify_method("Time (s)", "Total Pressure (Pa)", "spline", "zero") @@ -2500,7 +2407,7 @@ def max_total_pressure_time(self): @cached_property def max_total_pressure(self): """Maximum total pressure.""" - return self.total_pressure(self.max_total_pressure_time) + return self.total_pressure.get_value_opt(self.max_total_pressure_time) # Dynamics functions and variables @@ -2543,7 +2450,7 @@ def translational_energy(self): # Redefine total_mass time grid to allow for efficient Function algebra total_mass = deepcopy(self.rocket.total_mass) total_mass.set_discrete_based_on_model(self.vz) - translational_energy = 0.5 * total_mass * (self.vx**2 + self.vy**2 + self.vz**2) + translational_energy = 0.5 * total_mass * (self.speed**2) return translational_energy @funcify_method("Time (s)", "Kinetic Energy (J)", "spline", "zero") @@ -2557,16 +2464,15 @@ def potential_energy(self): """Potential energy as a Function of time in relation to sea level.""" # Constants - GM = 3.986004418e14 + GM = 3.986004418e14 # TODO: this constant should come from Environment. # Redefine total_mass time grid to allow for efficient Function algebra total_mass = deepcopy(self.rocket.total_mass) total_mass.set_discrete_based_on_model(self.z) - potential_energy = ( + return ( GM * total_mass * (1 / (self.z + self.env.earth_radius) - 1 / self.env.earth_radius) ) - return potential_energy # Total Mechanical Energy @funcify_method("Time (s)", "Mechanical Energy (J)", "spline", "constant") @@ -2606,7 +2512,7 @@ def angle_of_attack(self): for i in self.time ] # Define freestream speed list - free_stream_speed = [self.free_stream_speed(i) for i in self.time] + free_stream_speed = [self.free_stream_speed.get_value_opt(i) for i in self.time] free_stream_speed = np.nan_to_num(free_stream_speed) # Normalize dot product @@ -2618,9 +2524,8 @@ def angle_of_attack(self): # Calculate angle of attack and convert to degrees angle_of_attack = np.rad2deg(np.arccos(dot_product_normalized)) - angle_of_attack = np.column_stack([self.time, angle_of_attack]) - return angle_of_attack + return np.column_stack([self.time, angle_of_attack]) # Frequency response and stability variables @funcify_method("Frequency (Hz)", "ω1 Fourier Amplitude", "spline", "zero") @@ -2768,6 +2673,7 @@ def latitude(self): ) return np.column_stack((self.time, latitude)) + # TODO: haversine should be defined in tools.py so we just invoke it in here. @funcify_method("Time (s)", "Longitude (°)", "linear", "constant") def longitude(self): """Rocket longitude coordinate, in degrees, as a Function of @@ -2790,142 +2696,6 @@ def longitude(self): return np.column_stack((self.time, longitude)) - @cached_property - def retrieve_acceleration_arrays(self): - """Retrieve acceleration arrays from the integration scheme - - Parameters - ---------- - - Returns - ------- - ax: list - acceleration in x direction - ay: list - acceleration in y direction - az: list - acceleration in z direction - alpha1: list - angular acceleration in x direction - alpha2: list - angular acceleration in y direction - alpha3: list - angular acceleration in z direction - """ - # Initialize acceleration arrays - ax, ay, az = [[0, 0]], [[0, 0]], [[0, 0]] - alpha1, alpha2, alpha3 = [[0, 0]], [[0, 0]], [[0, 0]] - # Go through each time step and calculate accelerations - # Get flight phases - for phase_index, phase in self.time_iterator(self.FlightPhases): - init_time = phase.t - final_time = self.FlightPhases[phase_index + 1].t - current_derivative = phase.derivative - # Call callback functions - for callback in phase.callbacks: - callback(self) - # Loop through time steps in flight phase - for step in self.solution: # Can be optimized - if init_time < step[0] <= final_time: - # Get derivatives - u_dot = current_derivative(step[0], step[1:]) - # Get accelerations - ax_value, ay_value, az_value = u_dot[3:6] - alpha1_value, alpha2_value, alpha3_value = u_dot[10:] - # Save accelerations - ax.append([step[0], ax_value]) - ay.append([step[0], ay_value]) - az.append([step[0], az_value]) - alpha1.append([step[0], alpha1_value]) - alpha2.append([step[0], alpha2_value]) - alpha3.append([step[0], alpha3_value]) - - return ax, ay, az, alpha1, alpha2, alpha3 - - @cached_property - def retrieve_temporary_values_arrays(self): - """Retrieve temporary values arrays from the integration scheme. - Currently, the following temporary values are retrieved: ``R1`` , ``R2`` - ``R3`` , ``M1`` , ``M2`` , ``M3`` , ``pressure`` , ``density`` , - ``dynamic_viscosity`` , ``speed_of_sound`` . - - Returns - ------- - self.R1_list: list - R1 values. - self.R2_list: list - R2 values. - self.R3_list: list - R3 values are the aerodynamic force values in the rocket's axis - direction. - self.M1_list: list - M1 values. - self.M2_list: list - Aerodynamic bending moment in e2 direction at each time step. - self.M3_list: list - Aerodynamic bending moment in e3 direction at each time step. - self.pressure_list: list - Air pressure at each time step. - self.density_list: list - Air density at each time step. - self.dynamic_viscosity_list: list - Dynamic viscosity at each time step. - self.speed_of_sound_list: list - Speed of sound at each time step. - self.wind_velocity_x_list: list - Wind velocity in x direction at each time step. - self.wind_velocity_y_list: list - Wind velocity in y direction at each time step. - """ - - # Initialize force and atmospheric arrays - self.R1_list = [] - self.R2_list = [] - self.R3_list = [] - self.M1_list = [] - self.M2_list = [] - self.M3_list = [] - self.pressure_list = [] - self.density_list = [] - self.dynamic_viscosity_list = [] - self.speed_of_sound_list = [] - self.wind_velocity_x_list = [] - self.wind_velocity_y_list = [] - - # Go through each time step and calculate forces and atmospheric values - # Get flight phases - for phase_index, phase in self.time_iterator(self.FlightPhases): - init_time = phase.t - final_time = self.FlightPhases[phase_index + 1].t - current_derivative = phase.derivative - # Call callback functions - for callback in phase.callbacks: - callback(self) - # Loop through time steps in flight phase - for step in self.solution: # Can be optimized - if init_time < step[0] <= final_time or ( - init_time == self.t_initial and step[0] == self.t_initial - ): - # Call derivatives in post processing mode - u_dot = current_derivative(step[0], step[1:], post_processing=True) - - temporary_values = [ - self.R1_list, - self.R2_list, - self.R3_list, - self.M1_list, - self.M2_list, - self.M3_list, - self.pressure_list, - self.density_list, - self.dynamic_viscosity_list, - self.speed_of_sound_list, - self.wind_velocity_x_list, - self.wind_velocity_y_list, - ] - - return temporary_values - def get_controller_observed_variables(self): """Retrieve the observed variables related to air brakes from the controllers. If there is only one set of observed variables, it is @@ -2941,7 +2711,7 @@ def get_controller_observed_variables(self): ) @cached_property - def __calculate_rail_button_forces(self): + def __calculate_rail_button_forces(self): # TODO: complex method. """Calculate the forces applied to the rail buttons while rocket is still on the launch rail. It will return 0 if no rail buttons are defined. @@ -3028,7 +2798,7 @@ def __calculate_rail_button_forces(self): rail_button2_shear_force, ) - def _calculate_pressure_signal(self): + def __transform_pressure_signals_lists_to_functions(self): """Calculate the pressure signal from the pressure sensor. It creates a signal_function attribute in the parachute object. Parachute works as a subclass of Rocket class. @@ -3045,17 +2815,40 @@ def _calculate_pressure_signal(self): "Pressure - Without Noise (Pa)", "linear", ) - parachute.noisy_pressure_signal_function = Function( - parachute.noisy_pressure_signal, - "Time (s)", - "Pressure - With Noise (Pa)", - "linear", - ) parachute.noise_signal_function = Function( parachute.noise_signal, "Time (s)", "Pressure Noise (Pa)", "linear" ) + parachute.noisy_pressure_signal_function = ( + parachute.clean_pressure_signal_function + + parachute.noise_signal_function + ) - return None + @cached_property + def __evaluate_post_process(self): + """Evaluate all post-processing variables by running the simulation + again but with the post-processing flag set to True. + + Returns + ------- + np.array + An array containing all post-processed variables evaluated at each + time step. Each element of the array is a list containing: + [t, ax, ay, az, alpha1, alpha2, alpha3, R1, R2, R3, M1, M2, M3] + """ + self.__post_processed_variables = [] + for phase_index, phase in self.time_iterator(self.flight_phases): + init_time = phase.t + final_time = self.flight_phases[phase_index + 1].t + current_derivative = phase.derivative + for callback in phase.callbacks: + callback(self) + for step in self.solution: + if init_time < step[0] <= final_time or ( + init_time == self.t_initial and step[0] == self.t_initial + ): + current_derivative(step[0], step[1:], post_processing=True) + + return np.array(self.__post_processed_variables) def post_process(self, interpolation="spline", extrapolation="natural"): """This method is **deprecated** and is only kept here for backwards @@ -3071,12 +2864,10 @@ def post_process(self, interpolation="spline", extrapolation="natural"): ------- None """ - # Register post processing + # TODO: add a deprecation warning maybe? self.post_processed = True - return None - - def calculate_stall_wind_velocity(self, stall_angle): + def calculate_stall_wind_velocity(self, stall_angle): # TODO: move to utilities """Function to calculate the maximum wind velocity before the angle of attack exceeds a desired angle, at the instant of departing rail launch. Can be helpful if you know the exact stall angle of all aerodynamics @@ -3087,15 +2878,15 @@ def calculate_stall_wind_velocity(self, stall_angle): stall_angle : float Angle, in degrees, for which you would like to know the maximum wind speed before the angle of attack exceeds it + Return ------ None """ v_f = self.out_of_rail_velocity - # Convert angle to radians - theta = self.inclination * 3.14159265359 / 180 - stall_angle = stall_angle * 3.14159265359 / 180 + theta = np.radians(self.inclination) + stall_angle = np.radians(stall_angle) c = (math.cos(stall_angle) ** 2 - math.cos(theta) ** 2) / math.sin( stall_angle @@ -3109,16 +2900,13 @@ def calculate_stall_wind_velocity(self, stall_angle): ** 0.5 ) / 2 - # Convert stall_angle to degrees - stall_angle = stall_angle * 180 / np.pi + stall_angle = np.degrees(stall_angle) print( "Maximum wind velocity at Rail Departure time before angle" + f" of attack exceeds {stall_angle:.3f}°: {w_v:.3f} m/s" ) - return None - - def export_pressures(self, file_name, time_step): + def export_pressures(self, file_name, time_step): # TODO: move out """Exports the pressure experienced by the rocket during the flight to an external file, the '.csv' format is recommended, as the columns will be separated by commas. It can handle flights with or without @@ -3150,12 +2938,12 @@ def export_pressures(self, file_name, time_step): if len(self.rocket.parachutes) == 0: print("No parachutes in the rocket, saving static pressure.") for t in time_points: - file.write(f"{t:f}, {self.pressure(t):.5f}\n") + file.write(f"{t:f}, {self.pressure.get_value_opt(t):.5f}\n") else: for parachute in self.rocket.parachutes: for t in time_points: - p_cl = parachute.clean_pressure_signal_function(t) - p_ns = parachute.noisy_pressure_signal_function(t) + p_cl = parachute.clean_pressure_signal_function.get_value_opt(t) + p_ns = parachute.noisy_pressure_signal_function.get_value_opt(t) file.write(f"{t:f}, {p_cl:.5f}, {p_ns:.5f}\n") # We need to save only 1 parachute data break @@ -3183,6 +2971,7 @@ class attributes which are instances of the Function class. Usage will be exported. Otherwise, linear interpolation is carried out to calculate values at the desired time steps. Example: 0.001. """ + # TODO: we should move this method to outside of class. # Fast evaluation for the most basic scenario if time_step is None and len(variables) == 0: @@ -3241,13 +3030,13 @@ class attributes which are instances of the Function class. Usage try: obj = getattr(self.__class__, variable) variable_function = obj.__get__(self, self.__class__) - except AttributeError: + except AttributeError as exc: raise AttributeError( - "Variable '{}' not found in Flight class".format(variable) - ) + f"Variable '{variable}' not found in Flight class" + ) from exc variable_points = variable_function(time_points) exported_matrix += [variable_points] - exported_header += ", " + variable_function.__outputs__[0] + exported_header += f", {variable_function.__outputs__[0]}" exported_matrix = np.array(exported_matrix).T # Fix matrix orientation @@ -3260,9 +3049,7 @@ class attributes which are instances of the Function class. Usage encoding="utf-8", ) - return - - def export_kml( + def export_kml( # TODO: should be moved out of this class. self, file_name="trajectory.kml", time_step=None, @@ -3310,27 +3097,33 @@ def export_kml( # Open kml file with simplekml library kml = simplekml.Kml(open=1) trajectory = kml.newlinestring(name="Rocket Trajectory - Powered by RocketPy") - coords = [] + if altitude_mode == "relativetoground": # In this mode the elevation data will be the Above Ground Level # elevation. Only works properly if the ground level is similar to # a plane, i.e. it might not work well if the terrain has mountains - for t in time_points: - coords.append( - ( - self.longitude(t), - self.latitude(t), - self.altitude(t), - ) + coords = [ + ( + self.longitude.get_value_opt(t), + self.latitude.get_value_opt(t), + self.altitude.get_value_opt(t), ) + for t in time_points + ] trajectory.coords = coords trajectory.altitudemode = simplekml.AltitudeMode.relativetoground else: # altitude_mode == 'absolute' # In this case the elevation data will be the Above Sea Level elevation # Ensure you use the correct value on self.env.elevation, otherwise # the trajectory path can be offset from ground - for t in time_points: - coords.append((self.longitude(t), self.latitude(t), self.z(t))) + coords = [ + ( + self.longitude.get_value_opt(t), + self.latitude.get_value_opt(t), + self.z.get_value_opt(t), + ) + for t in time_points + ] trajectory.coords = coords trajectory.altitudemode = simplekml.AltitudeMode.absolute # Modify style of trajectory linestring @@ -3342,29 +3135,13 @@ def export_kml( kml.save(file_name) print("File ", file_name, " saved with success!") - return None - def info(self): - """Prints out a summary of the data available about the Flight. - - Returns - ------- - None - """ + """Prints out a summary of the data available about the Flight.""" self.prints.all() - return None def all_info(self): - """Prints out all data and graphs available about the Flight. - - Returns - ------- - None - """ - - # Print a summary of data about the flight + """Prints out all data and graphs available about the Flight.""" self.info() - self.plots.all() return None @@ -3404,7 +3181,7 @@ def display_warning(self, *messages): """A simple function to print a warning message.""" print("WARNING:", *messages) - def add(self, flight_phase, index=None): + def add(self, flight_phase, index=None): # TODO: quite complex method """Add a flight phase to the list. It will be inserted in the correct position, according to its initial time. If no index is provided, it will be appended to the end of the list. If by any @@ -3561,6 +3338,8 @@ class FlightPhase: A flag indicating whether to clear the solution after the phase. """ + # TODO: add a "name" optional argument to the FlightPhase. Really helps. + def __init__(self, t, derivative=None, callbacks=None, clear=True): self.t = t self.derivative = derivative @@ -3568,17 +3347,19 @@ def __init__(self, t, derivative=None, callbacks=None, clear=True): self.clear = clear def __repr__(self): - if self.derivative is None: - return "{Initial Time: " + str(self.t) + " | Derivative: None}" + name = "None" if self.derivative is None else self.derivative.__name__ return ( - "{Initial Time: " - + str(self.t) - + " | Derivative: " - + self.derivative.__name__ - + "}" + f"" ) class TimeNodes: + """TimeNodes is a class that stores all the time nodes of a simulation. + It is meant to work like a python list, but it has some additional + methods that are useful for the simulation. Them items stored in are + TimeNodes object are instances of the TimeNode class. + """ + def __init__(self, init_list=[]): self.list = init_list[:] @@ -3598,20 +3379,19 @@ def add_node(self, t, parachutes, callbacks): self.list.append(self.TimeNode(t, parachutes, callbacks)) def add_parachutes(self, parachutes, t_init, t_end): - # Iterate over parachutes for parachute in parachutes: # Calculate start of sampling time nodes - pcDt = 1 / parachute.sampling_rate + sampling_interval = 1 / parachute.sampling_rate parachute_node_list = [ - self.TimeNode(i * pcDt, [parachute], []) + self.TimeNode(i * sampling_interval, [parachute], []) for i in range( - math.ceil(t_init / pcDt), math.floor(t_end / pcDt) + 1 + math.ceil(t_init / sampling_interval), + math.floor(t_end / sampling_interval) + 1, ) ] self.list += parachute_node_list def add_controllers(self, controllers, t_init, t_end): - # Iterate over controllers for controller in controllers: # Calculate start of sampling time nodes controller_time_step = 1 / controller.sampling_rate @@ -3625,29 +3405,50 @@ def add_controllers(self, controllers, t_init, t_end): self.list += controller_node_list def sort(self): - self.list.sort(key=(lambda node: node.t)) + self.list.sort() def merge(self): - # Initialize temporary list - self.tmp_list = [self.list[0]] - self.copy_list = self.list[1:] - # Iterate through all other time nodes - for node in self.copy_list: - # If there is already another node with similar time: merge - if abs(node.t - self.tmp_list[-1].t) < 1e-7: - self.tmp_list[-1].parachutes += node.parachutes - self.tmp_list[-1].callbacks += node.callbacks - # Add new node to tmp list if there is none with the same time - else: - self.tmp_list.append(node) - # Save tmp list to permanent - self.list = self.tmp_list + """Merge all the time nodes that have the same time. This is made to + avoid multiple evaluations of the same time node. This method does + not guarantee the order of the nodes in the list, so it is + recommended to sort the list before or after using this method. + """ + tmp_dict = {} + for node in self.list: + time = round(node.t, 7) + try: + # Try to access the node and merge if it exists + tmp_dict[time].parachutes += node.parachutes + tmp_dict[time].callbacks += node.callbacks + except KeyError: + # If the node does not exist, add it to the dictionary + tmp_dict[time] = node + self.list = list(tmp_dict.values()) def flush_after(self, index): del self.list[index + 1 :] class TimeNode: + """TimeNode is a class that represents a time node in the time + nodes list. It stores the time, the parachutes and the controllers + that are active at that time. This class is supposed to work + exclusively within the TimeNodes class. + """ + def __init__(self, t, parachutes, controllers): + """Create a TimeNode object. + + Parameters + ---------- + t : float + Initial time of the time node. + parachutes : list[Parachute] + List containing all the parachutes that should be evaluated + at this time node. + controllers : list[_Controller] + List containing all the controllers that should be evaluated + at this time node. + """ self.t = t self.parachutes = parachutes self.callbacks = [] @@ -3655,9 +3456,27 @@ def __init__(self, t, parachutes, controllers): def __repr__(self): return ( - "{Initial Time: " - + str(self.t) - + " | Parachutes: " - + str(len(self.parachutes)) - + "}" + f"" ) + + def __lt__(self, other): + """Allows the comparison of two TimeNode objects based on their + initial time. This is particularly useful for sorting a list of + TimeNode objects. + + Parameters + ---------- + other : TimeNode + Another TimeNode object to compare with. + + Returns + ------- + bool + True if the initial time of the current TimeNode is less + than the initial time of the other TimeNode, False + otherwise. + """ + return self.t < other.t diff --git a/rocketpy/tools.py b/rocketpy/tools.py index ccecffd4f..86ad7f17e 100644 --- a/rocketpy/tools.py +++ b/rocketpy/tools.py @@ -1,3 +1,11 @@ +"""The module rocketpy.tools contains a set of functions that are used +throughout the rocketpy package. These functions are not specific to any +particular class or module, and are used to perform general tasks that are +required by multiple classes or modules. These functions can be modified or +expanded to suit the needs of other modules and may present breaking changes +between minor versions if necessary, although this will be always avoided. +""" + import functools import importlib import importlib.metadata @@ -42,6 +50,139 @@ def tuple_handler(value): raise ValueError("value must be a list or tuple of length 1 or 2.") +def calculate_cubic_hermite_coefficients(x0, x1, y0, yp0, y1, yp1): + """Calculate the coefficients of a cubic Hermite interpolation function. + The function is defined as ax**3 + bx**2 + cx + d. + + Parameters + ---------- + x0 : float + Position of the first point. + x1 : float + Position of the second point. + y0 : float + Value of the function evaluated at the first point. + yp0 : float + Value of the derivative of the function evaluated at the first + point. + y1 : float + Value of the function evaluated at the second point. + yp1 : float + Value of the derivative of the function evaluated at the second + point. + + Returns + ------- + tuple[float, float, float, float] + The coefficients of the cubic Hermite interpolation function. + """ + dx = x1 - x0 + d = float(y0) + c = float(yp0) + b = float((3 * y1 - yp1 * dx - 2 * c * dx - 3 * d) / (dx**2)) + a = float(-(2 * y1 - yp1 * dx - c * dx - 2 * d) / (dx**3)) + return a, b, c, d + + +def find_roots_cubic_function(a, b, c, d): + """Calculate the roots of a cubic function using Cardano's method. + + This method applies Cardano's method to find the roots of a cubic + function of the form ax^3 + bx^2 + cx + d. The roots may be complex + numbers. + + Parameters + ---------- + a : float + Coefficient of the cubic term (x^3). + b : float + Coefficient of the quadratic term (x^2). + c : float + Coefficient of the linear term (x). + d : float + Constant term. + + Returns + ------- + tuple[complex, complex, complex] + A tuple containing the real and complex roots of the cubic function. + Note that the roots may be complex numbers. The roots are ordered + in the tuple as x1, x2, x3. + + References + ---------- + - Cardano's method: https://en.wikipedia.org/wiki/Cubic_function#Cardano's_method + + Examples + -------- + >>> from rocketpy.tools import find_roots_cubic_function + + First we define the coefficients of the function ax**3 + bx**2 + cx + d + >>> a, b, c, d = 1, -3, -1, 3 + >>> x1, x2, x3 = find_roots_cubic_function(a, b, c, d) + >>> x1, x2, x3 + ((-1+0j), (3+7.401486830834377e-17j), (1-1.4802973661668753e-16j)) + + To get the real part of the roots, use the real attribute of the complex + number. + >>> x1.real, x2.real, x3.real + (-1.0, 3.0, 1.0) + """ + delta_0 = b**2 - 3 * a * c + delta_1 = 2 * b**3 - 9 * a * b * c + 27 * d * a**2 + c1 = ((delta_1 + (delta_1**2 - 4 * delta_0**3) ** (0.5)) / 2) ** (1 / 3) + + c2_0 = c1 + x1 = -(1 / (3 * a)) * (b + c2_0 + delta_0 / c2_0) + + c2_1 = c1 * (-1 / 2 + 1j * (3**0.5) / 2) ** 1 + x2 = -(1 / (3 * a)) * (b + c2_1 + delta_0 / c2_1) + + c2_2 = c1 * (-1 / 2 + 1j * (3**0.5) / 2) ** 2 + x3 = -(1 / (3 * a)) * (b + c2_2 + delta_0 / c2_2) + + return x1, x2, x3 + + +def find_root_linear_interpolation(x0, x1, y0, y1, y): + """Calculate the root of a linear interpolation function. + + This method calculates the root of a linear interpolation function + given two points (x0, y0) and (x1, y1) and a value y. The function + is defined as y = m*x + c. + + Parameters + ---------- + x0 : float + Position of the first point. + x1 : float + Position of the second point. + y0 : float + Value of the function evaluated at the first point. + y1 : float + Value of the function evaluated at the second point. + y : float + Value of the function at the desired point. + + Returns + ------- + float + The root of the linear interpolation function. This represents the + value of x at which the function evaluates to y. + + Examples + -------- + >>> from rocketpy.tools import find_root_linear_interpolation + >>> x0, x1, y0, y1, y = 0, 1, 0, 1, 0.5 + >>> x = find_root_linear_interpolation(x0, x1, y0, y1, y) + >>> x + 0.5 + """ + m = (y1 - y0) / (x1 - x0) + c = y0 - m * x0 + return (y - c) / m + + def bilinear_interpolation(x, y, x1, x2, y1, y2, z11, z12, z21, z22): """Bilinear interpolation. It considers the values of the four points around the point to be interpolated and returns the interpolated value. diff --git a/tests/acceptance/test_bella_lui_rocket.py b/tests/acceptance/test_bella_lui_rocket.py index a1297e1fb..0041074a8 100644 --- a/tests/acceptance/test_bella_lui_rocket.py +++ b/tests/acceptance/test_bella_lui_rocket.py @@ -228,10 +228,14 @@ def drogue_trigger(p, h, y): apogee_time_measured = time_kalt[np.argmax(altitude_kalt)] apogee_time_simulated = test_flight.apogee_time - assert ( - abs(max(altitude_kalt) - test_flight.apogee + test_flight.env.elevation) - / max(altitude_kalt) - < 0.015 + apogee_error_threshold = 0.015 + apogee_error = abs( + max(altitude_kalt) - test_flight.apogee + test_flight.env.elevation + ) / max(altitude_kalt) + assert apogee_error < apogee_error_threshold, ( + f"Apogee altitude error exceeded the threshold. " + f"Expected the error to be less than {apogee_error_threshold * 100}%, " + f"but got an error of {apogee_error * 100:.1f}%." ) assert abs(max(velocity_rcp) - max(vert_vel_kalt)) / max(vert_vel_kalt) < 0.06 assert ( diff --git a/tests/test_flight.py b/tests/test_flight.py index 4fb4036eb..d4ed1e114 100644 --- a/tests/test_flight.py +++ b/tests/test_flight.py @@ -618,11 +618,11 @@ def test_max_values(flight_calisto_robust): regarding this pytest fixture. """ test = flight_calisto_robust - atol = 1e-2 - assert pytest.approx(105.2774, abs=atol) == test.max_acceleration_power_on - assert pytest.approx(105.2774, abs=atol) == test.max_acceleration - assert pytest.approx(0.85999, abs=atol) == test.max_mach_number - assert pytest.approx(285.94948, abs=atol) == test.max_speed + rtol = 5e-3 + assert pytest.approx(105.1599, rel=rtol) == test.max_acceleration_power_on + assert pytest.approx(105.1599, rel=rtol) == test.max_acceleration + assert pytest.approx(0.85999, rel=rtol) == test.max_mach_number + assert pytest.approx(285.94948, rel=rtol) == test.max_speed def test_rail_buttons_forces(flight_calisto_custom_wind): diff --git a/tests/test_function.py b/tests/test_function.py index 2ce94f691..6f4122e47 100644 --- a/tests/test_function.py +++ b/tests/test_function.py @@ -102,7 +102,8 @@ def test_setters(func_from_csv, func_2d_from_csv): func_2d_from_csv.set_interpolation("shepard") assert func_2d_from_csv.get_interpolation_method() == "shepard" func_2d_from_csv.set_extrapolation("zero") - assert func_2d_from_csv.get_extrapolation_method() == "zero" + # 2d functions do not support zero extrapolation, must change to natural + assert func_2d_from_csv.get_extrapolation_method() == "natural" @patch("matplotlib.pyplot.show") @@ -181,7 +182,32 @@ def test_extrapolation_methods(linear_func): assert linear_func.get_extrapolation_method() == "constant" assert np.isclose(linear_func.get_value(-1), 0, atol=1e-6) - # Test natural + # Test natural for linear interpolation + linear_func.set_interpolation("linear") + assert isinstance(linear_func.set_extrapolation("natural"), Function) + linear_func.set_extrapolation("natural") + assert isinstance(linear_func.get_extrapolation_method(), str) + assert linear_func.get_extrapolation_method() == "natural" + assert np.isclose(linear_func.get_value(-1), -1, atol=1e-6) + + # Test natural for spline interpolation + linear_func.set_interpolation("spline") + assert isinstance(linear_func.set_extrapolation("natural"), Function) + linear_func.set_extrapolation("natural") + assert isinstance(linear_func.get_extrapolation_method(), str) + assert linear_func.get_extrapolation_method() == "natural" + assert np.isclose(linear_func.get_value(-1), -1, atol=1e-6) + + # Test natural for akima interpolation + linear_func.set_interpolation("akima") + assert isinstance(linear_func.set_extrapolation("natural"), Function) + linear_func.set_extrapolation("natural") + assert isinstance(linear_func.get_extrapolation_method(), str) + assert linear_func.get_extrapolation_method() == "natural" + assert np.isclose(linear_func.get_value(-1), -1, atol=1e-6) + + # Test natural for polynomial interpolation + linear_func.set_interpolation("polynomial") assert isinstance(linear_func.set_extrapolation("natural"), Function) linear_func.set_extrapolation("natural") assert isinstance(linear_func.get_extrapolation_method(), str) diff --git a/tests/unit/test_flight_time_nodes.py b/tests/unit/test_flight_time_nodes.py new file mode 100644 index 000000000..10f6b6c30 --- /dev/null +++ b/tests/unit/test_flight_time_nodes.py @@ -0,0 +1,103 @@ +"""Module to test everything related to the TimeNodes class and it's subclass +TimeNode. +""" + +import pytest + +from rocketpy.rocket import Parachute, _Controller + + +def test_time_nodes_init(flight_calisto): + time_nodes = flight_calisto.TimeNodes() + assert len(time_nodes) == 0 + + +def test_time_nodes_getitem(flight_calisto): + time_nodes = flight_calisto.TimeNodes() + time_nodes.add_node(1.0, [], []) + assert isinstance(time_nodes[0], flight_calisto.TimeNodes.TimeNode) + assert time_nodes[0].t == 1.0 + + +def test_time_nodes_len(flight_calisto): + time_nodes = flight_calisto.TimeNodes() + assert len(time_nodes) == 0 + + +def test_time_nodes_add(flight_calisto): + time_nodes = flight_calisto.TimeNodes() + example_node = flight_calisto.TimeNodes.TimeNode(1.0, [], []) + time_nodes.add(example_node) + assert len(time_nodes) == 1 + assert isinstance(time_nodes[0], flight_calisto.TimeNodes.TimeNode) + assert time_nodes[0].t == 1.0 + + +def test_time_nodes_add_node(flight_calisto): + time_nodes = flight_calisto.TimeNodes() + time_nodes.add_node(2.0, [], []) + assert len(time_nodes) == 1 + assert time_nodes[0].t == 2.0 + assert len(time_nodes[0].parachutes) == 0 + assert len(time_nodes[0].callbacks) == 0 + + +# def test_time_nodes_add_parachutes( +# flight_calisto, calisto_drogue_chute, calisto_main_chute +# ): # TODO: implement this test + + +# def test_time_nodes_add_controllers(flight_calisto): +# TODO: implement this test + + +def test_time_nodes_sort(flight_calisto): + time_nodes = flight_calisto.TimeNodes() + time_nodes.add_node(3.0, [], []) + time_nodes.add_node(1.0, [], []) + time_nodes.add_node(2.0, [], []) + time_nodes.sort() + assert len(time_nodes) == 3 + assert time_nodes[0].t == 1.0 + assert time_nodes[1].t == 2.0 + assert time_nodes[2].t == 3.0 + + +def test_time_nodes_merge(flight_calisto): + time_nodes = flight_calisto.TimeNodes() + time_nodes.add_node(1.0, [], []) + time_nodes.add_node(1.0, [], []) + time_nodes.add_node(2.0, [], []) + time_nodes.merge() + assert len(time_nodes) == 2 + assert time_nodes[0].t == 1.0 + assert len(time_nodes[0].parachutes) == 0 + assert len(time_nodes[0].callbacks) == 0 + assert time_nodes[1].t == 2.0 + assert len(time_nodes[1].parachutes) == 0 + assert len(time_nodes[1].callbacks) == 0 + + +def test_time_nodes_flush_after(flight_calisto): + time_nodes = flight_calisto.TimeNodes() + time_nodes.add_node(1.0, [], []) + time_nodes.add_node(2.0, [], []) + time_nodes.add_node(3.0, [], []) + time_nodes.flush_after(1) + assert len(time_nodes) == 2 + assert time_nodes[0].t == 1.0 + assert time_nodes[1].t == 2.0 + + +def test_time_node_init(flight_calisto): + node = flight_calisto.TimeNodes.TimeNode(1.0, [], []) + assert node.t == 1.0 + assert len(node.parachutes) == 0 + assert len(node.callbacks) == 0 + + +def test_time_node_lt(flight_calisto): + node1 = flight_calisto.TimeNodes.TimeNode(1.0, [], []) + node2 = flight_calisto.TimeNodes.TimeNode(2.0, [], []) + assert node1 < node2 + assert not node2 < node1 diff --git a/tests/unit/test_function.py b/tests/unit/test_function.py index 8bcefb818..17da59498 100644 --- a/tests/unit/test_function.py +++ b/tests/unit/test_function.py @@ -306,3 +306,31 @@ def test_remove_outliers_iqr(x, y, expected_x, expected_y): assert filtered_func.__interpolation__ == func.__interpolation__ assert filtered_func.__extrapolation__ == func.__extrapolation__ assert filtered_func.title == func.title + + +def test_set_get_value_opt(): + """Test the set_value_opt and get_value_opt methods of the Function class.""" + func = Function(lambda x: x**2) + func.source = np.array([[1, 1], [2, 4], [3, 9], [4, 16], [5, 25]]) + func.x_array = np.array([1, 2, 3, 4, 5]) + func.y_array = np.array([1, 4, 9, 16, 25]) + func.x_initial = 1 + func.x_final = 5 + func.set_interpolation("linear") + func.set_get_value_opt() + assert func.get_value_opt(2.5) == 6.5 + + +def test_get_image_dim(linear_func): + """Test the get_img_dim method of the Function class.""" + assert linear_func.get_image_dim() == 1 + + +def test_get_domain_dim(linear_func): + """Test the get_domain_dim method of the Function class.""" + assert linear_func.get_domain_dim() == 1 + + +def test_bool(linear_func): + """Test the __bool__ method of the Function class.""" + assert bool(linear_func) == True diff --git a/tests/unit/test_tools.py b/tests/unit/test_tools.py new file mode 100644 index 000000000..75a526aac --- /dev/null +++ b/tests/unit/test_tools.py @@ -0,0 +1,46 @@ +import numpy as np + +from rocketpy.tools import ( + calculate_cubic_hermite_coefficients, + find_roots_cubic_function, +) + + +def test_calculate_cubic_hermite_coefficients(): + """Test the calculate_cubic_hermite_coefficients method of the Function class.""" + # Function: f(x) = x**3 + 2x**2 -1 ; derivative: f'(x) = 3x**2 + 4x + x = np.array([-3, -2, -1, 0, 1]) + y = np.array([-10, -1, 0, -1, 2]) + + # Selects two points as x0 and x1 + x0, x1 = 0, 1 + y0, y1 = -1, 2 + yp0, yp1 = 0, 7 + + a, b, c, d = calculate_cubic_hermite_coefficients(x0, x1, y0, yp0, y1, yp1) + + assert np.isclose(a, 1) + assert np.isclose(b, 2) + assert np.isclose(c, 0) + assert np.isclose(d, -1) + assert np.allclose( + a * x**3 + b * x**2 + c * x + d, + y, + ) + + +def test_cardanos_root_finding(): + """Tests the find_roots_cubic_function method of the Function class.""" + # Function: f(x) = x**3 + 2x**2 -1 + # roots: (-1 - 5**0.5) / 2; -1; (-1 + 5**0.5) / 2 + + roots = list(find_roots_cubic_function(a=1, b=2, c=0, d=-1)) + roots.sort(key=lambda x: x.real) + + assert np.isclose(roots[0].real, (-1 - 5**0.5) / 2) + assert np.isclose(roots[1].real, -1) + assert np.isclose(roots[2].real, (-1 + 5**0.5) / 2) + + assert np.isclose(roots[0].imag, 0) + assert np.isclose(roots[1].imag, 0) + assert np.isclose(roots[2].imag, 0) diff --git a/tests/unit/test_utilities.py b/tests/unit/test_utilities.py index b07064906..33942b445 100644 --- a/tests/unit/test_utilities.py +++ b/tests/unit/test_utilities.py @@ -153,7 +153,7 @@ def test_fin_flutter_analysis(flight_calisto_custom_wind): assert np.isclose(flutter_mach(np.inf), 1.0048188594647927, atol=5e-3) assert np.isclose(safety_factor(0), 64.78797, atol=5e-3) assert np.isclose(safety_factor(10), 2.1948620401502072, atol=5e-3) - assert np.isclose(safety_factor(np.inf), 61.64222220469224, atol=5e-3) + assert np.isclose(safety_factor(np.inf), 61.64222220697017, atol=5e-3) def test_flutter_prints(flight_calisto_custom_wind):