References → Data Ingestion and Loading

About data ingestion

To allow Incorta to analyze your data using its own or integrated tools, it needs to have access to your data. It also needs to extract data, persist it to its shared storage, and load it into its engine memory to allow its Analytics Service to access this data.

Data ingestion in Incorta starts with creating a data source that defines the properties of the connection with your data provider, then creating a physical schema object to comprise data from this data source, and finally loading and persisting data.

The following figure summarizes the process of data ingestion in Incorta:

Data sources

Incorta can connect to external data as well as local data. External data can be a database, one or more files on a cloud storage service, or data in Software as a Service (SaaS) applications. Local data is one or more files uploaded to the shared storage of an Incorta cluster. Using the available connectors, you can create different data sources that define how Incorta can access your data. A data source defines the properties of the connection between Incorta and your data provider. For a list of the connectors available in Incorta, refer to Connectors → All.

Physical schemas and data discovery

After using suitable connectors to define data sources, you must create physical schema objects to encompass your data or to reference it. During the physical schema object creation, the Loader Service discovers your data and defines the object columns and their properties accordingly.

A physical schema object can be one of the following:

Only a physical schema table directly references one or more data sources to get data. Other objects in a physical schema reference one or more physical schema tables or other objects to get data from them. An alias only refers to the data of another existing physical schema object.

Data Loading

Incorta can analyze only data available to its Analytics Service. Thus, after defining data sources and creating physical schema tables that reference data in these data sources, you must load your data to Incorta shared storage and to its Engine memory to start analyzing it. Incorta stores the extracted data as Apache Parquet files in shared storage (staging) and creates Direct Data Mapping (DDM) files for objects according to the object type and configurations.

Note

Before 5.2, the Loader Service created DDM files for encrypted columns, but now, encrypted columns are available only as parquet files to save the hard disk space and reduce the Loader Service memory requirements.

Load types

Incorta supports multiple types of loads:

  • Physical schema load and object load
  • On-demand and scheduled load
  • Full load, incremental load, and load from staging

Physical Schema load and object load

A load job can be for a single physical schema object, all applicable objects in a physical schema, or multiple schemas. As a schema manager user, you can use the Schema Designer to start a schema or object load job. You cannot start a load job for an alias as it is merely a reference to data of another existing physical schema object. In addition, the Loader Service will skip aliases during a physical schema load job.

A physical schema load job can be on-demand or scheduled, and incremental, full, or from staging. For a physical schema object, a load job can only be on-demand full or from staging.

Important

Some Data Lake connectors, such as Amazon Web Services (AWS) S3, support remote tables that you can use to access your data without the need to load it to Incorta shared storage or engine memory. The Loader Service does not extract or load data for a remote table. Thus, the Analytics Service cannot query this data unless you create an MV that references this remote table and load the MV data into Incorta. In addition, external visualization tools that Incorta integrates with, such as Power BI and Tableau, can access this remote data through the Incorta SQL Interface (SQLi).

On-demand and scheduled loads

You can start a load job on demand for the physical schema or a specific physical schema object. You can also schedule the loading of data for the whole physical schema. While you use the Schema Designer to start on-demand load jobs, you use either the Schema Manager or the Scheduler to schedule physical schema load jobs.

Full load, incremental load, and load from staging

Depending on the data source of your data, the connector you use to access it, and the table data source properties, you can perform different types of load jobs. These types of loading strategies are different in behavior and output.

For a detailed description of the output for each load strategy, refer to References → Data Consistency and Availability.

Full load

You can perform a full load for a physical schema or an object. By default, the Loader Service performs a full load for all physical schema tables and MVs. However, for physical schema tables and MVs that have incremental load enabled and full load disabled, the Loader Service skips them during full load jobs.

Note

Typically, a schema developer performs a full load of an object at least once before enabling the Disable Full Load property.

During a full load, the following occurs:

  • The Loader Service extracts data from the data source for each physical schema table or the single specified table according to the table data source properties.

  • The Loader Service creates new source parquet files in the source directory. The Loader Service creates a new parquet version directory with an offset subdirectory to save these files.

    • When the Table Editor → Enforce Primary Key Constraint is enabled for an object, primary key index calculations (deduplication) start to mark duplicate records that must be deleted to ensure that only unique data records exist.
    • If the Cluster Management Console (CMC) → Tenant Configurations → Data Loading → Enable Always Compact option is enabled, a compaction job starts to remove duplicate rows and create a compacted version of the object parquet files in the object’s _rewritten directory in the source area. The following are the consumers of compacted parquet files: MVs, SQLi queries on Spark port, internal and external Notebook services, and Preview data function.
    • When the Enforce Primary Key Constraint property is disabled for an object, both the deduplication and compaction calculations for this object are skipped.
    • At the end of the compaction job, a group of metadata files is generated in Delta Lake file formats to point to all parquet files (whether extracted or rewritten) that constitute a compacted version. Consumers of the compacted parquet will use the Delta Lake metadata files to find out which extracted or compacted parquet file versions to read data from.
  • For an MV, the Loader Service passes the query of the MV Script to Spark. Spark reads data from the parquet files of the underlying physical schema objects and creates new parquet files for the MV in a new parquet version directory in the source directory. A compacted version of the MV parquet files is also created in the object’s _rewritten directory if compaction is enabled.

    Note

    Spark reads the MV data from the compacted parquet files of the underlying object in the case that the underlying object is a physical schema table or another MV. However, starting with release 5.1.2, an MV can reference columns from Incorta SQL tables or Incorta Analyzer tables in other physical schemas. In this case, Spark will read data from the source parquet files of these Incorta tables as they do not have a compacted version.

    With the new compaction mechanism introduced in the 5.2 release, for each of the Incorta Analyzer or SQL tables, the _delta_log directory exists in the object directory to include a group of metadata files that compacted parquet consumers (such as Spark) use to find out the parquet files of each object version to read from.

  • For Incorta Analyzer tables and Incorta SQL tables, the Loader Service creates full parquet files in the source directory. Prior to release 5.1.2, the Loader Service would create snapshot DDM files for these tables in the ddm directory (also known as snapshot in older releases). With the introduction of the derived tables' support for key columns in 6.0.3, the Loader Service creates snapshot DDM files for the unique index each time the derived table's key columns are updated or the schema or table is loaded.

  • For physical schema tables and MVs with performance optimization enabled, the Loader Service loads data to the Engine memory. The Engine then calculates any formula columns, key columns, or load filters for each object and creates snapshot DDM files. These files are saved to the schemas directory that exists in the ddm directory.

  • In the case that there is a join relationship where one of the physical schema objects is the child table, the Engine creates a new version of the join DDM files and saves them to the joins directory that exists in the ddm directory.

Important

The described behavior and output are applicable starting with release 5.1 where the Loader Service creates a new version of files. For older releases, a full load job deletes all existing parquet, DDM, and compacted files and creates new ones.

Incremental load

You can start an incremental load only for a physical schema. However, the Loader Service incrementally loads only physical schema tables and MVs that have incremental load enabled. For physical schema objects with incremental load disabled or that do not support incremental loads, such as Incorta Analyzer and SQL tables, the Loader Service performs a full load.

During an incremental load, the same behavior of the Loader Service for a full load occurs except for the following:

  • In the case of a physical schema table, the Loader Service extracts data from the data source according to the update query or the update file rather than the original query or file. As for an MV, the Loader Service passes the MV incremental script, not the original script, to Spark.
  • In the case of both physical schema tables and MVs, the Loader Service creates only a subdirectory for the new parquet files under the object’s latest parquet version directory.
  • An incremental load job for an object that has one or more key columns includes a deduplication process to mark duplicate rows regardless of the state of the Enforce Primary Key Constraint option.

Load from staging

Load from staging is the same as a full load but without the extraction and transformation phases. The Loader Service loads the latest existing parquet files for the respective object to the Engine memory, whether these files are loaded before (committed) or not (uncommitted). The Engine then calculates any joins, formula columns, key columns, or load filters and creates a new version of the DDM files.

Load order and queueing

Physical schema and object queueing is a pre-extraction phase and is not part of the load job itself. However, it is an important phase where the Engine decides on the order of loading physical schemas and objects according to their dependencies. The Engine queues physical schemas to run in sequence or in parallel depending upon the join relationships and formula columns in the physical schema objects. The Engine queues physical schemas and objects with no preceding dependencies first and then the ones that depend on them, and so on. MVs, Incorta Analyzer Tables, and Incorta SQL Tables wait for their underlying objects to load first.

On the other hand, a schema manager user can specify the order of loading physical schema tables and MVs in a single physical schema. A schema manager user adds tables and MVs to separate load groups and orders these groups as appropriate.

In addition to manually defining the load order of MVs using load groups, release 5.1.2 introduces the automation of load ordering of MVs within a load group. The automated approach includes detecting the MV dependencies within the load group and ordering the load accordingly. This should both relieve the burden of manually defining the MV load ordering and allow for better performance and optimal resource utilization.

The new implementation allows detecting the MV dependencies and virtually dividing MVs within a load group into sub-groups where independent MVs will load first and then their dependent MVs, and so on. You can include the MVs you want in one load group, and leave the load ordering decision to the load job. For MVs with cyclic dependencies, ordering the MV load depends mainly on the alphabetical order of the MV names.

Load phases

A load job process goes through multiple phases that vary according to the object type, configuration, and load job type. The following are the major load phases:

  • Extraction
  • Transformation (Enrichment)
  • Load (and Post-Load)
Important

Within a single load job, each phase should be completed for all objects before starting the new phase. For example, the extraction of all physical schema tables should be completed before starting the transformation phase of MVs, and the extraction or transformation should be completed before the loading starts.

The following figure summarizes the data loading process:

Extraction

This is the first phase in a full or incremental load job for a physical schema table. Other objects do not go through this phase. During this phase,

  • The Loader Service extracts records from data sources and writes them into parquet files. In the case of a load from staging, the Loader Service loads existing parquet files. No extraction happens during a load from staging.
  • The Loader Service can extract multiple tables in parallel, depending on object dependencies and the availability of threads in the table thread pool. If a physical schema table contains multiple data sets, the Loader Service can also extract these data sets in parallel. Furthermore, if you configure a data set to use chunking (parallel extraction), the Loader Service also extracts this data set in multiple threads.
  • At the end of the extraction phase, if compaction is enabled, a compaction job starts to create a compacted version of the extracted parquet files with no duplicate records.

Transformation (Enrichment)

This is the first actual phase in a full or incremental load for an MV or SQLi table. The Loader Service submits queries to Spark to process these queries and waits for them to finish. Spark then creates new parquet files in shared storage for these objects. At the end of the transformation phase as well, if compaction is enabled, a compaction job starts to create a compacted version of parquet files with no duplicate records.

For MVs that reference physical schema tables or other MVs, Spark reads from the compacted parquet files. If a compaction job is running for an object that an MV references, the Loader Service waits until the compaction is finished before sending queries to Spark.

For MVs that reference Incorta Analyzer tables or Incorta SQL tables, Spark reads data from the original parquet files after checking the .checkpoint.parquet metadata file of the respective version.

Incorta 5.2 release improved the prolonged transformation phase of independent MVs within the same physical schema. During a schema load job, the Loader Service will start the enrichment of multiple independent MVs in parallel whenever resources are available. This tends to reduce the MV in-queue time and the enrichment time as a whole.

Load and Post-Load

In this phase, the following applies:

  • For physical schema tables and MVs that have performance optimization enabled, the Engine loads data from the source parquet files into its memory.
  • After loading (post-load process) and in the case that a loaded object has one or more key columns, formula columns, load filters, or joins (as a child table), the engine calculates them and writes snapshot DDM files.
  • During the post-load process, the Engine also calculates Incorta Analyzer tables and Incorta SQL tables, and then the Loader Service writes full parquet files for these tables, in addition to the unique index DDM files if these tables have key columns.
Important

As the Loader Service calculates Incorta Analyzer tables and Incorta SQL tables during the Load and Post-Load phase and Spark processes MVs during the Transformation phase, an MV cannot reference an Incorta Analyzer table or Incorta SQL table that exists in the same physical schema as the MV.

Metadata database commit and synchronization

After loading data to Incorta shared storage and Engine memory, the Loader Service updates the Incorta metadata database with the load job information, including the latest available versions of parquet, DDM, and compacted files, data size, and commit time for each object in the load job. The Loader Service then sends a synchronization signal to the Analytics Service and other follower Loader Services so that they read from these latest versions.

Performance optimization

A schema manager user can enable performance optimization for the physical schema objects to make their data available to the Analytics Service. Enabling performance optimization instructs the Loader Service to load the object files to the Analytics Service Engine memory. However, enabling this feature may affect the load job time. For physical schema objects with performance optimization disabled, the Loader Service only creates parquet files in shared storage but does not load them to the Engine memory.

By default, Incorta enables Performance Optimization for all physical schema objects, except for an alias to which this property does not apply. As a schema manager user, you can disable or enable this feature per physical schema object in the Table Editor. You can also enable and disable it for one or more objects in the Schema Manager.

Important

You should not disable Performance Optimization for Incorta Analyzer tables and Incorta SQL tables to make them available to the Analytics Service. Starting with release 5.1.2, you cannot disable Performance Optimization for Incorta Analyzer tables or Incorta SQL tables in the Table Editor.

When you disable performance optimization for an object, its data is evicted from the Analytics Service Engine memory. When you enable it again, you need to perform either a full load or load from staging for the object.

Load job statuses

A load job can be completed successfully or can fail. You can use the following tools to view the status of a load job:

  • Schema Manager: shows the status and time of the last load job, if any, for each physical schema
  • Schema Designer: shows either the time of the last successful load job or the status of the last failed load job for a specific physical schema
  • Load Job Details Viewer: contains the load statuses and states for each load job for a given physical schema and object

The following are the possible statuses of a load job:

StatusDescription
In QueueThe physical schema is still in-queue waiting for the Loader Service to start loading data.
RunningThe status of the load job during execution. A running load job has different states depending upon the stage or the phase of the loading process.
SucceededThe status of a load job that is completed successfully
Finished With ErrorsThe status of a load job that is completed but with some errors in any stage or phase in the load process
FailedThe status of a load job that failed completely for reasons such as server error or network error
InterruptedThe status of a load job that is interrupted (stopped or canceled) by the user or for any other reason
AbortedThe status of a load job that is automatically forced to stop after exceeding the configured interruption timeout.
For more details, review Aborted load jobs.

Enhancements to reduce deduplication resource overhead

The 5.1.2 release introduces enhancements to reduce the overhead of the PK index calculation, which leads to reducing memory and disk requirements, enhancing performance, and improving CPU utilization.

The new implementation for deduplicating records in Incorta's memory includes the following:

  • Scalable PK index calculation during incremental load jobs
  • Support skipping PK index calculation during full load jobs
Important

These enhancements require enabling the scalable PK index calculation at the engine level. This is the default configuration of an Incorta cluster.

Scalable PK index calculation

This release supports the scalability of the PK index calculation process, especially during an incremental load job. The implemented enhancements are as follows:

  • Parallel and scalable handling of parquet files during an incremental load job to enhance CPU utilization
  • Using temporary files when comparing existing and new records to reduce the required disk space
  • Loading only new parquet files into the Incorta engine memory instead of loading new and old files to reduce memory overhead

Skip or enforce PK constraint

A user who belongs to a group with the Schema Manager role often defines one or more key columns for a physical schema entity object such as a physical schema table or materialized view. The table data source for this object usually contains unique values for the key column or at least unique records.

In the 5.1.2 release, you have the option to either enforce the calculation of the primary key at the object level or skip this calculation to optimize data load time and performance. This scenario applies to full load jobs only. In incremental load jobs, the Loader Service must compare existing and new data to avoid data duplication when key columns are defined.

When the physical schema table or materialized view has at least one key column, the Table Editor for this object shows the Enforce Primary Key Constraint option to enforce or skip the PK index calculation.

In the case of enabling the Enforce Primary Key Constraint option, the Loader Service evaluates and checks each value in the key column(s) and enforces record uniqueness. This process requires heavy resource utilization, especially for large datasets.

If you disable this option for an object, the Loader Service loads the source data without checking record uniqueness. This helps in enhancing the load time and performance.

Note

In releases before 6.0.3, when the Enforce Primary Key Constraint option was disabled for physical tables or MVs, and the selected key columns resulted in duplicate key values, unique index calculations would not fail, the first matching value was returned whenever a single value of the key columns was required.

Starting with release 6.0.3, in such a case, the unique index calculation will fail, and the load job will finish with errors. You must either select key columns that ensure row uniqueness and perform a full load or enable the Enforce Primary Key Constraint option and load tables from staging to have the unique index correctly calculated.

Enhancements to the compaction process

In releases before 5.2, a compaction job resulted in both rewriting a compacted version of each parquet file that has duplicates and copying other extracted parquet files. Copied and rewritten parquet files were saved to the compacted directory under the tenant directory. The compacted directory might have multiple versions of compacted files of the same object. Consumers of compacted parquet files (MVs, SQLi on Spark port, internal and external Notebook services, and the Preview data function) were directed to the latest committed compacted version of the parquet files in the compacted directory to read data from.

The 5.2 release introduces a new compaction mechanism to reduce the I/O operations during a load job and save disk space. The new mechanism includes the following:

  • Only Parquet files that have duplicates are rewritten to create compacted segments.
  • Compacted segments are saved per compaction job under a new directory in the source area under the object directory: the _rewritten directory.
  • Extracted Parquet files with no duplicates are no longer copied to the compacted segments directory (_rewritten).
  • Different versions of the compacted parquet files can exist under the _rewritten directory.
  • A group of metadata files is generated per compaction job in Delta Lake file formats to point to all extracted or rewritten parquet files that constitute a compacted version. These metadata files are saved to a new directory, _delta_log, that exists also in the source area under the object directory.
  • Consumers of compacted parquet files will use the latest <CompactedVersionID>.checkpoint.parquet metadata file to find out which parquet file versions, whether extracted or compacted, to read data from.
  • The Cleanup job deletes the unused compacted parquet versions after checking the respective <CompactedVersionID>.checkpoint.parquet metadata file.
  • If the compaction job fails, the Loader Service will not commit the new compacted version to the metadata database. It will also hide the related compacted directory in the _rewritten directory and delete the metadata files or otherwise hide them.
Important

If the Table Editor → Enforce Primary Key Constraint option is disabled for an object, the Loader Service skips the compaction calculation during a full load job for this object.

There is no change in the behavior when the Cluster Management Console (CMC) → Tenant Configurations → Data Loading → Enable Always Compact option is disabled. In this case, no compaction occurs as a result of a load job. However, when an MV references an object that does not have compacted parquet files, a compaction job runs to create the required compacted segments.

Spark Requirements

  • To read the Delta Lake metadata files, Spark can use either its native Delta Lake Reader (Default) or the Incorta Custom Delta Lake Reader. An option in the CMC enables using the Incorta Custom Delta Lake Reader. For more information, refer to Guides → Configure Server → Spark Integration.
    • To use the Incorta Custom Delta Lake Reader, Spark must include the incorta.delta-1.0.jar file.
    • To use its Delta Lake Reader, Spark must include the suitable delta-core JAR file.
    • The JAR files compatible with your Spark version are added automatically to your cluster during the upgrade process.
  • After upgrading to 5.2 or later, consumers will continue to read from the old compacted directory of an object until the first full or incremental load job (loading from source) on this object or the physical schema that requires compaction. The new folder structure will be created gradually, that is, it will be created separately for each object after the first full or incremental load job that requires compaction.

Additional Considerations

Pre-SQL support

With the introduction of the 5.2 release, some connectors (MySQL and Oracle) support executing SQL statements and calling stored procedures before executing the original extraction query or incremental query during a load job. If the Pre-SQL statement fails during a load job, the object data extraction and loading fail.

Aborted load jobs

In previous releases and in some cases, when a user tries to stop a load job that is in the extraction phase, this load job does not completely stop and it becomes stuck in the interruption process. This prevents users from starting a new load job of the related physical schema and causes load jobs of other physical schemas to remain in queue if the Schema Pool Size or Maximum Concurrent Load Jobs is exceeded.

Now, Incorta automatically aborts any load job that is interrupted in the extraction phase and exceeds a configured interruption time interval. This removes the stuck physical schema from the Schema Pool, which allows users to start a new load job of the related physical schema and allows loading other physical schemas as well. In such a case, the status of the load job will be Aborted instead of Interrupted.

The default time interval to automatically abort stuck interrupted load jobs is 60 minutes. You can ask the Incorta Support team to edit the service.properties file that exists in the directory of each Loader Service and define another time interval in minutes. The following key controls this time interval: abort.interrupted.job.wait.time.

Global variable support in MVs

Starting the 5.2 release, you can reference global variables in the MV script or incremental script using any supported language. Global variables referenced in MVs are evaluated when validating the MV data source and when loading them. Thus, when you edit the value of a global variable, you must perform a full load of the related MVs.

Concurrent schema update and load jobs

Incorta does not recommend running concurrent schema model update jobs and load jobs on the same schema or dependent schemas as this may result in errors or inaccurate data.