BigQuery
BigQuery is a fully-managed, serverless data warehouse provided by Google Cloud. It allows for fast SQL-based analytics over large datasets. In our system, we leverage BigQuery to collect, analyze, and process payment-related data, which is sent from various microservices through Google Cloud Pub/Sub.
When a payment is processed in our system, we use Google Cloud Pub/Sub to publish payment-related messages. These messages are then consumed and stored in BigQuery for further analysis. This allows us to generate insights like payment trends, subscription patterns, and customer behavior.
The example of implementation in our codebase:
def publishMessage(topic_id: str, data):
client = PublisherClient()
topic_path = client.topic_path(settings.PUBSUB_PROJECT_ID, topic_id)
b_data = json.dumps(data).encode("utf-8")
future = client.publish(topic_path, b_data)
return future.result()
def publishPayment(topic_id: str, data: dict):
serializer = PaymentsSerializer(data=data)
if not serializer.is_valid():
logging.error("Web analytics: publishPayment: Invalid data! errors=%s", str(serializer.errors))
return "Invalid data."
return publishMessage(topic_id, serializer.data)
publishPayment(settings.PUBSUB_PM_TOPIC_ID, {
"order_id": response.id,
"status": "settled" if response.status == "Authorized" else "declined",
"amount": response.amount,
"currency": response.currency,
"order_description": "jobescape_subscription",
"customer_account_id": user.pk,
"geo_country": user.funnel_info.get("geolocation", {}).get("country_code", None) if user.funnel_info else None,
"created_at": created_at,
"payment_type": "recurring",
"settle_datetime": created_at,
"settle_interval": settings.CHECKOUT_RECURRING_CONFIRM_TIME_HOURS,
"payment_method": getattr(response.source, "card_wallet_type", "card"),
"subscription_id": sub.pk,
"started_at": started_at,
"subscription_status": user_sub.status,
"card_country": response.source.issuer_country,
"card_brand": response.source.scheme,
"gross_amount": deconvert_amount(response.amount, response.currency),
"week_day": requested_on.strftime("%A"),
"months": months,
"week_date": week_date,
"date": requested_on.date(),
"subscription_cohort_date": requested_on.date(),
"mid": "checkout",
"channel": "checkout",
"paid_count": counter,
"retry_count": attempt.retry,
"decline_message": decline_message,
"is_3ds": three_ds,
"bin": response.source.bin,
})