How to implement Slowly Changing Dimensions (SCD) in Talend (part 2/2)


In the previous blog we talked about the different types of Slowly Changing Dimensions. We went into more detail on SCD type 2. In this blog we will show you how to implement SCD in Talend.  First, we will talk about the built-in SCD component and why you probably shouldn’t use it. Next, we will provide a tutorial on a custom solution for SCD type 2 in Talend.

Built-in SCD component

Talend has a built-in component to keep track of SCD: tDBSCD. This component can implement type 1, type 2 and type 3 SCD. However, it has one major downside: its performance. Figure 1 shows a comparison of the execution time of a Talend job where all source data was tracked using SCD type 2; once using the built-in tDBSCD component (red) and once using the custom workflow proposed in this blog (blue). Each execution time on the graph is an average of three runs with an average of 30% inserts and 70% updates.

scd_method_comparison

These results indicate a significant increase in execution time when using the tDBSCD component starting from 1000 rows. By comparison, there is only a very slight increase in execution time with the custom workflow. This indicates that the custom workflow is much more scalable than the tDBSCD component. Although keep in mind that it is not infinitely scalable. In our experience, the custom workflow can handle around 1 million records without making adjustments (depending on your job server’s memory settings, of course). If you want to scale for more records, you will likely run into out-of-memory issues. These can be handled with tMap tricks like storing temporary data or reloading at each row to a certain degree. But, for truly big datasets (10 million records and onwards) these will also not help. In these cases it can be useful to limit the input to records created within the last week or so.

Custom approach to SCD type 2

To do SCD Type 2 you need to detect changes between the source and target system. Detecting ‘inserts’ is easy: simply verify whether the record from the source system exists in the target system. Similarly, for ‘deletes’ you need to verify whether there are records that exist in the target system that don’t exist in the source system. ‘Updates’ are a bit trickier, you need to detect if the record was changed by comparing the source and target record. This brings us to step 1 of this tutorial.

Step 1: generating a hash string

To easily compare the source record to the target record you can generate an md5 hash string. This string is then stored as an extra column in the target system.

First, create a new java-routine with the following code:

package routines;
import org.apache.commons.codec.digest.DigestUtils;
public class DigestUtilities {
    
    public static String md5(String s){
        return DigestUtils.md5Hex(s);
    }
}

It uses the md5Hex method from the commons-codec library. This method creates an md5 hash string based on an input string. To use this routine, you will have to add this library to Talend as well.

Next, generate the hash string for each record. To do so, you can read in the source data and call the routine. The result is mapped to a new variable. If you only want to keep track of SCD on certain columns the easiest way is to concatenate them into a String and feed this String into md5Hex. A separator is added between each column to make sure a new hash_id is created.

routines.DigestUtilities.md5(
row1.cat_id + "#" +
row1.name + "#" +
row1.age + "#" +
row1.fur
);

When you want to apply SCD to all columns you can also use the following code:

routines.DigestUtilities.md5(
row1.toString().substring(row1.toString().indexOf("[")+1, row1.toString().lastIndexOf("]"))
);

This ensures the hash String is based on all incoming rows, even if you were to add some later.

Step 2: Handling inserts and updates

To detect an ‘insert’ we need to check if the source record occurs in the target system. To detect an ‘update’ we need to check if the hash string we just generated for the source records is the same as the hash string stored for this record in the target system. We do this through a join and some code.

First, the join: we’ll do an outer join on the business key from source to target. We’ll query the following things:

  • id: primary key of the target table
  • cat_id: business key from the source table
  • md5_hash_id: hash string of the record stored in the target table
  • scd_version: version of the record

Next, the code: we’ll use the following logic to differentiate between inserts and updates

  • If the primary key (id) for the record in the target table is null, the record is an insert. Denoted by “I”.
  • If the primary key (id) for the record in the target table is not null AND the md5_hash_id of the source record is not equal to the md5_hash_id of the target record, the record is an update. Denoted by “U”.
  • All other cases are denoted by “X”

 

In code:

target.id == null? "I":
target.id != null && target.md5_hash_id.equals(source.md5_hash_id) ? "X" :
target.id != null && !target.md5_hash_id.equals(source.md5_hash_id) ? "U" : "X"

The result of this conditional is stored as an expression variable and used to filter the records for each flow.

As mentioned above, we want to take the following actions when encountering ‘inserts’ and ‘updates’:

  • ‘Inserts’: new record is inserted as an active record
  • ‘Updates’: old record is set to inactive, updated record is inserted as an active record

 

To implement this, we need three output flow:

Inserts

A new record is inserted. The scd fields are filled in:

  • scd_start = current datetime
  • scd_end = null
  • scd_version = 1
  • scd_active = true

The action on the output data is set to insert. Using the advanced settings field options we can specify that all columns should be inserted, except ‘id’ (which is handled by the database as type serial).

Updates

The updated record is inserted (upd_ins):

  • scd_start = current datetime
  • scd_end = null
  • scd_version = version + 1
  • scd_active = true

The action on the output data is set to insert. Using the advanced settings field options we can specify that all columns except ‘id’ should be inserted.

The old version of this record is updated (upd_upd):

  • scd_end = current datetime
  • scd_active = false

The action on the output data is set to update. Using the advanced settings field options we can specify that the update key is ‘id’ and only the columns ‘scd_end’ and ‘scd_active’ should be updated.

Step 3: Handling deletes

To detect ‘deletes’ we need to check if there are records in the target system that are no longer in the source system. To do so, we need an inner join between target and source. Then we’ll catch the rejected records of this join. In other words, we’ll detect the records that were in the target, but not in the source! Remember that we’re not going to actually delete these records from the target system, they will just be set to inactive.

For the tDBOutput component, we’ll use the same method as with the upd_upd flow. The action on the output data is set to update (not delete!). In the advanced settings field options we can specify that the update key is id and only the columns scd_end and scd_active should be updated.

And there you have it, a custom workflow in Talend to do SCD type 2!

Conclusion

In this blog we propose a custom workflow to do SCD type 2 in Talend. The advantage of this approach is that it is more flexible and more scalable than the built-in Talend component. A disadvantage is that it is more complex to implement.

7 Replies to “How to implement Slowly Changing Dimensions (SCD) in Talend (part 2/2)”

  1. Pranay Jain

    Reply

    I have issue using import org.apache.commons.codec.digest.DigestUtils in talend..

    How do I install required packages?

    1. letenpa (Post author)

      Reply

      Hello,

      To use the DigestUtils library you will first need to add it to your routine in Talend.
      To do this right-click the routine, go to Edit Routine Libraries and add the library.
      The library you need to add is called “commons-codec-1.10.jar”.

      For more information, please refer to the Talend user guide:
      How to edit user routine libraries

  2. Rob

    Reply

    Hi,

    Isn’t your logic going to detect and update inactive records? They will have a matching business key and non-matching hash. Don’t you want to check for Current Indicator = “Y”??

    1. letenpa (Post author)

      Reply

      Hi Rob,

      You are absolutely right! This is not explicitly mentioned in the blog, but in the database query to the target you would typically filter for records where “scd_active = true”. This way you avoid comparing the hash to historical records.

      Thanks for commenting, hope this clarifies your issue.

  3. Stuart Baker

    Reply

    Hi
    Been reading this with great interest. However, I’m wondering if you covered all the bases?

    If you are using Type2 for only a couple of all columns and thus generating the hash for just a few say, “cat, name, age” but not “gur”- are you not going to miss updates to fur here? Updates are only detected here based on the hash anything else being an “X” outcome… I think a new check in the action is required here to assert that if the MD5 is not the same then potentially – you have an SCD1 scenario.

    Obviously this assumes you want to mix Type1 and Type2 within the table – but the tDBSCD Component allows exactly that.

  4. Stuart Baker

    Reply

    Forgot to add a potential solution (variation on Var.action)

    target.id == null ? “NEW”: // left join no match – new record
    target.id != null && target.md5_hash_id.equals(source.md5_hash_id) ? “SCD1” : // key exists, but SCD2 hash is same – could be SCD1
    target.id != null && !target.md5_hash_id.equals(source.md5_hash_id) ? “U” // key exists, but SCD2 hash fails, potential update here
    : “X” — fallout point

    The SCD1 would need to coded as a ‘normal’ update – but we cant use “U” as that’s for your update_ins/update_upd rows.

    1. letenpa (Post author)

      Reply

      Hi Stuart,

      Thanks for the addition! You’ve highlighted an interesting case. We focussed mainly on SCD type 2 in this blog, but as you said: there are some hybrid types you can consider as well. Your solution would work, but as I understand it the “fur” column will then update every run regardless of whether or not it has been changed. If you don’t have a lot of data, this is totally fine. If you do, this might not be what you want.

      An alternative solution is that the “fur” column would still be a part of a hash so we can detect when it has been changed, but we will not trigger the creation of a new SCD record and instead overwrite the current one. One way I can think to achieve that is to have two hash Strings: one with columns you want to track using SCD type 2, and one with with columns you want to track using SCD type 1. After generating those hash Strings you then split the flow for SCD type 2 (as described in the blog) and SCD type 1, where you do a regular update. The logic for Var.action stays the same, except that you refer to the respective hash String for each case. You would have to account for multiple updates of a different type (SCD type 1/SCD type 2) within the same record to make sure you’re updating the correct version though.

Een reactie achterlaten

Het e-mailadres wordt niet gepubliceerd. Vereiste velden zijn gemarkeerd met *