This article describes how to handle Slowly Changing Dimensions (SCD) in a data warehouse which uses Hive as a database.
Before reading on, you might want to refresh your knowledge of Slowly Changing Dimensions (SCD).
Let's imagine, we have a simple table in Hive:
CREATE TABLE dim_user ( login VARCHAR(255), -- natural key premium_user BOOLEAN, -- SCD Type 2 address VARCHAR(255), -- SCD Type 2 phone VARCHAR(255), -- SCD Type 2, may be NULL name VARCHAR(255), -- SCD Type 1 surname VARCHAR(255), -- SCD Type 1 year_of_birth INT -- SCD Type 1, may be NULL ) STORED AS PARQUET;
Handling SCD Type 1 and SCD Type 2 may be trivial or at least well known in other databases, but in Hive you may face several challenges. The most important are the following ones:
- There is no auto-increment functionality out of the box.
- Most storage engines (for example, Parquet) don't support UPDATE.
- Even if you use a storage engine that supports UPDATE (ORC), there is still no UPDATE... JOIN statement.
- Anyway, UPDATE in ORC is too slow (update of each individual record requires its own MapReduce job).
- There are only row level transactions (no BEGIN, COMMIT or ROLLB ACK statements).
Let's see how we can workaround all of them.
Suppose that "dim_user_production" is our existing table with current data. Its final schema (with surrogate keys and auxiliary fields) looks as follows:
CREATE TABLE dim_user_production ( dim_user_id INT, -- surrogate key login VARCHAR(255), -- natural key premium_user BOOLEAN, -- SCD Type 2 address VARCHAR(255), -- SCD Type 2 phone VARCHAR(255), -- SCD Type 2, may be NULL name VARCHAR(255), -- SCD Type 1 surname VARCHAR(255), -- SCD Type 1 year_of_birth INT, -- SCD Type 1, may be NULL scd_version INT, -- historical version of the record (1 is the oldest) scd_start_date TIMESTAMP, -- start date and time scd_end_date TIMESTAMP, -- end date and time (9999-12-31 23:59:59 by default) scd_active BOOLEAN, -- whether it's the latest version or not ) STORED AS PARQUET;
"dim_user_staging" is the table with new data to be processed. Its schema doesn't have surrogate keys or auxiliary fields and is identical to "dim_user" schema above.
- Create a new table by copying the schema of the production table:
DROP TABLE IF EXISTS dim_user_new; CREATE TABLE dim_user_new STORED AS PARQUET AS SELECT * FROM dim_user_production LIMIT 0;
- Copy all the records from the production table that don't exist in the staging table:
INSERT INTO TABLE dim_user_new SELECT p.* FROM dim_user_production p LEFT JOIN dim_user_staging s ON p.login = s.login WHERE s.login IS NULL;
- Copy all inactive (historical) records from the production table (apply SCD Type 1 changes if needed):
INSERT INTO TABLE dim_user_new SELECT p .dim_user_id, p.login, p.premium_user, p.address,,, s.surname, s.year_of_birth, p.scd_version, p.scd_start_date, p.scd_end_date, p.scd_active FROM dim_user_production p JOIN dim_user_staging s ON p.login = s.login AND p.scd_active = false;
- Copy all the active records from the production table which don't have SCD Type 2 changes (apply SCD Type 1 changes if needed):
INSERT INTO TABLE dim_user_new SELECT p.dim_user_id, p.login, p.premium_user, p.address,,, s.surname, s.year_of_birth, p.scd_version, p.scd_start_date, p.scd_end_date, p.scd_active FROM dim_user_production p JOIN dim_user_staging s ON p.login = s.login AND p.scd_active = true WHERE p.premium_user = s.premium_user AND p.address = s.address AND COALESCE(, '') = COALESCE(, '');
- Insert new inactive (historical) versions of records from the production table which have SCD Type 2 changes (apply SCD Type 1 changes if needed):
INSERT INTO TABLE dim_user_new SELECT p.dim_user_id, p.login, p.premium_user, p.address,,, s.surname, s.year_of_birth, p.scd_version, p.scd_start_date, '2016-10-01 00:00:00', -- current timestamp for scd_end_date false -- false for scd_active FROM dim_user_production p JOIN dim_user_staging s ON p.login = s.login AND p.scd_active = true WHERE p.premium_user != s.premium_user OR p.address != s.address OR COALESCE(, '') != COALESCE(, '');
- Insert new active versions of records from the production table which have SCD Type 2 changes (apply SCD Type 1 changes if needed):
INSERT INTO TABLE dim_user_new SELECT + COALESCE(m.max_id, 0), -- new id for dim_user_id n.login, n.premium_user, n.address,,, n.surname, n.year_of_birth, n.scd_version, '2016-10-01 00:00:00', -- current timestamp for scd_start_date '9999-12-31 23:59:59', -- default timestamp for scd_end_date true -- true for scd_active FROM ( SELECT row_number() OVER () AS id, p.login, s.premium_user, s.address,,, s.surname, s.year_of_birth, p.scd_version + 1 AS scd_version FROM dim_user_production p JOIN dim_user_staging s ON p.login = s.login AND p.scd_active = true WHERE p.premium_user != s.premium_user OR p.address != s.address OR COALESCE(, '') != COALESCE(, '') ) n, ( SELECT MAX(dim_user_id) AS max_id FROM dim_user_new ) m;
- Copy all the records from the staging table which don't exist in the production table:
INSERT INTO TABLE dim_user_new SELECT + COALESCE(m.max_id, 0), -- new id for dim_user_id n.login, n.premium_user, n.address,,, n.surname, n.year_of_birth, 1, -- 1 for scd_version '2016-10-01 00:00:00', -- current timestamp for scd_start_date '9999-12-31 23:59:59', -- default timestamp for scd_end_date true -- true for scd_active FROM ( SELECT row_number() OVER () AS id, s.login, s.premium_user, s.address,,, s.surname, s.year_of_birth FROM dim_user_staging s LEFT JOIN dim_user_production p ON s.login = p.login WHERE p.login IS NULL ) n, ( SELECT MAX(dim_user_id) AS max_id FROM dim_user_new ) m;
- Replace the content of the production table in a transactional mann er:
INSERT OVERWRITE TABLE dim_user_production SELECT * FROM dim_user_new;
Please take into account the way we handled fields of SCD Type 2 that may have NULL values (we don't need to compare fields of SCD Type 1):
COALESCE(, '') = COALESCE(, '') COALESCE(, '') != COALESCE(, '')
Alternatively, you can use <=> operator (Hive 0.9.0 and higher): <=> NOT ( <=>
That's it. No magic here 😊