Understanding how data flows into Adobe Experience Platform can be challenging – especially when you’re dealing with dozens of datasets, multiple ingestion flows, and a growing list of schemas. To streamline this, I built a custom script that automatically maps dataset lineage and exports the full picture to a clean, usable CSV.

The script connects to Adobe’s APIs and:

  • Pulls all datasets and schemas
  • Maps each dataset to its ingestion flow and source connection
  • Tracks how data flows into AEP schemas
  • Exports everything into adobe_dataset_lineage.csv — schema, dataset, flow, and source info included

It’s a practical way to get visibility into Adobe’s data architecture without digging through APIs manually.

Here’s what this script helps me with:

Data Governance

  • Track data provenance across the platform
  • Stay aligned with compliance needs (GDPR, CCPA)
  • Understand exactly how customer data moves through AEP

Operational Visibility

  • Spot inactive or broken ingestion flows
  • See which datasets are getting updates (and which aren’t)
  • Troubleshoot data issues with full context

Schema Management

  • Identify unused or orphaned schemas
  • Map active schemas to external sources
  • Clean up what’s no longer needed

Documentation

  • Save time by automatically generating a full lineage export
  • Use it as a foundation for schema updates or impact analysis

import requests
from collections import defaultdict
import datetime
import csv
# --- Config ---
access_token = ''
api_key = ''
org_id = ''
sandbox = 'dev'
schema_name_to_find = ''
schema_namespace = 'https://ns.adobe.com/XXX/schemas/'

# === ENDPOINT BASES & HEADERS ===
base = "https://platform.adobe.io/data/foundation"
hdr = {
    "Authorization": f"Bearer {access_token}",
    "x-api-key": api_key,
    "x-gw-ims-org-id": org_id,
    "x-sandbox-name": sandbox,
    "Accept": "application/json"
}

datasets_url = f"{base}/catalog/dataSets"
schema_stats_url = f"{base}/schemaregistry/stats?container=all"

def format_ts(ts):
    if not ts or not isinstance(ts, (int, float)):
        return 'N/A'
    try:
        return datetime.datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S')
    except Exception:
        return str(ts)

def fetch_all_datasets():
    ds = {}
    start, limit = 0, 100
    print("📦 Fetching datasets...")
    while True:
        resp = requests.get(f"{datasets_url}?start={start}&limit={limit}", headers=hdr)
        resp.raise_for_status()
        page = resp.json()
        if not page:
            break
        ds.update(page)
        start += limit
    print(f" Loaded {len(ds)} datasets")
    return ds

def fetch_schema_titles():
    resp = requests.get(schema_stats_url, headers=hdr)
    resp.raise_for_status()
    stats = resp.json()
    titles = {}
    for s in stats.get("recentlyCreatedResources", []):
        titles[s["$id"]] = s.get("title", "Unnamed Schema")
    for cu in stats.get("classUsage", []):
        for s in cu.get("schemas", []):
            titles[s["$id"]] = s.get("title", "Unnamed Schema")
    return titles

def group_datasets_by_schema(datasets):
    grouped = defaultdict(list)
    for ds_id, ds in datasets.items():
        sid = ds.get("schemaRef", {}).get("id", "")
        if sid.startswith(schema_namespace):
            grouped[sid].append({"id": ds_id, "name": ds.get("name", "Unnamed Dataset")})
    return grouped

def fetch_batches_for_dataset(dataset_id):
    url = f"{base}/catalog/batches?dataSet={dataset_id}"
    resp = requests.get(url, headers=hdr)
    resp.raise_for_status()
    # The response is a dict keyed by batch ID
    return list(resp.json().values())

def fetch_dataset_details(dataset_id):
    url = f"{base}/catalog/dataSets/{dataset_id}"
    resp = requests.get(url, headers=hdr)
    resp.raise_for_status()
    return resp.json()

def fetch_all_flows():
    resp = requests.get(f"{base}/flowservice/flows", headers=hdr)
    resp.raise_for_status()
    return resp.json().get("items", [])

def fetch_all_connections():
    resp = requests.get(f"{base}/flowservice/connections", headers=hdr)
    resp.raise_for_status()
    items = resp.json().get("items", [])
    return {c["id"]: c for c in items}

def fetch_target_connection(target_conn_id):
    url = f"{base}/flowservice/targetConnections/{target_conn_id}"
    resp = requests.get(url, headers=hdr)
    resp.raise_for_status()
    return resp.json()

def fetch_source_connection(source_conn_id):
    url = f"{base}/flowservice/sourceConnections/{source_conn_id}"
    resp = requests.get(url, headers=hdr)
    resp.raise_for_status()
    return resp.json()

def main():
    datasets = fetch_all_datasets()
    schema_titles = fetch_schema_titles()
    schema_groups = group_datasets_by_schema(datasets)
    flows = fetch_all_flows()
    connections = fetch_all_connections()
    # Fetch all target connections
    resp = requests.get(f"{base}/flowservice/targetConnections", headers=hdr)
    resp.raise_for_status()
    target_connections = resp.json().get("items", [])
    target_conn_by_id = {tc["id"]: tc for tc in target_connections}

    print("\nSchema → Datasets → Data Lineage\n")

    csv_rows = []
    csv_header = [
        'schema_name', 'dataset_name', 'flow_name',
        'source_conn_name', 'table_name', 'source_conn_type', 'source_conn_path', 'source_conn_format', 'source_conn_state'
    ]

    if not schema_groups:
        print("No matching schemas with datasets found.")
    else:
        for sid, ds_list in schema_groups.items():
            title = schema_titles.get(sid, "Synthetic/Deleted Schema")
            print(f"\nSchema: {title}\nID: {sid}")
            if not ds_list:
                print("    No datasets for this schema.")
            else:
                for ds in ds_list:
                    print(f"    Dataset: {ds}")
                    found_flow = False
                    # Find all target connections for this dataset
                    matching_target_conns = [tc for tc in target_connections if tc.get('params', {}).get('dataSetId') == ds['id']]
                    for tc in matching_target_conns:
                        tc_id = tc['id']
                        # Find all flows that use this target connection
                        for flow in flows:
                            if tc_id in flow.get('targetConnectionIds', []):
                                found_flow = True
                                flow_name = flow.get('name', 'N/A')
                                flow_id = flow.get('id', 'N/A')
                                source_conn_id = flow.get('sourceConnectionIds', [None])[0]
                                # Always fetch source connection details if ID is present
                                if source_conn_id:
                                    try:
                                        resp = fetch_source_connection(source_conn_id)
                                        if isinstance(resp, dict) and 'items' in resp and resp['items']:
                                            source_conn = resp['items'][0]
                                        else:
                                            source_conn = resp
                                    except Exception as e:
                                        print(f"        Could not fetch source connection {source_conn_id}: {e}")
                                        source_conn = {}
                                else:
                                    source_conn = {}
                                # Extract basic info
                                source_conn_name = source_conn.get('name', '')
                                source_conn_type = source_conn.get('params', {}).get('type', '')
                                source_conn_path = source_conn.get('params', {}).get('path', '')
                                source_conn_format = source_conn.get('data', {}).get('format', '')
                                source_conn_state = source_conn.get('state', '')
                                table_name = source_conn.get('params', {}).get('tableName', '')
                                print(f"        Flow: {flow_name} (ID: {flow_id})")
                                print(f"           Source: name={source_conn_name}, type={source_conn_type}, path={source_conn_path}, format={source_conn_format}, state={source_conn_state}")
                                csv_rows.append([
                                    title, ds['name'], flow_name,
                                    source_conn_name, table_name, source_conn_type, source_conn_path, source_conn_format, source_conn_state
                                ])
                    if not found_flow:
                        print("        No flows (data lineage) found for this dataset.")
                        # Add row with only schema and dataset info, rest empty
                        csv_rows.append([
                            title, ds['name'], '', '', '', '', '', '', ''
                        ])
            print("-" * 60)

    # Write to CSV
    with open('adobe_dataset_lineage.csv', 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(csv_header)
        writer.writerows(csv_rows)
    print("\nDataset lineage data exported to adobe_dataset_lineage.csv")

# Run the main function
main()