CODEX

A Dataflow Journey: from PubSub to BigQuery

Exploiting Google Cloud Services and Apache Beam to build a custom streaming data pipeline, in Python

Nicolò Gasparini
CodeX
Published in
8 min readMar 26, 2021

--

PubSub to DataFlow to BigQuery wallpaper

If you need to process a very large amount of streaming data you’ll often find that a managed solution will be the easiest choice. These flows are rarely consistent and the prices of building your own solution could not be affordable and you’ll have to manage both the architecture and the code.
There are many managed services: on AWS you can use Kinesis, Azure has Stream Analytics, IBM has Streaming Analytics and you can almost always find a managed solution for an Apache Kafka server.
I’ll try to explain how to get started with Google Cloud solution: Dataflow

Introduction

A simple introduction about the Google Cloud services we are going to use

Google PubSub
As the name suggests, PubSub is a publisher-subscriber managed service. In PubSub you can define some topics (or channels) where some services (publishers) can insert messages while others (subscribers) consume them.

Google BigQuery
You shouldn’t need many words to introduce Google Cloud's most famous service: BigQuery is a fully scalable, fast, relational database on the cloud.
It works with standard SQL language and is mainly used for BI analysis.

Google DataFlow
Based on Apache Beam, this Google Cloud service is used for data processing both in batch or streaming mode using the same code, providing horizontal scalability to calibrate the resources needed.

The service provides many templates ready to use. Between them, there is also one that follows the same pipeline we are going to elaborate on later: reading messages from PubSub and writing them to BigQuery.
Every template is written in Java and can be found on Github, however, we are going to see a pipeline creation in Python with custom operations, to better understand how to create one ourselves.

Hands on

Step 1 — Project preparation

1. Create a Google Cloud Project
This is the first basic step to start working on the Google Cloud Platform, I won’t dive into the details, you can find the official guide here.

2. Create a PubSub topic and subscription

PubSub topic creation

As we introduced earlier, PubSub needs a topic where our messages will be stored until they’re acknowledged.
You can create a topic simply from the interface.
After that, we are going to need a “pull” subscription that will be used by DataFlow. Pull means that our subscriber will be the one requesting data.

3. Create a BigQuery dataset and table
A dataset is the top level container unit for BigQuery, it contains any number of tables. We can create the dataset using any of the possible ways.
After that it’s just plain SQL tables, for our example we are going to need a table with the following schema:

timestamp:TIMESTAMP,attr1:FLOAT,msg:STRING

side note: BigQuery is a “pay as you use” service and it considers both storage and analysis for costs. Whenever possible, create a time-partitioned table so that queries will be more efficient, this is especially useful for a streaming pipeline, where you can partition on the ingestion time, which you’ll be able to query using DATE(_PARTITIONTIME).

4. Create a Cloud Storage bucket
Cloud Storage is basically just a storage service for your files where the main storage space is called bucket, here you can upload files or create different sub-directories. The bucket we need is used by the Apache beam Dataflow runner to store your code, the requirements, and any temporary files.

We can create a bucket from the interface, it doesn’t have to be multi-region but I advise you to create it in the same region you will run your job in. Standard storage class is fine.

5. Create a service account
We are going to need a service account that will act as receiver, writer and that will be responsible to deploy the production pipeline.

Service Account creation steps

You can create it by visiting the Api&Services/Credentials page where you can also directly grant the needed roles:

  • “Pub/Sub Subscriber”
  • “BigQuery Data Editor”
  • “Storage Admin”
  • “Service Account User”
  • “Dataflow Admin”
  • also, add “Pub/Sub Publisher” if you’ll want to use the publisher emulator to send some test messages

After its creation, we need to create and download a new JSON key by visiting its detail page.

Step 2 — Code the pipeline

We are going to write a pipeline in Python using the apache-beam sdk.
The official programming guide is actually very clear and complete and I recommend you read it, but for our example we just need to know a couple of concepts:

The Pipeline object is formed by a series of steps, this includes reading, writing, and modifying your data. Data is passed through steps in the form of a PCollection, which can be bounded if you are operating a batch pipeline or, in the case of a streaming pipeline, unbounded.
A PTransform is your step function, it takes some PCollections and outputs some more.

In streaming pipelines, the concept of windowing is also very important. Aggregating data when it is infinite might be tricky, this is why you can define some rules to decide how data will be split and analyzed, for example, Fixed time windows mean that your data will be split by timestamp in a fixed manner; Sliding time window is similar but windows will overlap so that you don’t accidentally lose the connection between two data points on the border of a time window; Session windows are for particular use cases where you won’t know how much data will arrive or how long it will last, but you’ll recognize a start and an end.

Streaming windowing differences

Below is the coded simple example of a Python pipeline:

You’ll notice how we simply defined the pipeline object as p, then configured 3 steps (preceded by the pipe character “|”):

  • ReadFromPubSub, a PTransform from the beam-gcp library, that can be either initialized with a topic or a subscription name, I chose the latter, using the subscription created before
  • CustomParse, a custom defined PTransform that shows how easy it is to integrate your code in a pipeline, it invokes the CustomParsing class above and triggers its process method, which simply adds the ingestion timestamp to the processed object
class CustomParsing(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
parsed = json.loads(element.decode("utf-8"))
parsed["timestamp"] = timestamp.to_rfc3339()
yield parsed
  • WriteToBigQuery, the last defined step uses another PTrasform from the beam-gcp library that directly writes the data returned by our previous step to the specified BigQuery table

Writing a pipeline in Apache Beam is as simple as that, with just one file we defined precisely our data processing strategy, any customization can be applied and many services are already integrated and provide their libraries.

Step 3 — Run the pipeline

Environment setup: The only thing you’ll need to run the pipeline locally is a python environment, you can use Virtualenv or Docker as you prefer.
You could reproduce PubSub locally with the official emulator, but there is not quite a direct substitute for BigQuery, our goal is to focus on the pipeline so, for these two we’ll use the cloud services directly.

Once you set up the environment and prepare your python file, to run the pipeline you simply need to run the file (note the --streaming parameters that will be passed as a pipeline option) with this command:

python apache_beam_pipeline_with_custom_transform.py 
--streaming
--input_subscription projects/PROJECT_ID/subscriptions/SUB_NAME
--output_table PROJECT_ID:DATASET_NAME.TABLE_NAME
--output_schema "timestamp:TIMESTAMP,attr1:FLOAT,msg:STRING"

Step 4 —Test it

Since we are already using our cloud services for PubSub and BigQuery in the pipeline, to test and debug the steps we just need a trigger: publishing a message on PubSub.

In the following gist, I created a simple publisher, which is then used to send 1 million sequential messages to PubSub, that will instantly be retrieved and processed by our still running pipeline.

Deployment

Now that our pipeline is ready we just need to upload it and it will start processing live data, you can run the following command locally using GCloud SDK or in the GCP provisioned machine Cloud Shell

python apache_beam_pipeline_with_custom_transform.py 
--streaming
--runner DataflowRunner
--project PROJECT_ID
--region europe-west1
--temp_location gs://dataflow-test-306521/
--job_name dataflow-custom-pipeline-v1
--max_num_workers 2

This command will run the pipeline as we did before but with a different runner, it will upload your files on Cloud Storage and deploy every necessary infrastructure. In Google Cloud, streaming jobs must use a Compute Engine machine type of n1-standard-2 or higher, you can change this and apply other execution parameters following this reference.

If the above command fails with a 403 code it means you probably didn’t give the correct IAM roles to the service account.
If everything works as intended, it will take some minutes to set up and you can follow the execution logs. Warning: even if you cancel the shell command, the job will still run on DataFlow, you’ll need to cancel or drain it manually. Note that you can stop DataFlow jobs but you can’t delete them and every job must have a unique name.

DataFlow deployed pipeline dashboard
The time shown under each step is the Wall time: an approximation of the time spent in this step, which helps you to identify slow steps

Your pipeline is now up and running!
You can now use the PubSub emulator again or test the production load with a real-world publisher to start posting messages and they will be processed.

DataFlow’s interface will show you each step throughput and timing, allowing you to better analyze your bottlenecks and which part of the pipeline can be improved. You can also set some alerting policies using Cloud Monitoring.

Conclusions

While this may not be a real-life use case, I wanted to share a simple example that intertwined different GCP services with a little customization.
Don’t forget that the leading actor here is Apache Beam and its Python sdk, while many vendors provided their official connectors (see AWS S3 package) and ready to use templates, writing a custom pipeline is extremely easy and the sdk gives you complete freedom over every step.

Costs

A little analysis regarding the costs of maintaining a pipeline like this one.
DataFlow, like many managed services, has pay-per-use pricing that varies whether you are running a batch or streaming pipeline (streaming costs more). Below is the results of some tests but real costs vary a lot based on the size and quantity of the processed data.
Consider the costs of any other cloud services you might be using too, as the streaming insert cost of BigQuery and PubSub cost per message, which only starts billing after 2M messages.

Running a DataFlow job receiving 1M events for 2h with one n1-standard1 worker costs 0.20$ but it’s not enough to assess a real world use case

Side Note, on 26th of May 2021 Google announced Dataflow Prime that should optimize resource usage and add some functionalities. It should be released later this year

--

--

Nicolò Gasparini
CodeX
Writer for

Software Engineer and full stack developer 💻 based in Italy — /in/nicologasparini/