Big Data

Open Data Lakehouse powered by Iceberg for all your Data Warehouse needs


Since we announced the general availability of Apache Iceberg in Cloudera Data Platform (CDP), we are excited to see customers testing their analytic workloads on Iceberg. We are also receiving several requests to share more details on how key data services in CDP, such as Cloudera Data Warehousing (CDW), Cloudera Data Engineering (CDE), Cloudera Machine Learning (CML), Cloudera Data Flow (CDF) and Cloudera Stream Processing (CSP) integrate with the Apache Iceberg table format and the easiest way to get started.  In this blog, we will share with you in detail how Cloudera integrates core compute engines including Apache Hive and Apache Impala in Cloudera Data Warehouse with Iceberg. We will publish follow up blogs for other data services.

Iceberg basics

Iceberg is an open table format designed for large analytic workloads. As described in Iceberg Introduction it supports schema evolution, hidden partitioning, partition layout evolution and time travel. Every table change creates an Iceberg snapshot, this helps to resolve concurrency issues and allows readers to scan a stable table state every time.

The Apache Iceberg project also develops an implementation of the specification in the form of a Java library. This library is integrated by execution engines such as Impala, Hive and Spark. The new feature this blog post is aiming to discuss about Iceberg V2 format (version 2), as the Iceberg table specification explains, the V1 format aimed to support large analytic data tables, while V2 aimed to add row level deletes and updates.

In a bit more detail, Iceberg V1 added support for creating, updating, deleting and inserting data into tables. The table metadata is stored next to the data files under a metadata directory, which allows multiple engines to use the same table simultaneously.

Iceberg V2

With Iceberg V2 it is possible to do row-level modifications without rewriting the data files. The idea is to store information about the deleted records in so-called delete files. We chose to use position delete files which provide the best performance for queries. These files store the file paths and positions of the deleted records. During queries the query engines scan both the data files and delete files belonging to the same snapshot and merge them together (i.e. eliminating the deleted rows from the output).

Updating row values is achievable by doing a DELETE plus an INSERT operation in a single transaction.

Compacting the tables merges the changes/deletes with the actual data files to improve performance of reads. To compact the tables use CDE Spark.

By default, Hive and Impala still create Iceberg V1 tables. To create a V2 table, users need to set table property ‘format-version’ to ‘2’. Existing Iceberg V1 tables can be upgraded to V2 tables by simply setting table property ‘format-version’ to ‘2’. Hive and Impala are compatible with both Iceberg format versions, i.e. users can still use their old V1 tables; V2 tables simply have more features.

Use cases

Complying with specific aspects of regulations such as GDPR (General Data Protection Regulation) and CCPA (California Consumer Privacy Act) means that databases need to be able to delete personal data upon customer requests. With delete files we can easily mark the records belonging to specific people. Then regular compaction jobs can physically erase the deleted records.

Another trivial use case is when existing records need to be modified to correct wrong data or update outdated values.

How to Update and Delete 

Currently only Hive can do row level modifications. Impala can read the updated tables and it can also INSERT data into Iceberg V2 tables.

To remove all data belonging to a single customer:

DELETE FROM ice_tbl WHERE user_id = 1234;

To update a column value in a specific record:

UPDATE ice_tbl SET col_v = col_v + 1 WHERE id = 4321;

Use the MERGE INTO statement to update an Iceberg table based on a staging table:

MERGE INTO customer USING (SELECT * FROM new_customer_stage) sub ON sub.id = customer.id 
WHEN MATCHED THEN UPDATE SET name = sub.name, state = sub.new_state 
WHEN NOT MATCHED THEN INSERT VALUES (sub.id, sub.name, sub.state);

When not to use Iceberg

Iceberg tables feature atomic DELETE and UPDATE operations, making them similar to traditional RDBMS systems. However, it’s important to note that they are not suitable for OLTP workloads as they are not designed to handle high frequency transactions. Instead, Iceberg is intended for managing large, infrequently changing datasets.

If one is looking for a solution that can handle very large datasets and frequent updates, we recommend using Apache Kudu.

CDW basics

Cloudera Data Warehouse (CDW) Data Service is a Kubernetes-based application for creating highly performant, independent, self-service data warehouses in the cloud that can be scaled dynamically and upgraded independently.  CDW  supports streamlined application development with open standards, open file and table formats, and standard APIs. CDW leverages Apache Iceberg, Apache Impala, and Apache Hive to provide broad coverage, enabling the best-optimized set of capabilities for each workload. 

CDW separates the compute (Virtual Warehouses) and metadata (DB catalogs) by running them in independent Kubernetes pods. Compute in the form of Hive LLAP or Impala Virtual Warehouses can be provisioned on-demand, auto-scaled based on query load, and de-provisioned when idle thus reducing cloud costs and providing consistent quick results with high concurrency, HA, and query isolation. Thus simplifying data exploration, ETL and deriving analytical insights on any enterprise data across the Data Lake.

CDW also simplifies administration by making multi-tenancy secure and manageable. It allows us to independently upgrade the Virtual Warehouses and Database Catalogs. Through tenant isolation, CDW can process workloads that do not interfere with each other, so everyone meets report timelines while controlling cloud costs.

How to use

In the following sections we are going to provide a few examples of how to create Iceberg V2 tables and how to interact with them. We’ll see how one can insert data, change the schema or the partition layout, how to remove/update rows, do time-travel and snapshot management.

Hive:

Creating a Iceberg V2 Table

A Hive Iceberg V2 table can be created by specifying the format-version as 2 in the table properties.

Ex.

CREATE EXTERNAL TABLE TBL_ICEBERG_PART(ID INT, NAME STRING) PARTITIONED BY (DEPT STRING) STORED BY ICEBERG STORED AS PARQUET TBLPROPERTIES ('FORMAT-VERSION'='2');
  • CREATE TABLE AS SELECT (CTAS)
CREATE EXTERNAL TABLE CTAS_ICEBERG_SOURCE STORED BY ICEBERG AS SELECT * FROM TBL_ICEBERG_PART;
CREATE EXTERNAL TABLE ICEBERG_CTLT_TARGET LIKE ICEBERG_CTLT_SOURCE STORED BY ICEBERG;

Ingesting Data

Data into an Iceberg V2 table can be inserted similarly like normal Hive tables

Ex:

INSERT INTO TABLE TBL_ICEBERG_PART  VALUES (1,'ONE','MATH'), (2, 'ONE','PHYSICS'), (3,'ONE','CHEMISTRY'), (4,'TWO','MATH'), (5, 'TWO','PHYSICS'), (6,'TWO','CHEMISTRY');
INSERT OVERWRITE TABLE CTLT_ICEBERG_SOURCE SELECT * FROM TBL_ICEBERG_PART;
MERGE INTO TBL_ICEBERG_PART  USING TBL_ICEBERG_PART_2 ON TBL_ICEBERG_PART.ID = TBL_ICEBERG_PART_2.ID

WHEN NOT MATCHED THEN INSERT VALUES (TBL_ICEBERG_PART_2.ID, TBL_ICEBERG_PART_2.NAME, TBL_ICEBERG_PART_2.DEPT);

Delete & Updates:

V2 tables allow row level deletes and updates similarly like the Hive-ACID tables.

Ex:

DELETE FROM TBL_ICEBERG_PART WHERE  DEPT = 'MATH';
UPDATE TBL_ICEBERG_PART SET DEPT='BIOLOGY' WHERE DEPT = 'PHYSICS' OR ID = 6;

Querying Iceberg tables:

Hive supports both vectorized and non vectorized reads for Iceberg V2 tables, Vectorization can be enabled normally using the following configs: 

  1. set hive.llap.io.memory.mode=cache;
  2. set hive.llap.io.enabled=true;
  3. set hive.vectorized.execution.enabled=true
SELECT COUNT(*) FROM TBL_ICEBERG_PART;

Hive allows us to query table data for specific snapshot versions.

SELECT * FROM  TBL_ICEBERG_PART FOR SYSTEM_VERSION AS OF 7521248990126549311;

Snapshot Management

Hive allows several operations regarding snapshot management, like:

ALTER TABLE TBL_ICEBERG_PART EXECUTE EXPIRE_SNAPSHOTS('2021-12-09 05:39:18.689000000');
ALTER TABLE TBL_ICEBERG_PART EXECUTE SET_CURRENT_SNAPSHOT   (7521248990126549311);
ALTER TABLE TBL_ICEBERG_PART EXECUTE ROLLBACK(3088747670581784990);

Alter Iceberg tables

ALTER TABLE … ADD COLUMNS (...); (Add a column)

ALTER TABLE … REPLACE COLUMNS (...);(Drop column by using REPLACE COLUMN to remove the old column)

ALTER TABLE … CHANGE COLUMN … AFTER …; (Reorder columns)
ALTER TABLE TBL_ICEBERG_PART SET PARTITION SPEC (NAME);

Materialized Views

  • Creating Materialized Views:
CREATE MATERIALIZED VIEW MAT_ICEBERG AS SELECT ID, NAME FROM TBL_ICEBERG_PART ;
ALTER MATERIALIZED VIEW MAT_ICEBERG REBUILD;
  • Querying Materialized Views:
SELECT * FROM MAT_ICEBERG;

Impala

Apache Impala is an open source, distributed, massively parallel SQL query engine with its backend executors written in C++, and its frontend (analyzer, planner) written in java. Impala uses the Iceberg Java library to get information about Iceberg tables during query analysis and planning. On the other hand, for query execution the high performing C++ executors are in charge. This means queries on Iceberg tables are lightning fast.

Impala supports the following statements on Iceberg tables.

Creating Iceberg tables

CREATE TABLE ice_t(id INT, name STRING, dept STRING)
PARTITIONED BY SPEC (bucket(19, id), dept)
STORED BY ICEBERG
TBLPROPERTIES ('format-version'='2');
  • CREATE TABLE AS SELECT (CTAS):
CREATE TABLE ice_ctas

PARTITIONED BY SPEC (truncate(1000, id))
STORED BY ICEBERG
TBLPROPERTIES ('format-version'='2')
AS SELECT id, int_col, string_col FROM source_table;
  • CREATE TABLE LIKE:
    (creates an empty table based on another table)
CREATE TABLE new_ice_tbl LIKE orig_ice_tbl;

Querying Iceberg tables

Impala supports reading V2 tables with position deletes.

Impala supports all kinds of queries on Iceberg tables that it supports for any other tables. E.g. joins, aggregations, analytical queries etc. are all supported.

SELECT * FROM ice_t;

SELECT count(*) FROM ice_t i LEFT OUTER JOIN other_t b
ON (i.id = other_t.fid)
WHERE i.col = 42;

It’s possible to query earlier snapshots of a table (until they are expired).

SELECT * FROM ice_t FOR SYSTEM_TIME AS OF '2022-01-04 10:00:00';

SELECT * FROM ice_t FOR SYSTEM_TIME AS OF now() - interval 5 days;

SELECT * FROM ice_t FOR SYSTEM_VERSION AS OF 123456;

We can use DESCRIBE HISTORY statement to see what are the earlier snapshots of a table:

DESCRIBE HISTORY ice_t FROM '2022-01-04 10:00:00';

DESCRIBE HISTORY ice_t FROM now() - interval 5 days;

DESCRIBE HISTORY ice_t BETWEEN '2022-01-04 10:00:00' AND '2022-01-05 10:00:00';

Insert data into Iceberg tables

INSERT statements work for both V1 and V2 tables.

INSERT INTO ice_t VALUES (1, 2);

INSERT INTO ice_t SELECT col_a, col_b FROM other_t;
INSERT OVERWRITE ice_t VALUES (1, 2);

INSERT OVERWRITE ice_t SELECT col_a, col_b FROM other_t;

Load data into Iceberg tables

LOAD DATA INPATH '/tmp/some_db/parquet_files/'

INTO TABLE iceberg_tbl;

Alter Iceberg tables

ALTER TABLE ... RENAME TO ... (renames the table)

ALTER TABLE ... CHANGE COLUMN ... (change name and type of a column)

ALTER TABLE ... ADD COLUMNS ... (adds columns to the end of the table)

ALTER TABLE ... DROP COLUMN ...
ALTER TABLE ice_p
SET PARTITION SPEC (VOID(i), VOID(d), TRUNCATE(3, s), HOUR(t), i);

Snapshot management

ALTER TABLE ice_tbl EXECUTE expire_snapshots('2022-01-04 10:00:00');

ALTER TABLE ice_tbl EXECUTE expire_snapshots(now() - interval 5 days);

DELETE and UPDATE statements for Impala are coming in later releases. As mentioned above, Impala is using its own C++ implementation to deal with Iceberg tables. This gives significant performance advantages compared to other engines.

Future Work

Our support for Iceberg v2 is advanced and reliable, and we continue our push for innovation. We are rapidly developing improvements, so you can expect to find new features related to Iceberg in each CDW release.  Please let us know your feedback in the comments section below.

Summary

Iceberg is an emerging, extremely interesting table format. It is under rapid development with new features coming every month. Cloudera Data Warehouse added support for the most recent format version of Iceberg in its latest release. Users can run Hive and Impala virtual warehouses and interact with their Iceberg tables via SQL statements. These engines are also evolving quickly and we deliver new features and optimizations in every release. Stay tuned, you can expect more blog posts from us about upcoming features and technical deep dives.

To learn more:

  • Replay our webinar Unifying Your Data: AI and Analytics on One Lakehouse, where we discuss the benefits of Iceberg and open data lakehouse.
  • Read why the future of data lakehouses is open.
  • Replay our meetup Apache Iceberg: Looking Below the Waterline.

Try Cloudera Data Warehouse (CDW) by signing up for a 60 day trial, or test drive CDP. If you are interested in chatting about Apache Iceberg in CDP, let your account team know or contact us directly. As always, please provide your feedback in the comments section below.