Data Transformation

Spark Processing on NATIS

Use Apache Spark 3.5 on NATIS for large-scale data processing, machine learning, and custom transformations.

8 min read · Updated April 2025

NATIS provides a fully managed Apache Spark 3.5 environment accessible through Notebooks, Jobs, and the Pipeline Editor. You don't need to configure clusters, manage dependencies, or worry about infrastructure — NATIS handles all of that automatically.

Spark Session Configuration

PYTHON
# The SparkSession is pre-configured in NATIS notebooks
# Access it directly:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable

# Read from Delta Lake
df = spark.table("catalog.silver.transactions")

# PySpark transformation example
result = (
    df
    .filter(F.col("status") == "completed")
    .withColumn("month", F.date_trunc("month", F.col("created_at")))
    .withColumn("is_high_value", F.when(F.col("amount") > 1000, True).otherwise(False))
    .groupBy("month", "merchant_category", "is_high_value")
    .agg(
        F.sum("amount").alias("total_amount"),
        F.count("transaction_id").alias("txn_count"),
        F.avg("amount").alias("avg_amount"),
        F.percentile_approx("amount", 0.95).alias("p95_amount")
    )
    .orderBy("month", "merchant_category")
)

# Write result to Gold zone
result.write.format("delta").mode("overwrite").saveAsTable("catalog.gold.merchant_analytics")

Cluster Types

Cluster Type | Best For | Auto-Scaling | Cost Profile — | — | — | — All-Purpose (Interactive) | Notebooks, exploration, development | Yes | Higher (always-on option) Job Cluster | Scheduled pipelines and ETL jobs | Yes (per-job) | Lower (spun up per job) SQL Warehouse | BI queries, ad-hoc SQL, dashboards | Yes | Per-query billing Streaming Cluster | Long-running Structured Streaming jobs | No (fixed size) | Reserved pricing

Delta Lake Integration

PYTHON
# MERGE (upsert) using DeltaTable API
delta_target = DeltaTable.forName(spark, "catalog.silver.customers")

(
    delta_target.alias("target")
    .merge(
        df_updates.alias("source"),
        "target.customer_id = source.customer_id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

# View table history
spark.sql("DESCRIBE HISTORY catalog.silver.customers").show(10, truncate=False)

# Time travel
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-12-01") \
    .table("catalog.silver.customers")

Was this page helpful?

Thanks for your feedback!