Understanding how data flows into Adobe Experience Platform can be challenging – especially when you’re dealing with dozens of datasets, multiple ingestion flows, and a growing list of schemas. To streamline this, I built a custom script that automatically maps dataset lineage and exports the full picture to a clean, usable CSV.
The script connects to Adobe’s APIs and:
- Pulls all datasets and schemas
- Maps each dataset to its ingestion flow and source connection
- Tracks how data flows into AEP schemas
- Exports everything into adobe_dataset_lineage.csv — schema, dataset, flow, and source info included
It’s a practical way to get visibility into Adobe’s data architecture without digging through APIs manually.
Here’s what this script helps me with:
Data Governance
- Track data provenance across the platform
- Stay aligned with compliance needs (GDPR, CCPA)
- Understand exactly how customer data moves through AEP
Operational Visibility
- Spot inactive or broken ingestion flows
- See which datasets are getting updates (and which aren’t)
- Troubleshoot data issues with full context
Schema Management
- Identify unused or orphaned schemas
- Map active schemas to external sources
- Clean up what’s no longer needed
Documentation
- Save time by automatically generating a full lineage export
- Use it as a foundation for schema updates or impact analysis
import requests
from collections import defaultdict
import datetime
import csv
# --- Config ---
access_token = ''
api_key = ''
org_id = ''
sandbox = 'dev'
schema_name_to_find = ''
schema_namespace = 'https://ns.adobe.com/XXX/schemas/'
# === ENDPOINT BASES & HEADERS ===
base = "https://platform.adobe.io/data/foundation"
hdr = {
"Authorization": f"Bearer {access_token}",
"x-api-key": api_key,
"x-gw-ims-org-id": org_id,
"x-sandbox-name": sandbox,
"Accept": "application/json"
}
datasets_url = f"{base}/catalog/dataSets"
schema_stats_url = f"{base}/schemaregistry/stats?container=all"
def format_ts(ts):
if not ts or not isinstance(ts, (int, float)):
return 'N/A'
try:
return datetime.datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S')
except Exception:
return str(ts)
def fetch_all_datasets():
ds = {}
start, limit = 0, 100
print("📦 Fetching datasets...")
while True:
resp = requests.get(f"{datasets_url}?start={start}&limit={limit}", headers=hdr)
resp.raise_for_status()
page = resp.json()
if not page:
break
ds.update(page)
start += limit
print(f" Loaded {len(ds)} datasets")
return ds
def fetch_schema_titles():
resp = requests.get(schema_stats_url, headers=hdr)
resp.raise_for_status()
stats = resp.json()
titles = {}
for s in stats.get("recentlyCreatedResources", []):
titles[s["$id"]] = s.get("title", "Unnamed Schema")
for cu in stats.get("classUsage", []):
for s in cu.get("schemas", []):
titles[s["$id"]] = s.get("title", "Unnamed Schema")
return titles
def group_datasets_by_schema(datasets):
grouped = defaultdict(list)
for ds_id, ds in datasets.items():
sid = ds.get("schemaRef", {}).get("id", "")
if sid.startswith(schema_namespace):
grouped[sid].append({"id": ds_id, "name": ds.get("name", "Unnamed Dataset")})
return grouped
def fetch_batches_for_dataset(dataset_id):
url = f"{base}/catalog/batches?dataSet={dataset_id}"
resp = requests.get(url, headers=hdr)
resp.raise_for_status()
# The response is a dict keyed by batch ID
return list(resp.json().values())
def fetch_dataset_details(dataset_id):
url = f"{base}/catalog/dataSets/{dataset_id}"
resp = requests.get(url, headers=hdr)
resp.raise_for_status()
return resp.json()
def fetch_all_flows():
resp = requests.get(f"{base}/flowservice/flows", headers=hdr)
resp.raise_for_status()
return resp.json().get("items", [])
def fetch_all_connections():
resp = requests.get(f"{base}/flowservice/connections", headers=hdr)
resp.raise_for_status()
items = resp.json().get("items", [])
return {c["id"]: c for c in items}
def fetch_target_connection(target_conn_id):
url = f"{base}/flowservice/targetConnections/{target_conn_id}"
resp = requests.get(url, headers=hdr)
resp.raise_for_status()
return resp.json()
def fetch_source_connection(source_conn_id):
url = f"{base}/flowservice/sourceConnections/{source_conn_id}"
resp = requests.get(url, headers=hdr)
resp.raise_for_status()
return resp.json()
def main():
datasets = fetch_all_datasets()
schema_titles = fetch_schema_titles()
schema_groups = group_datasets_by_schema(datasets)
flows = fetch_all_flows()
connections = fetch_all_connections()
# Fetch all target connections
resp = requests.get(f"{base}/flowservice/targetConnections", headers=hdr)
resp.raise_for_status()
target_connections = resp.json().get("items", [])
target_conn_by_id = {tc["id"]: tc for tc in target_connections}
print("\nSchema → Datasets → Data Lineage\n")
csv_rows = []
csv_header = [
'schema_name', 'dataset_name', 'flow_name',
'source_conn_name', 'table_name', 'source_conn_type', 'source_conn_path', 'source_conn_format', 'source_conn_state'
]
if not schema_groups:
print("No matching schemas with datasets found.")
else:
for sid, ds_list in schema_groups.items():
title = schema_titles.get(sid, "Synthetic/Deleted Schema")
print(f"\nSchema: {title}\nID: {sid}")
if not ds_list:
print(" No datasets for this schema.")
else:
for ds in ds_list:
print(f" Dataset: {ds}")
found_flow = False
# Find all target connections for this dataset
matching_target_conns = [tc for tc in target_connections if tc.get('params', {}).get('dataSetId') == ds['id']]
for tc in matching_target_conns:
tc_id = tc['id']
# Find all flows that use this target connection
for flow in flows:
if tc_id in flow.get('targetConnectionIds', []):
found_flow = True
flow_name = flow.get('name', 'N/A')
flow_id = flow.get('id', 'N/A')
source_conn_id = flow.get('sourceConnectionIds', [None])[0]
# Always fetch source connection details if ID is present
if source_conn_id:
try:
resp = fetch_source_connection(source_conn_id)
if isinstance(resp, dict) and 'items' in resp and resp['items']:
source_conn = resp['items'][0]
else:
source_conn = resp
except Exception as e:
print(f" Could not fetch source connection {source_conn_id}: {e}")
source_conn = {}
else:
source_conn = {}
# Extract basic info
source_conn_name = source_conn.get('name', '')
source_conn_type = source_conn.get('params', {}).get('type', '')
source_conn_path = source_conn.get('params', {}).get('path', '')
source_conn_format = source_conn.get('data', {}).get('format', '')
source_conn_state = source_conn.get('state', '')
table_name = source_conn.get('params', {}).get('tableName', '')
print(f" Flow: {flow_name} (ID: {flow_id})")
print(f" Source: name={source_conn_name}, type={source_conn_type}, path={source_conn_path}, format={source_conn_format}, state={source_conn_state}")
csv_rows.append([
title, ds['name'], flow_name,
source_conn_name, table_name, source_conn_type, source_conn_path, source_conn_format, source_conn_state
])
if not found_flow:
print(" No flows (data lineage) found for this dataset.")
# Add row with only schema and dataset info, rest empty
csv_rows.append([
title, ds['name'], '', '', '', '', '', '', ''
])
print("-" * 60)
# Write to CSV
with open('adobe_dataset_lineage.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(csv_header)
writer.writerows(csv_rows)
print("\nDataset lineage data exported to adobe_dataset_lineage.csv")
# Run the main function
main()
Recent Comments