References → Data Consistency and Availability

Introduction

Before release 5.1, some factors, including configurations, network latency, and background synchronization, caused the Analytics Service to read inconsistent data (dirty reads) from snapshot (DDM) files and parquet (source) files.

Starting release 5.1, Incorta introduced enhancements that help in securing and maintaining a higher level of data consistency and availability across the system at any point in time. The new enhancements also reduce input/output (I/O) operations and keep query time minimal.

Important

This document takes into consideration the updates in recent releases:

  • The 5.1.2 release introduced materializing Incorta SQL tables and Incorta Analyzer tables to parquet files rather than snapshot Direct Data Mapping (DDM) files.
  • Starting the 5.2 release, the Loader Service no longer creates DDM files for encrypted columns. These columns will be available only as parquet files.
  • The 5.2 release introduces a new compaction (deduplication) mechanism to reduce the I/O operations during a load job and save disk space. This new mechanism results in a change in the directory structure where the compacted area does not exist anymore and compacted segments will be saved to the _rewritten directory in the source area instead.

Concurrency control

Concurrency control ensures that concurrent operations generate correct results. Without concurrency control, systems can neither provide correct results nor maintain data consistency.

To ensure data integrity and validity when running concurrent transactions or processes, a system must prevent the following read phenomena:

  • Dirty reads: occurs when a transaction reads data written by another concurrent uncommitted transaction.
  • Non-repeatable reads: occurs when a transaction re-reads data it has previously read and finds that data has been modified by another transaction (that was committed since the initial read).
  • Phantom reads: occurs when new rows are added or removed by a transaction to the records being read in the course of another transaction.

Transaction isolation

Transaction isolation is the main goal of concurrency control. As per the American National Standards Institute (ANSI) and the International Organization for Standardization (ISO) SQL standard, there are 4 levels of transaction isolation; each level defines how or when changes made by one operation become visible to others. The following are the four isolation levels:

  • Read Uncommitted: the lowest isolation level where a transaction may read uncommitted data or changes that other transactions make (dirty read)
  • Read Committed: another level of isolation where write locks are used till the end of a transaction while read locks are released once data is read. This guarantees that the data a transaction reads is committed at the moment it is read. It simply restricts the reader from seeing any intermediate, uncommitted, ‘dirty’ read.
  • Repeatable Read: in this isolation level, both read and write locks are used till the end of a transaction.
  • Serializable: the highest isolation level where both read and write locks are used till the end of a transaction. An execution is serializable when executing concurrent transactions results in an outcome that is equal to the outcome of executing transactions serially without overlapping in time.

Previous implementations

Before release 4.0, both the Loader Service and the Analytics Service existed on the same host or node. Thus, both services had access to the same data files. In these older releases, when a job, for example, a load job, ran for a specific physical schema, all queries that the Analytics Service sent to this physical schema were blocked till the load job or the physical schema update job was completed. The Engine acquired write locks on the physical schemas till the job was finished to ensure access to recent consistent data; however, this compromised data availability resulted in blocked queries or dashboards.

Starting release 4.0, Incorta supported a distributed architecture where services could exist on different machines or nodes. This distributed architecture guaranteed data availability as no queries were blocked during a load or update job for the same entity. However, in some rare situations, the Analytics Service could have read old data from a column and more recent data from another (dirty reads) while the Loader Service was writing snapshot files. This was not a common issue as data was inconsistent only in a short time window; that is, the time spent to write snapshot files.

With the introduction of release 4.5, the Analytics Service started to read data from both snapshot files and parquet files. The load job was writing and committing parquet files in the extraction phase while it was writing and committing snapshot files in the load phase at the end of the load job. Thus, the Analytics Service had access to recent data in parquet files and old data in snapshot files. The time window of data inconsistency was not as short as before.

Next releases tried to solve this issue by postponing the commit of both snapshot and parquet files until the end of the load job. However, snapshot files were small in size and were quickly committed, while parquet files could be in gigabytes. Thus, the snapshot files of a job would be available while parquet files of the same job wouldn’t. In addition, the Analytics Service did not have access to the recently committed parquet files as it had to wait for the synchronization process. Data synchronization required read and write locks to physical schemas, and it had to wait for all other locks to be released before acquiring its locks. As a result of both the gap between committing snapshot and parquet files and waiting for data synchronization, the Analytics Service continued to have access to inconsistent data, that is, old parquet files and recent snapshot files.

Note

Before release 5.1, only locking mechanisms were used to achieve concurrency control.

Enhancements for data consistency and availability

5.1 new features and updates

Release 5.1 introduced new features and updates that aim to a higher level of data consistency and availability. These features and updates include the following:

  • Implementing multi-versioning of load job files
  • Using different locking mechanisms
  • Updating the load and commit process
  • Updating the synchronization process
  • Moving information related to physical schema size on disk and number of rows to the metadata database
  • Introducing supporting tools for content cleanup

5.1.2 updates to Incorta Analyzer tables and Incorta SQL tables

With the introduction of the Incorta 5.1.2 release, the Loader Service materializes Incorta derived tables (Incorta Analyzer tables and Incorta SQL tables) to parquet files rather than Direct Data Mapping (DDM) files. The Loader Service creates a directory under the source directory for each Incorta table once created. As Incorta tables do not support incremental loads, the Loader Service creates a new parquet version directory for each of these tables during a load job or during a schema update job that impacts the columns referenced in the Incorta table.

The Loader Service creates a new parquet file for an Incorta table in the following cases:

  • When you create or update the Incorta table
  • When you update the source table column(s) that you reference in the Incorta table in the same physical schema
  • When you update the source table column(s) that you reference in the Incorta table in another physical schema and load the Incorta table
  • When you delete the source table of the Incorta table in the same physical schema (empty parquet file)
  • When you delete the source table column(s) that you reference in the Incorta table in another physical schema and load the Incorta table (empty parquet file)

6.0.3 enhancements

Supporting key columns in Incorta Analyzer tables and Incorta SQL tables

Starting with 6.0.3, you can specify key columns for Incorta Analyzer and SQL tables. Adding, removing, or changing key columns does not require running a load job as derived tables are refreshed as part of schema update jobs. The derived table’s unique index is calculated and saved as a snapshot DDM file each time the key columns are updated or the schema or table is loaded.

The column or columns that you designate as key must maintain row uniqueness because no deduplication is performed for derived tables. If the selected key columns result in duplicate key values:

  • During the schema update job, duplicate values are kept, and the Engine will return the first matching value whenever a single value of the key columns is required. The schema update logs will point out the unique index issue.
  • During the schema or table load job, the unique index calculation will fail, resulting in a finished-with-error load job. No value is returned when the unique index is required. You must select the correct key columns to have the unique index calculated.
Detecting duplicates during unique index calculations

In releases before 6.0.3, when the Enforce Primary Key Constraint option was disabled for physical tables or MVs, and the selected key columns resulted in duplicate key values, unique index calculations would not fail, the first matching value was returned whenever a single value of the key columns was required.

Starting with release 6.0.3, in such a case, the unique index calculation will fail, and the load job will finish with errors. You must either select key columns that ensure row uniqueness and perform a full load or enable the Enforce Primary Key Constraint option and load tables from staging to have the unique index correctly calculated.

In-place compaction

When the Cluster Management Console (CMC) → Tenant Configurations → Data Loading → Enable Always Compact option is enabled, a compaction job starts at the end of the extraction or transformation phases in a load job to create a compacted version with no duplicates of the extracted parquet files. The 5.2 release introduces a new compaction (deduplication) mechanism to reduce the I/O operations during a load job and save disk space. The new mechanism results in changes to the directory structure, the output of a compaction job, and the way compacted-parquet consumers access the compacted segments.

For more information, refer to Data Ingestion and Loading → Enhancements to the compaction process.

File multi-versioning implementation

Release 5.1 introduced the use of multi-versions of files created during a load or update job. In addition, there are also some updates and enhancements to existing locking mechanisms.

File multi-versioning

To solve the issue of reading inconsistent data experienced in older releases, Incorta 5.1 release adopted a multiversion concurrency control (MVCC) implementation for dealing with load and update job files. A load job or a schema update job, in release 5.1 and later, creates a new version of files that are the result of each job instead of overwriting the existing files. This includes new versions of parquet files, snapshot files, and compacted parquet files, as applicable. This MVCC implementation allows other services to have access to existing files while the Loader Service is performing a load job till new files are committed and synchronized.

Important

The synchronization process may not be completed for all services at the same time. As a result, one Analytics service can have access to the latest version of files, while another Analytics service may still access the older version. In other words, two users who consume the same dashboard may have different versions of the data displayed on this dashboard.

The implementation of file multi-versioning results in the following changes:

  • A new directory structure to support multiple versions of files that a load job or a schema update job creates
  • A locking mechanism to mark file versions that are in use so that they are not deleted
  • A change in the Incorta metadata database to store locking and versioning information
  • A cleanup job to delete unneeded file versions

Locking mechanism updates

Previously, there was only one type of locking: Engine locking that worked at the schema level. In this release, there is an update to the Engine locking, in addition, there is a new locking mechanism: Version locking.

Engine locking update

Engine locks objects and entities to allow processes to run concurrently. In older releases, the Engine acquired read or write locks on the physical schema and all its dependents physical schemas in different situations, for example:

  • When querying data while rendering a dashboard or previewing the query plan for example
  • When the Analytics Service received a synch signal for a physical schema load, update, or deletion
  • When the Loader Service performed physical schema load or update jobs or a synchronization job

In some cases, these locks blocked access to data in multiple physical schemas although the process was for only one physical schema.

Starting release 5.1, the granularity of Engine locking was decreased to the level of the related physical schema objects and joins. As a result, only objects, joins, and calculation resources that are participating in the underlying process are locked while other dependent objects are not.

Version locking

This new mechanism aims to prevent the Cleanup job from deleting new and in-use file versions. It works at the level of the version of the physical schema object or join. Version locking is implemented using the metadata database that now has a new table: VERSION_LOCK. This table contains references to all locked versions per service and per entity (whether an object or join), in addition to the related file type: source parquet, DDM snapshot, DDM join, or compacted. The load or update job and the synchronization process are responsible for updating records in this table.

Important

Starting the 5.2 release, the Delta Lake metadata files created as part of the compaction (deduplication) job maintain records of compacted parquet files that the Cleanup job will remove. They will also reference all extracted or rewritten (compacted) parquet files that constitute the compacted version of an object.

The FILES_VERSIONS metadata database table continues to reference the latest compacted version while the VERSION_LOCK table maintains records of locks acquired on committed versions.

Metadata database updates

Two new tables are now available as part of the Incorta metadata database: FILES_VERSIONS and VERSION_LOCK.

FILES_VERSIONS table

The FILES_VERSIONS table contains only one record per physical schema object. This record is updated with each load job or schema update job. This record contains the following columns:

ColumnDescription
Schema_IDThe ID of the physical schema
NameThe physical schema object full name or the join name
TypeT (physical schema object) or J (join)
Uncommitted Parquet VersionThe latest version that is the result of a full load job that has not been loaded to the Engine yet
Uncommitted Parquet OffsetThe latest version that is the result of a full or incremental load job that has not been loaded to the Engine yet
Committed Parquet VersionThe latest version that is loaded to the Engine
Committed Parquet OffsetThe latest version that is loaded to the Engine
Committed DDM VersionThe latest version that is the result of a full or incremental load job or a schema update job
Compacted VersionThe latest compacted parquet version. It is updated after each successful compaction job.
Commit TimestampThe time of the last load or update job that resulted in a committed version of the object or the join
Number of RowsNumber of rows loaded into the Engine
SnapshotsizeThe size of files for the latest committed DDM version
StagingsizeThe size of files for the latest committed parquet version

VERSION_LOCK table

The VERSION_LOCK table contains references to all in-use file versions. Each record represents the locked version per service per entity (physical schema object or join):

ColumnDescription
Locker IDThe service that acquires the lock
Tenant IDThe tenant ID
Entity NameThe join or the physical schema object (schemaName.objectName)
File TypeThe type of the locked file version: Source, DDM, DDM Join, or Compacted
Version IDThe locked version
TypeThe type of the lock: temporary or permanent

The Cleanup job checks the FILES_VERSIONS and VERSION_LOCK tables before deleting unneeded file versions. The Cleanup job will not delete file versions that have records in any of these two tables. Starting the 5.2 release, the Cleanup job checks the <CompactedVersionID>.checkpoint.parquet Delta Lake metadata file to decide on the compacted parquet files to delete.

Important

As an additional enhancement to reduce the I/O operations, the details related to the number of rows and the data size of each physical schema object are available in the FILES_VERSIONS table within the Incorta metadata database.

The cleanup mechanism

Creating multiple versions of files causes the system to run out of disk space after some time. A cleanup job runs at regular intervals in the current implementation to delete unneeded versions of the Compacted, Source, and DDM files. The unneeded versions are all versions except for the ones referenced in the metadata database new tables: FILES_VERSIONS and VERSION_LOCK. In addition, the cleanup job skips the latest uncommitted parquet version with the highest timestamp even if it does not have a record in the FILES_VERSIONS and VERSION_LOCK tables. Starting the 5.2 release, the Cleanup job deletes the unused compacted parquet versions after checking the respective <CompactedVersionID>.checkpoint.parquet metadata file.

Versions in the FILES_VERSIONS and VERSION_LOCK tables that the cleanup job will not delete include the following:

  • Uncommitted versions of Source and DDM Files (referenced in the FILES_VERSIONS table)
  • Committed versions of Source and DDM files (referenced in the FILES_VERSIONS table)
  • Compacted versions (referenced in the FILES_VERSIONS table)
  • Locked versions (referenced in the VERSION_LOCK table)
Note

An uncommitted version of a file is the latest extracted version that has not been loaded to the Engine yet. A committed version is the latest version loaded to the Engine and can be synchronized with other services.

Important

The Cluster Management Console administrator can define the time interval to run the cleanup job. This setting is available in Cluster ConfigurationsServer ConfigurationsTuning. Make sure to configure the Cleanup job to run at intervals that are suitable to the scheduled load jobs to free the shared storage from unneeded file versions.

The new directory structure

The new directory structure has two new directories: source and ddm. Those two directories will inherit the function of the old parquet and snapshot directories respectively. The source directory will contain parquet files created during a load job, while the ddm directory will include the snapshot files created during a load or schema update job for physical schema objects and joins.

Note

The compacted directory remains as is in releases before 5.2. However, starting the 5.2 release, the compacted directory is replaced by the _rewritten directory in the source area. The compacted segments are saved per compaction job under the _rewritten directory. In addition, creating compacted segments includes only rewriting parquet files that have duplicates instead of doing both: rewriting parquet files that have duplicates and copying other extracted parquet files.

General rules

The following table shows the default impacted directories for each job type per physical schema object type:

Object / JobFull Load JobIncremental Load JobLoad From Staging JobSchema Update Job
Physical Schema Table / Materialized View
  ●  Source
  ●  DDM
  ●  _rewritten
  ●  _delta_log

  ●  Source
  ●  DDM
  ●  _rewritten
  ●  _delta_log

  ●  DDM

  ●  DDM
Incorta Analyzer Table / Incorta SQL Table
  ●  Source
  ●  _delta_log

  ●  Source
  ●  _delta_log

  ●  Source

  ●  Source
Note

Before the 5.1.2 release, the Loader Service created only snapshot DDM files for Incorta Analyzer tables and Incorta SQL tables.

In addition to the object type, enabling or disabling performance optimization affects the job behavior and results.

Note

You must not disable performance optimization for Incorta Analyzer tables or Incorta SQL tables.

The following table shows the impact for each job type (as applicable) when you enable or disable performance optimization for a physical schema table or materialized view (MV):

Performance Optimization / JobFull Load JobIncremental Load Job (for objects with incremental load enabled)Load From StagingSchema Update Job
Optimized MV or physical schema table
  ●  New Parquet Version with its increment directory
  ●  New DDM Version
  ●  New Compacted Version

  ●  New Increment version under the last Parquet Version
  ●  New DDM Version
  ●  New Compacted Version

  ●  New DDM Version

  ●  New DDM Version
Non-optimized MV or physical schema table
  ●  New Parquet Version with its increment directory
  ●  New Compacted Version

  ●  New Increment version under the last Parquet Version
  ●  New Compacted Version

For more details, see Example of the new implementation.

Source directory structure

The following is the structure of the source directory:

  • A directory for each physical schema
    • A directory for each physical schema table or MV
      • A directory for each parquet (source) version that is the result of a full load job. The directory name is the start of the full load job.
        • A directory for the full load parquet file(s) with the same name as the parquet version directory
        • A separate directory for parquet files resulting from each incremental load job if any.
      • A _rewritten directory that includes the rewritten compacted segments for the object:
        • A directory for each compaction job that has resulted in rewriting one or more compacted parquet segments. The directory name is the ID of the compacted version which is the same as the timestamp of the load job that resulted in the extracted parquet file to compact. The name of each rewritten file has the version ID of the original extracted parquet file.
      • A _delta_log directory that includes the Delta Lake metadata files:
        • _last_checkpoint: only one file per object updated per compaction job to reference the .checkpoint.parquet file of the latest compacted version
        • <Compacted_Version_ID>.json file per compaction job
        • <Compacted_Version_ID>.checkpoint.parquet file per compaction job to include references to all extracted and compacted parquet files that constitute the compacted version. This file also includes references to unused compacted segments that the Cleanup job will delete when a newer compacted version is available.
    • A directory for each Incorta Analyzer table or Incorta SQL table
      • A directory for each parquet (source) version that is the result of a load or schema update job. The directory name is the start of the respective job.
        • A directory for the new parquet file(s) with the same name as the parquet version directory
      • A _delta_log directory to include the Delta Lake metadata files related to each version of the Incorta Analyzer or SQL table parquet files.
Note

In the 5.1 release, the Incorta Analyzer tables or Incorta SQL tables did not exist in the source directory.

Important

Incorta Analyzer tables and Incorta SQL tables do not support incremental loads. Starting release 5.1.2, and during a schema incremental load job, the Loader Service creates a new parquet version directory and a subdirectory for the parquet files with the same timestamp.

The source directory structure after the first full load job

The following figure conceptually shows the contents of the source directory for two physical schema tables after the first successful load job for the physical schema while compaction is enabled. Both the COUNTRIES and CUSTOMERS tables have key columns, but only the source of the COUNTRIES table has duplicate rows of the key column.

Important

The figures presented here are for illustration purposes only.

Notice the following:

  • The parquet version directory and the first increment directory under it represent the result of a full load job. They both share the same name which is the timestamp of the load job start time.
  • The _rewritten directory for the COUNTRIES table has the rewritten compacted segment(s) while the CUSTOMERS table is missing the _rewritten directory as there are no rewritten segments.

The source directory structure after the first incremental load job

If the incremental load is enabled for an object (CUSTOMERS for example) and you perform an incremental load, a new increment directory is added under the parquet version directory with the timestamp of the incremental load job start time.

If the incremental load is disabled for an object (COUNTRIES table for example), and you perform an incremental load, the Loader Service will perform a full load for this object. This will result in a new parquet version directory with a new increment directory. The name of both the parquet version and parquet increment will be the timestamp of the incremental load job start time.

The following figure conceptually shows the contents of the source directory after the first successful incremental load job for the same physical schema while compaction is enabled. Both tables have key columns. The source of the COUNTRIES table has duplicated rows for the key column. The incremental query of CUSTOMERS returns the following:

  • Updated rows, which requires rewriting the existing extracted parquet file and creating a compacted version of it.
  • Duplicated rows, which requires rewriting the newly extracted parquet file and creating a compacted version of it.


Notice that the incremental load job resulted in the following:

  • A new parquet version directory for COUNTRIES with an increment directory resulted from a full load as this table has incremental load disabled.
  • A new increment directory under the existing parquet version of the CUSTOMERS table resulted from the incremental load job.
  • Two new Delta Lake metadata files for each table: .checkpoint.parquet and .json.
  • A new directory under the _rewritten directory for each table with the load job timestamp as a name to contain the compacted segments.
  • The names of the compacted parquet files of CUSTOMERS are the same as the original parquet files, whether the one extracted during the previous full load job or the newly extracted parquet file.

FILES_VERSIONS table data

You can track the source directory changes in the FILES_VERSIONS table within the Incorta metadata database. The following columns represent files in this directory:

  • Uncommitted Parquet Version
  • Uncommitted Parquet Offset
  • Committed Parquet Version
  • Committed Parquet Offset
  • Compacted Version (updated only after a successful compaction job that follows a successful extraction process)

Assuming that compaction is enabled and required, and each successful extraction is followed by a successful compaction job, the following table conceptually shows changes in the FILES_VERSIONS table after the extraction and load phases of a series of full load and incremental load jobs for a physical schema table with the incremental load enabled.

Note

During a load job, the Loader Service does not extract data for Incorta Analyzer tables or Incorta SQL tables.

  • In release 5.1, the Loader Service creates DDM files for these tables during the load phase of a load job.
  • Starting release 5.1.2, the Loader Service creates parquet files for these tables during the load phase of a load job and during a schema update job. If the Loader Service fails to load one of these Incorta tables for any reason, the Loader Service writes an empty parquet file for it.
  • With the new compaction mechanism introduced in the 5.2 release, for each of these tables, a _delta_log directory exists in the object directory to include the metadata files that compacted parquet consumers use to find out the parquet files of each object version to read from. In addition, the ID of the committed parquet version of an Incorta Analyzer or SQL table will be reflected in the name of the metadata files and the Compacted Version column in the FILES_VERSIONS table.
Job IDPhysical Schema Load TypeTimestamp (TS)PhasePhase StatusUncommitted Parquet VersionUncommitted Parquet OffsetCommitted Parquet VersionCommitted Parquet OffsetCompacted Version
Initial State00000
11st Full LoadTS1Extraction Ver1_inc1SucceedTS1TS100TS1
11st Full LoadTS1Load Ver1_inc1SucceedTS1TS1TS1TS1TS1
21st Incremental LoadTS2Extraction Ver1_inc2SucceedTS1TS2TS1TS1TS2
21st Incremental LoadTS2Load Ver1_inc2SucceedTS1TS2TS1TS2TS2
32nd Incremental LoadTS3Extraction Ver1_inc3SucceedTS1TS3TS1TS2TS3
32nd Incremental LoadTS3Load Ver1_inc3FailTS1TS3TS1TS2TS3
43rd Incremental LoadTS4Extraction Ver1_inc4SucceedTS1TS4TS1TS2TS4
43rd Incremental LoadTS4Load both the failed increment (Ver1_inc3) and the new one (Ver1_inc4)SucceedTS1TS4TS1TS4TS4
52nd Full LoadTS5Extraction Ver2_inc1SucceedTS5TS5TS1TS4TS5
52nd Full LoadTS5Load Ver2_inc1FailTS5TS5TS1TS4TS5
64th Incremental LoadTS6Extraction Ver2_inc2SucceedTS5TS6TS1TS4TS6
64th Incremental LoadTS6Load both the failed version (Ver2_inc1) and the new increment (Ver2_inc2)SucceedTS5TS6TS5TS6TS6

DDM directory structure

Instead of the flat structure of the snapshot directory, the ddm directory that replaces it now has a structure that is similar to the source directory.

The following is the structure of the ddm directory, as applicable:

  • A directory for schemas
    • A directory for each physical schema
      • A directory for each performance-optimized physical schema table or MV
        • For each object directory, a sub-directory for each DDM version with the related snapshot files, if any
      • Starting release 6.0.3, a directory for each Analyzer or SQL table that has one or more key columns with sub-directories to keep the different versions of the unique index ddm files.
  • A directory for joins
    • A directory for each join relationship
      • A directory for each join version that contains the join information
Important

The DDM version directory for a table or materialized will be empty if the object does not have a key column, formula column, or load filter.

Note

A version directory name is a timestamp that is the start time of the job that results in creating the directory.

Note

Before the 5.1.2 release, each performance-optimized Incorta Analyzer table or Incorta SQL table had a directory under the physical schema directory to save the related DDM versions of these tables.

Starting with 6.0.3, when these derived tables have key columns, a ddm directory will be available for these tables under the physical schema directory.

The ddm directory structure after the first full load job

A full load job, an incremental load job, or a schema update job creates a new ddm directory version for each performance-optimized physical schema table or MV. Starting release 5.1.2, the Loader Service did not create DDM files for Incorta Analyzer tables or Incorta SQL tables. However, with the introduction of the derived tables' support for key columns in 6.0.3, the Loader Service creates snapshot DDM files for the unique index each time the derived table's key columns are updated or the schema or table is loaded.

The following figure conceptually shows the contents of the ddm directory after the first load job for the physical schema:

The directory of each table with a key column has file(s) in the ddm directory while the directory of tables that do not have a formula column, a load filter, or a key column does not have any files.

The ddm directory structure after the first incremental load job

The following figure conceptually shows the contents of the ddm directory after the first successful incremental load job for the same physical schema:

The incremental load job results in a new DDM version directory even if the directory can be empty.

The new structure at a glance

The following figure conceptually summarizes the new directory structure of the source area as per the updates of the 5.1.2 and 5.2 releases:


The following figure conceptually summarizes the new directory structure of the ddm area as per the updates of the 5.1.2 release:

Example of the new implementation

The following example tracks the changes on shared storage and the FILES_VERSIONS and VERSION_LOCK tables starting with creating a physical schema and its objects through different load jobs. The example reflects the updates related to the compaction job in the 5.2 release and the updates related to materializing Incorta Analyzer and SQL tables introduced in the 5.1.2 release.

Important
  • This example shows the default behavior assuming that related jobs are completed successfully.
  • The CMC → Data Loading → Enable Always Compact option is enabled to start a compaction job after the extraction or transformation phase and the sources have duplicates that require compaction.
  • Object, tenant, and server configurations may affect the introduced behavior or results.

Using an environment that runs the 5.2 release or later, create the following physical schema with the following objects in your tenant:

EntityNameNotes
Physical Schemaphy_sch1A new physical schema
Tabletbl1_keyWith a key column
Tabletb2_fcWith a formula column FC_1
Tabletbl3_no_optimizeWith performance optimization disabled
Tabletbl4_incrmntJust columns including a key column with Incremental load enabled
Tabletbl5_load_fltrWith a load filter
Tabletbl6Only columns
Tabletbl7_key_no_optimizeWith a key column and performance optimization disabled
Materialized Viewmv1
Materialized Viewmv2_keyWith a key column
Incorta Analyzer TableAnlyz_tbl1

After physical schema creation

The following table shows what will exist on shared storage and the new tables in the metadata database once you successfully create the physical schema and its objects:

ObjectVERSION_LOCKFILES_VERSIONSSourceDDM_rewritten_delta_log
phy_sch1Not Applicable (NA)NAAn empty directory exists for the physical schema.
tbl1_key / tbl2_fc / tbl4_incrmnt / tbl5_load_fltr / tbl6 / tbl7_key_no_optimize / mv1 / mv2_key
tbl3_no_optimizeA record exists but with zero values for all columns
Anlyz_tbl1A record referencing a permanent lock by the Analytics Service on the Source, which is the version of the parquet file(s) of the Analyzer table.
The Version ID is the timestamp of the object creation time.
A record exists with only the Committed Parquet Version, Committed Parquet Offset, commit timestamp, and the staging size.
Columns related to DDM, uncommitted parquet version and offset, and compacted version have zero values.
A directory under the physical schema with a subdirectory with the same name (the timestamp that represents the creation time). This directory contains an empty parquet file

After the first successful full load job

The example focuses only on records related to physical schema objects. However, there might be other records related to joins if the physical schema has joins defined.

In the case of a successful full load job for a physical schema table or MV, the timestamps (which the name represents) of all the versions are the same, whether the Uncommitted Parquet Version, Uncommitted Parquet Offset, Committed Parquet Version, Committed Parquet Offset, Committed DDM Version, Compacted Version (assuming that the compaction job succeeds), and Commit Timestamp.

In the case of a failed full load where the Extraction finishes successfully, the Uncommitted Parquet Version, Committed Parquet Offset, and Compacted Version will have the same timestamp while the Committed Parquet Version, Committed Parquet Offset, and DDM Version will have zero value or earlier timestamps.

The following table shows changes on shared storage and the VERSION_LOCK and FILES_VERSIONS tables after the first successful full load job that starts with a timestamp of TS1.

Note that there will be no compacted directory starting the 5.2 release, and compacted parquet files are now saved to the _rewritten directory that exists in each object’s directory in the source area.

ObjectVERSION_LOCKFILES_VERSIONSSourceDDM_rewritten_delta_log
ph_sch1NANADirectoryDirectory
tbl1_keyTwo records referencing two permanent locks by the Analytics Service: one for the source version and the other for the DDM Version ID = TS1A record with values for all columns Version names = TS1Table directory → Parquet version directory (TS1) → Parquet files (increments) directory (TS1) → part-000000 file(s)Table directory → DDM version directory (TS1) → index.zxi fileCompacted version directory (TS1) → compacted file(s) (TS1-part-000000)_last_checkpoint, 000TS1.checkpoint.parquet, and 000TS1.json
tbl2_fcTwo records referencing two permanent locks by the Analytics Service: one for source version and the other for DDM Version ID = TS1A record with values for all columns Version names = TS1Table directory → Parquet version directory (TS1) → Parquet files directory (TS1) → part-000000 file(s)Table directory → DDM version directory (TS1) → FC_1.zxc file_last_checkpoint, 000TS1.checkpoint.parquet, and 000TS1.json
tbl3_no_optimizeNo recordsA record with only uncommitted Parquet Version and Offset and the Compacted version Version names = TS1Table directory → Parquet version directory (TS1) → Parquet files directory (TS1) → part-000000 file(s)No directory_last_checkpoint, 000TS1.checkpoint.parquet, and 000TS1.json
tbl4_incrmntTwo records referencing two permanent locks by the Analytics Service: one for source version and the other for DDM Version ID = TS1A record with values for all columns except for the Snapshot size (zero value) Version names = TS1Table directory → Parquet version directory (TS1) → Parquet files directory (TS1) → part-000000 file(s)Table directory → DDM version directory (TS1) → EmptyCompacted version directory (TS1) → compacted file(s) (TS1-part-000000)_last_checkpoint, 000TS1.checkpoint.parquet, and 000TS1.json
tbl5_load_fltrTwo records referencing two permanent locks by the Analytics Service: one for source version and the other for DDM Version ID = TS1A record with values for all columns Version names = TS1Table directory → Parquet version directory (TS1) → Parquet files directory (TS1) → part-000000 file(s)Table directory → DDM version directory (TS1) → all columns (.zxc)_last_checkpoint, 000TS1.checkpoint.parquet, and 000TS1.json
tbl6Two records referencing two permanent locks by the Analytics Service: one for source version and the other for DDM Version ID = TS1A record with values for all columns except for the Snapshot size (zero value) Version names = TS1Table directory → Parquet version directory (TS1) → Parquet files directory (TS1) → part-000000 file(s)Table directory → DDM version directory (TS1) → Empty_last_checkpoint, 000TS1.checkpoint.parquet, and 000TS1.json
tbl7_key_no_optimizeNo recordsA record with only uncommitted Parquet Version and Offset and the Compacted version Version names = TS1Table directory → Parquet version directory (TS1) → Parquet files directory (TS1) → part-000000 file(s)No directoryCompacted version directory (TS1) → compacted file(s) (TS1-part-000000)_last_checkpoint, 000TS1.checkpoint.parquet, and 000TS1.json
mv1Two records referencing two permanent locks by the Analytics Service: one for source version and the other for DDM Version ID = TS1A record with values for all columns except for the Snapshot size (zero value) Version names = TS1Table directory → Parquet version directory (TS1) → Parquet files directory (TS1) → part-000000-abcde1....snappy.parquet fileMV directory → DDM version directory (TS1) → Empty_last_checkpoint, 000TS1.checkpoint.parquet, and 000TS1.json
mv2_keyTwo records referencing two permanent locks by the Analytics Service: one for source version and the other for DDM Version ID = TS1A record with values for all columns Version names = TS1Table directory → Parquet version directory (TS1) → Parquet files directory (TS1) → part-000000-fghi2....snappy.parquet fileTable directory → DDM version directory (TS1) → index.zxi fileCompacted version directory (TS1) → compacted file(s) (TS1-part-000000)_last_checkpoint, 000TS1.checkpoint.parquet, and 000TS1.json
Anlyz_tblOne record referencing a permanent lock by the Analytics Service for the Source version Version ID = TS1A record with values for Committed Parquet and Offset versions, Compacted Version, Commit Timestamp, number of rows, and Staging size
Version names = TS1
Table directory → Parquet version directory (TS1) → Parquet files directory (TS1) → part-000000 file(s)No directory existsNA_last_checkpoint, 000TS1.checkpoint.parquet, and 000TS1.json
Note

Before the 5.1.2 release, Incorta Analyzer tables and Incorta SQL tables only existed in the ddm directory as snapshot files, and not in the source directory as parquet files.

After the first successful incremental load

After performing a successful incremental load, the same behavior of the full load applies to all objects except for the objects with incremental load enabled.

Note

Records in the FILES_VERSIONS metadata table will be updated with the new versions, and locks are created and added to the VERSION_LOCK table during and after the load job.

Important

The directories of the old versions and the new versions will be available on shared storage till the Cleanup job runs and deletes unneeded versions that do not have any records in the VERSION_LOCK or FILES_VERSIONS tables.

The following table shows changes on shared storage after the first successful incremental load job that starts with a timestamp of TS2.

Object ConfigsSourceDDM_rewritten_delta_log
Performance-optimized tables and MVs that require compaction and have incremental load disabledTable directory → New Parquet version directory (TS2) → New Parquet files (increments) directory (TS2)Table directory → New DDM version directory (TS2)New Compacted version directory (TS2)New 000TS2.checkpoint.parquet and 000TS2.json metadata files
Non-performance-optimized tables and MVs that require compaction and have incremental load disabledTable directory → New Parquet version directory (TS2) → New Parquet files directory (TS2)New Compacted version directory (TS2)New 000TS2.checkpoint.parquet and 000TS2.json metadata files
Performance-optimized tables and MVs that require compaction and have incremental load enabledTable directory → Old Parquet version directory (TS1) → New Parquet files directory (TS2) (added in addition to the already existing one TS1)Table directory → New DDM version directory (TS2) (empty directory if no key, formula column, or load filter exists)New Compacted version directory (TS2)New 000TS2.checkpoint.parquet and 000TS2.json metadata files
Non-performance-optimized tables and MVs that require compaction and have incremental load enabledTable directory → Old Parquet version directory (TS1) → New Parquet files directory (TS2) (in addition to the already existing one TS1)New Compacted version directory (TS2)New 000TS2.checkpoint.parquet and 000TS2.json metadata files
Incorta Analyzer and SQL tablesTable directory → New Parquet version directory (TS2) → New Parquet files directory (TS2)NANew 000TS2.checkpoint.parquet and 000TS2.json metadata files

After the second successful full load

The following table shows changes on shared storage after the second successful full load job that starts with a timestamp of TS3.

Object ConfigsSourceDDM_rewritten_delta_log
Performance-optimized tables and MVs that require compaction and have incremental load disabledTable directory → New Parquet version directory (TS3) → New Parquet files (increments) directory (TS3)Table directory → New DDM version directory (TS3)New Compacted version directory (TS3)New 000TS3.checkpoint.parquet and 000TS3.json metadata files
Non-performance-optimized tables and MVs that require compaction and have incremental load disabledTable directory → New Parquet version directory (TS3) → New Parquet files directory (TS3)New Compacted version directory (TS3)New 000TS3.checkpoint.parquet and 000TS3.json metadata files
Performance-optimized tables and MVs that require compaction and have incremental load enabledTable directory → New Parquet version directory (TS3) → New Parquet files directory (TS3)Table directory → New DDM version directory (TS3) (empty directory if no key, formula column, or load filter exists)New Compacted version directory (TS3)New 000TS3.checkpoint.parquet and 000TS3.json metadata files
Non-performance-optimized tables and MVs that require compaction and have incremental load enabledTable directory → New Parquet version directory (TS3) → New Parquet files directory (TS3)New Compacted version directory (TS3)New 000TS3.checkpoint.parquet and 000TS3.json metadata files
Incorta Analyzer and SQL tablesTable directory → New Parquet version directory (TS3) → New Parquet files directory (TS3)NANew 000TS3.checkpoint.parquet and 000TS3.json metadata files

When the Cleanup job runs

If the synchronization process runs successfully after the successful full load job and all services release their locks related to old versions, these old versions will not be in use anymore. When the Cleanup job runs, it checks the FILES_VERSIONS or VERSION_LOCK metadata database tables to skip versions that have records in these tables. Then, it deletes unneeded file versions, whether they are uncommitted or committed Parquet version or Offset, committed DDM versions, or Committed version. Then, it deletes unneeded file versions. The Cleanup job also checks the compaction metadata files to decide on the compacted segments and metadata files to delete. The Cleanup job is also responsible for deleting hidden compacted version directories and metadata files that result from failed compaction jobs.

In our example, the Cleanup job will delete all TS1 and TS2 versions. Only TS3 versions will be available in the source, ddm, _rewritten, and _delta_log directories related to the physical schema and its objects.

Additional considerations

Materializing Incorta Analyzer tables and Incorta SQL tables to parquet files

The 5.1.2 release introduces materializing Incorta Analyzer tables and Incorta SQL tables to parquet files rather than DDM files. After upgrading to release 5.1.2 or later, you must review Incorta Analyzer tables and Incorta SQL tables in all physical schemas to disable or delete unsupported configurations. Incorta Analyzer tables and Incorta SQL tables no longer support key columns, encrypted columns, load filters, disabled performance optimization, or self joins.

You must also perform a load from staging for all physical schemas with Incorta Analyzer tables or Incorta SQL tables to materialize them to parquet files rather than snapshot Direct Data Mapping (DDM) files.

In addition, starting release 5.1.2, when you delete (or update) the source table that you reference in an Incorta table in another physical schema, the Loader Service will not automatically update the Incorta table. However, when you load the Incorta table that references a deleted source table, whether in the same or another physical schema, the load fails and the Loader Service writes an empty parquet file for the Incorta table.

Migration Considerations

When migrating shared storage files from one Incorta cluster to another, for example, from a User Acceptance Testing (UAT) environment to a Production environment, you must first copy the source directory that contains the parquet files, and then perform a load from staging. Only copying the directories that contain the snapshot ddm files and the source parquet files from shared storage between Incorta clusters environments will not have the same result.

To migrate only one object in a physical schema, you need to copy the whole object directory (contains all the parquet files) that exists under the physical schema in the source directory. The path to the object directory that you need to copy is as follows: /home/incorta/IncortaAnalytics/Tenants/<tenant_name>/source/<schema_name>/<object_name>.

Both environments must run an Incorta release that supports file versioning and the copied files should not have records in the FILES_VERSIONS or VERSION_LOCK metadata database tables.

Loading a single object

When loading a single object (sch1.table1 for example) that has a reference in a formula column in another object (sch1.table2 for example) in the same physical schema, or a column in an Incorta Analyzer or SQL table, the object with the formula column (sch1.table2) will have a new DDM version with the timestamp of the load job of the first object (sch1.table1). This new DDM version will have the new calculated version of the formula column and a copy of the other columns from the previous DDM version.

Synchronization delay impact

At the end of a load job, the Loader Service sends a signal to the Analytics Services and other follower Loader Services so that they read from the latest file versions. As the synchronization process may not be completed for all services at the same time, one Analytics Service can have access to the latest version of files while another Analytics Service may still read the older version. Thus, two users who consume the same dashboard may have different versions of the data displayed on this dashboard.

Known issues

  • As a drawback of the MVCC implementation, any service or process may continue to access relatively old data till the synchronization process is complete. However, the advantage of the implemented approach is that all services and processes across the system will have access to only consistent data at any point in time.
  • Loading a single object with columns referenced in another object in the same physical schema may take a longer time and result in heavy I/O operations. This is due to the creation of a new DDM version of the other object including copying files from the older DDM version.
  • Materializing Incorta Analyzer and SQL tables to parquet files leads to a longer load time than in older releases.
  • The new compaction mechanism introduced in the 5.2 release may cause the first full or incremental load job of each object to take a longer time to create the new structure and perform full compaction of all the required Parquet files. In addition, there may be a degradation in the compaction performance with very small queries. If most of your queries reference small data sets, it is recommended to enable the Incorta Custom Delta Lake Reader in the CMC. Note that changing the reader used to read the Delta Lake files may cause the first SQLi query on the Spark port to consume more time because the SQLi engine will be restarted.