Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, easy, and secure analytics at scale. Tens of thousands of customers rely on Amazon Redshift to analyze exabytes of data and run complex analytical queries, making it the widely used cloud data warehouse. You can run and scale analytics in seconds on all your data without having to manage your data warehouse infrastructure.
You can use the Amazon Redshift streaming ingestion capability to update your analytics databases in near-real time. Amazon Redshift streaming ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use SQL (Structured Query Language) to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams, and pull data directly to Amazon Redshift.
In this post, we discuss a solution that uses Amazon Redshift streaming ingestion to provide near-real-time analytics.
Overview of solution
We walk through an example pipeline to ingest data from an Amazon DynamoDB source table in near-real time using Kinesis Data Streams in combination with Amazon Redshift streaming ingestion. We also walk through using PartiQL in Amazon Redshift to unnest nested JSON documents and build fact and dimension tables that are used in your data warehouse refresh. The solution uses Kinesis Data Streams to capture item-level changes from an application DynamoDB table.
As shown in the following reference architecture, DynamoDB table data changes are streamed into Amazon Redshift through Kinesis Data Streams and Amazon Redshift streaming ingestion for near-real-time analytics dashboard visualization using Amazon QuickSight.
The process flow includes the following steps:
- Create a Kinesis data stream and turn on the data stream from DynamoDB to capture item-level changes in your DynamoDB table.
- Create a streaming materialized view in your Amazon Redshift cluster to consume live streaming data from the data stream.
- The streaming data gets ingested into a JSON payload. Use a combination of a PartiQL statement and dot notation to unnest the JSON document into data columns of a staging table in Amazon Redshift.
- Create fact and dimension tables in the Amazon Redshift cluster and keep loading the latest data at regular intervals from the staging table using transformation logic.
- Establish connectivity between a QuickSight dashboard and Amazon Redshift to deliver visualization and insights.
You must have the following:
Set up a Kinesis data stream
To configure your Kinesis data stream, complete the following steps:
- Create a Kinesis data stream called demo-data-stream. For instructions, refer to Step 1 in Set up streaming ETL pipelines.
Configure the stream to capture changes from the DynamoDB table.
- On the DynamoDB console, choose Tables in the navigation pane.
- Open your table.
- On the Exports and streams tab, choose Turn on under Amazon Kinesis data stream details.
- For Destination Kinesis data stream, choose demo-data-stream.
- Choose Turn on stream.
Item-level changes in the DynamoDB table should now be flowing to the Kinesis data stream.
- To verify if the data is entering the stream, on the Kinesis Data Streams console, open demo-data-stream.
- On the Monitoring tab, find the PutRecord success – average (Percent) and PutRecord – sum (Bytes) metrics to validate record ingestion.
Set up streaming ingestion
To set up streaming ingestion, complete the following steps:
- Set up the AWS Identity and Access Management (IAM) role and trust policy required for streaming ingestion. For instructions, refer to Steps 1 and 2 in Getting started with streaming ingestion from Amazon Kinesis Data Streams.
- Launch the Query Editor v2 from the Amazon Redshift console or use your preferred SQL client to connect to your Amazon Redshift cluster for the next steps.
- Create an external schema:
- To use case-sensitive identifiers, set
enable_case_sensitive_identifierto true at either the session or cluster level.
- Create a materialized view to consume the stream data and store stream records in semi-structured SUPER format:
- Refresh the view, which triggers Amazon Redshift to read from the stream and load data into the materialized view:
You can also set your streaming materialized view to use auto refresh capabilities. This will automatically refresh your materialized view as data arrives in the stream. See CREATE MATERIALIZED VIEW for instructions on how to create a materialized view with auto refresh.
Unnest the JSON document
The following is a sample of a JSON document that was ingested from the Kinesis data stream to the payload column of the streaming materialized view
We can use dot notation to unnest the JSON document. But in addition to that, we should use a PartiQL statement to handle arrays if applicable. For example, in the preceding JSON document, there is an array under the element:
The following SQL query uses a combination of dot notation and a PartiQL statement to unnest the JSON document:
The query unnests the JSON document to the following result set.
Precompute the result set using a materialized view
Optionally, to precompute and store the unnested result set from the preceding query, you can create a materialized view and schedule it to refresh at regular intervals. In this post, we maintain the preceding unnested data in a materialized view called
mv_demo_super_unnest, which will be refreshed at regular intervals and used for further processing.
To capture the latest data from the DynamoDB table, the Amazon Redshift streaming materialized view needs to be refreshed at regular intervals, and then the incremental data should be transformed and loaded into the final fact and dimension table. To avoid reprocessing the same data, a metadata table can be maintained at Amazon Redshift to keep track of each ELT process with status, start time, and end time, as explained in the following section.
Maintain an audit table in Amazon Redshift
The following is a sample DDL of a metadata table that is maintained for each process or job:
create table MetaData_ETL ( JobName varchar(100), StartDate timestamp, EndDate timestamp, Status varchar(50) );
The following is a sample initial entry of the metadata audit table that can be maintained at job level. The insert statement is the initial entry for the ELT process to load the
insert into MetaData_ETL values ('Customer_Transaction_Fact_Load', current_timestamp, current_timestamp,'Ready' );
Build a fact table with the latest data
In this post, we demonstrate the loading of a fact table using specific transformation logic. We are skipping the dimension table load, which uses similar logic.
As a prerequisite, create the fact and dimension tables in a preferred schema. In following example, we create the fact table
Customer_Transaction_Fact in Amazon Redshift:
Transform data using a stored procedure
We load this fact table from the unnested data using a stored procedure. For more information, refer to Creating stored procedures in Amazon Redshift.
Note that in this sample use case, we are using transformation logic to identify and load the latest value of each column for a customer transaction.
The stored procedure contains the following components:
- In the first step of the stored procedure, the job entry in the
MetaData_ETLtable is updated to change the status to
StartDateas the current timestamp, which indicates that the fact load process is starting.
- Refresh the materialized view
mv_demo_super_unnest, which contains the unnested data.
- In the following example, we load the fact table
Customer_Transaction_Factusing the latest data from the streaming materialized view based on the column
approximate_arrival_timestamp, which is available as a system column in the streaming materialized view. The value of
approximate_arrival_timestampis set when a Kinesis data stream successfully receives and stores a record.
- The following logic in the stored procedure checks if the
mv_demo_super_unnestis greater than the
EndDatetimestamp in the
MetaData_ETLaudit table, so that it can only process the incremental data.
- Additionally, while loading the fact table, we identify the latest non-null value of each column for every
Transaction_IDdepending on the order of the
approximate_arrival_timestampcolumn using the rank and min
- The transformed data is loaded into the intermediate staging table
- The impacted records with the same
Transaction_IDvalues are deleted and reloaded into the
Customer_Transaction_Facttable from the staging table
- In the last step of the stored procedure, the job entry in the
MetaData_ETLtable is updated to change the status to
EndDateas the current timestamp, which indicates that the fact load process has completed successfully.
See the following code:
Additional considerations for implementation
There are several additional capabilities that you could utilize to modify this solution to meet your needs. Many customers utilize multiple AWS accounts, and it’s common that the Kinesis data stream may be in a different AWS account than the Amazon Redshift data warehouse. If this is the case, you can utilize an Amazon Redshift IAM role that assumes a role in the Kinesis data stream AWS account in order to read from the data stream. For more information, refer to Cross-account streaming ingestion for Amazon Redshift.
Another common use case is that you need to schedule the refresh of your Amazon Redshift data warehouse jobs so that the data warehouse’s data is continuously updated. To do this, you can utilize Amazon EventBridge to schedule the jobs in your data warehouse to run on a regular basis. For more information, refer to Creating an Amazon EventBridge rule that runs on a schedule. Another option is to use Amazon Redshift Query Editor v2 to schedule the refresh. For details, refer to Scheduling a query with query editor v2.
If you have a requirement to load data from a DynamoDB table with existing data, refer to Loading data from DynamoDB into Amazon Redshift.
For more information on Amazon Redshift streaming ingestion capabilities, refer to Real-time analytics with Amazon Redshift streaming ingestion.
To avoid unnecessary charges, clean up any resources that you built as part of this architecture that are no longer in use. This includes dropping the materialized view, stored procedure, external schema, and tables created as part of this post. Additionally, make sure you delete the DynamoDB table and delete the Kinesis data stream.
After following the solution in this post, you’re now able to build near-real-time analytics using Amazon Redshift streaming ingestion. We showed how you can ingest data from a DynamoDB source table using a Kinesis data stream in order to refresh your Amazon Redshift data warehouse. With the capabilities presented in this post, you should be able to increase the refresh rate of your Amazon Redshift data warehouse in order to provide the most up-to-date data in your data warehouse for your use case.
About the authors
Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.
Matt Nispel is an Enterprise Solutions Architect at AWS. He has more than 10 years of experience building cloud architectures for large enterprise companies. At AWS, Matt helps customers rearchitect their applications to take full advantage of the cloud. Matt lives in Minneapolis, Minnesota, and in his free time enjoys spending time with friends and family.
Dan Dressel is a Senior Analytics Specialist Solutions Architect at AWS. He is passionate about databases, analytics, machine learning, and architecting solutions. In his spare time, he enjoys spending time with family, nature walking, and playing foosball.