Aquarium
Search…
Integrating with Labeling Using Webhooks

Motivation

A common usecase for Aquarium datasets is to identify problem labels and group them using segments, downloading a json or csv representation of a segment's elements, and then using scripting or a manual process to reformat the data to submit to a labeling service, whether an external vendor or an internal tool. By leveraging webhook configurations, the reformatting and submitting steps can be automated by a single handler. This allows an Aquarium user to directly send data to a labeling service directly from the UI without writing code each time.
The issue-exported event will POST the issue's elements to a webhook. A full schema can be found in the Event Schemas section of the general Webhooks page.
Schema with Example Map Keys
{
event: "issue-exported",
project: str,
issue: {
id: str,
elements: [{
dataset: str,
inference_set: str,
issue_name: str,
element_id: str,
element_type: "frame" | "crop",
frame_id: str,
frame_data: {
coordinate_frames: [{
coordinate_frame_id: str,
coordinate_frame_metadata: Optional[Dict],
coordinate_frame_type: str,
}],
custom_metrics: {
[custom_metric_type]: int[][] | float,
...
},
date_captured: str,
device_id: str,
geo_data: {
[coordinate_field]: float,
...
},
label_data: [{
attributes: {
confidence: float,
...
},
label: str,
label_coorindate_frame: str,
label_type: str,
linked_labels: str[],
uuid: str,
}, ...],
sensor_data: [{
coordinate_frame: str,
data_urls: {
image_url: str,
...
},
date_captured: str,
sensor_id: str,
sensor_metadata: Dict,
sensor_type: str,
}, ...],
task_id: str,
[user__metadata_field]: str,
},
}, ...]
}
}

Shaping the Webhook Payload To Send to Labeling

You can then use the elements in issue to create a new payload that is accepted by a labeling service; we've provided some sample code that serves a webhook endpoint and transforms an Aquarium payload to some common formats.
GraphQL
REST
server.py
from flask import Flask, request
import os
from python_graphql_client import GraphqlClient
AQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")
LABELING_API_KEY = os.getenv("LABELING_API_KEY")
LABELING_API_ENDPOINT = os.getenv("LABELING_API_ENDPOINT")
labeling_api_headers = {
# Replace with the proper API key header if any for your service
"Authorization": f"Bearer {LABELING_API_KEY}"
}
client = GraphqlClient(endpoint=LABELING_API_ENDPOINT, headers=labeling_api_headers)
# Replace with appropriate graphql mutation.
# In this example, the way to requeue a label is to remove it and mark as a template
relabel_mutation_fragment = """
mutation BulkDeleteLabels (
$projectId: ID!,
$makeTemplates: Boolean = true,
$labelIds: [ID!]) {
project (where: {id: $projectId}) {
bulkDeleteLabels (
where: {
id_in: $labelIds
},
makeTemplates: $makeTemplates,
waitForQueue: true
) {
count
}
}
}
"""
app = Flask(__name__)
@app.route("/webhook", methods=["POST"])
def handle_webhook_payload():
# Optionally verify that the payload came from Aquarium
aq_secret = request.headers.get("x-aquarium-secret")
if aq_secret != AQ_WEBHOOK_SECRET:
return f"Bad Request: {msg}", 400
payload_envelope = request.get_json()
if not payload_envelope:
msg = "no payload body received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(payload_envelope, dict) or not payload_envelope.get("event"):
msg = "invalid webhook payload format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
event_type = payload_envelope["event"]
if event_type == "issue-exported":
if not payload_envelope.get("issue"):
msg = "webhook payload did not contain expected key: issue"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
_format_and_export_to_graphql_api(payload_envelope["project_name"], payload_envelope["issue"])
else:
msg = f"endpoint not setup to handle {event_type} events yet"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
return ("", 204)
def _format_and_export_to_graphql_api(project_name, issue):
label_ids = set()
for element in issue["elements"]:
if element["element_type"] == "crop":
label_ids.add(element["crop_data"]["uuid"])
else:
for label in element["frame_data"]["label_data"]:
label_ids.add(label["uuid"])
variables = {
"projectId": project_name,
"labelIds": list(label_ids)
}
client.execute(
query=relabel_mutation_fragment,
variables=variables,
)
if __name__ == "__main__":
PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 8080
app.run(host="127.0.0.1", port=PORT, debug=True)
server.py
from flask import Flask, request
import os
import requests
AQ_WEBHOOK_SECRET = os.getenv("AQ_WEBHOOK_SECRET")
LABELING_API_KEY = os.getenv("LABELING_API_KEY")
LABELING_API_ENDPOINT = os.getenv("LABELING_API_ENDPOINT")
labeling_api_headers = {
# Substitute with the proper API key header if any for your service
"Authorization": f"Bearer {LABELING_API_KEY}"
}
app = Flask(__name__)
@app.route("/webhook", methods=["POST"])
def handle_webhook_payload():
# Optionally verify that the payload came from Aquarium
aq_secret = request.headers.get("x-aquarium-secret")
if aq_secret != AQ_WEBHOOK_SECRET:
return f"Bad Request: {msg}", 400
payload_envelope = request.get_json()
if not payload_envelope:
msg = "no payload body received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(payload_envelope, dict) or not payload_envelope.get("event"):
msg = "invalid webhook payload format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
event_type = payload_envelope["event"]
if event_type == "issue-exported":
if not payload_envelope.get("issue"):
msg = "webhook payload did not contain expected key: issue"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
_format_and_export_to_rest_api(payload_envelope["project_name"], payload_envelope["issue"])
else:
msg = f"endpoint not setup to handle {event_type} events yet"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
return ("", 204)
def _format_and_export_to_rest_api(project_name, issue):
relabel_frames = []
for element in issue["elements"]:
relabel_frames.append({
"id": element["frame_id"],
"url": element["frame_data"]["sensor_data"]["data_urls"]["image_url"] # select the right key for your media type
})
# Replace with appropriate post body.
# In this example, we assume the way to requeue a label is to resubmit the frame(s) as a new batch
new_dataset_payload = {
"name": f"{issue['dataset']}_relabel_{issue['issue_name']}",
"project": project_name,
"frames": relabel_frames
}
requests.post(LABELING_API_ENDPOINT, json=new_dataset_payload, headers=labeling_api_headers)
if __name__ == "__main__":
PORT = int(os.getenv("PORT")) if os.getenv("PORT") else 8080
app.run(host="127.0.0.1", port=PORT, debug=True)

Triggering a Segment Export

To trigger an export, click the Export Issue button on an issue's page. If there aren't any webhooks configured to handle the issue-exported event on the issue's project, it will prompt you to create one.
Export to Labeling
Confirmation step where you can preview the payload
Copy link
Outline
Motivation
Shaping the Webhook Payload To Send to Labeling
Triggering a Segment Export