Source code for twixl.collections.twinl

# nopycln: file
import csv
import datetime
import json
import os
import tempfile
import time
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import List, Callable, Any
from typing import Optional
from urllib.parse import urlparse

import dateutil
import dateutil.parser
import humanize
import pandas as pd
import requests

from twixl.collections.twinl import _api
from twixl.collections.twinl.exceptions import QueryFailed, QueryCanceled, QueryTimeout

_DEFAULT_QUERY_POLLING_INTERVAL = datetime.timedelta(seconds=10)
_DEFAULT_QUERY_TIMEOUT = datetime.timedelta(minutes=10)
_SCRATCH_DIRECTORY_ENVIRONMENT_VARIABLE_KEY = "TWIXL_SCRATCH_DIRECTORY"


@dataclass
class WordFrequencydataGroupedByHour:
    word: str
    hour: datetime.datetime
    frequency: int


class QueryState(Enum):
    """
    Twi-XL query state: queued, running or downloading results.
    # TODO: should downloading results be a separate state?
    """

    RUNNING = "RUNNING"
    QUEUED = "QUEUED"
    DOWNLOADING_RESULTS = "DOWNLOADING_RESULTS"


@dataclass(frozen=True)
class QueryStatus:
    state: QueryState
    data_scanned_bytes: int


@dataclass(frozen=True)
class _SuccessfulQuery:
    download_location: str  # TODO: better type (Furl)?
    data_scanned_bytes: int


[docs]@dataclass class WordFrequencyResults: # TODO: fix representation to top N elements words: List[WordFrequencydataGroupedByHour] def to_pandas(self, by_hour: bool = False) -> pd.DataFrame: """Return the word frequency data as a pandas dataframe. :param by_hour (default=false): group the frequency list by hour """ df = pd.DataFrame([vars(word) for word in self.words]) if not by_hour: return df.groupby("word").sum().reset_index() return df @staticmethod def from_file(path: Path) -> "WordFrequencyResults": """Read the file from the given path and return a WordFrequency data object.""" # TODO: typed response possible here? with path.open("rt") as stream: reader = csv.reader(stream) next(reader) # skip header return WordFrequencyResults( [ WordFrequencydataGroupedByHour( word, dateutil.parser.isoparse(hour), int(frequency) ) for word, hour, frequency in reader ] )
@dataclass class TweetMetadata: tweet_id: str timestamp: datetime.datetime
[docs]@dataclass class SearchResults: # TODO: fix representation to top N elements tweets: List[TweetMetadata] def to_pandas(self) -> pd.DataFrame: """Return the word frequency data as a pandas dataframe.""" return pd.DataFrame([vars(tweet) for tweet in self.tweets]) @staticmethod def from_file(path: Path) -> "SearchResults": """Read the file from the given path and return a QueryResults data object.""" # TODO: typed response possible here? with path.open("rt") as stream: reader = csv.reader(stream) next(reader) # skip header return SearchResults( [ TweetMetadata(tweet_id, dateutil.parser.isoparse(timestamp)) for tweet_id, timestamp in reader ] )
@dataclass class TweetMetricItem: year: int month: int num_tweets: int @staticmethod def from_dict(d: dict) -> "TweetMetricItem": return TweetMetricItem(d["year"], d["month"], d["num_tweets"])
[docs]@dataclass class TweetMetrics: metrics: List[TweetMetricItem] def to_pandas(self) -> pd.Series: # TODO: series or data frame? return pd.Series( data=(item.num_tweets for item in self.metrics), index=pd.to_datetime( [datetime.datetime(item.year, item.month, 1) for item in self.metrics] ), ).sort_index() @staticmethod def from_dict(d: dict) -> "TweetMetrics": return TweetMetrics([TweetMetricItem.from_dict(d) for d in d["metrics"]])
[docs]class Query: """A TwiXL query object to define a search query.""" def __init__(self) -> None: """Instantiate a new TwiXL.Query object.""" self._query: List[dict] = []
[docs] def or_(self, keywords: List[str], regex: bool = False) -> "Query": """Add an OR query statement to the TwiXL.Query object. :param keywords: A list of keywords/regular expressions that you wish to find in the Twitter text. :param regex: (Optional, boolean) if True, the list of keywords are a list of regular expressions. Defaults to False, the list of keywords are plain text words. :return: Query object :rtype: twinl.Query Usage:: >>> from twixl.collections import Query >>> Query().or_(['twi', 'xl']) """ self._query.append( { "operator": "OR", "search_type": "regex" if regex else "word", "keywords": keywords, } ) return self
[docs] def and_(self, keywords: List[str], regex: bool = False) -> "Query": """Add an AND query statement to the TwiXL.Query object. :param keywords: A list of keywords/regular expressions that you wish to find in the Twitter text. :param regex (Optional): if True, the list of keywords are a list of regular expressions. Defaults to False, the list of keywords are plain text words. :return: Query object :rtype: twinl.Query Usage:: >>> from twixl.collections import Query >>> Query().and_(['twi', 'xl']) """ self._query.append( { "operator": "AND", "search_type": "regex" if regex else "word", "keywords": keywords, } ) return self
[docs] def print(self) -> None: """Print query in JSON format.""" print(json.dumps(self.to_dict(), indent=4, sort_keys=True))
[docs] def to_dict(self) -> dict: """Returns the TwiXL query as a dict. :return: Query object as dictionary. :rtype: dict """ return {"query": self._query}
[docs]class API(object): def __init__(self, api_endpoint: str, api_key: str): """Instantiate a new TwiXL.Api object. Args: api_endpoint (str): The TwiXL endpoint. api_key (str): Your TwiXL api key. """ self._session = _api.APISession(api_endpoint, api_key) def get_search( self, query: Query, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, max_results: Optional[int] = None, # TODO: should go into Query-object callback: Optional[Callable[[QueryStatus], Any]] = None, ) -> SearchResults: """Return TwiXL search. :param query: A TwiXL query object that describes the search query. :param start_time (Optional): The oldest UTC timestamp from which the Tweets will be provided. :param end_time (Optional): The newest, most recent timestamp to which the Tweets will be provided. :param max_results (Optional): The maximum number of search results in query output. By default, all search results will be stored in the query output. :param callback (Optional): Callback function accepting the query status. :return: A JSON object. :rtype: twinl.SearchResults """ parameters: dict parameters = {} if max_results is not None: parameters["max_results"] = str(max_results) if start_time is not None: parameters["start_time"] = start_time.strftime("%Y-%m-%d %H:%M:%S") if end_time is not None: parameters["end_time"] = end_time.strftime("%Y-%m-%d %H:%M:%S") response = self._session.request_safe( "POST", "/tweets/search/all", params=parameters, json=query.to_dict() ) query_id = _api.QueryId.parse_obj(response.json()) successful_query = self._wait_for_query_completion( query_id.id, _DEFAULT_QUERY_TIMEOUT, callback ) if callback is not None: callback( QueryStatus( QueryState.DOWNLOADING_RESULTS, successful_query.data_scanned_bytes ) ) return SearchResults.from_file( self._download_query_results(successful_query.download_location) ) def get_word_frequency( self, date: datetime.datetime, max_results: Optional[int] = None, min_length_words: Optional[int] = None, frequency_limit: Optional[int] = None, callback: Optional[Callable[[QueryStatus], Any]] = None, ) -> WordFrequencyResults: """Execute a word_frequency request to the TwiXL API for the specified date. Download and return the result of the request. :param date: The date for which the word-frequency should be returned. :param max_results (Optional): The maximum number of search results in the word cloud. By default, all search results will be stored in the query output. The results are returned in descending order. :param min_length_words (Optional): The minimum word length of the words in the word-frequency list. The default minimum word length is '1'. :param frequency_limit (Optional): The minimum occurence rate of the words in the word-frequency list. The default is '1' :return: WordFrequencyResults object :rtype: twinl.WordFrequencyResults """ # function # TODO: check types of query params; currently all are strings parameters: dict parameters = {"date": date.strftime("%Y-%m-%d")} if max_results is not None: parameters["max_results"] = str(max_results) if min_length_words is not None: parameters["min_length_words"] = str(min_length_words) if frequency_limit is not None: parameters["frequency_limit"] = str(frequency_limit) response = self._session.request_safe( "POST", "/tweets/words/frequency", params=parameters ) query_id = _api.QueryId.parse_obj(response.json()) successful_query = self._wait_for_query_completion( query_id.id, _DEFAULT_QUERY_TIMEOUT, callback ) if callback: callback( QueryStatus( QueryState.DOWNLOADING_RESULTS, successful_query.data_scanned_bytes ) ) return WordFrequencyResults.from_file( self._download_query_results(successful_query.download_location) ) def get_tweet_metrics(self) -> TweetMetrics: return TweetMetrics.from_dict( self._session.request_safe("GET", "/tweets/metrics").json() ) # TODO: raise error on failed/canceled query and return query location + bytes scanned def _wait_for_query_completion( self, query_id: str, timeout: datetime.timedelta = _DEFAULT_QUERY_TIMEOUT, callback: Optional[Callable[[QueryStatus], Any]] = None, ) -> _SuccessfulQuery: # TODO: make function which waits and times out start = datetime.datetime.now() while (datetime.datetime.now() - start) < timeout: response = self._session.request_safe("GET", f"/query/results/{query_id}") api_query_status = _api.QueryStatus.parse_obj(response.json()) if api_query_status.state == _api.QueryState.SUCCEEDED: if api_query_status.location is None: raise ValueError( "query has succeeded but no download location is given" ) return _SuccessfulQuery( api_query_status.location, api_query_status.datascanned ) elif api_query_status.state == _api.QueryState.FAILED: raise QueryFailed() elif api_query_status.state == _api.QueryState.CANCELED: raise QueryCanceled() if callback is not None: callback( QueryStatus( QueryState[api_query_status.state.value], api_query_status.datascanned, ) ) # TODO: make interval configurable time.sleep(_DEFAULT_QUERY_POLLING_INTERVAL.total_seconds()) else: raise QueryTimeout(timeout) def _download_query_results(self, url: str) -> Path: """Downloads query contents to a file, returning a path. :param url: The AWS S3 presigned URL. :return: A Query Result object. """ # TODO: callback for downloading status # TODO: cache # TODO: cache results locally first to lower costs # TODO: get ID from query status filename = urlparse(url).path.rpartition("/")[-1] with _get_temporary_file_for_writing( prefix="twixl-query-results-", suffix=f"-{filename}" ) as stream: with requests.get(url, stream=True) as r: for chunk in r.iter_content(chunk_size=8192): stream.write(chunk) return Path(stream.name)
def get_twixl_api() -> API: """Initialize a twixl api object. :rtype: Initialized twixl API object """ try: return API( api_endpoint=os.environ["TWIXL_API_ENDPOINT"], api_key=os.environ["TWIXL_API_KEY"], ) except KeyError as e: raise ValueError( f"environment variable {e.args[0]} required for initializing TwiXL API" )
[docs]def word_frequency( date: datetime.datetime, min_length_words: int = 1, max_results: Optional[int] = None, frequency_limit: Optional[int] = None, api: Optional[API] = None, callback: Optional[Callable[[QueryStatus], Any]] = None, ) -> WordFrequencyResults: """Sends a word_frequency request to the TwiXL API for the specified date. :param date: The date for which the word-frequency should be returned. :param min_length_words: The minimum word length of the words in the word-frequency list. The default minimum word length is '1'. :param max_results (Optional): The maximum number of search results in query output. By default, all search results will be stored in the query output. :param frequency_limit (Optional): The minimum occurence rate of the words in the word-frequency list. The default limit is '1'. :param api (Optional): An initialized twixl Api object. If not provided, an object will be initialized. :return: :class:`WordFrequencyResults <WordFrequencyResults>` object :rtype: twinl.WordFrequencyResults Usage:: >>> from twixl.collections import twinl >>> word_frequencies = twinl.word_frequency( >>> date=datetime(2022, 1, 1), >>> min_length_words=2, >>> callback=twinl.print_callback >>> ) Query status: RUNNING (0 Bytes scanned) Query status: DOWNLOADING_RESULTS (1.0 GB scanned) >>> word_frequencies.to_pandas() word frequency 0 example 100 1 words 50 """ if api is None: api = get_twixl_api() return api.get_word_frequency( date=date, min_length_words=min_length_words, max_results=max_results, frequency_limit=frequency_limit, callback=callback, )
[docs]def tweet_metrics(api: Optional[API] = None) -> TweetMetrics: """Sends a get metrics request to the TwiXL Api and returns the results. :param api (Optional): An initialized twixl Api object. If not provided, an object will be initialized. :return: :class:`TweetMetrics <TweetMetrics>` object :rtype: twinl.TweetMetrics Usage:: >>> from twixl.collections import twinl >>> metrics = twinl.tweet_metrics() >>> merics.to_pandas() """ if api is None: api = get_twixl_api() return api.get_tweet_metrics()
def _get_temporary_file_for_writing( prefix: Optional[str] = None, suffix: Optional[str] = None, mode: str = "wb", delete: bool = False, ): """Return a temporary file for writing to. Will either be created in a user-specified scratch directory (if provided through the relevant environment variable), or created directly as a temporary file through Python's tempfile.""" # TODO: provide protocol for write() and .name try: scratch_directory = Path( os.environ[_SCRATCH_DIRECTORY_ENVIRONMENT_VARIABLE_KEY] ) scratch_directory.mkdir(parents=True, exist_ok=True) except KeyError: scratch_directory = None return tempfile.NamedTemporaryFile( mode, prefix=prefix, suffix=suffix, dir=scratch_directory, delete=delete ) def print_callback(query_status: QueryStatus) -> None: """Callback that prints the status of a query to standard out. :param query_status: The query status. """ print( f"Query status: {query_status.state.value} ({humanize.naturalsize(query_status.data_scanned_bytes)} scanned)" )