Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Spark

easy-db-lab supports provisioning Apache Spark clusters via AWS EMR for analytics workloads.

Enabling Spark

There are two ways to enable Spark:

Option 1: During Init (before up)

Enable Spark during cluster initialization with the --spark.enable flag. The EMR cluster will be created automatically when you run up:

easy-db-lab init --spark.enable
easy-db-lab up

Init Spark Configuration Options

OptionDescriptionDefault
--spark.enableEnable Spark EMR clusterfalse
--spark.master.instance.typeMaster node instance typem5.xlarge
--spark.worker.instance.typeWorker node instance typem5.xlarge
--spark.worker.instance.countNumber of worker nodes3

Example with Custom Configuration

easy-db-lab init \
  --spark.enable \
  --spark.master.instance.type m5.2xlarge \
  --spark.worker.instance.type m5.4xlarge \
  --spark.worker.instance.count 5

Option 2: After up (standalone spark init)

Add Spark to an existing environment that is already running. This is useful when you forgot to pass --spark.enable during init, or when you decide to add Spark later:

easy-db-lab spark init

Prerequisites: easy-db-lab init and easy-db-lab up must have been run first.

Spark Init Configuration Options

OptionDescriptionDefault
--master.instance.typeMaster node instance typem5.xlarge
--worker.instance.typeWorker node instance typem5.xlarge
--worker.instance.countNumber of worker nodes3

Example with Custom Configuration

easy-db-lab spark init \
  --master.instance.type m5.2xlarge \
  --worker.instance.type m5.4xlarge \
  --worker.instance.count 5

Submitting Spark Jobs

Submit JAR-based Spark applications to your EMR cluster:

easy-db-lab spark submit \
  --jar /path/to/your-app.jar \
  --main-class com.example.YourMainClass \
  --conf spark.easydblab.keyspace=my_keyspace \
  --conf spark.easydblab.table=my_table \
  --wait

Submit Options

OptionDescriptionRequired
--jarPath to JAR file (local path or s3:// URI)Yes
--main-classMain class to executeYes
--confSpark configuration (key=value), can be repeatedNo
--envEnvironment variable (KEY=value), can be repeatedNo
--argsArguments for the Spark applicationNo
--waitWait for job completionNo
--nameJob name (defaults to main class)No

When --jar is a local path, it is automatically uploaded to the cluster's S3 bucket before submission. When it is an s3:// URI, it is used directly.

Using a JAR Already on S3

If your JAR is already on S3 (e.g., from a CI pipeline or a previous upload), pass the S3 URI directly:

easy-db-lab spark submit \
  --jar s3://my-bucket/jars/your-app.jar \
  --main-class com.example.YourMainClass \
  --conf spark.easydblab.keyspace=my_keyspace \
  --wait

This skips the upload step entirely, which is useful for large JARs or when resubmitting the same job.

Cancelling a Job

Cancel a running or pending Spark job without terminating the cluster:

easy-db-lab spark stop

Without --step-id, this cancels the most recent job. To cancel a specific job:

easy-db-lab spark stop --step-id <step-id>

The cancellation uses EMR's TERMINATE_PROCESS strategy (SIGKILL). The API is asynchronous — use spark status to confirm the job has been cancelled.

Checking Job Status

View Recent Jobs

List recent Spark jobs on the cluster:

easy-db-lab spark jobs

Options:

  • --limit - Maximum number of jobs to display (default: 10)

Check Specific Job Status

easy-db-lab spark status --step-id <step-id>

Without --step-id, shows the status of the most recent job.

Options:

  • --step-id - EMR step ID to check
  • --logs - Download step logs (stdout, stderr)

Retrieving Logs

Download logs for a Spark job:

easy-db-lab spark logs --step-id <step-id>

Logs are automatically decompressed and include:

  • stdout.gz - Standard output
  • stderr.gz - Standard error
  • controller.gz - EMR controller logs

Architecture

When Spark is enabled, easy-db-lab provisions:

  • EMR Cluster: Managed Spark cluster with master and worker nodes
  • S3 Integration: Logs stored at s3://<bucket>/spark/emr-logs/
  • IAM Roles: Service and job flow roles for EMR operations
  • Observability: Each EMR node runs an OTel Collector (host metrics, OTLP forwarding), OTel Java Agent (auto-instrumentation for logs/metrics/traces), and Pyroscope Java Agent (continuous CPU/allocation/lock profiling). All telemetry flows to the control node's observability stack.

Timeouts and Polling

  • Job Polling Interval: 5 seconds
  • Maximum Wait Time: 4 hours
  • Cluster Creation Timeout: 30 minutes

Spark with Cassandra

A common use case is running Spark jobs that read from or write to Cassandra. Use the Spark Cassandra Connector:

import com.datastax.spark.connector._

val df = spark.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "my_table", "keyspace" -> "my_keyspace"))
  .load()

Ensure your JAR includes the Spark Cassandra Connector dependency and configure the Cassandra host in your Spark application.

Spark Modules

All Spark job modules live under the spark/ directory and share unified configuration via spark.easydblab.* properties. You can compare performance across implementations by swapping the JAR and main class while keeping the same --conf flags.

Module Overview

ModuleGradle PathMain ClassDescription
common:spark:commonShared config, data generation, CQL setup
bulk-writer-sidecar:spark:bulk-writer-sidecarDirectBulkWriterCassandra Analytics, direct sidecar transport
bulk-writer-s3:spark:bulk-writer-s3S3BulkWriterCassandra Analytics, S3 staging transport
connector-writer:spark:connector-writerStandardConnectorWriterStandard Spark Cassandra Connector
connector-read-write:spark:connector-read-writeKeyValuePrefixCountRead→transform→write example

Building

Pre-build Cassandra Analytics (one-time, for bulk-writer modules)

The cassandra-analytics library requires JDK 11 to build:

bin/build-cassandra-analytics

Options:

  • --force - Rebuild even if JARs exist
  • --branch <branch> - Use a specific branch (default: trunk)

Build JARs

# Build all Spark modules
./gradlew :spark:bulk-writer-sidecar:shadowJar :spark:bulk-writer-s3:shadowJar \
  :spark:connector-writer:shadowJar :spark:connector-read-write:shadowJar

# Or build individually
./gradlew :spark:bulk-writer-sidecar:shadowJar
./gradlew :spark:connector-writer:shadowJar

Usage

All modules use the same --conf properties for easy comparison.

Direct Bulk Writer (Sidecar)

easy-db-lab spark submit \
  --jar spark/bulk-writer-sidecar/build/libs/bulk-writer-sidecar-*.jar \
  --main-class com.rustyrazorblade.easydblab.spark.DirectBulkWriter \
  --conf spark.easydblab.contactPoints=host1,host2,host3 \
  --conf spark.easydblab.keyspace=bulk_test \
  --conf spark.easydblab.localDc=us-west-2 \
  --conf spark.easydblab.rowCount=1000000 \
  --wait

S3 Bulk Writer

easy-db-lab spark submit \
  --jar spark/bulk-writer-s3/build/libs/bulk-writer-s3-*.jar \
  --main-class com.rustyrazorblade.easydblab.spark.S3BulkWriter \
  --conf spark.easydblab.contactPoints=host1,host2,host3 \
  --conf spark.easydblab.keyspace=bulk_test \
  --conf spark.easydblab.localDc=us-west-2 \
  --conf spark.easydblab.s3.bucket=my-bucket \
  --conf spark.easydblab.rowCount=1000000 \
  --wait

Standard Connector Writer

easy-db-lab spark submit \
  --jar spark/connector-writer/build/libs/connector-writer-*.jar \
  --main-class com.rustyrazorblade.easydblab.spark.StandardConnectorWriter \
  --conf spark.easydblab.contactPoints=host1,host2,host3 \
  --conf spark.easydblab.keyspace=bulk_test \
  --conf spark.easydblab.localDc=us-west-2 \
  --conf spark.easydblab.rowCount=1000000 \
  --wait

Convenience Script

The bin/spark-bulk-write script handles JAR lookup, host resolution, and health checks:

# From a cluster directory
spark-bulk-write direct --rows 10000
spark-bulk-write s3 --rows 1000000 --parallelism 20
spark-bulk-write connector --keyspace myks --table mytable

Configuration Properties

All modules share these properties via spark.easydblab.*:

PropertyDescriptionDefault
spark.easydblab.contactPointsComma-separated database hostsRequired
spark.easydblab.keyspaceTarget keyspaceRequired
spark.easydblab.tableTarget tabledata_<timestamp>
spark.easydblab.localDcLocal datacenter nameRequired
spark.easydblab.rowCountNumber of rows to write1000000
spark.easydblab.parallelismSpark partitions for generation10
spark.easydblab.partitionCountCassandra partitions to distribute across10000
spark.easydblab.replicationFactorKeyspace replication factor3
spark.easydblab.skipDdlSkip keyspace/table creation (validates they exist)false
spark.easydblab.compactionCompaction strategy(default)
spark.easydblab.s3.bucketS3 bucket (S3 mode only)Required for S3
spark.easydblab.s3.endpointS3 endpoint overrideAWS S3

Table Schema

The test data generators produce this schema:

CREATE TABLE <keyspace>.<table> (
    partition_id bigint,
    sequence_id bigint,
    course blob,
    marks bigint,
    PRIMARY KEY ((partition_id), sequence_id)
);