Skip to content

khalidmammadov/polarspark

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

CI PyPI Python License

 ____       _              ____                   _    
|  _ \ ___ | | __ _ _ __  / ___| _ __   __ _ _ __| | __
| |_) / _ \| |/ _` | '__| \___ \| '_ \ / _` | '__| |/ /
|  __/ (_) | | (_| | |     ___) | |_) | (_| | |  |   < 
|_|   \___/|_|\__,_|_|    |____/| .__/ \__,_|_|  |_|\_\
                                |_|                    

๐Ÿš€ Apache Spark on Polars

Polar Spark brings the PySpark API to Polars, optimized for single-machine workloads.

It is designed as a drop-in replacement for PySpark in scenarios where a full Spark cluster is not needed. A common use case is running fast, lightweight unit tests in CI/CD pipelines ๐Ÿงช.

Instead of relying on the JVM-based Spark engine, Polar Spark runs on Polarsโ€™ Lazy API, powered by a high-performance Rust execution engine ๐Ÿฆ€. This avoids the overhead of the JVM, which can be slow and heavy for small or local workloads.

By leveraging Polars, Polar Spark automatically benefits from:

  • ๐Ÿš€ Advanced query optimization
  • ๐Ÿงต Efficient multithreading
  • ๐Ÿ–ฅ๏ธ Excellent performance on modern CPUs

๐ŸŽฏ Goal: Make Polar Spark a seamless PySpark replacement whenever workloads fit on a single machine or within local resource limits.

Installation

pip install polarspark==0.2.2a4

Examples:

Spark session

try:            
    from polarspark.sql.session import SparkSession
except Exception:
    from pyspark.sql.session import SparkSession

spark = SparkSession.builder.master("local").appName("myapp").getOrCreate()

print(spark)
print(type(spark))

>>> <polarspark.sql.session.SparkSession object at 0x1043bdd90>
>>> <class 'polarspark.sql.session.SparkSession'>

DataFrame API

try:
    from polarspark.sql import Row
    from polarspark.sql.types import *
except Exception:
    from pyspark.sql import Row
    from pyspark.sql.types import *    
from pprint import pprint
d = [{'name': 'Alice', 'age': 1}, 
     {'name': 'Tome', 'age': 100}, 
     {'name': 'Sim', 'age': 99}]
df = spark.createDataFrame(d)
rows = df.collect()

SQL

spark.sql("CREATE TABLE input_table (value string) USING parquet")
spark.sql("INSERT INTO input_table VALUES (1), (2), (3)")

spark.sql("""
    SELECT * 
    FROM input_table i 
        JOIN my_table m 
    ON i.value = m.age
""").show()

API

pprint(rows)
>>> [Row(age=1, name='Alice'),
>>>  Row(age=100, name='Tome'),
>>>  Row(age=99, name='Sim')]
df.printSchema()
>>> root
>>>  |-- age: long (nullable = true)
>>>  |-- name: string (nullable = true)
# With schema
schema = StructType([
            StructField("name", StringType(), True),
            StructField("age", IntegerType(), True)])
df_no_rows = spark.createDataFrame([], schema=schema)

print(df_no_rows.isEmpty())
>>> True
# or using Spark DDL
df = spark.createDataFrame([("Alice", 3), ("Ben", 5)], schema="name STRING, age INT")
print(df.isEmpty())
>>> False

Read / write Parquet, Delta, CSV etc.

base_path = "/var/tmp"

df1 = spark.read.format("json").load([f"{base_path}/data.json",
                                     f"{base_path}/data.json"
                                     ])
df2 = spark.read.json([f"{base_path}/data.json",
                      f"{base_path}/data.json"])


df1.write.format("csv").save(f"{base_path}/data_json_to_csv.csv", mode="overwrite")

df1 = spark.read.format("csv").load([f"{base_path}/data_json_to_csv.csv",
                                       f"{base_path}/data_json_to_csv.csv"])

df1 = spark.read.format("parquet").load([f"{base_path}/data_json_to_parquet.parquet",
                                       f"{base_path}/data_json_to_parquet.parquet"])
df2 = spark.read.parquet(f"{base_path}/data_json_to_parquet.parquet",
                               f"{base_path}/data_json_to_parquet.parquet")

Streaming (Stateless)

df = self.spark.readStream.format("rate").load()
q = df.writeStream.toTable("output_table", format="parquet", checkpointLocation=tmpdir)
q.stop()
result = self.spark.sql("SELECT value FROM output_table").collect()    

Streaming (foreachBatch)

def collectBatch(batch_df, batch_id):
    batch_df.write.format("parquet").mode("overwrite").saveAsTable("test_table1")

df = self.spark.readStream.format("text").load("polarspark/test_support/sql/streaming")
q = df.writeStream.foreachBatch(collectBatch).start()
q.processAllAvailable()
collected = self.spark.sql("select * from test_table1").collect()

In Memory Catalog

df.write.saveAsTable("my_table")
spark.sql("select * from my_table").show()

Some more:

Filter

pprint(df.offset(1).first())
>>>  Row(age=100, name='Tome')
df.show()

shape: (3, 2)
โ”Œโ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ age โ”† name     โ”‚
โ”‚ --- โ”† ---      โ”‚
โ”‚ i64 โ”† str      โ”‚
โ•žโ•โ•โ•โ•โ•โ•ชโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก
โ”‚ 1   โ”† Alice    โ”‚
โ”‚ 100 โ”† Tome     โ”‚
โ”‚ 99  โ”† Sim      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
df.explain()
                 0
   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
   โ”‚
   โ”‚  โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
   โ”‚  โ”‚ DF ["age", "name"]  โ”‚
 0 โ”‚  โ”‚ PROJECT */2 COLUMNS โ”‚
   โ”‚  โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
print(repr(df))
>>>  DataFrame[age: bigint, name: string]
print(df.count())
>>>  3
def func(row):
    print("Row -> {}".format(row))

df.foreach(func)

df = spark.createDataFrame(
    [(14, "Tom"), (23, "Alice"), (16, "Bob"), (16, "Bob")], ["age", "name"]
)

def func(itr):
    for person in itr:
        print(person)
        print("Person -> {}".format(person.name))
df.foreachPartition(func)

df.show()
df.distinct().show()

NOTE: Some of the features are not directly mapped but relies on Polars. e.g. df.show() or df.explain() will print polars relevant method output