Applications are divided into two types - publishers (e.g. Twitter) and Subscribers (e.g. browser)

(Also read, Single Pub Sub Topic for Multiple Projects)

A Subscriber has a Main Method

def receive_stream(project, subscription_name):
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)

def callback(message):
print('Received message: {}'.format(message))
collect_messages(message.data)
message.ack()

Modify this to write to BigQuery as follows:

def write_stream_to_bq(dataset_id, table_id, messages):
client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)

errors = client.insert_rows(table, )
if not errors:
print('Loaded {} row(s) into {}:{}'.format(len(messages), dataset_id, table_id))
else:
print('Errors:')
for error in errors:
print(error)

That's it. That's the simplest way to stream from a Pub Sub Publisher to BiqQuery.



Need an experienced AWS/GCP/Azure Professional to help out with your Public Cloud Strategy? Set up a time with Anuj Varma.