Big Data

A side-by-side comparison of Apache Spark and Apache Flink for common streaming use cases


Apache Flink and Apache Spark are both open-source, distributed data processing frameworks used widely for big data processing and analytics. Spark is known for its ease of use, high-level APIs, and the ability to process large amounts of data. Flink shines in its ability to handle processing of data streams in real-time and low-latency stateful computations. Both support a variety of programming languages, scalable solutions for handling large amounts of data, and a wide range of connectors. Historically, Spark started out as a batch-first framework and Flink began as a streaming-first framework.

In this post, we share a comparative study of streaming patterns that are commonly used to build stream processing applications, how they can be solved using Spark (primarily Spark Structured Streaming) and Flink, and the minor variations in their approach. Examples cover code snippets in Python and SQL for both frameworks across three major themes: data preparation, data processing, and data enrichment. If you are a Spark user looking to solve your stream processing use cases using Flink, this post is for you. We do not intend to cover the choice of technology between Spark and Flink because it’s important to evaluate both frameworks for your specific workload and how the choice fits in your architecture; rather, this post highlights key differences for use cases that both these technologies are commonly considered for.

Apache Flink offers layered APIs that offer different levels of expressiveness and control and are designed to target different types of use cases. The three layers of API are Process Functions (also known as the Stateful Stream Processing API), DataStream, and Table and SQL. The Stateful Stream Processing API requires writing verbose code but offers the most control over time and state, which are core concepts in stateful stream processing. The DataStream API supports Java, Scala, and Python and offers primitives for many common stream processing operations, as well as a balance between code verbosity or expressiveness and control. The Table and SQL APIs are relational APIs that offer support for Java, Scala, Python, and SQL. They offer the highest abstraction and intuitive, SQL-like declarative control over data streams. Flink also allows seamless transition and switching across these APIs. To learn more about Flink’s layered APIs, refer to layered APIs.

Apache Spark Structured Streaming offers the Dataset and DataFrames APIs, which provide high-level declarative streaming APIs to represent static, bounded data as well as streaming, unbounded data. Operations are supported in Scala, Java, Python, and R. Spark has a rich function set and syntax with simple constructs for selection, aggregation, windowing, joins, and more. You can also use the Streaming Table API to read tables as streaming DataFrames as an extension to the DataFrames API. Although it’s hard to draw direct parallels between Flink and Spark across all stream processing constructs, at a very high level, we could say Spark Structured Streaming APIs are equivalent to Flink’s Table and SQL APIs. Spark Structured Streaming, however, does not yet (at the time of this writing) offer an equivalent to the lower-level APIs in Flink that offer granular control of time and state.

Both Flink and Spark Structured Streaming (referenced as Spark henceforth) are evolving projects. The following table provides a simple comparison of Flink and Spark capabilities for common streaming primitives (as of this writing).

. Flink Spark
Row-based processing Yes Yes
User-defined functions Yes Yes
Fine-grained access to state Yes, via DataStream and low-level APIs No
Control when state eviction occurs Yes, via DataStream and low-level APIs No
Flexible data structures for state storage and querying Yes, via DataStream and low-level APIs No
Timers for processing and stateful operations Yes, via low level APIs No

In the following sections, we cover the greatest common factors so that we can showcase how Spark users can relate to Flink and vice versa. To learn more about Flink’s low-level APIs, refer to Process Function. For the sake of simplicity, we cover the four use cases in this post using the Flink Table API. We use a combination of Python and SQL for an apples-to-apples comparison with Spark.

Data preparation

In this section, we compare data preparation methods for Spark and Flink.

Reading data

We first look at the simplest ways to read data from a data stream. The following sections assume the following schema for messages:

symbol: string,
price: int,
timestamp: timestamp,
company_info:
{
    name: string,
    employees_count: int
}

Reading data from a source in Spark Structured Streaming

In Spark Structured Streaming, we use a streaming DataFrame in Python that directly reads the data in JSON format:

spark = ...  # spark session

# specify schema
stock_ticker_schema = ...

# Create a streaming DataFrame
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "mybroker1:port") \
    .option("topic", "stock_ticker") \
    .load()
    .select(from_json(col("value"), stock_ticker_schema).alias("ticker_data")) \
    .select(col("ticker_data.*"))

Note that we have to supply a schema object that captures our stock ticker schema (stock_ticker_schema). Compare this to the approach for Flink in the next section.

Reading data from a source using Flink Table API

For Flink, we use the SQL DDL statement CREATE TABLE. You can specify the schema of the stream just like you would any SQL table. The WITH clause allows us to specify the connector to the data stream (Kafka in this case), the associated properties for the connector, and data format specifications. See the following code:

# Create table using DDL

CREATE TABLE stock_ticker (
  symbol string,
  price INT,
  timestamp TIMESTAMP(3),
  company_info STRING,
  WATERMARK FOR timestamp AS timestamp - INTERVAL '3' MINUTE
) WITH (
 'connector' = 'kafka',
 'topic' = 'stock_ticker',
 'properties.bootstrap.servers' = 'mybroker1:port',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)

JSON flattening

JSON flattening is the process of converting a nested or hierarchical JSON object into a flat, single-level structure. This converts multiple levels of nesting into an object where all the keys and values are at the same level. Keys are combined using a delimiter such as a period (.) or underscore (_) to denote the original hierarchy. JSON flattening is useful when you need to work with a more simplified format. In both Spark and Flink, nested JSONs can be complicated to work with and may need additional processing or user-defined functions to manipulate. Flattened JSONs can simplify processing and improve performance due to reduced computational overhead, especially with operations like complex joins, aggregations, and windowing. In addition, flattened JSONs can help in easier debugging and troubleshooting data processing pipelines because there are fewer levels of nesting to navigate.

JSON flattening in Spark Structured Streaming

JSON flattening in Spark Structured Streaming requires you to use the select method and specify the schema that you need flattened. JSON flattening in Spark Structured Streaming involves specifying the nested field name that you’d like surfaced to the top-level list of fields. In the following example, company_info is a nested field and within company_info, there’s a field called company_name. With the following query, we’re flattening company_info.name to company_name:

stock_ticker_df = ...  # Streaming DataFrame w/ schema shown above

stock_ticker_df.select("symbol", "timestamp", "price", "company_info.name" as "company_name")

JSON flattening in Flink

In Flink SQL, you can use the JSON_VALUE function. Note that you can use this function only in Flink versions equal to or greater than 1.14. See the following code:

SELECT
   symbol,
   timestamp,
   price,
   JSON_VALUE(company_info, 'lax $.name' DEFAULT NULL ON EMPTY) AS company_name
FROM
   stock_ticker

The term lax in the preceding query has to do with JSON path expression handling in Flink SQL. For more information, refer to System (Built-in) Functions.

Data processing

Now that you have read the data, we can look at a few common data processing patterns.

Deduplication

Data deduplication in stream processing is crucial for maintaining data quality and ensuring consistency. It enhances efficiency by reducing the strain on the processing from duplicate data and helps with cost savings on storage and processing.

Spark Streaming deduplication query

The following code snippet is related to a Spark Streaming DataFrame named stock_ticker. The code performs an operation to drop duplicate rows based on the symbol column. The dropDuplicates method is used to eliminate duplicate rows in a DataFrame based on one or more columns.

stock_ticker = ...  # Streaming DataFrame w/ schema shown above

stock_ticker.dropDuplicates("symbol")

Flink deduplication query

The following code shows the Flink SQL equivalent to deduplicate data based on the symbol column. The query retrieves the first row for each distinct value in the symbol column from the stock_ticker stream, based on the ascending order of proctime:

SELECT symbol, timestamp, price
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY proctime ASC) AS row_num
  FROM stock_ticker)
WHERE row_num = 1

Windowing

Windowing in streaming data is a fundamental construct to process data within specifications. Windows commonly have time bounds, number of records, or other criteria. These time bounds bucketize continuous unbounded data streams into manageable chunks called windows for processing. Windows help in analyzing data and gaining insights in real time while maintaining processing efficiency. Analyses or operations are performed on constantly updating streaming data within a window.

There are two common time-based windows used both in Spark Streaming and Flink that we will detail in this post: tumbling and sliding windows. A tumbling window is a time-based window that is a fixed size and doesn’t have any overlapping intervals. A sliding window is a time-based window that is a fixed size and moves forward in fixed intervals that can be overlapping.

Spark Streaming tumbling window query

The following is a Spark Streaming tumbling window query with a window size of 10 minutes:

stock_ticker = ...  # Streaming DataFrame w/ schema shown above

# Get max stock price in tumbling window
# of size 10 minutes
visitsByWindowAndUser = visits
   .withWatermark("timestamp", "3 minutes")
   .groupBy(
      window(stock_ticker.timestamp, "10 minutes"),
      stock_ticker.symbol)
   .max(stock_ticker.price)

Flink Streaming tumbling window query

The following is an equivalent tumbling window query in Flink with a window size of 10 minutes:

SELECT symbol, MAX(price)
  FROM TABLE(
    TUMBLE(TABLE stock_ticker, DESCRIPTOR(timestamp), INTERVAL '10' MINUTES))
  GROUP BY ticker;

Spark Streaming sliding window query

The following is a Spark Streaming sliding window query with a window size of 10 minutes and slide interval of 5 minutes:

stock_ticker = ...  # Streaming DataFrame w/ schema shown above

# Get max stock price in sliding window
# of size 10 minutes and slide interval of size
# 5 minutes

visitsByWindowAndUser = visits
   .withWatermark("timestamp", "3 minutes")
   .groupBy(
      window(stock_ticker.timestamp, "10 minutes", "5 minutes"),
      stock_ticker.symbol)
   .max(stock_ticker.price)

Flink Streaming sliding window query

The following is a Flink sliding window query with a window size of 10 minutes and slide interval of 5 minutes:

SELECT symbol, MAX(price)
  FROM TABLE(
    HOP(TABLE stock_ticker, DESCRIPTOR(timestamp), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY ticker;

Handling late data

Both Spark Structured Streaming and Flink support event time processing, where a field within the payload can be used for defining time windows as distinct from the wall clock time of the machines doing the processing. Both Flink and Spark use watermarking for this purpose.

Watermarking is used in stream processing engines to handle delays. A watermark is like a timer that sets how long the system can wait for late events. If an event arrives and is within the set time (watermark), the system will use it to update a request. If it’s later than the watermark, the system will ignore it.

In the preceding windowing queries, you specify the lateness threshold in Spark using the following code:

.withWatermark("timestamp", "3 minutes")

This means that any records that are 3 minutes late as tracked by the event time clock will be discarded.

In contrast, with the Flink Table API, you can specify an analogous lateness threshold directly in the DDL:

WATERMARK FOR timestamp AS timestamp - INTERVAL '3' MINUTE

Note that Flink provides additional constructs for specifying lateness across its various APIs.

Data enrichment

In this section, we compare data enrichment methods with Spark and Flink.

Calling an external API

Calling external APIs from user-defined functions (UDFs) is similar in Spark and Flink. Note that your UDF will be called for every record processed, which can result in the API getting called at a very high request rate. In addition, in production scenarios, your UDF code often gets run in parallel across multiple nodes, further amplifying the request rate.

For the following code snippets, let’s assume that the external API call entails calling the function:

response = my_external_api(request)

External API call in Spark UDF

The following code uses Spark:

class Predict(ScalarFunction):
def open(self, function_context):

with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)

def eval(self, x):
return self.model.predict(x)

External API call in Flink UDF

For Flink, assume we define the UDF callExternalAPIUDF, which takes as input the ticker symbol symbol and returns enriched information about the symbol via a REST endpoint. We can then register and call the UDF as follows:

callExternalAPIUDF = udf(callExternalAPIUDF(), result_type=DataTypes.STRING())

SELECT
    symbol, 
    callExternalAPIUDF(symbol) as enriched_symbol
FROM stock_ticker;

Flink UDFs provide an initialization method that gets run one time (as opposed to one time per record processed).

Note that you should use UDFs judiciously as an improperly implemented UDF can cause your job to slow down, cause backpressure, and eventually stall your stream processing application. It’s advisable to use UDFs asynchronously to maintain high throughput, especially for I/O-bound use cases or when dealing with external resources like databases or REST APIs. To learn more about how you can use asynchronous I/O with Apache Flink, refer to Enrich your data stream asynchronously using Amazon Kinesis Data Analytics for Apache Flink.

Conclusion

Apache Flink and Apache Spark are both rapidly evolving projects and provide a fast and efficient way to process big data. This post focused on the top use cases we commonly encountered when customers wanted to see parallels between the two technologies for building real-time stream processing applications. We’ve included samples that were most frequently requested at the time of this writing. Let us know if you’d like more examples in the comments section.


About the author

Deepthi Mohan is a Principal Product Manager on the Amazon Kinesis Data Analytics team.

Karthi Thyagarajan was a Principal Solutions Architect on the Amazon Kinesis team.