References → Data Ingestion and Loading
About data ingestion
To allow Incorta to analyze your data using its own or integrated tools, it needs to have access to your data. It also needs to extract data, persist it to its shared storage, and load it into its engine memory to allow its Analytics Service to access this data.
Data ingestion in Incorta starts with creating a data source that defines the properties of the connection with your data provider, then creating a physical schema object to comprise data from this data source, and finally loading and persisting data.
The following figure summarizes the process of data ingestion in Incorta:

Data sources
Incorta can connect to external data as well as local data. External data can be a database, one or more files on a cloud storage service, or data in Software as a Service (SaaS) applications. Local data is one or more files uploaded to the shared storage of an Incorta cluster. Using the available connectors, you can create different data sources that define how Incorta can access your data. A data source defines the properties of the connection between Incorta and your data provider. For a list of the connectors available in Incorta, refer to Connectors → All.
Physical schemas and data discovery
After using suitable connectors to define data sources, you must create physical schema objects to encompass your data or to reference it. During the physical schema object creation, the Loader Service discovers your data and defines the object columns and their properties accordingly.
A physical schema object can be one of the following:
- A physical schema table
- A materialized view (MV)
- An Incorta Analyzer table
- An Incorta SQL table
- An alias
Only a physical schema table directly references one or more data sources to get data. Other objects in a physical schema reference one or more physical schema tables or other objects to get data. An alias only refers to the data of another existing physical schema object.
Data Loading
Incorta can analyze only data available to its Analytics Service. Thus, after defining data sources and creating physical schema tables that reference data in these data sources, you must load your data to Incorta shared storage and to its Engine memory to start analyzing it. Incorta stores the extracted data as Apache Parquet files in shared storage (staging) and creates Direct Data Mapping (DDM) files for objects according to the object type and configurations.
Encrypted columns are available only as parquet files to save the hard disk space and reduce the Loader Service memory requirements.
Load types
Incorta supports multiple types of loads:
- Physical schema load, object load, and multi-schema load
- On-demand and scheduled load
- Full load, incremental load, and load from staging on-prem
On-Prem
Physical Schema load and object load
A load job can be for a single physical schema object, all applicable objects in a physical schema, or multiple schemas. As a schema manager user, you can use the Schema Designer to start a schema or object load job. You cannot start a load job for an alias as it is merely a reference to data of another existing physical schema object. Additionally, the Loader Service will skip aliases during a physical schema load job.
You can also schedule load plans that involve loading multiple physical schemas in parallel or sequentially via separate load groups. Schemas can be loaded sequentially by ordering them by group. Within each group, schemas will load simultaneously (as resources allow). The Loader Service loads schemas within the first group simultaneously as resources allow, then starts loading the next group, and so on. The Loader Service executes each group as a separate load job.
A physical schema load job can be on-demand or scheduled, and incremental, full, or from staging. For a physical schema object, a load job can only be on-demand, full or from staging. Executing a load plan can be on-demand or scheduled.
Some Data Lake connectors, such as Amazon Web Services (AWS) S3, support remote tables that you can use to access your data without the need to load it to Incorta shared storage or engine memory. The Loader Service does not extract or load data for a remote table. Thus, the Analytics Service cannot query this data unless you create an MV that references this remote table and load the MV data into Incorta. In addition, external visualization tools that Incorta integrates with, such as Power BI and Tableau, can access this remote data through the Incorta SQL Interface (SQLi).
On-demand and scheduled loads
You can start a load job on demand for the physical schema or a specific physical schema object. You can also schedule the loading of data for the whole physical schema. While you use the Schema Designer to start on-demand load jobs, you use either the Schema Manager or the Scheduler to schedule physical schema load jobs.
For load plans, when you create a load plan, you can set a schedule to execute it or save it unscheduled. Regardless of the status of the load plan, you can use the Scheduler to execute it if you own or have Edit access to all its schemas.
There are also public API endpoints to manually load one or more physical schemas or manually execute a load plan.
Full load, incremental load, and load from staging
Depending on the data source of your data, the connector you use to access it, and the table data source properties, you can perform different types of load jobs. These types of loading strategies are different in behavior and output.
Full load
You can perform a full load for a physical schema or an object. By default, the Loader Service performs a full load for all physical schema tables and MVs. However, for physical schema tables and MVs that have incremental load enabled and full load disabled, the Loader Service skips them during full load jobs.
Typically, a schema developer performs a full load of an object at least once before enabling the Disable Full Load property.
During a full load, the following occurs:
- The Loader Service extracts data from the data source for each physical schema table or the single specified table according to the table data source properties. 
- The Loader Service creates new source parquet files in the - sourcedirectory. The Loader Service creates a new parquet version directory with an offset subdirectory to save these files.- When the Table Editor → Enforce Primary Key Constraint is enabled for an object, primary key index calculations (deduplication) start to mark duplicate records that must be deleted to ensure that only unique data records exist.
- A compaction job starts to remove duplicate rows and create a compacted version of the object parquet files in the object’s _rewrittendirectory in thesourcearea. The following are the consumers of compacted parquet files: MVs, SQLi queries on Spark port, internal and external Notebook services, and Preview data function.
- When the Enforce Primary Key Constraint property is disabled for an object, both the deduplication and compaction calculations for this object are skipped.
- At the end of the compaction job, a group of metadata files is generated in Delta Lake file formats to point to all parquet files (whether extracted or rewritten) that constitute a compacted version. Consumers of the compacted parquet will use the Delta Lake metadata files to find out which extracted or compacted parquet file versions to read data from.
 
- For an MV, the Loader Service passes the query of the MV Script to Spark. Spark reads data from the parquet files of the underlying physical schema objects and creates new parquet files for the MV in a new parquet version directory in the source directory. A compacted version of the MV parquet files is also created in the object’s - _rewrittendirectory.Note- Spark reads the MV data from the compacted parquet files of the underlying object in the case that the underlying object is a physical schema table or another MV. However, when an MV references columns from Incorta SQL tables or Incorta Analyzer tables in other physical schemas, Spark will read data from the source parquet files of these Incorta tables as they do not have a compacted version. - For each of the Incorta Analyzer or SQL tables, a - _delta_logdirectory exists in the object directory to include a group of metadata files that compacted parquet consumers (such as Spark) use to find out the parquet files of each object version to read from.
- For Incorta Analyzer tables and Incorta SQL tables, the Loader Service creates full parquet files in the - sourcedirectory. With the re-introduction of the derived tables' support for key columns, the Loader Service creates snapshot DDM files for the unique index each time the derived table's key columns are updated or the schema or table is loaded.
- For physical schema tables and MVs with performance optimization enabled, the Loader Service loads data to the Engine memory. The Engine then calculates any formula columns, key columns, or load filters for each object and creates snapshot DDM files. These files are saved to the - schemasdirectory that exists in the- ddmdirectory.
- In the case that there is a join relationship where one of the physical schema objects is the child table, the Engine creates a new version of the join DDM files and saves them to the - joinsdirectory that exists in the- ddmdirectory.
Incremental load
You can start an incremental load only for a physical schema. However, the Loader Service incrementally loads only physical schema tables and MVs that have incremental load enabled. For physical schema objects with incremental load disabled or that do not support incremental loads, such as Incorta Analyzer and SQL tables, the Loader Service performs a full load.
During an incremental load, the same behavior of the Loader Service for a full load occurs except for the following:
- In the case of a physical schema table, the Loader Service extracts data from the data source according to the update query or the update file rather than the original query or file. As for an MV, the Loader Service passes the MV incremental script, not the original script, to Spark.
- In the case of both physical schema tables and MVs, the Loader Service creates only a subdirectory for the new parquet files under the object’s latest parquet version directory.
- An incremental load job for an object that has one or more key columns includes a deduplication process to mark duplicate rows regardless of the state of the Enforce Primary Key Constraint option.
Load from staging
Load from staging is the same as a full load but without the extraction and transformation phases. The Loader Service loads the latest existing parquet files for the respective object to the Engine memory, whether these files are loaded before (committed) or not (uncommitted). The Engine then calculates any joins, formula columns, key columns, or load filters and creates a new version of the DDM files.
Load job phases
With Incorta's next-generation loader, a new architecture is introduced, resulting in changes in the load job execution. The new architecture effectuates the plan-based execution of load jobs by introducing a new Planning phase. During this phase, the Loader Service detects the dependencies among the objects in the load job, along with user-defined load order, and creates a plan for loading objects accordingly. In addition, the Loader Service concurrently executes the extraction and transformation of objects in the load job in the same phase: the Running phase. The new architecture optimizes the Post-load calculations so that the Loader Service concurrently executes different calculation types.
A load job goes through multiple phases that vary according to the object type, configurations, and load job type. The following are the major load phases:
- Planning: The Loader Service detects the dependencies among the physical schema objects in the load plan, checks manually defined load order groups, and accordingly creates a directed acyclic graph (DAG) plan to determine the object loading steps and order. You can preview the DAG of a load plan using the Load Plan DAG Viewer.
- Running: The Loader Service concurrently executes the extraction and transformation (enrichment) of tables and materialized views (MVs) respectively, based on the DAG plan. Combining the extraction and transformation processes in one phase eliminates unnecessary waiting time. If the physical schema has manually defined load order where all tables are in a group and all MVs are in a different one, the Loader Service won’t execute the extraction and transformation in parallel.
- Post-load: After completing the extraction and transformation of all tables and MVs in the load plan, the Loader Service performs Post-load calculations, such as formula columns, joins, and derived tables (Incorta SQL and Analyzer tables). The Loader Service no longer performs these calculations sequentially depending on the calculation type. Instead, it executes these calculations concurrently or sequentially according to their interdependency detected during the Planning phase. This phase now involves the Loading phase. For each calculation, the Loader Service loads into the engine memory only the columns that the service requires, completes the calculation, and then creates the required DDM files. The Loader Service then marks the unneeded columns so that they can be evicted from the memory when needed, which reduces the memory needed during this phase.
It’s important to note that using both the MVs' user-defined load order and automatically detected dependencies may result in an execution plan with cyclic dependencies, leading to load job failures. To avoid such failures, it is recommended to delete the MVs' user-defined load order.
Load job processes and tasks in detail
Extraction
Extraction is the first task in a full and incremental load job of a physical schema table. Other objects do not go through this process. During extraction,
- The Loader Service extracts records from data sources and writes them into parquet files. No extraction happens during a load from staging, and the Loader Service loads existing parquet files.
- The Loader Service can extract multiple tables in parallel, depending on object dependencies and the availability of threads in the table thread pool. If a physical schema table contains multiple data sets, the Loader Service can also extract these data sets in parallel. Furthermore, if you configure a data set to use chunking (parallel extraction), the Loader Service also extracts this data set in multiple threads.
- At the end of the extraction phase, a compaction job starts to create a compacted version of the extracted parquet files with no duplicate records.
Enrichment (Transformation)
This is the first task in a full or incremental load for an MV or SQLi table. The Loader Service submits queries to Spark to process these queries and waits for them to finish. Spark then creates new parquet files in shared storage for these objects. At the end of the transformation task, a compaction job starts to create a compacted version of parquet files with no duplicate records.
For MVs that reference physical schema tables or other MVs, Spark reads from the compacted parquet files. If a compaction job is running for an object that an MV references, the Loader Service waits until the compaction is finished before sending queries to Spark.
For MVs that reference Incorta Analyzer tables or Incorta SQL tables, Spark reads data from the original parquet files after checking the .checkpoint.parquet metadata file of the respective version.
Load and Post-Load
- For physical schema tables and MVs that have performance optimization enabled, the Engine loads data from the source parquet files into its memory.
- After loading, the Post-load process starts, and in the case that a loaded object has one or more key columns, formula columns, load filters, or joins (as a child table), the Engine calculates them and writes snapshot DDM files.
- During the Post-load process, the Engine also calculates Incorta Analyzer tables and Incorta SQL tables, and then the Loader Service writes full parquet files for these tables, in addition to the unique index DDM files if these tables have key columns.
As the Loader Service calculates Incorta Analyzer tables and Incorta SQL tables during the Load and Post-Load phase and Spark processes MVs during the Transformation phase, an MV cannot reference an Incorta Analyzer table or Incorta SQL table that exists in the same physical schema as the MV.
Metadata database commit and synchronization
After loading data to Incorta shared storage and Engine memory, the Loader Service updates the Incorta metadata database with the load job information, including the latest available versions of parquet, DDM, and compacted files, data size, and commit time for each object in the load job. The Loader Service then sends a synchronization signal to the Analytics Service and other follower Loader Services so that they read from these latest versions.
Performance optimization
A schema manager user can enable performance optimization for the physical schema objects to make their data available to the Analytics Service. Enabling performance optimization instructs Incorta to load the object files to the Analytics Service Engine memory. However, enabling this feature may affect the load job time. For physical schema objects with performance optimization disabled, the Loader Service only creates parquet files in shared storage but does not load them to the Engine memory.
By default, Incorta enables Performance Optimization for all physical schema objects, except for an alias to which this property does not apply. As a schema manager user, you can disable or enable this feature per physical schema object in the Table Editor. You can also enable and disable it for one or more objects in the Schema Manager.
You should not disable Performance Optimization for Incorta Analyzer tables and Incorta SQL tables to make them available to the Analytics Service. You cannot disable Performance Optimization for Incorta Analyzer tables or Incorta SQL tables in the Table Editor.
When you disable performance optimization for an object, its data is evicted from the Analytics Service Engine memory. When you enable it again, you need to perform either a full load or load from staging for the object.
Load job statuses
You can use the following tools to view the status of a load job:
- Schema Manager: shows the status and time of the last load job, if any, for each physical schema
- Schema Designer: shows either the time of the last successful load job or the status of the last failed load job for a specific physical schema
- Load Job Details Viewer: shows the load statuses and states for each load job for a given physical schema and object. It can also show the status of load plans related to this physical schema.
The following are the possible statuses of a load job:
| Status | Description | 
|---|---|
| In Queue | The physical schema is still in-queue waiting for the Loader Service to start loading data. | 
| Running | The status of the load job during execution. A running load job has different states depending upon the stage or the phase of the loading process. | 
| Succeeded | The status of a load job that is completed successfully | 
| Finished With Errors | The status of a load job that is completed but with some errors in any stage or phase in the load process | 
| Failed | The status of a load job that failed completely for reasons such as server error or network error | 
| Interrupted | The status of a load job that is interrupted (stopped or canceled) by the user or for any other reason | 
| Aborted | The status of a load job that is automatically forced to stop after exceeding the configured interruption timeout. For more details, review Aborted load jobs. | 
Deduplication resource requirements
The current implementation for deduplicating records in Incorta's memory aims at reducing the overhead of the PK index calculation, reducing memory and disk requirements, enhancing performance, and improving CPU utilization.
This implementation includes the following:
- Scalable PK index calculation during incremental load jobs
- Support skipping PK index calculation during full load jobs
These enhancements require enabling the scalable PK index calculation at the engine level. This is the default configuration of an Incorta cluster.
Scalable PK index calculation
Incorta supports the scalability of the PK index calculation process, especially during an incremental load job.
- Parallel and scalable handling of parquet files during an incremental load job to enhance CPU utilization
- Using temporary files when comparing existing and new records to reduce the required disk space
- Loading only new parquet files into the Incorta engine memory instead of loading new and old files to reduce memory overhead
Skip or enforce PK constraint
A user who belongs to a group with the Schema Manager role often defines one or more key columns for a physical schema entity object such as a physical schema table or materialized view. The table data source for this object usually contains unique values for the key column or at least unique records.
You have the option to either enforce the calculation of the primary key at the object level or skip this calculation to optimize data load time and performance. This scenario applies to full load jobs only. In incremental load jobs, the Loader Service must compare existing and new data to avoid data duplication when key columns are defined.
When the physical schema table or materialized view has at least one key column, the Table Editor for this object shows the Enforce Primary Key Constraint option to enforce or skip the PK index calculation.
In the case of enabling the Enforce Primary Key Constraint option, the Loader Service evaluates and checks each value in the key column(s) and enforces record uniqueness. This process requires heavy resource utilization, especially for large datasets.
If you disable this option for an object, the Loader Service loads the source data without checking record uniqueness. This helps in enhancing the load time and performance.
In previous releases, when the Enforce Primary Key Constraint option was disabled for a data source that does not enforce the primary key constraint, such as CSV files; or primary keys of a table are not properly set; unique index calculations would not fail.
Now, in such a case, the unique index calculation will fail, and the load job will finish with errors. You must do one of the following to have the unique index correctly calculated for physical tables or MVs:
- Enable the Enforce Primary Key Constraint option and fully load tables from staging.
- Select right key columns that ensure row uniqueness and fully load tables.
In the case of derived tables, select the right key columns that ensure row uniqueness, and the unique index will be correctly calculated during the schema update job.
The compaction process
Consumers of compacted parquet files (MVs, SQLi on Spark port, internal and external Notebook services, and the Preview data function) The current compaction mechanism includes the following:
- Only Parquet files that have duplicates are rewritten to create compacted segments.
- Compacted segments are saved per compaction job under a new directory in the sourcearea under the object directory: the_rewrittendirectory.
- Extracted Parquet files with no duplicates are no longer copied to the compacted segments directory (_rewritten).
- Different versions of the compacted parquet files can exist under the _rewrittendirectory.
- A group of metadata files is generated per compaction job in Delta Lake file formats to point to all extracted or rewritten parquet files that constitute a compacted version. These metadata files are saved to a new directory, _delta_log, that exists also in thesourcearea under the object directory.
- Consumers of compacted parquet files (MVs, SQLi on Spark port, internal and external Notebook services, and the Preview data function) will use the latest <CompactedVersionID>.checkpoint.parquetmetadata file to find out which parquet file versions, whether extracted or compacted, to read data from.
- The Cleanup job deletes the unused compacted parquet versions after checking the respective <CompactedVersionID>.checkpoint.parquetmetadata file.
- If the compaction job fails, the Loader Service will not commit the new compacted version to the metadata database. It will also hide the related compacted directory in the _rewrittendirectory and delete the metadata files or otherwise hide them.
If the Table Editor → Enforce Primary Key Constraint option is disabled for an object, the Loader Service skips the compaction calculation during a full load job for this object.
Spark Requirements
- To read the Delta Lake metadata files, Spark can use either its native Delta Lake Reader (Default) or the Incorta Custom Delta Lake Reader. An option in the CMC enables using the Incorta Custom Delta Lake Reader. For more information, refer to Guides → Configure Server → Spark Integration.- To use the Incorta Custom Delta Lake Reader, Spark must include the incorta.delta-1.0.jarfile.
- To use its Delta Lake Reader, Spark must include the suitable delta-core JAR file.- Spark v3 is compatible with the delta-core_2.12-0.7.0.jar file.
- Spark v2 is compatible with the delta-core_2.12-0.6.0.jar file.
 
- The JAR files compatible with your Spark version are added automatically to your cluster during the upgrade process.
 
- To use the Incorta Custom Delta Lake Reader, Spark must include the 
- Consumers will continue to read from the old compacteddirectory of an object until the first full or incremental load job (loading from source) on this object or the physical schema that requires compaction. The new folder structure will be created gradually, that is, it will be created separately for each object after the first full or incremental load job that requires compaction.
Additional Considerations
Pre-SQL support
Some connectors, such as MySQL and Oracle, support executing SQL statements and calling stored procedures before executing the original extraction query or incremental query during a load job. If the Pre-SQL statement fails during a load job, the object data extraction and loading fail.
Aborted load jobs
Incorta automatically aborts any load job that is interrupted in the extraction phase and exceeds a configured interruption time interval. This removes the stuck physical schema from the Schema Pool, which allows users to start a new load job of the related physical schema and allows loading other physical schemas as well. In such a case, the status of the load job will be Aborted instead of Interrupted.
The default time interval to automatically abort stuck interrupted load jobs is 60 minutes. You can ask the Incorta Support team to edit the service.properties file that exists in the directory of each Loader Service and define another time interval in minutes. The following key controls this time interval: abort.interrupted.job.wait.time.
Global variable support in MVs
You can reference global variables in the MV script or incremental script using any supported language. Global variables referenced in MVs are evaluated when validating the MV data source and when loading them. Thus, when you edit the value of a global variable, you must perform a full load of the related MVs.
Concurrent schema update and load jobs
Incorta does not recommend running concurrent schema model update jobs and load jobs on the same schema or dependent schemas as this may result in errors or inaccurate data.
Multi-loader support
Both Incorta Cloud and On-Premises can support a multi-loader architecture, which allows multiple Loader Services to manage the acquisition and ingestion of data within the same cluster. This setup ensures that all loader services have access to the same schema model and file versions, maintaining consistency throughout the process. To determine which loader service should execute a load job, Zookeeper considers various factors, such as the number of running load jobs and the maximum number of concurrent load jobs a loader service can handle simultaneously.
However, this architecture doesn’t support schema distribution for multi-schema load jobs. Therefore, avoid schema distribution if you have multi-schema load jobs and vice versa.