diff --git a/requirements.txt b/requirements.txt index e5e39aa..cee9195 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,14 +4,6 @@ scikit-learn matplotlib seaborn scipy -collections -random skrebate==0.7 -os -time -copy -math -ast networkx -itertools -pickle + diff --git a/src/skheros/heros.py b/src/skheros/heros.py index 7be901e..4fe3dd1 100644 --- a/src/skheros/heros.py +++ b/src/skheros/heros.py @@ -670,7 +670,7 @@ def offspring_improves(self, offspring_list): - def predict_explanation(self, x, feature_names, whole_rule_pop=False, target_model=0): + def predict_explanation(self, x, feature_names, whole_rule_pop=False, target_model=0, verbose=True): """ Applies model to predict a single instance outcome with full explanation of prediction. """ # Data point checks ************************ for value in x: @@ -687,6 +687,7 @@ def predict_explanation(self, x, feature_names, whole_rule_pop=False, target_mod outcome_proba = prediction.get_prediction_proba_dictionary() outcome_coverage = prediction.get_if_covered() match_set = self.rule_population.match_set + rule_source = self.rule_population.pop_set self.rule_population.clear_sets() else: self.model_population.get_target_model(target_model) @@ -697,47 +698,139 @@ def predict_explanation(self, x, feature_names, whole_rule_pop=False, target_mod outcome_proba = prediction.get_prediction_proba_dictionary() outcome_coverage = prediction.get_if_covered() match_set = self.model_population.match_set + rule_source = self.model_population.target_rule_set self.model_population.clear_sets() # Technical Report of Matching Rules ------------------------------------------ - print("PREDICTION REPORT ------------------------------------------------------------------") - print("Outcome Prediction: "+str(outcome_prediction)) - print("Model Prediction Probabilities: "+ str(outcome_proba)) - if outcome_coverage == 0: - print("Instance Covered by Model: No") - else: - print("Instance Covered by Model: Yes") - print("Number of Matching Rules: "+str(len(match_set))) - # TECHNICAL RULE REPORT - #for rule_index in match_set: - # self.model_population.target_rule_set[rule_index].display_key_rule_info() - print("PREDICTION EXPLANATION -------------------------------------------------------------") - if prediction.majority_class_selection_made: - print("Majority class selected since there is probability tie among matching rules, but there is a training majority class") - if prediction.random_selection_made: - print("Random class selected since there is probability tie among matching rules, but no training majority class") + if verbose: + print("PREDICTION REPORT ------------------------------------------------------------------") + print("Outcome Prediction: "+str(outcome_prediction)) + print("Model Prediction Probabilities: "+ str(outcome_proba)) + if outcome_coverage == 0: + print("Instance Covered by Model: No") + else: + print("Instance Covered by Model: Yes") + print("Number of Matching Rules: "+str(len(match_set))) + # TECHNICAL RULE REPORT + #for rule_index in match_set: + # self.model_population.target_rule_set[rule_index].display_key_rule_info() + print("PREDICTION EXPLANATION -------------------------------------------------------------") + if prediction.majority_class_selection_made: + print("Majority class selected since there is probability tie among matching rules, but there is a training majority class") + if prediction.random_selection_made: + print("Random class selected since there is probability tie among matching rules, but no training majority class") if len(match_set) > 0: # Sort match set for intuitive ordering - match_set = sorted(match_set, key=lambda i: (self.model_population.target_rule_set[i].numerosity, self.model_population.target_rule_set[i].correct_cover), reverse=True) + match_set = sorted(match_set, key=lambda i: (rule_source[i].numerosity, rule_source[i].correct_cover), reverse=True) # Give explanations for matching rules - print("Supporting Rules: --------------------") + if verbose: + print("Supporting Rules: --------------------") for rule_index in match_set: - if str(self.model_population.target_rule_set[rule_index].action) == str(prediction.prediction): - self.model_population.target_rule_set[rule_index].translate_rule(feature_names,self) - print("Contradictory Rules: -----------------") + if str(rule_source[rule_index].action) == str(prediction.prediction): + if verbose: + rule_source[rule_index].translate_rule(feature_names,self) + if verbose: + print("Contradictory Rules: -----------------") counter = 0 for rule_index in match_set: - if str(self.model_population.target_rule_set[rule_index].action) != str(prediction.prediction): - self.model_population.target_rule_set[rule_index].translate_rule(feature_names,self) + if str(rule_source[rule_index].action) != str(prediction.prediction): + if verbose: + rule_source[rule_index].translate_rule(feature_names,self) counter += 1 - if counter == 0: + if counter == 0 and verbose: print("No contradictory rules matched.") else: # No matching rules - if prediction.random_selection_made: - print("Random class selected since there are no matching rules and no training majority class") + if verbose: + if prediction.random_selection_made: + print("Random class selected since there are no matching rules and no training majority class") + else: + print("Majority class selected since there are no matching rules, but there is a training majority class") + + # Build and return structured explanation for programmatic use + features_view = [ + { + "feature_index": idx, + "feature_name": feature_names[idx], + "value": x[idx] + } + for idx in range(len(x)) + ] + + supporting_rules = [] + contradictory_rules = [] + per_rule_contributions = [] + for rule_index in match_set: + rule_obj = rule_source[rule_index] + rule_dict = rule_obj.to_explanation_dict(feature_names, self) + # compute this rule's weighted vote contribution (classification only) + vote_contrib = {} + if hasattr(rule_obj, 'instance_outcome_prop') and isinstance(outcome_proba, dict): + for cls, prob in rule_obj.instance_outcome_prop.items(): + vote_contrib[cls] = prob * rule_obj.numerosity + rule_dict["vote_contribution"] = vote_contrib + rule_dict["selected_action_matches_prediction"] = (str(rule_obj.action) == str(outcome_prediction)) + if str(rule_obj.action) == str(outcome_prediction): + supporting_rules.append(rule_dict) else: - print("Majority class selected since there are no matching rules, but there is a training majority class") - + contradictory_rules.append(rule_dict) + per_rule_contributions.append({ + "rule_id": getattr(rule_obj, "ID", None), + "numerosity": rule_obj.numerosity, + "action": rule_obj.action, + "vote_contribution": vote_contrib + }) + + selection_reason = None + if prediction.majority_class_selection_made and len(match_set) > 0: + selection_reason = "tie_break_by_training_majority" + elif prediction.random_selection_made and len(match_set) > 0: + selection_reason = "tie_break_random" + elif prediction.random_selection_made and len(match_set) == 0: + selection_reason = "no_matching_rules_random" + elif not prediction.random_selection_made and not prediction.majority_class_selection_made and len(match_set) == 0: + selection_reason = "no_matching_rules_training_majority" + + structured = { + "outcome_prediction": outcome_prediction, + "prediction_probabilities": outcome_proba, + "covered": bool(outcome_coverage), + "num_matching_rules": len(match_set), + "whole_rule_population": bool(whole_rule_pop), + "target_model_index": int(target_model) if not whole_rule_pop else None, + "selection_reason": selection_reason, + "algorithm": { + "outcome_type": self.outcome_type, + "classes": list(self.env.classes) if hasattr(self.env, 'classes') else None, + "voting_scheme": "whole_population" if whole_rule_pop else "top_model_rule_set", + "numerosity_sum": getattr(prediction, 'numerosity_sum', None), + "tie_breaking": { + "majority_class": bool(getattr(prediction, 'majority_class_selection_made', False)), + "random": bool(getattr(prediction, 'random_selection_made', False)) + } + }, + "features": features_view, + "supporting_rules": supporting_rules, + "contradictory_rules": contradictory_rules, + "per_rule_contributions": per_rule_contributions, + "match_set_rule_ids": [getattr(rule_source[i], 'ID', None) for i in match_set] + } + + # A short narrative for user-facing explanation layers + try: + num_support = len(supporting_rules) + num_contra = len(contradictory_rules) + coverage_text = "covered" if structured["covered"] else "not covered" + tie_text = " with tie broken by training majority" if structured["algorithm"]["tie_breaking"]["majority_class"] else (" with random tie-break" if structured["algorithm"]["tie_breaking"]["random"] else "") + narrative = ( + "Instance is "+coverage_text+" by "+str(structured["num_matching_rules"]) + + " rule(s); " + str(num_support) + " support the predicted class '"+str(outcome_prediction)+"' and " + + str(num_contra) + " contradict. Prediction made via " + structured["algorithm"]["voting_scheme"] + tie_text + "." + ) + structured["narrative"] = narrative + except Exception: + structured["narrative"] = None + + return structured def predict(self, X, whole_rule_pop=False, target_model=0, rule_pop_iter=None, model_pop_iter=None): """Scikit-learn required: Apply trained model to predict outcomes of instances. diff --git a/src/skheros/methods/rule.py b/src/skheros/methods/rule.py index ab006cc..eb82c7b 100644 --- a/src/skheros/methods/rule.py +++ b/src/skheros/methods/rule.py @@ -39,7 +39,7 @@ def __init__(self,heros): self.ave_match_set_size = 1 #average size of the match sets in which this rule was included across all training instances - used in deletion to promote niching self.deletion_prob = None #probability of rule being selected for deletion self.prediction = None - #self.encoding = None + self.encoding = None def __eq__(self, other): return isinstance(other, RULE) and self.ID == other.ID @@ -821,6 +821,62 @@ def translate_rule(self,feature_names,heros): translation += " THEN: predict outcome '"+str(self.action)+"' with "+str(100 * self.instance_outcome_prop[self.action])+"% confidence based on "+str(self.match_cover)+' matching training instances ('+str(round(100*self.match_cover /float(heros.env.num_instances),2))+"% of training instances)." print(translation) + def to_explanation_dict(self, feature_names, heros): + """Return a structured, LLM-friendly explanation of this rule. + + The structure includes both machine- and human-readable fields for conditions and + key rule statistics useful for downstream reasoning layers. + """ + # Ensure deterministic ordering of conditions + self.order_rule_conditions() + conditions = [] + for i in range(len(self.condition_indexes)): + feature_index = self.condition_indexes[i] + value = self.condition_values[i] + is_categorical = (heros.env.feat_types[feature_index] == 1) + if is_categorical: + human = { + "text": str(feature_names[feature_index])+" = "+str(value) + } + cond = { + "feature_index": feature_index, + "feature_name": feature_names[feature_index], + "type": "categorical", + "operator": "=", + "value": value, + "human_readable": human["text"] + } + else: + # quantitative range (min, max) + range_min, range_max = value[0], value[1] + human_text = str(feature_names[feature_index])+" in ["+str(range_min)+", "+str(range_max)+"]" + cond = { + "feature_index": feature_index, + "feature_name": feature_names[feature_index], + "type": "quantitative", + "operator": "in_range", + "min": range_min, + "max": range_max, + "human_readable": human_text + } + conditions.append(cond) + + explanation = { + "rule_id": getattr(self, "ID", None), + "action": self.action, + "instance_outcome_proportions": dict(self.instance_outcome_prop) if hasattr(self, "instance_outcome_prop") else {}, + "numerosity": self.numerosity, + "fitness": self.fitness, + "accuracy": self.accuracy, + "match_cover": self.match_cover, + "correct_cover": self.correct_cover, + "average_match_set_size": self.ave_match_set_size, + "deletion_probability": self.deletion_prob, + "birth_iteration": self.birth_iteration, + "conditions": conditions + } + return explanation + def order_rule_conditions(self): """ Order the rule conditions by increasing feature index; keeping the ordering consistent between condition_indexes and condition_values.""" diff --git a/src/skheros/methods/rule_population.py b/src/skheros/methods/rule_population.py index 7f9d9d9..2fe50f8 100644 --- a/src/skheros/methods/rule_population.py +++ b/src/skheros/methods/rule_population.py @@ -1,7 +1,7 @@ import copy import pandas as pd import ast -from .rule import RULE +from skheros.methods.rule import RULE import seaborn as sns import matplotlib.pyplot as plt from scipy.cluster.hierarchy import linkage#, dendrogram, leaves_list @@ -12,6 +12,7 @@ from sklearn.ensemble import RandomForestClassifier from sklearn.tree import _tree, DecisionTreeClassifier +from sklearn.preprocessing import OneHotEncoder from collections import Counter from sklearn import tree as sktree from matplotlib.table import Table @@ -47,22 +48,10 @@ def clear_explored_rules(self): def rule_exists(self, target_rule, heros): """Checks the explored rules list to see if a given 'new' rule has been previously discovered and evaluated, returning that rule's reference in explored rules.""" - #print('test') - #print(target_rule.condition_indexes) - #print(target_rule.condition_values) - - for rule_summary in self.explored_rules: - if self.equals(target_rule,rule_summary): - #print(rule_summary[0]) - #print(rule_summary[1]) - return rule_summary - return None - - """encoded = target_rule.encode_rule_binary(heros.env.num_feat) + encoded = target_rule.encode_rule_binary(heros.env.num_feat) if encoded in self.explored_rules: return self.decode_rule_binary(encoded, heros.env.num_feat) - - return None""" + return None @@ -632,6 +621,108 @@ def tree_init_population(self, X, y, heros, random, np, verbose = False): {"n_estimators": 15, "max_depth": 6, "random_state": heros.random_state}, {"n_estimators": 15, "max_depth": None, "random_state": heros.random_state}, ] + + # STEP 2.5: One Hot Encode the categorical features for decision tree training + # Decision trees need quantitative features, so we one-hot encode categorical features temporarily + original_X = X.copy() if hasattr(X, 'copy') else X + onehot_mapping = {} # Maps one-hot encoded feature index -> (original_feat_idx, categorical_value) + reverse_onehot_mapping = {} # Maps (original_feat_idx, categorical_value) -> one-hot encoded feature index + quant_feat_mapping = {} # Maps encoded quantitative feature index -> original feature index + + if heros.cat_feature_indexes is not None and len(heros.cat_feature_indexes) > 0: + print(f"\nOne-hot encoding {len(heros.cat_feature_indexes)} categorical features...") + + # Convert X to numpy array if it's a DataFrame + if hasattr(X, 'values'): + X_array = X.values + X_is_dataframe = True + X_columns = list(X.columns) + else: + X_array = np.array(X) + X_is_dataframe = False + X_columns = None + + # Separate categorical and quantitative features + cat_feat_indexes = sorted(heros.cat_feature_indexes) + quant_feat_indexes = sorted([i for i in range(X_array.shape[1]) if i not in cat_feat_indexes]) + + # Extract categorical and quantitative columns + cat_data = X_array[:, cat_feat_indexes] + quant_data = X_array[:, quant_feat_indexes] if quant_feat_indexes else None + + # One-hot encode categorical features + onehot_encoder = OneHotEncoder(sparse_output=False, drop=None, handle_unknown='ignore') + cat_onehot = onehot_encoder.fit_transform(cat_data) + + # Build mapping: one-hot encoded feature index -> (original_feat_idx, categorical_value) + # Also build mapping for quantitative features: encoded_idx -> original_idx + if quant_feat_indexes: + for encoded_idx, orig_idx in enumerate(quant_feat_indexes): + quant_feat_mapping[encoded_idx] = orig_idx + + # Build one-hot mapping + current_onehot_idx = 0 + num_quant = len(quant_feat_indexes) if quant_feat_indexes else 0 + + for cat_col_idx, orig_cat_idx in enumerate(cat_feat_indexes): + # Get the categories from the encoder (in order) + if hasattr(onehot_encoder, 'categories_'): + encoder_categories = onehot_encoder.categories_[cat_col_idx] + else: + # Fallback: use unique values from data + encoder_categories = np.unique(cat_data[:, cat_col_idx]) + + # Find the start index for this categorical feature's one-hot columns + # Count how many one-hot columns come before this feature + onehot_start_idx = 0 + for prev_cat_idx in cat_feat_indexes: + if prev_cat_idx == orig_cat_idx: + break + prev_cat_col_idx = cat_feat_indexes.index(prev_cat_idx) + if hasattr(onehot_encoder, 'categories_'): + onehot_start_idx += len(onehot_encoder.categories_[prev_cat_col_idx]) + else: + onehot_start_idx += len(np.unique(cat_data[:, prev_cat_col_idx])) + + # Map each one-hot column for this categorical feature + for cat_val_idx, cat_val in enumerate(encoder_categories): + onehot_feat_idx = num_quant + onehot_start_idx + cat_val_idx + onehot_mapping[onehot_feat_idx] = (orig_cat_idx, cat_val) + reverse_onehot_mapping[(orig_cat_idx, cat_val)] = onehot_feat_idx + + # Combine quantitative and one-hot encoded features + if quant_data is not None: + X_encoded = np.hstack([quant_data, cat_onehot]) + else: + X_encoded = cat_onehot + + # Convert back to DataFrame if original was DataFrame + if X_is_dataframe: + # Create new column names + new_columns = [] + if quant_feat_indexes: + new_columns.extend([X_columns[i] for i in quant_feat_indexes]) + for orig_cat_idx in cat_feat_indexes: + cat_values = heros.env.feat_c_values[orig_cat_idx] + if hasattr(onehot_encoder, 'categories_'): + encoder_categories = onehot_encoder.categories_[cat_feat_indexes.index(orig_cat_idx)] + else: + encoder_categories = np.unique(cat_data[:, cat_feat_indexes.index(orig_cat_idx)]) + for cat_val in encoder_categories: + new_columns.append(f"{X_columns[orig_cat_idx]}_{cat_val}") + X = pd.DataFrame(X_encoded, columns=new_columns, index=X.index if hasattr(X, 'index') else None) + else: + X = X_encoded + + print(f" Original features: {X_array.shape[1]}, After one-hot encoding: {X.shape[1]}") + print(f" One-hot mapping created for {len(onehot_mapping)} encoded features") + print(f" Quantitative feature mapping: {len(quant_feat_mapping)} features") + else: + print("\nNo categorical features to encode.") + onehot_mapping = {} + reverse_onehot_mapping = {} + quant_feat_mapping = {} + rf_models = [] tree_depths_by_rf = [] for idx, params in enumerate(rf_settings): @@ -677,7 +768,7 @@ def print_rf_training_summary(rf_models): all_rules = [] branch_paths = [] - def recurse_tree(tree, node_id, path, rules, branch_paths=None): + def recurse_tree(tree, node_id, path, rules, branch_paths=None, onehot_mapping=None): if tree.children_left[node_id] == _tree.TREE_LEAF: condition_indexes = [] condition_values = [] @@ -693,13 +784,13 @@ def recurse_tree(tree, node_id, path, rules, branch_paths=None): left_id = tree.children_left[node_id] feat_idx = tree.feature[node_id] threshold = tree.threshold[node_id] - recurse_tree(tree, left_id, path + [(feat_idx, threshold, 'leq')], rules, branch_paths) + recurse_tree(tree, left_id, path + [(feat_idx, threshold, 'leq')], rules, branch_paths, onehot_mapping) right_id = tree.children_right[node_id] - recurse_tree(tree, right_id, path + [(feat_idx, threshold, 'gt')], rules, branch_paths) + recurse_tree(tree, right_id, path + [(feat_idx, threshold, 'gt')], rules, branch_paths, onehot_mapping) for rf in rf_models: for estimator in rf.estimators_: - recurse_tree(estimator.tree_, 0, [], all_rules, branch_paths) + recurse_tree(estimator.tree_, 0, [], all_rules, branch_paths, onehot_mapping) print(f"Total raw rules extracted: {len(all_rules)}") @@ -768,28 +859,78 @@ def print_rule_extraction_summary(all_rules, unique_rules): # STEP 5: Convert rules to HEROS format, check for redundancy, and add to population print("\nConverting extracted rules to HEROS format and checking for redundancy...") - def convert_path_to_minmax(condition_indexes, condition_values): - """Convert a list of (direction, threshold) for each feature into a min/max interval for HEROS.""" - minmax_dict = {} + def convert_path_to_minmax(condition_indexes, condition_values, onehot_mapping, quant_feat_mapping, heros): + """Convert a list of (direction, threshold) for each feature into HEROS format. + Handles both quantitative features (min/max ranges) and categorical features (equality checks). + Maps one-hot encoded features back to original categorical features.""" + minmax_dict = {} # For quantitative features: {orig_feat_idx: [min, max]} + categorical_dict = {} # For categorical features: {orig_feat_idx: set of values} + for idx, (direction, threshold) in zip(condition_indexes, condition_values): - if idx not in minmax_dict: - minmax_dict[idx] = [float('-inf'), float('inf')] - if direction == 'leq': - minmax_dict[idx][1] = min(minmax_dict[idx][1], threshold) - elif direction == 'gt': - minmax_dict[idx][0] = max(minmax_dict[idx][0], np.nextafter(threshold, threshold+1)) + # Check if this is a one-hot encoded feature + if idx in onehot_mapping: + # This is a one-hot encoded categorical feature + orig_feat_idx, cat_value = onehot_mapping[idx] + + # For one-hot encoding: features are binary (0 or 1) + # Threshold is typically 0.5 + # If direction is 'gt' and threshold <= 0.5, it means the one-hot feature is 1 (category IS present) + # If direction is 'leq' and threshold < 0.5, it means the one-hot feature is 0 (category NOT present) + if direction == 'gt' and threshold <= 0.5: + # This branch means the one-hot feature is 1, so the category IS present + if orig_feat_idx not in categorical_dict: + categorical_dict[orig_feat_idx] = set() + categorical_dict[orig_feat_idx].add(cat_value) + # If direction is 'leq' and threshold < 0.5, the category is NOT present (we ignore it) + # Note: We only add categories that are explicitly present (value = 1) + else: + # This is a quantitative feature + # Map encoded index back to original index + if quant_feat_mapping and idx in quant_feat_mapping: + orig_idx = quant_feat_mapping[idx] + else: + # No one-hot encoding was done, so index is already original + orig_idx = idx + + if orig_idx not in minmax_dict: + minmax_dict[orig_idx] = [float('-inf'), float('inf')] + if direction == 'leq': + minmax_dict[orig_idx][1] = min(minmax_dict[orig_idx][1], threshold) + elif direction == 'gt': + minmax_dict[orig_idx][0] = max(minmax_dict[orig_idx][0], np.nextafter(threshold, threshold+1)) + + # Build final condition lists clean_indexes = [] clean_values = [] + + # Add quantitative features for idx in sorted(minmax_dict.keys()): min_val, max_val = minmax_dict[idx] if min_val <= max_val: clean_indexes.append(idx) clean_values.append([min_val, max_val]) + + # Add categorical features + # For categorical features, we need to check if all one-hot conditions for a feature point to the same value + for orig_feat_idx in sorted(categorical_dict.keys()): + cat_values = categorical_dict[orig_feat_idx] + # If only one value is in the set, that's the categorical condition + if len(cat_values) == 1: + clean_indexes.append(orig_feat_idx) + clean_values.append(list(cat_values)[0]) # Single categorical value, not a range + # If multiple values, we might need to handle this differently + # For now, we'll take the first one (though this might not be correct) + elif len(cat_values) > 1: + # Multiple categories for same feature - this shouldn't happen in a valid tree path + # But if it does, we'll use the first one + clean_indexes.append(orig_feat_idx) + clean_values.append(list(cat_values)[0]) + return clean_indexes, clean_values for rule_data in unique_rules: raw_condition_indexes, raw_condition_values, action = rule_data - condition_indexes, condition_values = convert_path_to_minmax(raw_condition_indexes, raw_condition_values) + condition_indexes, condition_values = convert_path_to_minmax(raw_condition_indexes, raw_condition_values, onehot_mapping, quant_feat_mapping, heros) if len(condition_indexes) == 0: continue #print(f"Condition Indexes: {condition_indexes}") diff --git a/src/test_heros.py b/src/test_heros.py index 664dba5..cfc1145 100644 --- a/src/test_heros.py +++ b/src/test_heros.py @@ -98,7 +98,7 @@ def test_mixed_feature_types(): print(ek) heros = HEROS(outcome_type='class',iterations=20000,pop_size=500,cross_prob=0.8,mut_prob=0.04,nu=1,beta=0.2,theta_sel=0.5, fitness_function='pareto',subsumption='both',rsl=0,feat_track=None, model_iterations=40, - model_pop_size=100,model_pop_init='target_acc',new_gen=1.0,merge_prob=0.1,rule_pop_init=None,compaction='sub', + model_pop_size=100,model_pop_init='target_acc',new_gen=1.0,merge_prob=0.1,rule_pop_init="dt",compaction='sub', track_performance=1000,model_tracking=True,stored_rule_iterations=None,stored_model_iterations=None,random_state=42,verbose=True) heros = heros.fit(X, y, None, cat_feat_indexes=cat_feat_indexes, ek=ek) #Select best model from the front @@ -193,8 +193,8 @@ def test_quantitative_outcome(): if __name__ == "__main__": - test_6mux() - test_na() + #test_6mux() + #test_na() test_mixed_feature_types() test_mixed_feature_types_na() test_multiclass()