Spark Processing on NATIS
Use Apache Spark 3.5 on NATIS for large-scale data processing, machine learning, and custom transformations.
NATIS provides a fully managed Apache Spark 3.5 environment accessible through Notebooks, Jobs, and the Pipeline Editor. You don't need to configure clusters, manage dependencies, or worry about infrastructure — NATIS handles all of that automatically.
Spark Session Configuration
# The SparkSession is pre-configured in NATIS notebooks
# Access it directly:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable
# Read from Delta Lake
df = spark.table("catalog.silver.transactions")
# PySpark transformation example
result = (
df
.filter(F.col("status") == "completed")
.withColumn("month", F.date_trunc("month", F.col("created_at")))
.withColumn("is_high_value", F.when(F.col("amount") > 1000, True).otherwise(False))
.groupBy("month", "merchant_category", "is_high_value")
.agg(
F.sum("amount").alias("total_amount"),
F.count("transaction_id").alias("txn_count"),
F.avg("amount").alias("avg_amount"),
F.percentile_approx("amount", 0.95).alias("p95_amount")
)
.orderBy("month", "merchant_category")
)
# Write result to Gold zone
result.write.format("delta").mode("overwrite").saveAsTable("catalog.gold.merchant_analytics")
Cluster Types
Cluster Type | Best For | Auto-Scaling | Cost Profile — | — | — | — All-Purpose (Interactive) | Notebooks, exploration, development | Yes | Higher (always-on option) Job Cluster | Scheduled pipelines and ETL jobs | Yes (per-job) | Lower (spun up per job) SQL Warehouse | BI queries, ad-hoc SQL, dashboards | Yes | Per-query billing Streaming Cluster | Long-running Structured Streaming jobs | No (fixed size) | Reserved pricing
Delta Lake Integration
# MERGE (upsert) using DeltaTable API
delta_target = DeltaTable.forName(spark, "catalog.silver.customers")
(
delta_target.alias("target")
.merge(
df_updates.alias("source"),
"target.customer_id = source.customer_id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# View table history
spark.sql("DESCRIBE HISTORY catalog.silver.customers").show(10, truncate=False)
# Time travel
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-12-01") \
.table("catalog.silver.customers")
Was this page helpful?
Thanks for your feedback!