DataNostro doesn't have a built-in BigQuery export like GA4 → BigQuery from GCP. Instead we send every event via a webhook and you stream-insert it into BigQuery with a single Cloud Function (~30 lines). Setup is 15 minutes and the result is identical to BigQuery streaming in GA4 — including schema flexibility (you shape the events yourself).
Architecture
sGTM event → DataNostro engine → POST webhook → Cloud Function → BigQuery (streaming insert)
The webhook is sent asynchronously from a celery worker, so it doesn't affect the latency of sGTM dispatch to the ad platforms. Retry: 3× with exponential backoff. A dead-letter queue in Redis for events that repeatedly fail.
1. Create a BigQuery dataset + table
In the GCP Console:
bq mk --dataset --location=EU YOUR_PROJECT:datanostro_events
bq mk --table --schema='
event_id:STRING,
event_name:STRING,
event_time:TIMESTAMP,
tenant_id:STRING,
payload:JSON,
user_ip_masked:STRING,
user_agent:STRING,
request_source:STRING
' YOUR_PROJECT:datanostro_events.events
Tip: EU location = data residency the same as DataNostro. payload as a JSON type lets you query SELECT JSON_VALUE(payload, '$.items[0].sku') without schema migrations when the events change.
2. Cloud Function (Python 3.12)
main.py:
import functions_framework
from google.cloud import bigquery
import os, hmac, hashlib
bq = bigquery.Client()
TABLE = "YOUR_PROJECT.datanostro_events.events"
SHARED_SECRET = os.environ["DN_WEBHOOK_SECRET"]
@functions_framework.http
def receive_dn_event(request):
# Verify the HMAC signature — DataNostro sends an X-DN-Signature header
sig = request.headers.get("X-DN-Signature", "")
body = request.get_data()
expected = hmac.new(SHARED_SECRET.encode(), body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(sig, expected):
return ("invalid signature", 401)
event = request.get_json(silent=True)
if not event:
return ("no payload", 400)
row = {
"event_id": event.get("id"),
"event_name": event.get("event_name"),
"event_time": event.get("created_at"),
"tenant_id": event.get("tenant_id"),
"payload": event.get("request_body"),
"user_ip_masked": event.get("source_ip_masked"),
"user_agent": event.get("user_agent"),
"request_source": event.get("request_source"),
}
errors = bq.insert_rows_json(TABLE, [row])
if errors:
return (f"bq insert failed: {errors}", 500)
return ("ok", 204)
requirements.txt:
functions-framework==3.*
google-cloud-bigquery==3.*
3. Deploy
export DN_WEBHOOK_SECRET=$(openssl rand -hex 32)
echo "Save this: $DN_WEBHOOK_SECRET"
gcloud functions deploy receive-dn-event \
--gen2 --runtime=python312 --region=europe-west3 \
--source=. --entry-point=receive_dn_event \
--trigger-http --allow-unauthenticated \
--set-env-vars="DN_WEBHOOK_SECRET=$DN_WEBHOOK_SECRET"
Note: --allow-unauthenticated is OK because we verify the HMAC signature. Without a signature the Cloud Function never does a BigQuery insert.
4. Set up the webhook in DataNostro
- Dashboard → Webhooks → Create a new webhook
- URL: the URL from the
gcloud functions deployoutput (e.g.https://europe-west3-YOUR_PROJECT.cloudfunctions.net/receive-dn-event) - Events: select
event.captured(= every sGTM event with a capture row in our DB) - HMAC secret: paste
$DN_WEBHOOK_SECRETfrom step 3 - Save → click Test webhook → you should get HTTP 204
5. Verify in BigQuery
After 1–2 minutes of normal traffic:
SELECT event_name, COUNT(*) as n
FROM `YOUR_PROJECT.datanostro_events.events`
WHERE event_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 5 MINUTE)
GROUP BY event_name
ORDER BY n DESC
What you gain vs the GA4 BigQuery export
- Latency: ~2 s (Cloud Function streaming insert) vs ~6 hours (GA4 daily batch). Realtime dashboards possible immediately.
- Schema: a JSON payload field = you can add new fields anytime without migrations. The GA4 export has a fixed schema.
- Cost: Cloud Functions Free Tier = 2 M invocations / month free. At 5 M sGTM events/month = ~$0.40/month. BigQuery streaming insert = $0.01/MB (≈ $5/month for 5 M events). The GA4 → BQ export is free but only for GA360 (>$150k/year).
- Filter at the webhook level: in the DataNostro Dashboard you set which event types you want to push — you don't pay for events you don't care about.
Alternative: a daily CSV export to Cloud Storage
If you don't need realtime, the dashboard has /dashboard/export/ → a ZIP with CSVs for the last N days. You can call it via an API key + cron + gsutil cp into Cloud Storage → a BigQuery scheduled query. Slower, but zero Cloud Functions setup.
FAQ
Q: Can I send only a subset of events (e.g. only purchase, not page_view)?
Yes. In the Webhook settings there's an event_name regex filter. We send only events that match (e.g. ^(purchase|begin_checkout|add_to_cart)$).
Q: What about historical data from before the webhook was turned on?
You can do a one-off export of historical EventCapture data via /api/v1/events/export/?from=…&to=… (API key required) as JSONL and import it via bq load --source_format=NEWLINE_DELIMITED_JSON.
Q: Does it work with a GA4 BigQuery dataset where I already send the GA4 native export?
Yes — create a second dataset (datanostro_events) next to GA4 (analytics_PROPERTY_ID). You can join them on event_id if you let the GA4 client send event_id into sGTM (not via X-Gtm-Server-Preview, only in event_data).
Q: Do you already have this deployed somewhere?
Internally we use the same setup for our own debug dashboard (datanostro.com is its own first customer). Webhook → Cloud Function → BigQuery. P95 webhook delivery latency: 1.8 s. Loss rate: 0.003% (the dead-letter queue catches the rest).