Twitter has become a very important social media site for society and the data it holds i.e., tweets is of immense importance for both academics and professionals. There are open and paid twitter datasets available for researchers and professional for their use case. But more often one might need tweets relating to a specific topic which might not be available from other sources. I hope with my Azure function, you can use it to retrieve the required tweets in scale. My azure functions use a timer trigger whereby it is possible to have your function run to collect tweets at predefined intervals. Twitter has limited their API access to 900 requests in 15 mins. To overcome this limitation, I developed this function to run on the required time intervals.
Accessing Twitter API
The first step would be to create an account in the Twitter developer portal and generate your access keys. Twitter has a good, structured documentation which you refer to understand the capabilities of the API.
You can follow the Getting Started guide to setup our account.
Once you have your bearer token, we can proceed to creating our Azure function.
Tweepy python Library
Tweepy is an easy-to-use python library to access Twitter API. The library has methods for authorizing with your token to Twitter and search queries for to retrieve tweets that fit the parameter.
You will only need the bearer token to authenticate with TwitterAPI. The token is saved in Azure function variable ‘bearertoken’ in this example.
client = tweepy.Client(bearer_token=os.environ['bearertoken'])
You can define your search query as
query = 'covid -is:retweet'
Here I am searching for tweets on ‘covid’ with additional filter criteria of looking for retweets. The search query can be modeled with the requisite parameters supported by Twitter API to fit your use case.
Once the clietnt and query is initialized, we call the Twitter API
tweets = client.search_recent_tweets(query=query, tweet_fields=['context_annotations', 'created_at'], max_results=10)
You can learn more about Tweepy functionalities from its documentation.
Azure Function
The Azure function creation is similar to HOP3 activities that is described there with the added difference that we would be using a Timer trigger instead of a HTTPS trigger.
The time trigger in Azure is similar to the crontab notation. There are 6 space separated values. We choose 0 0 * * * *.
This means that we get the following values
time unit | value |
second | 0 |
minute | 0 |
hour | * |
day | * |
month | * |
day of week | * |
What that means is, at every (*) hour, on every day in every month, on any day of the week, we execute the function on minute 0 at second 0, i.e. on every full hour.
I would advise having the function trigger every minute or few seconds so that while debugging, it would be quicker for the function trigger.
Sample code snippet for the Azure function
import datetime
import logging
import os
import json
import azure.functions as func
from azure.storage.blob import BlobServiceClient
import tweepy
def main(mytimer: func.TimerRequest) -> None:
utc_timestamp = datetime.datetime.utcnow().replace(
tzinfo=datetime.timezone.utc).isoformat()
client = tweepy.Client(bearer_token=os.environ['bearertoken'])
query = 'covid -is:retweet'
tweets = client.search_recent_tweets(query=query, tweet_fields=['context_annotations', 'created_at'], max_results=10)
# for tweet in tweets.data:
# print(tweet.text)
# if len(tweet.context_annotations) > 0:
# print(tweet.context_annotations)
# Connecting to Azure Storage
connstr = os.environ['AzureWebJobsStorage']
blobsvc = BlobServiceClient.from_connection_string(connstr)
container = blobsvc.get_container_client("twitterblog")
blob_name = f"tweet_{utc_timestamp}.json.gz"
blob_client = blobsvc.get_blob_client(container = "twitterblog", blob = blob_name)
# Upload compressed data to Azure Storage
json_data = json.dumps(json.dumps(str(tweets)))
encoded = json_data.encode('utf-8')
# compressed = gzip.compress(encoded)
blob_client.upload_blob(encoded)
if mytimer.past_due:
logging.info('The timer is past due!')
logging.info('Python timer trigger function ran at %s', utc_timestamp)
The result of the query is encoded and stored in a blob in your storage account.
blob_client.upload_blob(encoded)
The same idea will also work for AWS Lambda functions, which you try as a fun activity.