Skip to content

Kernel-ML/sparkfeaturekit

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

SparkFeatureKit

SparkFeatureKit is a production-quality Python library for feature engineering, schema validation, and data quality profiling. All transforms are implemented on pandas DataFrames for local use and testing, with optional PySpark support for large-scale production workloads.

Installation

pip install sparkfeaturekit
# With PySpark support:
pip install sparkfeaturekit[spark]
# Development extras:
pip install sparkfeaturekit[dev]

Or with uv:

uv add sparkfeaturekit
uv add "sparkfeaturekit[spark]"

Modules

Module Description
sparkfeaturekit.transforms 15+ reusable feature engineering transforms
sparkfeaturekit.validate Pydantic-based schema validation for DataFrames
sparkfeaturekit.quality Data quality profiler with stats and issue detection
sparkfeaturekit.utils Spark session helpers and logging utilities

Quick Start

Feature Transforms

import pandas as pd
from sparkfeaturekit.transforms import (
    normalize_minmax,
    log_transform,
    frequency_encode,
    target_encode,
    extract_timestamp_features,
    windowed_aggregation,
    impute_mean,
    ratio_feature,
)

df = pd.read_parquet("features.parquet")

# Normalize a numeric column
df = normalize_minmax(df, col="revenue")

# Log-transform (with default offset=1.0 to handle zeros)
df = log_transform(df, col="spend")

# Frequency encode a categorical column
df = frequency_encode(df, col="country")

# Target encode with smoothing
df = target_encode(df, col="product_id", target_col="conversion", smoothing=5.0)

# Extract datetime features
df = extract_timestamp_features(df, col="event_time")

# Rolling window aggregation
df = windowed_aggregation(
    df,
    partition_by="user_id",
    order_by="event_time",
    agg_col="revenue",
    window_sizes=[7, 30],
    agg_funcs=["mean", "sum"],
)

# Impute missing values
df = impute_mean(df, col="age")

# Interaction features
df = ratio_feature(df, num_col="clicks", den_col="impressions", output_col="ctr")

Schema Validation

from sparkfeaturekit.validate import FeatureSchema, ColumnSpec, validate_dataframe

schema = FeatureSchema(
    columns={
        "age": ColumnSpec(dtype="int", nullable=False, min_value=0, max_value=120),
        "score": ColumnSpec(dtype="float", nullable=True, min_value=0.0, max_value=1.0),
        "label": ColumnSpec(dtype="str", nullable=False, allowed_values=["A", "B", "C"]),
    },
    max_null_rate=0.1,
)

result = validate_dataframe(df, schema)
if result.passed:
    print("Validation passed!")
else:
    for violation in result.violations:
        print(f"[{violation.column}] {violation.message}")

Data Quality Profiling

from sparkfeaturekit.quality import DataProfiler

profiler = DataProfiler(df)
profile = profiler.run()

# Human-readable summary (logged via logging module)
profiler.summary()

# Export JSON report
profiler.export("/tmp/quality_report.json")

# Inspect detected issues
for issue in profiler.issues:
    print(f"[{issue.level}] {issue.column}: {issue.issue_type}{issue.detail}")

PySpark Session Helper

from sparkfeaturekit.utils import get_or_create_spark

spark = get_or_create_spark(
    app_name="MyFeaturePipeline",
    config={"spark.sql.shuffle.partitions": "200"},
)

Transform Reference

Numerical (transforms.numerical)

Function Description
normalize_minmax(df, col) Scale to [0, 1] (or custom range)
normalize_zscore(df, col) Zero mean, unit variance
normalize_robust(df, col) Median/IQR scaling — robust to outliers
log_transform(df, col, offset=1.0) Natural log with configurable offset

Categorical (transforms.categorical)

Function Description
frequency_encode(df, col) Replace category with its relative frequency
target_encode(df, col, target_col, smoothing=1.0) Smoothed mean target encoding
one_hot_encode(df, col) Binary indicator columns

Temporal (transforms.temporal)

Function Description
extract_timestamp_features(df, col) Hour, day-of-week, weekend flag, month, quarter, year
time_since_event(df, event_col, ref_col) Elapsed time in seconds/minutes/hours/days

Window (transforms.window)

Function Description
windowed_aggregation(df, partition_by, order_by, agg_col, window_sizes, agg_funcs) Rolling aggregations (mean, sum, min, max, std, count)

Cumulative (transforms.cumulative)

Function Description
cumulative_sum(df, col) Expanding cumulative sum
cumulative_count(df, col) Expanding non-null count
cumulative_mean(df, col) Expanding mean

Imputation (transforms.impute)

Function Description
impute_mean(df, col) Fill NaN with column mean
impute_median(df, col) Fill NaN with column median
impute_mode(df, col) Fill NaN with most frequent value
impute_constant(df, col, value) Fill NaN with a constant
forward_fill(df, col) Carry forward the last valid value

Interaction (transforms.interaction)

Function Description
ratio_feature(df, num_col, den_col, output_col) num / (den + epsilon) — safe division
product_feature(df, col1, col2, output_col) Element-wise product
difference_feature(df, col1, col2, output_col) col1 - col2

Design Principles

  • Pandas-first, PySpark-ready: All transforms work with pandas DataFrames for testing and local use. PySpark is an optional heavy dependency imported lazily.
  • Immutable inputs: Every transform returns a copy — the input DataFrame is never modified.
  • Graceful edge cases: Constant columns return zeros rather than raising; NaN inputs propagate correctly; log of zero is guarded by offset.
  • Pydantic v2 validation: Schema and quality objects use Pydantic for runtime type safety.
  • Logging, not printing: All feedback goes through Python's logging module.

License

Apache License 2.0 — see LICENSE for details.

About

SparkFeatureKit is a production-quality Python library for feature engineering, schema validation, and data quality profiling.

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages