Delta Lake
Important Capabilities
| Capability | Status | Notes | 
|---|---|---|
| Detect Deleted Entities | ✅ | Optionally enabled via stateful_ingestion.remove_stale_metadata | 
| Extract Tags | ✅ | Can extract S3 object/bucket and Azure blob/container tags if enabled | 
This plugin extracts:
- Column types and schema associated with each delta table
- Custom properties: number_of_files, partition_columns, table_creation_time, location, version etc.
If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs.
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
  type: delta-lake
  config:
    env: "PROD"
    platform_instance: "my-delta-lake"
    base_path: "/path/to/data/folder"
sink:
  # sink configs
Config Details
- Options
- Schema
Note that a . is used to denote nested fields in the YAML recipe.
| Field | Description | 
|---|---|
| base_path ✅ string | Path to table (s3 or local file system). If path is not a delta table path then all subfolders will be scanned to detect and ingest delta tables. | 
| platform Enum | One of: "delta-lake" Default: delta-lake | 
| platform_instance string | The instance of the platform that all assets produced by this recipe belong to | 
| relative_path string | If set, delta-tables will be searched at location '<base_path>/<relative_path>' and URNs will be created using relative_path only. | 
| require_files boolean | Whether DeltaTable should track files. Consider setting this to Falsefor large delta tables, resulting in significant memory reduction for ingestion process.When set toFalse, number_of_files in delta table can not be reported.Default: True | 
| version_history_lookback integer | Number of previous version histories to be ingested. Defaults to 1. If set to -1 all version history will be ingested. Default: 1 | 
| env string | The environment that all assets produced by this connector belong to Default: PROD | 
| azure Azure | Azure specific configuration | 
| azure.use_abs_blob_properties boolean | Whether or not to create properties in datahub from the Azure blob Default: False | 
| azure.use_abs_blob_tags boolean | Whether or not to create tags in datahub from Azure blob tags Default: False | 
| azure.use_abs_container_properties boolean | Whether or not to create properties in datahub from the Azure blob container Default: False | 
| azure.azure_config AzureConnectionConfig | Azure configuration | 
| azure.azure_config.account_key string | Azure storage account access key. | 
| azure.azure_config.account_name string | Name of the Azure storage account. See Microsoft official documentation on how to create a storage account. | 
| azure.azure_config.base_path string | Base folder in hierarchical namespaces to start from. Default: / | 
| azure.azure_config.client_id string | Azure client (Application) ID for service principal auth. | 
| azure.azure_config.client_secret string | Azure client secret for service principal auth. | 
| azure.azure_config.container_name string | Azure storage account container name. | 
| azure.azure_config.sas_token string | Azure storage account SAS token. | 
| azure.azure_config.tenant_id string | Azure tenant ID required for service principal auth. | 
| azure.azure_config.use_cli_auth boolean | Whether to authenticate using the Azure CLI. Default: False | 
| azure.azure_config.use_managed_identity boolean | Whether to use Azure Managed Identity authentication. Default: False | 
| s3 S3 | S3 specific configuration | 
| s3.use_s3_bucket_tags boolean | Whether or not to create tags in datahub from the s3 bucket Default: False | 
| s3.use_s3_object_tags boolean | # Whether or not to create tags in datahub from the s3 object Default: False | 
| s3.aws_config AwsConnectionConfig | AWS configuration | 
| s3.aws_config.aws_access_key_id string | AWS access key ID. Can be auto-detected, see the AWS boto3 docs for details. | 
| s3.aws_config.aws_advanced_config object | Advanced AWS configuration options. These are passed directly to botocore.config.Config. | 
| s3.aws_config.aws_endpoint_url string | The AWS service endpoint. This is normally constructed automatically, but can be overridden here. | 
| s3.aws_config.aws_profile string | The named profile to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config. | 
| s3.aws_config.aws_proxy map(str,string) | |
| s3.aws_config.aws_region string | AWS region code. | 
| s3.aws_config.aws_retry_mode Enum | One of: "legacy", "standard", "adaptive" Default: standard | 
| s3.aws_config.aws_retry_num integer | Number of times to retry failed AWS requests. See the botocore.retry docs for details. Default: 5 | 
| s3.aws_config.aws_secret_access_key string | AWS secret access key. Can be auto-detected, see the AWS boto3 docs for details. | 
| s3.aws_config.aws_session_token string | AWS session token. Can be auto-detected, see the AWS boto3 docs for details. | 
| s3.aws_config.read_timeout number | The timeout for reading from the connection (in seconds). Default: 60 | 
| s3.aws_config.aws_role One of string, array | AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as boto3's STS.Client.assume_role. | 
| s3.aws_config.aws_role.union One of string, AwsAssumeRoleConfig | |
| s3.aws_config.aws_role.union.RoleArn ❓ string | ARN of the role to assume. | 
| s3.aws_config.aws_role.union.ExternalId string | External ID to use when assuming the role. | 
| table_pattern AllowDenyPattern | regex patterns for tables to filter in ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | 
| table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True | 
| table_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] | 
| table_pattern.allow.string string | |
| table_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] | 
| table_pattern.deny.string string | |
| stateful_ingestion StatefulIngestionConfig | Stateful Ingestion Config | 
| stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_apiis specified, otherwise FalseDefault: False | 
The JSONSchema for this configuration is inlined below.
{
  "title": "DeltaLakeSourceConfig",
  "description": "Any source that connects to a platform should inherit this class",
  "type": "object",
  "properties": {
    "stateful_ingestion": {
      "title": "Stateful Ingestion",
      "description": "Stateful Ingestion Config",
      "allOf": [
        {
          "$ref": "#/definitions/StatefulIngestionConfig"
        }
      ]
    },
    "env": {
      "title": "Env",
      "description": "The environment that all assets produced by this connector belong to",
      "default": "PROD",
      "type": "string"
    },
    "platform_instance": {
      "title": "Platform Instance",
      "description": "The instance of the platform that all assets produced by this recipe belong to",
      "type": "string"
    },
    "base_path": {
      "title": "Base Path",
      "description": "Path to table (s3 or local file system). If path is not a delta table path then all subfolders will be scanned to detect and ingest delta tables.",
      "type": "string"
    },
    "relative_path": {
      "title": "Relative Path",
      "description": "If set, delta-tables will be searched at location '<base_path>/<relative_path>' and URNs will be created using relative_path only.",
      "type": "string"
    },
    "platform": {
      "title": "Platform",
      "description": "The platform that this source connects to",
      "default": "delta-lake",
      "enum": [
        "delta-lake"
      ],
      "type": "string"
    },
    "table_pattern": {
      "title": "Table Pattern",
      "description": "regex patterns for tables to filter in ingestion.",
      "default": {
        "allow": [
          ".*"
        ],
        "deny": [],
        "ignoreCase": true
      },
      "allOf": [
        {
          "$ref": "#/definitions/AllowDenyPattern"
        }
      ]
    },
    "version_history_lookback": {
      "title": "Version History Lookback",
      "description": "Number of previous version histories to be ingested. Defaults to 1. If set to -1 all version history will be ingested.",
      "default": 1,
      "type": "integer"
    },
    "require_files": {
      "title": "Require Files",
      "description": "Whether DeltaTable should track files. Consider setting this to `False` for large delta tables, resulting in significant memory reduction for ingestion process.When set to `False`, number_of_files in delta table can not be reported.",
      "default": true,
      "type": "boolean"
    },
    "s3": {
      "title": "S3",
      "description": "S3 specific configuration",
      "allOf": [
        {
          "$ref": "#/definitions/S3"
        }
      ]
    },
    "azure": {
      "title": "Azure",
      "description": "Azure specific configuration",
      "allOf": [
        {
          "$ref": "#/definitions/Azure"
        }
      ]
    }
  },
  "required": [
    "base_path"
  ],
  "additionalProperties": false,
  "definitions": {
    "DynamicTypedStateProviderConfig": {
      "title": "DynamicTypedStateProviderConfig",
      "type": "object",
      "properties": {
        "type": {
          "title": "Type",
          "description": "The type of the state provider to use. For DataHub use `datahub`",
          "type": "string"
        },
        "config": {
          "title": "Config",
          "description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
          "default": {},
          "type": "object"
        }
      },
      "required": [
        "type"
      ],
      "additionalProperties": false
    },
    "StatefulIngestionConfig": {
      "title": "StatefulIngestionConfig",
      "description": "Basic Stateful Ingestion Specific Configuration for any source.",
      "type": "object",
      "properties": {
        "enabled": {
          "title": "Enabled",
          "description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
          "default": false,
          "type": "boolean"
        }
      },
      "additionalProperties": false
    },
    "AllowDenyPattern": {
      "title": "AllowDenyPattern",
      "description": "A class to store allow deny regexes",
      "type": "object",
      "properties": {
        "allow": {
          "title": "Allow",
          "description": "List of regex patterns to include in ingestion",
          "default": [
            ".*"
          ],
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "deny": {
          "title": "Deny",
          "description": "List of regex patterns to exclude from ingestion.",
          "default": [],
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "ignoreCase": {
          "title": "Ignorecase",
          "description": "Whether to ignore case sensitivity during pattern matching.",
          "default": true,
          "type": "boolean"
        }
      },
      "additionalProperties": false
    },
    "AwsAssumeRoleConfig": {
      "title": "AwsAssumeRoleConfig",
      "type": "object",
      "properties": {
        "RoleArn": {
          "title": "Rolearn",
          "description": "ARN of the role to assume.",
          "type": "string"
        },
        "ExternalId": {
          "title": "Externalid",
          "description": "External ID to use when assuming the role.",
          "type": "string"
        }
      },
      "required": [
        "RoleArn"
      ]
    },
    "AwsConnectionConfig": {
      "title": "AwsConnectionConfig",
      "description": "Common AWS credentials config.\n\nCurrently used by:\n    - Glue source\n    - SageMaker source\n    - dbt source",
      "type": "object",
      "properties": {
        "aws_access_key_id": {
          "title": "Aws Access Key Id",
          "description": "AWS access key ID. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
          "type": "string"
        },
        "aws_secret_access_key": {
          "title": "Aws Secret Access Key",
          "description": "AWS secret access key. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
          "type": "string"
        },
        "aws_session_token": {
          "title": "Aws Session Token",
          "description": "AWS session token. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
          "type": "string"
        },
        "aws_role": {
          "title": "Aws Role",
          "description": "AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as [boto3's STS.Client.assume_role](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_role).",
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "array",
              "items": {
                "anyOf": [
                  {
                    "type": "string"
                  },
                  {
                    "$ref": "#/definitions/AwsAssumeRoleConfig"
                  }
                ]
              }
            }
          ]
        },
        "aws_profile": {
          "title": "Aws Profile",
          "description": "The [named profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config.",
          "type": "string"
        },
        "aws_region": {
          "title": "Aws Region",
          "description": "AWS region code.",
          "type": "string"
        },
        "aws_endpoint_url": {
          "title": "Aws Endpoint Url",
          "description": "The AWS service endpoint. This is normally [constructed automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html), but can be overridden here.",
          "type": "string"
        },
        "aws_proxy": {
          "title": "Aws Proxy",
          "description": "A set of proxy configs to use with AWS. See the [botocore.config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) docs for details.",
          "type": "object",
          "additionalProperties": {
            "type": "string"
          }
        },
        "aws_retry_num": {
          "title": "Aws Retry Num",
          "description": "Number of times to retry failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.",
          "default": 5,
          "type": "integer"
        },
        "aws_retry_mode": {
          "title": "Aws Retry Mode",
          "description": "Retry mode to use for failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.",
          "default": "standard",
          "enum": [
            "legacy",
            "standard",
            "adaptive"
          ],
          "type": "string"
        },
        "read_timeout": {
          "title": "Read Timeout",
          "description": "The timeout for reading from the connection (in seconds).",
          "default": 60,
          "type": "number"
        },
        "aws_advanced_config": {
          "title": "Aws Advanced Config",
          "description": "Advanced AWS configuration options. These are passed directly to [botocore.config.Config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html).",
          "type": "object"
        }
      },
      "additionalProperties": false
    },
    "S3": {
      "title": "S3",
      "type": "object",
      "properties": {
        "aws_config": {
          "title": "Aws Config",
          "description": "AWS configuration",
          "allOf": [
            {
              "$ref": "#/definitions/AwsConnectionConfig"
            }
          ]
        },
        "use_s3_bucket_tags": {
          "title": "Use S3 Bucket Tags",
          "description": "Whether or not to create tags in datahub from the s3 bucket",
          "default": false,
          "type": "boolean"
        },
        "use_s3_object_tags": {
          "title": "Use S3 Object Tags",
          "description": "# Whether or not to create tags in datahub from the s3 object",
          "default": false,
          "type": "boolean"
        }
      },
      "additionalProperties": false
    },
    "AzureConnectionConfig": {
      "title": "AzureConnectionConfig",
      "description": "Common Azure credentials config.\n\nhttps://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-directory-file-acl-python",
      "type": "object",
      "properties": {
        "base_path": {
          "title": "Base Path",
          "description": "Base folder in hierarchical namespaces to start from.",
          "default": "/",
          "type": "string"
        },
        "container_name": {
          "title": "Container Name",
          "description": "Azure storage account container name.",
          "type": "string"
        },
        "account_name": {
          "title": "Account Name",
          "description": "Name of the Azure storage account.  See [Microsoft official documentation on how to create a storage account.](https://docs.microsoft.com/en-us/azure/storage/blobs/create-data-lake-storage-account)",
          "type": "string"
        },
        "use_managed_identity": {
          "title": "Use Managed Identity",
          "description": "Whether to use Azure Managed Identity authentication.",
          "default": false,
          "type": "boolean"
        },
        "use_cli_auth": {
          "title": "Use Cli Auth",
          "description": "Whether to authenticate using the Azure CLI.",
          "default": false,
          "type": "boolean"
        },
        "account_key": {
          "title": "Account Key",
          "description": "Azure storage account access key.",
          "type": "string"
        },
        "sas_token": {
          "title": "Sas Token",
          "description": "Azure storage account SAS token.",
          "type": "string"
        },
        "client_id": {
          "title": "Client Id",
          "description": "Azure client (Application) ID for service principal auth.",
          "type": "string"
        },
        "client_secret": {
          "title": "Client Secret",
          "description": "Azure client secret for service principal auth.",
          "type": "string"
        },
        "tenant_id": {
          "title": "Tenant Id",
          "description": "Azure tenant ID required for service principal auth.",
          "type": "string"
        }
      },
      "additionalProperties": false
    },
    "Azure": {
      "title": "Azure",
      "description": "Azure-specific configuration for Delta Lake",
      "type": "object",
      "properties": {
        "azure_config": {
          "title": "Azure Config",
          "description": "Azure configuration",
          "allOf": [
            {
              "$ref": "#/definitions/AzureConnectionConfig"
            }
          ]
        },
        "use_abs_container_properties": {
          "title": "Use Abs Container Properties",
          "description": "Whether or not to create properties in datahub from the Azure blob container",
          "default": false,
          "type": "boolean"
        },
        "use_abs_blob_properties": {
          "title": "Use Abs Blob Properties",
          "description": "Whether or not to create properties in datahub from the Azure blob",
          "default": false,
          "type": "boolean"
        },
        "use_abs_blob_tags": {
          "title": "Use Abs Blob Tags",
          "description": "Whether or not to create tags in datahub from Azure blob tags",
          "default": false,
          "type": "boolean"
        }
      },
      "additionalProperties": false
    }
  }
}
Usage Guide
If you are new to Delta Lake and want to test out a simple integration with Delta Lake and DataHub, you can follow this guide.
Delta Table on Local File System
Step 1
Create a delta table using the sample PySpark code below if you don't have a delta table you can point to.
import uuid
import random
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
def generate_data():
    return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
    for d in range(1, 29)
    for m in range(1, 13)
    for y in range(2000, 2021)]
jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
    .appName("quickstart") \
    .master("local[*]") \
    .config("spark.jars.packages", ",".join(jar_packages)) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
table_path = "quickstart/my-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)
df = spark.read.format("delta").load(table_path)
df.show()
Step 2
Create a datahub ingestion yaml file (delta.dhub.yaml) to ingest metadata from the delta table you just created.
source:
  type: "delta-lake"
  config:
    base_path:  "quickstart/my-table"
    
sink:
  type: "datahub-rest"
  config:
    server: "http://localhost:8080"
Note: Make sure you run the Spark code as well as recipe from same folder otherwise use absolute paths.
Step 3
Execute the ingestion recipe:
datahub ingest -c delta.dhub.yaml
Delta Table on S3
Step 1
Set up your AWS credentials by creating an AWS credentials config file; typically in '$HOME/.aws/credentials'.
[my-creds]
aws_access_key_id: ######
aws_secret_access_key: ######
Step 2: Create a Delta Table using the PySpark sample code below unless you already have Delta Tables on your S3.
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from configparser import ConfigParser
import uuid
import random
def generate_data():
    return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
    for d in range(1, 29)
    for m in range(1, 13)
    for y in range(2000, 2021)]
jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
    .appName("quickstart") \
    .master("local[*]") \
    .config("spark.jars.packages", ",".join(jar_packages)) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
config_object = ConfigParser()
config_object.read("$HOME/.aws/credentials")
profile_info = config_object["my-creds"]
access_id = profile_info["aws_access_key_id"]
access_key = profile_info["aws_secret_access_key"]
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", access_id)
hadoop_conf.set("fs.s3a.secret.key", access_key)
table_path = "s3a://my-bucket/my-folder/sales-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)
df = spark.read.format("delta").load(table_path)
df.show()
Step 3
Create a datahub ingestion yaml file (delta.s3.dhub.yaml) to ingest metadata from the delta table you just created.
source:
  type: "delta-lake"
  config:
    base_path:  "s3://my-bucket/my-folder/sales-table"
    s3:
      aws_config:
        aws_access_key_id: <<Access key>>
        aws_secret_access_key: <<secret key>>
    
sink:
  type: "datahub-rest"
  config:
    server: "http://localhost:8080"
Step 4
Execute the ingestion recipe:
datahub ingest -c delta.s3.dhub.yaml
Note
The above recipes are minimal recipes. Please refer to Config Details section for the full configuration.
Code Coordinates
- Class Name: datahub.ingestion.source.delta_lake.source.DeltaLakeSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Delta Lake, feel free to ping us on our Slack.
