GCP PubSub Streaming to BigQuery
Applications are divided into two types - publishers (e.g. Twitter) and Subscribers (e.g. browser)
(Also read, Single Pub Sub Topic for Multiple Projects)
A Subscriber has a Main Method
def receive_stream(project, subscription_name): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) def callback(message): print('Received message: {}'.format(message)) collect_messages(message.data) message.ack()
Modify this to write to BigQuery as follows:
def write_stream_to_bq(dataset_id, table_id, messages): client = bigquery.Client() dataset_ref = client.dataset(dataset_id) table_ref = dataset_ref.table(table_id) table = client.get_table(table_ref) errors = client.insert_rows(table, ) if not errors: print('Loaded {} row(s) into {}:{}'.format(len(messages), dataset_id, table_id)) else: print('Errors:') for error in errors: print(error)
That's it. That's the simplest way to stream from a Pub Sub Publisher to BiqQuery.
Need an experienced AWS/GCP/Azure Professional to help out with your Public Cloud Strategy? Set up a time with Anuj Varma.
Leave a Reply