All Articles

Azure Python Blob Trigger

2048 markus spiske rwkhlvk bsm unsplash

Welcome to the third part of this blog series in which we build our own Twitter Analytics service, including followers and unfollowers data.

In case you are just jumping in, here is what happened so far:

  1. In the first part we created a Twitter Developer Account and used the API to download statistical data of our tweets as well as follower data.
  2. In the second part we used Azure Functions to run these downloads periodically and store them in the Azure Cloud

Today, we will use Azure Functions with a Blob Trigger to analyze our downloaded data.

In case you haven’t already, please make sure, that the __init__.py file of your time triggered Azure Function from part two also downloads the tweet statistics. This is handled by the code below the # Upload tweet stats to Azure Storage line here:

import datetime
import logging
import os
import json
import gzip
from twitterstats import Fetcher
from azure.storage.blob import BlobServiceClient
import azure.functions as func


def main(mytimer: func.TimerRequest) -> None:
    utc_timestamp = datetime.datetime.utcnow().replace(
        tzinfo=datetime.timezone.utc).isoformat()

    # Getting follower data
    fetcher = Fetcher()
    followers = fetcher.get_followers()
    
    # Connecting to Azure Storage
    AZURE_CONNECTION_STRING = os.getenv('AzureWebJobsStorage')
    container_name = str("output")
    blob_name = f"FOLLOWERS_{utc_timestamp}.json.gz"
    blob_service_client = BlobServiceClient.from_connection_string(AZURE_CONNECTION_STRING)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

    # Upload compressed data to Azure Storage
    json_data = json.dumps(followers)
    encoded = json_data.encode('utf-8')
    compressed = gzip.compress(encoded)
    blob_client.upload_blob(compressed)

    # Upload tweet stats to Azure Storage
    promoted_tweets = fetcher.get_tweets(promoted=True)
    unpromoted_tweets = fetcher.get_tweets(promoted=False)
    blob_name = f"TWEETS_{utc_timestamp}.json.gz"
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    json_data = json.dumps(dict(promoted=promoted_tweets, unpromoted=unpromoted_tweets))
    encoded = json_data.encode('utf-8')
    compressed = gzip.compress(encoded)
    blob_client.upload_blob(compressed)

Preliminary Thoughts

Just as a recap: We now have several files in our bucket. For every full hour, we have one file prefixed with FOLLOWERS_, one prefixed with TWEETS_.

All of these files are in gzip compressed JSON format, so that we start with writing a function to get them from our storage and parse them with Python’s json module.

def get_data_from_azure(blob_service_client, blob_name, container_name="output", default=None):
    try:
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
        data = blob_client.download_blob().readall()
        uncompressed = gzip.decompress(data)
        return json.loads(uncompressed)
    except:
        return default

Since we need to store the results of our analysis back to Azure, we need a function that writes a Python dict to Azure in the same way:

def write_to_azure(blob_service_client, blob_name, data, container_name="output"):
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    json_data = json.dumps(data)
    encoded = json_data.encode('utf-8')
    compressed = gzip.compress(encoded)
    blob_client.upload_blob(compressed, overwrite=True)

Now that we have prepared that, we can start analyzing our followers.

Creating a New Azure Function

What we need is a function, which is triggered right after we got the results from Twitter’s API. This function should take the current snapshot of Twitter’s API results and compare that snapshot to the snapshot of the previous run. This comparison includes checking which new users appear in the followers list and which users are now missing (unfollowers).

Since we store each of these snapshots in the Azure Blob Storage, we can use what is called a “Blob Trigger” in Azure. This trigger will run our function, whenever a new Blob (“file”) is stored in our container.

So, we start by adding a new function in VSCode:

Again, click on “Create Function…” in the Azure Tab of VSCode (look in the last article in case you missed it)

This time, select “Azure Blob Storage trigger” 1

Give your function a name 2

Make sure you select the AzureWebJobsStorage environment variable 3

In the last step, choose output/{name} as the path to be monitored. If you changed the container name from output to something else, make sure, this setting reflects your change. 4

Finding Followers, Unfollowers and Analyse Tweet Logs

CONTAINER_NAME = str("output")

def main(myblob: func.InputStream):
    AZURE_CONNECTION_STRING = os.getenv('AzureWebJobsStorage')
    blob_service_client = BlobServiceClient.from_connection_string(AZURE_CONNECTION_STRING)
    container_client = blob_service_client.get_container_client(CONTAINER_NAME)
    if myblob.name.startswith(f"{CONTAINER_NAME}/FOLLOWERS_"):
        timestamps = sorted([blob.name.split("_")[1].replace(".json.gz", "") for blob in container_client.list_blobs() if blob.name.startswith("FOLLOWERS")])
        try:
            current_time_stamp = myblob.name.split("_")[1].replace(".json.gz", "")
            idx = timestamps.index(current_time_stamp)
        except:
            current_time_stamp = None
            idx = 0

        this_filename = f"FOLLOWERS_{current_time_stamp}.json.gz"
        this_followers = get_data_from_azure(blob_service_client, this_filename, default=[])

        if idx > 0:
            prev_ts = timestamps[idx-1]
            prev_filename = f"FOLLOWERS_{prev_ts}.json.gz"
            prev_followers = get_data_from_azure(blob_service_client, prev_filename, default=[])
        else:
            prev_followers = []

        USER_INFO = get_data_from_azure(blob_service_client, "USER_INFO.json.gz", default={})
        NEW_FOLLOWERS = get_data_from_azure(blob_service_client, "NEW_FOLLOWERS.json.gz", default={})
        LOST_FOLLOWERS = get_data_from_azure(blob_service_client, "LOST_FOLLOWERS.json.gz", default={})

        for twitter_user in this_followers:
            USER_INFO[twitter_user['id']] = twitter_user

        this_follower_ids = [f['id'] for f in this_followers]
        prev_follower_ids = [f['id'] for f in prev_followers]

        new_followers = [follower_id for follower_id in this_follower_ids if follower_id not in prev_follower_ids]
        lost_followers = [follower_id for follower_id in prev_follower_ids if follower_id not in this_follower_ids]

        NEW_FOLLOWERS[current_time_stamp] = new_followers
        LOST_FOLLOWERS[current_time_stamp] = lost_followers

        write_to_azure(blob_service_client, "USER_INFO.json.gz", USER_INFO)
        write_to_azure(blob_service_client, "NEW_FOLLOWERS.json.gz", NEW_FOLLOWERS)
        write_to_azure(blob_service_client, "LOST_FOLLOWERS.json.gz", LOST_FOLLOWERS)

    elif myblob.name.startswith(f"{CONTAINER_NAME}/TWEETS_"):
        STATS_TWEETS = get_data_from_azure(blob_service_client, "STATS_TWEETS.json.gz", default={})
        promoted = this_stats['promoted']
        unpromoted = this_stats['unpromoted']

        all_tweets = promoted + unpromoted

        for tweet in all_tweets:
            tweet_id = tweet["id"]
            created_at = tweet["created_at"]
            text = tweet["text"]

            if tweet_id not in STATS_TWEETS.keys():
                STATS_TWEETS[tweet_id] = {}
            STATS_TWEETS[tweet_id]["text"] = text
            STATS_TWEETS[tweet_id]["created_at"] = created_at
            if "stats" not in STATS_TWEETS[tweet_id].keys():
                STATS_TWEETS[tweet_id]["stats"] = {}
            STATS_TWEETS[tweet_id]["stats"][current_time_stamp] = dict(
                promoted_metrics = tweet.get("promoted_metrics"),
                public_metrics = tweet.get("public_metrics"),
                non_public_metrics = tweet.get("non_public_metrics"),
                organic_metrics = tweet.get("organic_metrics")
            )
        write_to_azure(blob_service_client, "STATS_TWEETS.json.gz", STATS_TWEETS)

Re-triggering the function

As soon as this function terminates for a particular blob in your container, this blob is marked as processed, so that the function will only be triggered once per blob. The only exception for this rule is when the blob has changed. But since we’re continiously adding blobs instead of overwriting them, this does not count here.

If you, for whatever reason, want to retrigger the whole cascade of function calls, you need to update (or delete) a file called scanInfo. You can find this file in a container named azure-webjobs-hosts inside the storage account linked to your Azure Function App. Navigate to teh scanInfo file via these prefixes:

blobscaninfo/<Your Function Name>/storageAccount/storageContainer

You can delete that file. If you do, your Blob Trigger Function will be executed again and again for each file inside your output container.

Looking at the files

This function will create four new files for us:

  1. USER_INFO.json.gz – Containing consolidated info about users.
  2. NEW_FOLLOWERS.json.gz – Containing time stamped lists of user ids of new followers.
  3. LOST_FOLLOWERS.json.gz – Containing time stamped lists of user ids of lost followers.
  4. STATS_TWEETS.json.gz – Containing consolidated and time stamped statistics about each tweet.

What we have now

Here is a recap of what we achieved in today’s tutorial:

  1. We created an Azure Function triggered by a Blob Trigger
  2. We compared snapshots of our followers to the respective previous snapshot
  3. From this comparison we compiled a list of new and lost followers
  4. We extended a timestamp indexed JSON dict and uploaded this to Azure

In the next series, we will build a frontend for the data we created today. I am particularly looking forward to this one as we will use Python only but our Dashboard will still run in the browser! So stay tuned!

If you like stuff like this, make sure you follow me on Twitter where I constantly post interesting Python, SQL and CloudComputing stuff!