|
| 1 | +# Using Delta Lake with Sail |
| 2 | + |
| 3 | +[Sail](https://github.com/lakehq/sail) is an open-source multimodal distributed compute framework, built in Rust, unifying batch, streaming, and AI workloads. For seamless adoption, Sail offers a drop-in replacement for the Spark SQL and DataFrame APIs in both single-host and distributed settings. You can use Sail for both ad-hoc queries on your laptop and large-scale data processing jobs in a cluster. |
| 4 | + |
| 5 | +Delta Lake is a natural fit for Sail. Delta Lake provides a reliable storage layer with strong data management guarantees, while Sail focuses on flexible, unified multimodal compute. By combining the two, you can start with a familiar PySpark client, while benefiting from Sail’s native support for Delta Lake tables and ensuring interoperability with existing Delta datasets. |
| 6 | + |
| 7 | +## Getting Started with Sail |
| 8 | + |
| 9 | +### Installing Sail |
| 10 | + |
| 11 | +The easiest way to get started is by installing PySail and the PySpark client. |
| 12 | + |
| 13 | +```bash |
| 14 | +pip install "pysail" |
| 15 | +pip install "pyspark-client" |
| 16 | +``` |
| 17 | + |
| 18 | +### Using the Sail Library |
| 19 | + |
| 20 | +You can use the Sail library to start a Spark Connect server and connect to it using PySpark. |
| 21 | + |
| 22 | +```python |
| 23 | +from pysail.spark import SparkConnectServer |
| 24 | +from pyspark.sql import SparkSession |
| 25 | + |
| 26 | +server = SparkConnectServer() |
| 27 | +server.start() |
| 28 | +_, port = server.listening_address |
| 29 | + |
| 30 | +spark = SparkSession.builder.remote(f"sc://localhost:{port}").getOrCreate() |
| 31 | +``` |
| 32 | + |
| 33 | +<br> |
| 34 | +In all the examples below, `spark` refers to the Spark session connected to the Sail server. |
| 35 | + |
| 36 | +## Basic Delta Lake Usage in Sail |
| 37 | + |
| 38 | +### DataFrame API |
| 39 | + |
| 40 | +```python |
| 41 | +path = "file:///tmp/sail/users" |
| 42 | +df = spark.createDataFrame( |
| 43 | + [(1, "Alice"), (2, "Bob")], |
| 44 | + schema="id INT, name STRING", |
| 45 | +) |
| 46 | + |
| 47 | +# This creates a new table or overwrites an existing one. |
| 48 | +df.write.format("delta").mode("overwrite").save(path) |
| 49 | +# This appends data to an existing Delta table. |
| 50 | +df.write.format("delta").mode("append").save(path) |
| 51 | + |
| 52 | +df = spark.read.format("delta").load(path) |
| 53 | +df.show() |
| 54 | +``` |
| 55 | + |
| 56 | +### SQL |
| 57 | + |
| 58 | +```python |
| 59 | +spark.sql(""" |
| 60 | +CREATE TABLE users (id INT, name STRING) |
| 61 | +USING delta |
| 62 | +LOCATION 'file:///tmp/sail/users' |
| 63 | +""") |
| 64 | + |
| 65 | +spark.sql("INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob')") |
| 66 | +spark.sql("SELECT * FROM users").show() |
| 67 | +``` |
| 68 | + |
| 69 | +## Data Partitioning |
| 70 | + |
| 71 | +Partitioned Delta tables organize data into directories based on the values of one or more columns. This improves query performance by skipping data files that do not match the filter conditions. |
| 72 | + |
| 73 | +### DataFrame API |
| 74 | + |
| 75 | +```python |
| 76 | +path = "file:///tmp/sail/metrics" |
| 77 | +df = spark.createDataFrame( |
| 78 | + [(2024, 1.0), (2025, 2.0)], |
| 79 | + schema="year INT, value FLOAT", |
| 80 | +) |
| 81 | + |
| 82 | +df.write.format("delta").mode("overwrite").partitionBy("year").save(path) |
| 83 | + |
| 84 | +df = spark.read.format("delta").load(path).filter("year > 2024") |
| 85 | +df.show() |
| 86 | +``` |
| 87 | + |
| 88 | +### SQL |
| 89 | + |
| 90 | +```python |
| 91 | +spark.sql(""" |
| 92 | +CREATE TABLE metrics (year INT, value FLOAT) |
| 93 | +USING delta |
| 94 | +LOCATION 'file:///tmp/sail/metrics' |
| 95 | +PARTITIONED BY (year) |
| 96 | +""") |
| 97 | + |
| 98 | +spark.sql("INSERT INTO metrics VALUES (2024, 1.0), (2025, 2.0)") |
| 99 | +spark.sql("SELECT * FROM metrics WHERE year > 2024").show() |
| 100 | +``` |
| 101 | + |
| 102 | +## Schema Evolution |
| 103 | + |
| 104 | +Delta Lake handles schema evolution gracefully. By default, if you try to write data with a different schema than the one of the existing Delta table, an error will occur. You can enable schema evolution by setting the `mergeSchema` option to `true` when writing data. In this case, if you change the data type of an existing column to a compatible type, or add a new column, Delta Lake will automatically update the schema of the table. |
| 105 | + |
| 106 | +```python |
| 107 | +df.write.format("delta").mode("append").option("mergeSchema", "true").save(path) |
| 108 | +``` |
| 109 | + |
| 110 | +You can also use the `overwriteSchema` option to overwrite the schema of an existing Delta table. But this works only if you set the write mode to `overwrite`. |
| 111 | + |
| 112 | +```python |
| 113 | +df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(path) |
| 114 | +``` |
| 115 | + |
| 116 | +## Time Travel |
| 117 | + |
| 118 | +You can use the time travel feature of Delta Lake to query historical versions of a Delta table. |
| 119 | + |
| 120 | +```python |
| 121 | +df = spark.read.format("delta").option("versionAsOf", "0").load(path) |
| 122 | +``` |
| 123 | + |
| 124 | +## Contribute to Sail |
| 125 | + |
| 126 | +You can refer to the [Sail documentation](https://docs.lakesail.com/sail/latest/) for more information, such as reading or writing Delta tables stored in various object storages supported by Sail. |
| 127 | + |
| 128 | +Excited about Sail and want to contribute? Join them on [GitHub](https://github.com/lakehq/sail)! 🚀 |
0 commit comments