Source code for twixl.collections.twitter

import json
import re
import humanize

# nopycln: file
import csv
import datetime
import os
import time
import tempfile
import pendulum
import requests
import string
import dateutil
import dateutil.parser
import pandas as pd
import tldextract
from urllib.parse import urlparse, quote
from nltk.tokenize import TweetTokenizer
from typing import List, Any, Dict, Callable
from gzip import GzipFile
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from enum import Enum
from typing import Generator, Union
from twixl.collections.twitter._api import APISession
from twixl.collections.twitter import _api
from twixl.collections.twitter.exceptions import (
    QueryCanceled,
    QueryFailed,
    QueryTimeout,
)

_DEFAULT_QUERY_POLLING_INTERVAL = pendulum.duration(seconds=10)
_DEFAULT_QUERY_TIMEOUT = pendulum.duration(minutes=15)
_SCRATCH_DIRECTORY_ENVIRONMENT_VARIABLE_KEY = "TWIXL_SCRATCH_DIRECTORY"


class QueryState(Enum):
    """
    Twi-XL query state: queued, running or downloading results.
    # TODO: should downloading results be a separate state?
    """

    RUNNING = "RUNNING"
    QUEUED = "QUEUED"
    DOWNLOADING_RESULTS = "DOWNLOADING_RESULTS"


@dataclass(frozen=True)
class QueryStatus:
    state: QueryState
    data_scanned_bytes: int


@dataclass(frozen=True)
class _SuccessfulQuery:
    download_location: List[str]  # TODO: better type (Furl)?
    data_scanned_bytes: int


@dataclass
class WordFrequencydataGroupedByHour:
    word: str
    hour: datetime.datetime
    frequency: int


@dataclass
class WordFrequencyResults:
    """WordFrequencyResults object.

    Attributes:
        tweets (List): List with :class:`WordFrequencydataGroupedByHour`.
    """

    # TODO: fix representation to top N elements
    words: List[WordFrequencydataGroupedByHour]

    def to_pandas(self, by_hour: bool = False) -> pd.DataFrame:
        """Return the word frequency data as a pandas dataframe.

        :param by_hour (default=false): group the frequency list by hour
        """
        df = pd.DataFrame([vars(word) for word in self.words])
        if not by_hour:
            return df.groupby("word").sum().reset_index()
        return df

    @staticmethod
    def from_file(path: Path) -> "WordFrequencyResults":
        """Read the file from the given path and return a WordFrequency data object.

        :param path: source file path
        """
        # TODO: typed response possible here?

        with path.open("rt") as stream:
            reader = csv.reader(stream)
            next(reader)  # skip header

            return WordFrequencyResults(
                [
                    WordFrequencydataGroupedByHour(
                        word, dateutil.parser.isoparse(hour), int(frequency)
                    )
                    for word, hour, frequency in reader
                ]
            )


@dataclass
class TweetMetadata:
    tweet_id: str
    timestamp: datetime.datetime


@dataclass
class Tweet:
    tweet_id: str
    timestamp: datetime.datetime
    text: str
    full_text: str
    hashtags: List[str]
    urls: List[str]
    source: str


@dataclass
class ResultPartition:
    """Represents a partitioned result from a query, containing the URL to download and an optional file path.

    Attributes
    ----------
    url : str
        The URL to download the result from.
    file : Path | None
        The file path where the result is saved, by default None.
    """

    url: str
    file: Optional[Path] = None

    def save_as(self, target_dir: Path) -> "ResultPartition":
        """Saves the current file to the specified target directory.
        If the file is not already downloaded, it triggers the download process.

        :param target_dir: The directory to save the file to.
        :type target_dir: Path
        :return: The instance of ResultPartition with updated file path.
        :rtype: ResultPartition
        """
        target_dir.mkdir(parents=True, exist_ok=True)
        if self.file is None:
            # If file is not already downloaded, download it
            self.download(target_dir=target_dir)
        else:
            target = target_dir / self.file.name
            self.file.rename(target)
            self.file = target
        return self

    def download(self, target_dir: Optional[Path] = None) -> "ResultPartition":
        """Downloads the file from the URL and saves it to the specified directory or a temporary location.

        :param target_dir: The directory to save the downloaded file to. If not provided, a temporary directory is used.
        :type target_dir: Path | None
        :return: The instance of ResultPartition with the downloaded file.
        :rtype: ResultPartition
        """
        if self.file is None:
            filename = urlparse(self.url).path.rpartition("/")[-1]
            with _get_temporary_file_for_writing(
                prefix="twixl-query-results-",
                suffix=f"-{filename}.parquet",
                dir=target_dir,
            ) as stream:
                with requests.get(self.url, stream=True) as response:
                    response.raise_for_status()  # type: ignore
                    for chunk in response.iter_content(chunk_size=8192):  # type: ignore
                        stream.write(chunk)
            self.file = Path(stream.name)
        return self


[docs]class SearchResults(object): """Manages a collection of ResultPartition objects, providing methods to save and download all results. Attributes ---------- result_partitions : List[ResultPartition] A list of ResultPartition objects initialized from the provided URLs. """ def __init__(self, result_urls: List[str]): """ Initializes SearchResults with a list of result URLs. :param result_urls: A list of URLs to download results from. :type result_urls: List[str] """ self.result_partitions = [ResultPartition(url) for url in result_urls]
[docs] def save_as(self, target_dir: Union[str, Path]) -> None: """Saves all result files to the specified target directory. :param target_dir: The directory to save all files to. :type target_dir: Union[str, Path] """ for rp in self.result_partitions: rp.save_as(target_dir=Path(target_dir))
[docs] def download(self) -> Generator[pd.DataFrame, None, None]: """Downloads each result file and yields it as a DataFrame. :yield: A DataFrame for each downloaded file. :rtype: Generator[pd.DataFrame, None, None] """ for rp in self.result_partitions: if rp.file is None: rp.download() yield pd.read_parquet(rp.file, engine="pyarrow")
[docs] def download_all(self) -> pd.DataFrame: """Downloads all result files and concatenates them into a single DataFrame. :return: A concatenated DataFrame containing all downloaded results. :rtype: pd.DataFrame """ dfs = [ pd.read_parquet(rp.download().file, engine="pyarrow") for rp in self.result_partitions ] return pd.concat(dfs, ignore_index=True)
@dataclass class TweetMetricItem: year: int month: int num_tweets: int @staticmethod def from_dict(d: dict) -> "TweetMetricItem": return TweetMetricItem(d["year"], d["month"], d["num_tweets"])
[docs]@dataclass class TweetMetrics: """Manages the metrics of the twitter collection.""" metrics: List[TweetMetricItem]
[docs] def to_pandas(self) -> pd.Series: """Return metrics as a pandas series. :return: A pandas series containing all metrics of the twitter collection. :rtype: pd.Series """ # TODO: series or data frame? return pd.Series( data=(item.num_tweets for item in self.metrics), index=pd.to_datetime( [datetime.datetime(item.year, item.month, 1) for item in self.metrics] ), ).sort_index()
@staticmethod def from_dict(d: dict) -> "TweetMetrics": return TweetMetrics([TweetMetricItem.from_dict(d) for d in d["metrics"]])
[docs]class dataset(Enum): """ Twitter datasets available on twi-xl. """ ALL = "/tweets/search/all" TWINL = "/tweets/search/twinl" POLITICS = "/tweets/search/politics"
[docs]class Query: """A TwiXL query object to define a search query.""" def __init__(self, dataset=dataset.ALL): """Instantiate a new TwiXL.Query object.""" self._query: List[Dict[Any, Any]] = [] self.dataset = dataset
[docs] def keywords(self, keywords: List[str]) -> "Query": """Add a query statement that matches any tweet with the specified keyword(s). :param keywords: A list of words :return: Query object :rtpe: twitter:Query Usage:: >>> from twixl.collections import Query >>> Query().keywords(keywords=['twitter', 'tweet']) """ if type(keywords) != list: raise TypeError("keywords parameter can only be a list") self._query.append({"keywords": keywords}) return self
[docs] def regex(self, regex: List[str]) -> "Query": """Add a query statement that matches any tweet which match any of the specified regular expression(s). :param regex: A list of regular expressions :return: Query object :rtpe: twitter:Query Usage:: >>> from twixl.collections import Query >>> Query().regex(regex=['\\btwit\\w+']) """ if type(regex) != list: raise TypeError("regex parameter can only be a list") self._query.append({"regex": regex}) return self
[docs] def from_usernames(self, usernames: List[str], overwrite: bool = False) -> "Query": """Add a query statement that matches any tweet from the specific username(s) to the TwiXL.Query object. :param usernames: A list of usernames (excluding the @ character) :return: Query object :rtpe: twitter:Query Usage:: >>> from twixl.collections import Query >>> Query().from_usernames(usernames=['twitterdev', 'twitterapi']) """ if type(usernames) != list: raise TypeError("usernames parameter can only be a list") if any(filter(lambda q: "from_userids" in q.keys(), self._query)): # type: ignore if overwrite: self._query.remove( next(filter(lambda q: "from_userids" in q.keys(), self._query)) ) else: raise ValueError( "'from_userids' already exists in this query, use overwrite to replace with 'from_usernames'." ) self._query.append({"from_usernames": usernames}) return self
[docs] def from_userids(self, userids: List[str], overwrite: bool = False) -> "Query": """Add a query statement that matches any tweet from the specific userid(s) to the TwiXL.Query object. :param userids: A list of user's numeric user ID's :return: Query object :rtpe: twitter:Query Usage:: >>> from twixl.collections import Query >>> Query().from_userids(userids=['twitterdev', 'twitterapi']) """ if type(userids) != list: raise TypeError("userids parameter can only be a list") if any(filter(lambda q: "from_usernames" in q.keys(), self._query)): # type: ignore if overwrite: self._query.remove( next(filter(lambda q: "from_usernames" in q.keys(), self._query)) ) else: raise ValueError( "'from_usernames' already exists in this query, use overwrite to replace with 'from_userids'" ) self._query.append({"from_userids": userids}) return self
[docs] def url(self, url_regex: List[str], must_not: bool = False) -> "Query": """Add a query statement that matches any tweet that contains one of the specified URLs to the TwiXL.Query object. :param url: List of url's or regular expressions :param must_not: If set to True, the list of url's must not appear in the final results :return: Query object :rtype: twitter.Query Usage:: >>> from twixl.collections import Query >>> Query().url(url_regex: ["twitter.com/twitter*"]) """ if type(url_regex) != list: raise TypeError("userids parameter can only be a list") for r in url_regex: re.compile(r) if must_not: self._query.append( { "not_url": [quote(q) for q in url_regex], } ) else: self._query.append( { "url": [quote(q) for q in url_regex], } ) return self
[docs] def print(self) -> None: """Print query.""" output = {"query": []} # type: dict for q_stmt in self._query: output["query"].append( { "AND": { # convert array into OR seperated string next(iter(q_stmt)): str(next(iter(q_stmt.values()))).replace( ",", " OR" ) } } ) print(json.dumps(output, indent=4, sort_keys=True))
[docs] def to_dict(self) -> dict: """Returns the TwiXL query as a dict. :return: Query object as dictionary. :rtype: dict """ return {"query": self._query}
def _download_query_results(self, url: str) -> Path: """Downloads query contents to a file, returning a path. :param url: The AWS S3 presigned URL. :return: A Query Result object. """ # TODO: callback for downloading status # TODO: cache # TODO: cache results locally first to lower costs # TODO: get ID from query status filename = urlparse(url).path.rpartition("/")[-1] with _get_temporary_file_for_writing( prefix="twixl-query-results-", suffix=f"-{filename}.gz" ) as stream, GzipFile( fileobj=stream, mode="wb", compresslevel=1 ) as compressed_stream: with requests.get(url, stream=True) as r: for chunk in r.iter_content(chunk_size=8192): # type: ignore compressed_stream.write(chunk) return Path(stream.name)
[docs]class API(object): def __init__(self, api_endpoint: str, api_key: str): """Instantiate a new TwiXL.Api object. Args: api_endpoint (str): The TwiXL endpoint. api_key (str): Your TwiXL api key. """ self._session = APISession(api_endpoint, api_key) def get_search( self, query: Query, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, max_results: Optional[int] = None, # TODO: should go into Query-object callback: Optional[Callable[[QueryStatus], Any]] = None, extended_tweet: bool = True, ) -> SearchResults: """Return TwiXL search. :param query: A TwiXL query object that describes the search query. :param start_time (Optional): The oldest UTC timestamp from which the Tweets will be provided. :param end_time (Optional): The newest, most recent timestamp to which the Tweets will be provided. :param max_results (Optional): The maximum number of search results in query output. By default, all search results will be stored in the query output. :param callback (Optional): Callback function accepting the query status. :param extended_tweet (Optional): A boolean flag that determines whether to include the full, untruncated text of tweets in the search query. - Default: True — The search query applies to the entire 280-character text of the tweet. - If set to False: The search query applies only to the truncated portion (first 140 characters) of the tweet. In both modes, any tweets that match the query will contain the full text of the tweet in the results. :return: A JSON object. :rtype: twitter.SearchResults """ parameters: dict parameters = {} # query also runs on extended tweet field, this is default parameters["extended_tweet"] = extended_tweet if max_results is not None: parameters["max_results"] = str(max_results) if start_time is not None: parameters["start_time"] = start_time.strftime("%Y-%m-%d %H:%M:%S") if end_time is not None: parameters["end_time"] = end_time.strftime("%Y-%m-%d %H:%M:%S") response = self._session.request_safe( "POST", query.dataset.value, params=parameters, json=query.to_dict() ) query_id = _api.QueryId.parse_obj(response.json()) successful_query = self._wait_for_query_completion( query_id.id, _DEFAULT_QUERY_TIMEOUT, callback ) if callback is not None: callback( QueryStatus( QueryState.DOWNLOADING_RESULTS, successful_query.data_scanned_bytes ) ) return SearchResults(successful_query.download_location) # def get_word_frequency( # self, # date: datetime.datetime, # max_results: Optional[int] = None, # min_length_words: Optional[int] = None, # frequency_limit: Optional[int] = None, # callback: Optional[Callable[[QueryStatus], Any]] = None, # ) -> WordFrequencyResults: # """Execute a word_frequency request to the TwiXL API for the specified date. # Download and return the result of the request. # :param date: The date for which the word-frequency should be returned. # :param max_results (Optional): The maximum number of search # results in the word cloud. By default, all search results # will be stored in the query output. # The results are returned in descending order. # :param min_length_words (Optional): The minimum word length of the # words in the word-frequency list. The default minimum word length is '1'. # :param frequency_limit (Optional): The minimum occurence rate of the # words in the word-frequency list. The default is '1' # :return: WordFrequencyResults object # :rtype: twitter.WordFrequencyResults # """ # # function # # TODO: check types of query params; currently all are strings # parameters: dict # parameters = {"date": date.strftime("%Y-%m-%d")} # if max_results is not None: # parameters["max_results"] = str(max_results) # if min_length_words is not None: # parameters["min_length_words"] = str(min_length_words) # if frequency_limit is not None: # parameters["frequency_limit"] = str(frequency_limit) # response = self._session.request_safe( # "POST", "/tweets/words/frequency", params=parameters # ) # query_id = _api.QueryId.parse_obj(response.json()) # successful_query = self._wait_for_query_completion( # query_id.id, _DEFAULT_QUERY_TIMEOUT, callback # ) # if callback: # callback( # QueryStatus( # QueryState.DOWNLOADING_RESULTS, successful_query.data_scanned_bytes # ) # ) # return WordFrequencyResults.from_file( # self._download_query_results(successful_query.download_location) # ) def get_tweet_metrics(self) -> TweetMetrics: return TweetMetrics.from_dict( self._session.request_safe("GET", "/tweets/metrics").json() ) # TODO: raise error on failed/canceled query and return query location + bytes scanned def _wait_for_query_completion( self, query_id: str, timeout: pendulum.Duration = _DEFAULT_QUERY_TIMEOUT, callback: Optional[Callable[[QueryStatus], Any]] = None, ) -> _SuccessfulQuery: # TODO: make function which waits and times out start = datetime.datetime.now() while (datetime.datetime.now() - start) < timeout: response = self._session.request_safe("GET", f"/query/results/{query_id}") api_query_status = _api.QueryStatus.parse_obj(response.json()) if api_query_status.state == _api.QueryState.SUCCEEDED: if len(api_query_status.location) == 0: raise ValueError( "query has succeeded but no download location is given" ) return _SuccessfulQuery( api_query_status.location, api_query_status.datascanned # type: ignore ) elif api_query_status.state == _api.QueryState.FAILED: raise QueryFailed() elif api_query_status.state == _api.QueryState.CANCELED: raise QueryCanceled() if callback is not None: callback( QueryStatus( QueryState[api_query_status.state.value], api_query_status.datascanned, ) ) # TODO: make interval configurable time.sleep(_DEFAULT_QUERY_POLLING_INTERVAL.total_seconds()) else: raise QueryTimeout(timeout) def _download_query_results(self, url: List[str]) -> List[Path]: """Downloads query contents to a file, returning a path. :param url: The AWS S3 presigned URL. :return: A Query Result object. """ # TODO: callback for downloading status # TODO: cache # TODO: cache results locally first to lower costs # TODO: get ID from query status ret = [] for file in url: filename = urlparse(file).path.rpartition("/")[-1] with _get_temporary_file_for_writing( prefix="twixl-query-results-", suffix=f"-{filename}.gz" ) as stream: with requests.get(file, stream=True) as r: for chunk in r.iter_content(chunk_size=8192): # type: ignore stream.write(chunk) ret.append(Path(stream.name)) # with _get_temporary_file_for_writing( # prefix="twixl-query-results-", suffix=f"-{filename}.gz" # ) as stream, GzipFile( # fileobj=stream, mode="wb", compresslevel=1 # ) as compressed_stream: # with requests.get(file, stream=True) as r: # for chunk in r.iter_content(chunk_size=8192): # type: ignore # compressed_stream.write(chunk) # ret.append(Path(stream.name)) print(file) return ret
def _get_temporary_file_for_writing( prefix: Optional[str] = None, suffix: Optional[str] = None, mode: str = "wb", delete: bool = False, dir: Optional[Path] = None, ): """Return a temporary file for writing to. Will either be created in a user-specified scratch directory (if provided through the relevant environment variable), or created directly as a temporary file through Python's tempfile.""" # TODO: provide protocol for write() and .name try: if dir is None: scratch_directory = Path( os.environ[_SCRATCH_DIRECTORY_ENVIRONMENT_VARIABLE_KEY] ) else: scratch_directory = dir scratch_directory.mkdir(parents=True, exist_ok=True) except KeyError: scratch_directory = None return tempfile.NamedTemporaryFile( mode, prefix=prefix, suffix=suffix, dir=scratch_directory, delete=delete ) def get_twixl_api() -> API: """Initialize a twixl api object. :rtype: Initialized twixl API object """ try: return API( api_endpoint=os.environ["TWIXL_API_ENDPOINT"], api_key=os.environ["TWIXL_API_KEY"], ) except KeyError as e: raise ValueError( f"environment variable {e.args[0]} required for initializing TwiXL API" ) # def word_frequency( # date: datetime.datetime, # min_length_words: int = 1, # max_results: Optional[int] = None, # frequency_limit: Optional[int] = None, # api: Optional[API] = None, # callback: Optional[Callable[[QueryStatus], Any]] = None, # ) -> WordFrequencyResults: # """Sends a word_frequency request to the TwiXL API for the specified date. # :param date: The date for which the word-frequency should be returned. # :param min_length_words: The minimum word length of the words in the word-frequency list. # The default minimum word length is '1'. # :param max_results (Optional): The maximum number of search results in query output. # By default, all search results will be stored in the query output. # :param frequency_limit (Optional): The minimum occurence rate of the words # in the word-frequency list. The default limit is '1'. # :param api (Optional): An initialized twixl Api object. If not provided, an object will be initialized. # :return: :class:`WordFrequencyResults <WordFrequencyResults>` object # :rtype: twitter.WordFrequencyResults # Usage:: # >>> from twixl.collections import twitter # >>> word_frequencies = twitter.word_frequency( # >>> date=datetime.datetime(2022, 1, 1), # >>> min_length_words=2, # >>> callback=twitter.print_callback # >>> ) # Query status: RUNNING (0 Bytes scanned) # Query status: DOWNLOADING_RESULTS (1.0 GB scanned) # >>> word_frequencies.to_pandas() # word frequency # 0 example 100 # 1 words 50 # """ # if api is None: # api = get_twixl_api() # return api.get_word_frequency( # date=date, # min_length_words=min_length_words, # max_results=max_results, # frequency_limit=frequency_limit, # callback=callback, # )
[docs]def tweet_metrics(api: Optional[API] = None) -> TweetMetrics: """Sends a get metrics request to the TwiXL Api and returns the results. :param api (Optional): An initialized twixl Api object. If not provided, an object will be initialized. :return: :class:`TweetMetrics <TweetMetrics>` object :rtype: twitter.TweetMetrics Usage:: >>> from twixl.collections import twitter >>> metrics = twitter.tweet_metrics() >>> merics.to_pandas() """ if api is None: api = get_twixl_api() return api.get_tweet_metrics()
def frequencies(series: pd.Series, mapper: Optional[Callable] = None) -> pd.Series: """ Calculate frequencies for a series where each item in the series is a list of items. Each list item can be optionally mapped with a mapping function. """ def identity(x): return x mapper = mapper or identity return series.explode().dropna().map(mapper).dropna().value_counts() def hashtag_frequencies(df: pd.DataFrame) -> pd.Series: """ Return a series with counts for each hashtag in the tweets data frame. """ return frequencies(df["hashtags"]) def url_frequencies(df: pd.DataFrame) -> pd.Series: """ Return a series with counts for each URL in the tweets data frame. """ return frequencies(df["urls"]) def domain_frequencies(df: pd.DataFrame) -> pd.Series: """ Return a series with counts for each URL's domain in the tweets data frame. Domains will be extracted with tldextract. """ return frequencies(df["urls"], lambda url: tldextract.extract(url).domain) def country_code_frequencies(df: pd.DataFrame) -> pd.Series: """ Return a series with counts for each URL's country code in the tweets data frame. Country codes will be extracted with tldextract. """ return frequencies(df["urls"], lambda url: tldextract.extract(url).suffix) def word_frequencies( tweets: pd.DataFrame, tokenizer: Optional[Callable] = None ) -> pd.Series: """ Return a series with counts for each word in the tweets data frame. Tokenization will be based on NLTK's TweetTokenizer if no tokenizer is specified. """ tokenizer = tokenizer or TweetTokenizer().tokenize def tokenize(text: str) -> List[str]: tokens = tokenizer(text) # type: ignore return [token.lower() for token in tokens if token not in string.punctuation] return frequencies(tweets["text"].map(tokenize)) def print_callback(query_status: QueryStatus) -> None: """Callback that prints the status of a query to standard out. :param query_status: The query status. """ print( f"Query status: {query_status.state.value} ({humanize.naturalsize(query_status.data_scanned_bytes)} scanned)" )