Skip to main content

Automating Access Requests

This tutorial shows how to automate the access request lifecycle programmatically — from discovering available roles and submitting requests in bulk to building approval bots, defining policies, and integrating with external identity systems.

Access automation is especially valuable in organizations where data consumers frequently need access to new datasets, where onboarding new team members requires provisioning access across many products, or where compliance requirements demand periodic recertification of existing access grants.

Step 1: Discover Available Roles

Before submitting an access request, you need to know which roles are available for the target product. Each source system connected to Qarion defines one or more roles that map to specific permission levels in the external platform:

import requests

API_BASE = "https://api.qarion.com"
API_KEY = "your-api-key"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}

def discover_roles(space_slug):
"""List available roles across all source systems in a space."""
systems = requests.get(
f"{API_BASE}/spaces/{space_slug}/source-systems",
headers=HEADERS
).json()

available_roles = []
for system in systems:
roles = requests.get(
f"{API_BASE}/spaces/{space_slug}/source-systems/{system['id']}/roles",
headers=HEADERS
).json()

for role in roles:
available_roles.append({
"system": system["name"],
"role_id": role["id"],
"role_name": role["name"],
"external_role": role["external_role"],
"description": role.get("description", "")
})

return available_roles

The response gives you a mapping between Qarion's internal role identifiers and the actual permissions in each external system (e.g., a Snowflake role or BigQuery IAM binding). This mapping is what makes it possible to provision access programmatically after approval.

Step 2: Submit Bulk Access Requests

When onboarding a new team member or provisioning access for a service account, you often need to request access to multiple products at once. The following function submits requests in bulk:

def bulk_request_access(product_ids, role_id, reason):
"""Submit access requests for multiple products at once."""
results = []

for product_id in product_ids:
response = requests.post(
f"{API_BASE}/api/products/{product_id}/access-requests",
headers=HEADERS,
json={
"role_id": role_id,
"description": reason
}
)

results.append({
"product_id": product_id,
"status": response.status_code,
"request_id": response.json().get("id") if response.ok else None
})

return results

A detailed description (the justification for access) is important because it's what the product owner sees when deciding whether to approve or reject the request. For automated provisioning, include context like the team name, the project the access supports, and any relevant compliance justification.

Step 3: Build an Approval Bot

For organizations that want to automate approval decisions (for example, auto-approving access to non-sensitive datasets), you can build a bot that polls for pending requests and applies policy-based decisions:

def auto_approve_bot(space_slug, auto_approve_tags=None):
"""Automatically approve requests matching certain criteria."""
if auto_approve_tags is None:
auto_approve_tags = ["public", "bronze"]

# Get pending requests
requests_response = requests.get(
f"{API_BASE}/api/access-requests?space={space_slug}&status=pending",
headers=HEADERS
).json()

for req in requests_response["items"]:
product = requests.get(
f"{API_BASE}/catalog/spaces/{space_slug}/products/{req['product_id']}",
headers=HEADERS
).json()

product_tags = [t["slug"] for t in product.get("tags", [])]

if any(tag in product_tags for tag in auto_approve_tags):
# Auto-approve
requests.post(
f"{API_BASE}/api/access-requests/{req['id']}/approve",
headers=HEADERS,
json={"comment": "Auto-approved: public/bronze tier data"}
)
print(f"Auto-approved: {product['name']} for {req['requester_id']}")
else:
print(f"Requires manual review: {product['name']}")

This bot checks whether the target product is tagged as public or bronze tier, and auto-approves requests for those datasets. Requests for more sensitive data (gold tier, PII-tagged products) are left for manual review. You can extend the policy logic to consider the requester's team, the role being requested, or any other relevant metadata.

Step 4: Define Access Policies

For more sophisticated access control, define policies as structured rules that the approval bot evaluates against each request. This separates policy definition from enforcement logic, making it easier to audit and update access rules:

ACCESS_POLICIES = [
{
"name": "Public Data Auto-Approve",
"condition": lambda req, product: "public" in get_tags(product),
"action": "approve",
"comment": "Public data - auto-approved"
},
{
"name": "Same Team Auto-Approve",
"condition": lambda req, product: is_same_team(req["requester_id"], product),
"action": "approve",
"comment": "Same team - auto-approved"
},
{
"name": "PII Requires Manual Review",
"condition": lambda req, product: "pii" in get_tags(product),
"action": "flag",
"comment": "Contains PII - requires manual review"
}
]

def evaluate_policies(request, product):
"""Evaluate all policies against an access request."""
for policy in ACCESS_POLICIES:
if policy["condition"](request, product):
return {
"policy": policy["name"],
"action": policy["action"],
"comment": policy["comment"]
}

return {"policy": "default", "action": "review", "comment": "No matching policy"}

Policies are evaluated in order, so more specific rules should come before more general ones. The first matching policy determines the action — approve, flag (for human review), or review (default).

Step 5: Integrate with External Identity Systems

In enterprise environments, access decisions often depend on information from external identity providers. The following example shows how to enrich access requests with group membership from an LDAP directory:

def sync_with_identity_provider(space_slug):
"""Sync access based on external identity provider groups."""
# Get group memberships from identity provider
groups = get_idp_groups()

role_mapping = {
"data-analysts": {"role": "viewer", "auto_approve": True},
"data-engineers": {"role": "editor", "auto_approve": True},
"data-admins": {"role": "admin", "auto_approve": True}
}

for group_name, members in groups.items():
if group_name not in role_mapping:
continue

config = role_mapping[group_name]

for member in members:
# Check if user already has access
existing = requests.get(
f"{API_BASE}/api/access?user_id={member['id']}&space={space_slug}",
headers=HEADERS
).json()

if not existing["items"]:
# Submit request
request = requests.post(
f"{API_BASE}/api/access-requests",
headers=HEADERS,
json={
"user_id": member["id"],
"space_slug": space_slug,
"role": config["role"],
"description": f"Auto-provisioned from {group_name} group"
}
).json()

if config["auto_approve"]:
requests.post(
f"{API_BASE}/api/access-requests/{request['id']}/approve",
headers=HEADERS,
json={"comment": f"IDP sync: {group_name}"}
)

This pattern keeps Qarion's access grants in sync with your organization's identity provider, ensuring that when someone joins a team (and is added to the corresponding LDAP/AD group), they automatically receive the appropriate data access.

Step 6: Scheduled Jobs

Wrap the approval bot and identity sync into scheduled jobs that run periodically. The approval job processes pending requests, while the sync job reconciles access with external systems:

def run_access_automation():
"""Main automation job - run on a schedule."""
spaces = requests.get(
f"{API_BASE}/spaces",
headers=HEADERS
).json()

for space in spaces:
slug = space["slug"]
print(f"\n Processing space: {space['name']}")

# Process pending requests with policies
pending = requests.get(
f"{API_BASE}/api/access-requests?space={slug}&status=pending",
headers=HEADERS
).json()

for req in pending["items"]:
product = requests.get(
f"{API_BASE}/catalog/spaces/{slug}/products/{req['product_id']}",
headers=HEADERS
).json()

decision = evaluate_policies(req, product)

if decision["action"] == "approve":
requests.post(
f"{API_BASE}/api/access-requests/{req['id']}/approve",
headers=HEADERS,
json={"comment": decision["comment"]}
)
elif decision["action"] == "flag":
# Notify reviewer
notify_reviewer(req, decision)

# Sync with identity provider
sync_with_identity_provider(slug)

print("\nAccess automation complete")

Schedule this job to run every few hours (or more frequently, depending on your organization's access request volume). This ensures that routine access requests are processed promptly while sensitive requests are escalated for human review.

Step 7: Recertification

Periodic access recertification ensures that users who no longer need access have their grants revoked. This is a common compliance requirement and is straightforward to automate by checking whether each access grant is still justified:

def recertify_access(space_slug, max_age_days=90):
"""Review and recertify active access grants."""
from datetime import datetime, timedelta

cutoff = datetime.utcnow() - timedelta(days=max_age_days)

access_grants = requests.get(
f"{API_BASE}/api/access?space={space_slug}&is_active=true",
headers=HEADERS
).json()

for grant in access_grants["items"]:
granted_at = datetime.fromisoformat(grant["granted_at"])

if granted_at < cutoff:
# Flag for review
print(f"Needs recertification: {grant['user_id']} -> {grant['product_id']}")

# Optionally auto-revoke unused access
if not has_recent_activity(grant["user_id"], grant["product_id"]):
requests.patch(
f"{API_BASE}/api/access/{grant['id']}",
headers=HEADERS,
json={"is_active": False}
)
print(f" Revoked: no activity in {max_age_days} days")

This function identifies access grants older than a configurable threshold (defaulting to 90 days) and optionally revokes those where the user hasn't actually used the data. This prevents the accumulation of stale access grants over time.