Retrieving Tweets from Twitter using Azure Functions 

Twitter has become a very important social media site for society and the data it holds i.e., tweets is of immense importance for both academics and professionals. There are open and paid twitter datasets available for researchers and professional for their use case. But more often one might need tweets relating to a specific topic which might not be available from other sources. I hope with my Azure function, you can use it to retrieve the required tweets in scale. My azure functions use a timer trigger whereby it is possible to have your function run to collect tweets at predefined intervals. Twitter has limited their API access to 900 requests in 15 mins. To overcome this limitation, I developed this function to run on the required time intervals. 

Accessing Twitter API 

The first step would be to create an account in the Twitter developer portal and generate your access keys. Twitter has a good, structured documentation which you refer to understand the capabilities of the API.

You can follow the Getting Started guide to setup our account.

Once you have your bearer token, we can proceed to creating our Azure function. 

Tweepy python Library 

Tweepy is an easy-to-use python library to access Twitter API. The library has methods for authorizing with your token to Twitter and search queries for to retrieve tweets that fit the parameter. 

You will only need the bearer token to authenticate with TwitterAPI. The token is saved in Azure function variable ‘bearertoken’ in this example.

client = tweepy.Client(bearer_token=os.environ['bearertoken'])

You can define your search query as

query = 'covid -is:retweet'

Here I am searching for tweets on ‘covid’ with additional filter criteria of looking for retweets. The search query can be modeled with the requisite parameters supported by Twitter API to fit your use case.

Once the clietnt and query is initialized, we call the Twitter API

tweets = client.search_recent_tweets(query=query, tweet_fields=['context_annotations', 'created_at'], max_results=10)

You can learn more about Tweepy functionalities from its documentation.

Azure Function 

The Azure function creation is similar to HOP3 activities that is described there with the added difference that we would be using a Timer trigger instead of a HTTPS trigger.  

The time trigger in Azure is similar to the crontab notation. There are 6 space separated values. We choose 0 0 * * * *. 

This means that we get the following values 

time unit value 
second 
minute 
hour 
day 
month 
day of week 

What that means is, at every (*) hour, on every day in every month, on any day of the week, we execute the function on minute 0 at second 0, i.e. on every full hour. 

I would advise having the function trigger every minute or few seconds so that while debugging, it would be quicker for the function trigger. 

Sample code snippet for the Azure function

import datetime
import logging
import os
import json

import azure.functions as func
from azure.storage.blob import BlobServiceClient

import tweepy


def main(mytimer: func.TimerRequest) -> None:
    utc_timestamp = datetime.datetime.utcnow().replace(
        tzinfo=datetime.timezone.utc).isoformat()
    
    client = tweepy.Client(bearer_token=os.environ['bearertoken'])

    query = 'covid -is:retweet'

    tweets = client.search_recent_tweets(query=query, tweet_fields=['context_annotations', 'created_at'], max_results=10)

    # for tweet in tweets.data:
    #     print(tweet.text)
    #     if len(tweet.context_annotations) > 0:
    #         print(tweet.context_annotations)
    
    # Connecting to Azure Storage
    connstr = os.environ['AzureWebJobsStorage']
    blobsvc = BlobServiceClient.from_connection_string(connstr)
    container = blobsvc.get_container_client("twitterblog")
    blob_name = f"tweet_{utc_timestamp}.json.gz"
    
    blob_client = blobsvc.get_blob_client(container = "twitterblog", blob = blob_name)

    # Upload compressed data to Azure Storage
    json_data = json.dumps(json.dumps(str(tweets)))
    encoded = json_data.encode('utf-8')
    # compressed = gzip.compress(encoded)
    blob_client.upload_blob(encoded)

    if mytimer.past_due:
        logging.info('The timer is past due!')

    logging.info('Python timer trigger function ran at %s', utc_timestamp)

The result of the query is encoded and stored in a blob in your storage account. 

blob_client.upload_blob(encoded)

The same idea will also work for AWS Lambda functions, which you try as a fun activity.