Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 78 additions & 151 deletions docs/content/program-api/python-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ title: "Python API"
weight: 5
type: docs
aliases:
- /api/python-api.html
- /api/python-api.html
---

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
Expand All @@ -26,119 +27,52 @@ under the License.

# Java-based Implementation For Python API

[Python SDK ](https://github.com/apache/paimon-python) has defined Python API for Paimon. Currently, there is only a Java-based implementation.

Java-based implementation will launch a JVM and use `py4j` to execute Java code to read and write Paimon table.
[Python SDK ](https://github.com/apache/paimon-python) has defined Python API for Paimon.

## Environment Settings

### SDK Installing

SDK is published at [pypaimon](https://pypi.org/project/pypaimon/). You can install by
```shell
pip install pypaimon
```

### Java Runtime Environment

This SDK needs JRE 1.8. After installing JRE, make sure that at least one of the following conditions is met:
1. `java` command is available. You can verify it by `java -version`.
2. `JAVA_HOME` and `PATH` variables are set correctly. For example, you can set:
```shell
export JAVA_HOME=/path/to/java-directory
export PATH=$JAVA_HOME/bin:$PATH
```

### Set Environment Variables

Because we need to launch a JVM to access Java code, JVM environment need to be set. Besides, the java code need Hadoop
dependencies, so hadoop environment should be set.

#### Java classpath

The package has set necessary paimon core dependencies (Local/Hadoop FileIO, Avro/Orc/Parquet format support and
FileSystem/Jdbc/Hive catalog), so If you just test codes in local or in hadoop environment, you don't need to set classpath.

If you need other dependencies such as OSS/S3 filesystem jars, or special format and catalog ,please prepare jars and set
classpath via one of the following ways:

1. Set system environment variable: ```export _PYPAIMON_JAVA_CLASSPATH=/path/to/jars/*```
2. Set environment variable in Python code:

```python
import os
from pypaimon import constants

os.environ[constants.PYPAIMON_JAVA_CLASSPATH] = '/path/to/jars/*'
```

#### JVM args (optional)

You can set JVM args via one of the following ways:

1. Set system environment variable: ```export _PYPAIMON_JVM_ARGS='arg1 arg2 ...'```
2. Set environment variable in Python code:

```python
import os
from pypaimon import constants

os.environ[constants.PYPAIMON_JVM_ARGS] = 'arg1 arg2 ...'
```

#### Hadoop classpath

If the machine is in a hadoop environment, please ensure the value of the environment variable HADOOP_CLASSPATH include
path to the common Hadoop libraries, then you don't need to set hadoop.

Otherwise, you should set hadoop classpath via one of the following ways:

1. Set system environment variable: ```export _PYPAIMON_HADOOP_CLASSPATH=/path/to/jars/*```
2. Set environment variable in Python code:

```python
import os
from pypaimon import constants

os.environ[constants.PYPAIMON_HADOOP_CLASSPATH] = '/path/to/jars/*'
pip install pypaimon
```

If you just want to test codes in local, we recommend to use [Flink Pre-bundled hadoop jar](https://flink.apache.org/downloads/#additional-components).


## Create Catalog

Before coming into contact with the Table, you need to create a Catalog.

```python
from pypaimon import Catalog
from pypaimon.catalog.catalog_factory import CatalogFactory

# Note that keys and values are all string
catalog_options = {
'metastore': 'filesystem',
'warehouse': 'file:///path/to/warehouse'
}
catalog = Catalog.create(catalog_options)
catalog = CatalogFactory.create(catalog_options)
```

## Create Database & Table

You can use the catalog to create table for writing data.

### Create Database (optional)

Table is located in a database. If you want to create table in a new database, you should create it.

```python
catalog.create_database(
name='database_name',
ignore_if_exists=True, # If you want to raise error if the database exists, set False
properties={'key': 'value'} # optional database properties
name='database_name',
ignore_if_exists=True, # If you want to raise error if the database exists, set False
properties={'key': 'value'} # optional database properties
)
```

### Create Schema

Table schema contains fields definition, partition keys, primary keys, table options and comment.
Table schema contains fields definition, partition keys, primary keys, table options and comment.
The field definition is described by `pyarrow.Schema`. All arguments except fields definition are optional.

Generally, there are two ways to build `pyarrow.Schema`.
Expand All @@ -157,16 +91,15 @@ pa_schema = pa.schema([
('value', pa.string())
])

schema = Schema(
pa_schema=pa_schema,
schema = Schema.from_pyarrow_schema(
pa_schema=pa_schema,
partition_keys=['dt', 'hh'],
primary_keys=['dt', 'hh', 'pk'],
options={'bucket': '2'},
comment='my test table'
)
comment='my test table')
```

See [Data Types]({{< ref "python-api#data-types" >}}) for all supported `pyarrow-to-paimon` data types mapping.
See [Data Types]({{< ref "python-api#data-types" >}}) for all supported `pyarrow-to-paimon` data types mapping.

Second, if you have some Pandas data, the `pa_schema` can be extracted from `DataFrame`:

Expand All @@ -187,9 +120,9 @@ dataframe = pd.DataFrame(data)

# Get Paimon Schema
record_batch = pa.RecordBatch.from_pandas(dataframe)
schema = Schema(
pa_schema=record_batch.schema,
partition_keys=['dt', 'hh'],
schema = Schema.from_pyarrow_schema(
pa_schema=record_batch.schema,
partition_keys=['dt', 'hh'],
primary_keys=['dt', 'hh', 'pk'],
options={'bucket': '2'},
comment='my test table'
Expand All @@ -204,8 +137,8 @@ After building table schema, you can create corresponding table:
schema = ...
catalog.create_table(
identifier='database_name.table_name',
schema=schema,
ignore_if_exists=True # If you want to raise error if the table exists, set False
schema=schema,
ignore_if_exists=True # If you want to raise error if the table exists, set False
)
```

Expand All @@ -217,13 +150,56 @@ The Table interface provides tools to read and write table.
table = catalog.get_table('database_name.table_name')
```

## Batch Read
## Batch Write

Paimon table write is Two-Phase Commit, you can write many times, but once committed, no more data can be written.

{{< hint warning >}}
Currently, the feature of writing multiple times and committing once only supports append only table.
{{< /hint >}}

```python
table = catalog.get_table('database_name.table_name')

# 1. Create table write and commit
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

# 2. Write data. Support 3 methods:
# 2.1 Write pandas.DataFrame
dataframe = ...
table_write.write_pandas(dataframe)

# 2.2 Write pyarrow.Table
pa_table = ...
table_write.write_arrow(pa_table)

# 2.3 Write pyarrow.RecordBatch
record_batch = ...
table_write.write_arrow_batch(record_batch)

# 3. Commit data
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)

### Set Read Parallelism
# 4. Close resources
table_write.close()
table_commit.close()
```

TableRead interface provides parallelly reading for multiple splits. You can set `'max-workers': 'N'` in `catalog_options`
to set thread numbers for reading splits. `max-workers` is 1 by default, that means TableRead will read splits sequentially
if you doesn't set `max-workers`.
By default, the data will be appended to table. If you want to overwrite table, you should use `TableWrite#overwrite`
API:

```python
# overwrite whole table
write_builder.overwrite()

# overwrite partition 'dt=2024-01-01'
write_builder.overwrite({'dt': '2024-01-01'})
```

## Batch Read

### Get ReadBuilder and Perform pushdown

Expand Down Expand Up @@ -296,7 +272,7 @@ You can also read data into a `pyarrow.RecordBatchReader` and iterate record bat

```python
table_read = read_builder.new_read()
for batch in table_read.to_arrow_batch_reader(splits):
for batch in table_read.to_iterator(splits):
print(batch)

# pyarrow.RecordBatch
Expand Down Expand Up @@ -374,66 +350,17 @@ print(ray_dataset.to_pandas())
# ...
```

## Batch Write

Paimon table write is Two-Phase Commit, you can write many times, but once committed, no more data can be written.

{{< hint warning >}}
Currently, Python SDK doesn't support writing primary key table with `bucket=-1`.
{{< /hint >}}

```python
table = catalog.get_table('database_name.table_name')

# 1. Create table write and commit
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

# 2. Write data. Support 3 methods:
# 2.1 Write pandas.DataFrame
dataframe = ...
table_write.write_pandas(dataframe)

# 2.2 Write pyarrow.Table
pa_table = ...
table_write.write_arrow(pa_table)

# 2.3 Write pyarrow.RecordBatch
record_batch = ...
table_write.write_arrow_batch(record_batch)

# 3. Commit data
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)

# 4. Close resources
table_write.close()
table_commit.close()
```

By default, the data will be appended to table. If you want to overwrite table, you should use `TableWrite#overwrite` API:

```python
# overwrite whole table
write_builder.overwrite()

# overwrite partition 'dt=2024-01-01'
write_builder.overwrite({'dt': '2024-01-01'})
```

## Data Types

| pyarrow | Paimon |
|:-----------------------------------------|:---------|
| pyarrow.int8() | TINYINT |
| pyarrow.int16() | SMALLINT |
| pyarrow.int32() | INT |
| pyarrow.int64() | BIGINT |
| pyarrow.float16() <br/>pyarrow.float32() | FLOAT |
| pyarrow.float64() | DOUBLE |
| pyarrow.string() | STRING |
| pyarrow.boolean() | BOOLEAN |
| pyarrow | Paimon |
|:-----------------------------------------------------------------|:---------|
| pyarrow.int8() | TINYINT |
| pyarrow.int16() | SMALLINT |
| pyarrow.int32() | INT |
| pyarrow.int64() | BIGINT |
| pyarrow.float16() <br/>pyarrow.float32() <br/>pyarrow.float64() | FLOAT |
| pyarrow.string() | STRING |
| pyarrow.boolean() | BOOLEAN |

## Predicate

Expand Down
2 changes: 1 addition & 1 deletion paimon-python/pypaimon/tests/reader_append_only_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def testAppendOnlyReaderWithFilter(self):
p3 = predicate_builder.between('user_id', 0, 6) # [2/b, 3/c, 4/d, 5/e, 6/f] left
p4 = predicate_builder.is_not_in('behavior', ['b', 'e']) # [3/c, 4/d, 6/f] left
p5 = predicate_builder.is_in('dt', ['p1']) # exclude 3/c
p6 = predicate_builder.is_not_null('behavior') # exclude 4/d
p6 = predicate_builder.is_not_null('behavior') # exclude 4/d
g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
read_builder = table.new_read_builder().with_filter(g1)
actual = self._read_test_table(read_builder)
Expand Down