# nopycln: file
import csv
import datetime
import json
import os
import tempfile
import time
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import List, Callable, Any, Dict
from typing import Optional
from urllib.parse import urlparse, quote
import dateutil
import dateutil.parser
import humanize
import pandas as pd
import requests
import re
from twixl.collections.twinl import _api
from twixl.collections.twinl.exceptions import QueryFailed, QueryCanceled, QueryTimeout
_DEFAULT_QUERY_POLLING_INTERVAL = datetime.timedelta(seconds=10)
_DEFAULT_QUERY_TIMEOUT = datetime.timedelta(minutes=10)
_SCRATCH_DIRECTORY_ENVIRONMENT_VARIABLE_KEY = "TWIXL_SCRATCH_DIRECTORY"
@dataclass
class WordFrequencydataGroupedByHour:
word: str
hour: datetime.datetime
frequency: int
class QueryState(Enum):
"""
Twi-XL query state: queued, running or downloading results.
# TODO: should downloading results be a separate state?
"""
RUNNING = "RUNNING"
QUEUED = "QUEUED"
DOWNLOADING_RESULTS = "DOWNLOADING_RESULTS"
@dataclass(frozen=True)
class QueryStatus:
state: QueryState
data_scanned_bytes: int
@dataclass(frozen=True)
class _SuccessfulQuery:
download_location: str # TODO: better type (Furl)?
data_scanned_bytes: int
[docs]@dataclass
class WordFrequencyResults:
"""WordFrequencyResults object.
Attributes:
tweets (List): List with :class:`WordFrequencydataGroupedByHour`.
"""
# TODO: fix representation to top N elements
words: List[WordFrequencydataGroupedByHour]
[docs] def to_pandas(self, by_hour: bool = False) -> pd.DataFrame:
"""Return the word frequency data as a pandas dataframe.
:param by_hour (default=false): group the frequency list by hour
"""
df = pd.DataFrame([vars(word) for word in self.words])
if not by_hour:
return df.groupby("word").sum().reset_index()
return df
[docs] @staticmethod
def from_file(path: Path) -> "WordFrequencyResults":
"""Read the file from the given path and return a WordFrequency data object.
:param path: source file path
"""
# TODO: typed response possible here?
with path.open("rt") as stream:
reader = csv.reader(stream)
next(reader) # skip header
return WordFrequencyResults(
[
WordFrequencydataGroupedByHour(
word, dateutil.parser.isoparse(hour), int(frequency)
)
for word, hour, frequency in reader
]
)
@dataclass
class TweetMetadata:
tweet_id: str
timestamp: datetime.datetime
[docs]@dataclass
class SearchResults:
"""SearchReults object.
Attributes:
tweets (List): List with :class:`TweetMetadata`.
"""
# TODO: fix representation to top N elements
tweets: List[TweetMetadata]
[docs] def to_pandas(self) -> pd.DataFrame:
"""Return the word frequency data as a pandas dataframe."""
return pd.DataFrame([vars(tweet) for tweet in self.tweets])
[docs] @staticmethod
def from_file(path: Path) -> "SearchResults":
"""Read the file from the given path and return a QueryResults data object.
:param path: source file path
"""
# TODO: typed response possible here?
with path.open("rt") as stream:
reader = csv.reader(stream)
next(reader) # skip header
return SearchResults(
[
TweetMetadata(tweet_id, dateutil.parser.isoparse(timestamp))
for tweet_id, timestamp in reader
]
)
@dataclass
class TweetMetricItem:
year: int
month: int
num_tweets: int
@staticmethod
def from_dict(d: dict) -> "TweetMetricItem":
return TweetMetricItem(d["year"], d["month"], d["num_tweets"])
[docs]class Query:
"""A TwiXL query object to define a search query."""
def __init__(self) -> None:
"""Instantiate a new TwiXL.Query object."""
self._query: List[Dict[Any, Any]] = []
[docs] def keywords(self, keywords: List[str]) -> "Query":
"""Add a query statement that matches any tweet with the specified keyword(s).
:param regex: A list of words
:return: Query object
:rtpe: twinl:Query
Usage::
>>> from twixl.collections import Query
>>> Query().keywords(regex=['twitter', 'tweet'])
"""
if type(keywords) != list:
raise TypeError("keywords parameter can only be a list")
self._query.append({"keywords": keywords})
return self
[docs] def regex(self, regex: List[str]) -> "Query":
"""Add a query statement that matches any tweet with the specified regular expression(s).
:param regex: A list of regular expressions
:return: Query object
:rtpe: twinl:Query
Usage::
>>> from twixl.collections import Query
>>> Query().regex(regex=['\btwit\\w+'])
"""
if type(regex) != list:
raise TypeError("regex parameter can only be a list")
self._query.append({"regex": regex})
return self
[docs] def from_usernames(self, usernames: List[str], overwrite: bool = False) -> "Query":
"""Add a query statement that matches any tweet from the specific username(s) to the TwiXL.Query object.
:param usernames: A list of usernames (excluding the @ character)
:return: Query object
:rtpe: twinl:Query
Usage::
>>> from twixl.collections import Query
>>> Query().from_usernames(usernames=['twitterdev', 'twitterapi'])
"""
if type(usernames) != list:
raise TypeError("usernames parameter can only be a list")
if any(filter(lambda q: "from_userids" in q.keys(), self._query)): # type: ignore
if overwrite:
self._query.remove(
next(filter(lambda q: "from_userids" in q.keys(), self._query))
)
else:
raise ValueError(
"'from_userids' already exists in this query, use overwrite to replace with 'from_usernames'."
)
self._query.append({"from_usernames": usernames})
return self
[docs] def from_userids(self, userids: List[str], overwrite: bool = False) -> "Query":
"""Add a query statement that matches any tweet from the specific userid(s) to the TwiXL.Query object.
:param userids: A list of user's numeric user ID's
:return: Query object
:rtpe: twinl:Query
Usage::
>>> from twixl.collections import Query
>>> Query().from_userids(userids=['twitterdev', 'twitterapi'])
"""
if type(userids) != list:
raise TypeError("userids parameter can only be a list")
if any(filter(lambda q: "from_usernames" in q.keys(), self._query)): # type: ignore
if overwrite:
self._query.remove(
next(filter(lambda q: "from_usernames" in q.keys(), self._query))
)
else:
raise ValueError(
"'from_usernames' already exists in this query, use overwrite to replace with 'from_userids'"
)
self._query.append({"from_userids": userids})
return self
[docs] def url(self, url_regex: List[str]) -> "Query":
"""Add a query statement that matches any tweet that contains one of the specified URLs to the TwiXL.Query object.
:param url: List of url's or regular expressions
:return: Query object
:rtype: twinl.Query
Usage::
>>> from twixl.collections import Query
>>> Query().url(url_regex: ["twitter.com/twitter*"])
"""
if type(url_regex) != list:
raise TypeError("userids parameter can only be a list")
for r in url_regex:
re.compile(r)
self._query.append(
{
"url": [quote(q) for q in url_regex],
}
)
return self
[docs] def print(self) -> None:
"""Print query."""
output = {"query": []} # type: dict
for q_stmt in self._query:
output["query"].append(
{
"AND": {
# convert array into OR seperated string
next(iter(q_stmt)): str(next(iter(q_stmt.values()))).replace(
",", " OR"
)
}
}
)
print(json.dumps(output, indent=4, sort_keys=True))
[docs] def to_dict(self) -> dict:
"""Returns the TwiXL query as a dict.
:return: Query object as dictionary.
:rtype: dict
"""
return {"query": self._query}
[docs]class API(object):
def __init__(self, api_endpoint: str, api_key: str):
"""Instantiate a new TwiXL.Api object.
Args:
api_endpoint (str):
The TwiXL endpoint.
api_key (str):
Your TwiXL api key.
"""
self._session = _api.APISession(api_endpoint, api_key)
def get_search(
self,
query: Query,
start_time: Optional[datetime.datetime] = None,
end_time: Optional[datetime.datetime] = None,
max_results: Optional[int] = None, # TODO: should go into Query-object
callback: Optional[Callable[[QueryStatus], Any]] = None,
) -> SearchResults:
"""Return TwiXL search.
:param query: A TwiXL query object that describes the search query.
:param start_time (Optional): The oldest UTC timestamp from which
the Tweets will be provided.
:param end_time (Optional): The newest, most recent timestamp to
which the Tweets will be provided.
:param max_results (Optional): The maximum number of search results in query output.
By default, all search results will be stored in the query output.
:param callback (Optional): Callback function accepting the query status.
:return: A JSON object.
:rtype: twinl.SearchResults
"""
parameters: dict
parameters = {}
if max_results is not None:
parameters["max_results"] = str(max_results)
if start_time is not None:
parameters["start_time"] = start_time.strftime("%Y-%m-%d %H:%M:%S")
if end_time is not None:
parameters["end_time"] = end_time.strftime("%Y-%m-%d %H:%M:%S")
response = self._session.request_safe(
"POST", "/tweets/search/all", params=parameters, json=query.to_dict()
)
query_id = _api.QueryId.parse_obj(response.json())
successful_query = self._wait_for_query_completion(
query_id.id, _DEFAULT_QUERY_TIMEOUT, callback
)
if callback is not None:
callback(
QueryStatus(
QueryState.DOWNLOADING_RESULTS, successful_query.data_scanned_bytes
)
)
return SearchResults.from_file(
self._download_query_results(successful_query.download_location)
)
def get_word_frequency(
self,
date: datetime.datetime,
max_results: Optional[int] = None,
min_length_words: Optional[int] = None,
frequency_limit: Optional[int] = None,
callback: Optional[Callable[[QueryStatus], Any]] = None,
) -> WordFrequencyResults:
"""Execute a word_frequency request to the TwiXL API for the specified date.
Download and return the result of the request.
:param date: The date for which the word-frequency should be returned.
:param max_results (Optional): The maximum number of search
results in the word cloud. By default, all search results
will be stored in the query output.
The results are returned in descending order.
:param min_length_words (Optional): The minimum word length of the
words in the word-frequency list. The default minimum word length is '1'.
:param frequency_limit (Optional): The minimum occurence rate of the
words in the word-frequency list. The default is '1'
:return: WordFrequencyResults object
:rtype: twinl.WordFrequencyResults
"""
# function
# TODO: check types of query params; currently all are strings
parameters: dict
parameters = {"date": date.strftime("%Y-%m-%d")}
if max_results is not None:
parameters["max_results"] = str(max_results)
if min_length_words is not None:
parameters["min_length_words"] = str(min_length_words)
if frequency_limit is not None:
parameters["frequency_limit"] = str(frequency_limit)
response = self._session.request_safe(
"POST", "/tweets/words/frequency", params=parameters
)
query_id = _api.QueryId.parse_obj(response.json())
successful_query = self._wait_for_query_completion(
query_id.id, _DEFAULT_QUERY_TIMEOUT, callback
)
if callback:
callback(
QueryStatus(
QueryState.DOWNLOADING_RESULTS, successful_query.data_scanned_bytes
)
)
return WordFrequencyResults.from_file(
self._download_query_results(successful_query.download_location)
)
def get_tweet_metrics(self) -> TweetMetrics:
return TweetMetrics.from_dict(
self._session.request_safe("GET", "/tweets/metrics").json()
)
# TODO: raise error on failed/canceled query and return query location + bytes scanned
def _wait_for_query_completion(
self,
query_id: str,
timeout: datetime.timedelta = _DEFAULT_QUERY_TIMEOUT,
callback: Optional[Callable[[QueryStatus], Any]] = None,
) -> _SuccessfulQuery:
# TODO: make function which waits and times out
start = datetime.datetime.now()
while (datetime.datetime.now() - start) < timeout:
response = self._session.request_safe("GET", f"/query/results/{query_id}")
api_query_status = _api.QueryStatus.parse_obj(response.json())
if api_query_status.state == _api.QueryState.SUCCEEDED:
if api_query_status.location is None:
raise ValueError(
"query has succeeded but no download location is given"
)
return _SuccessfulQuery(
api_query_status.location, api_query_status.datascanned
)
elif api_query_status.state == _api.QueryState.FAILED:
raise QueryFailed()
elif api_query_status.state == _api.QueryState.CANCELED:
raise QueryCanceled()
if callback is not None:
callback(
QueryStatus(
QueryState[api_query_status.state.value],
api_query_status.datascanned,
)
)
# TODO: make interval configurable
time.sleep(_DEFAULT_QUERY_POLLING_INTERVAL.total_seconds())
else:
raise QueryTimeout(timeout)
def _download_query_results(self, url: str) -> Path:
"""Downloads query contents to a file, returning a path.
:param url: The AWS S3 presigned URL.
:return: A Query Result object.
"""
# TODO: callback for downloading status
# TODO: cache
# TODO: cache results locally first to lower costs
# TODO: get ID from query status
filename = urlparse(url).path.rpartition("/")[-1]
with _get_temporary_file_for_writing(
prefix="twixl-query-results-", suffix=f"-{filename}"
) as stream:
with requests.get(url, stream=True) as r:
for chunk in r.iter_content(chunk_size=8192): # type: ignore
stream.write(chunk)
return Path(stream.name)
def get_twixl_api() -> API:
"""Initialize a twixl api object.
:rtype: Initialized twixl API object
"""
try:
return API(
api_endpoint=os.environ["TWIXL_API_ENDPOINT"],
api_key=os.environ["TWIXL_API_KEY"],
)
except KeyError as e:
raise ValueError(
f"environment variable {e.args[0]} required for initializing TwiXL API"
)
[docs]def search(
query: Query,
start_time: Optional[datetime.datetime] = None,
end_time: Optional[datetime.datetime] = None,
max_results: Optional[int] = None,
api: Optional[API] = None,
callback: Optional[Callable[[QueryStatus], Any]] = None,
) -> SearchResults:
"""Sends a TwiXL search query.
:param query: A TwiXL query object that describes the search query.
:param start_time: The oldest UTC timestamp from which the Tweets will be provided.
:param end_time: The newest UTC timestamp from which the Tweets will be provided.
:param max_results (Optional): The maximum number of search results in query output.
By default, all search results will be stored in the query output.
:param api (Optional): An initialized twixl Api object. If not provided,
an object will be initialized.
:param callback (Optional): Callback function taking a QueryStatus object.
:return: :class:`SearchReuslts <SearchResults>` object
:rtype: twinl.SearchResults
Usage::
>>> from twixl.collections import twinl
>>> query = (
>>> twinl.Query().
>>> and_(keywords=["example", "query"])
>>> )
>>> search_results = twinl.search(
>>> query=query,
>>> start_time=datetime(2022, 1, 1, 0, 0),
>>> end_time=datetime(2022, 1, 1, 23, 59, 59),
>>> max_results=100,
>>> callback=twinl.print_callback
>>> )
Query status: RUNNING (0 Bytes scanned)
Query status: DOWNLOADING_RESULTS (1 GB scanned)
>>> SearchResults
SearchResults(...)
"""
if api is None:
api = get_twixl_api()
return api.get_search(
query=query,
start_time=start_time,
end_time=end_time,
max_results=max_results,
callback=callback,
)
[docs]def word_frequency(
date: datetime.datetime,
min_length_words: int = 1,
max_results: Optional[int] = None,
frequency_limit: Optional[int] = None,
api: Optional[API] = None,
callback: Optional[Callable[[QueryStatus], Any]] = None,
) -> WordFrequencyResults:
"""Sends a word_frequency request to the TwiXL API for the specified date.
:param date: The date for which the word-frequency should be returned.
:param min_length_words: The minimum word length of the words in the word-frequency list.
The default minimum word length is '1'.
:param max_results (Optional): The maximum number of search results in query output.
By default, all search results will be stored in the query output.
:param frequency_limit (Optional): The minimum occurence rate of the words
in the word-frequency list. The default limit is '1'.
:param api (Optional): An initialized twixl Api object. If not provided, an object will be initialized.
:return: :class:`WordFrequencyResults <WordFrequencyResults>` object
:rtype: twinl.WordFrequencyResults
Usage::
>>> from twixl.collections import twinl
>>> word_frequencies = twinl.word_frequency(
>>> date=datetime(2022, 1, 1),
>>> min_length_words=2,
>>> callback=twinl.print_callback
>>> )
Query status: RUNNING (0 Bytes scanned)
Query status: DOWNLOADING_RESULTS (1.0 GB scanned)
>>> word_frequencies.to_pandas()
word frequency
0 example 100
1 words 50
"""
if api is None:
api = get_twixl_api()
return api.get_word_frequency(
date=date,
min_length_words=min_length_words,
max_results=max_results,
frequency_limit=frequency_limit,
callback=callback,
)
def _get_temporary_file_for_writing(
prefix: Optional[str] = None,
suffix: Optional[str] = None,
mode: str = "wb",
delete: bool = False,
):
"""Return a temporary file for writing to. Will either be created in a user-specified scratch directory (if provided
through the relevant environment variable), or created directly as a temporary file through Python's tempfile."""
# TODO: provide protocol for write() and .name
try:
scratch_directory = Path(
os.environ[_SCRATCH_DIRECTORY_ENVIRONMENT_VARIABLE_KEY]
)
scratch_directory.mkdir(parents=True, exist_ok=True)
except KeyError:
scratch_directory = None
return tempfile.NamedTemporaryFile(
mode, prefix=prefix, suffix=suffix, dir=scratch_directory, delete=delete
)
def print_callback(query_status: QueryStatus) -> None:
"""Callback that prints the status of a query to standard out.
:param query_status: The query status.
"""
print(
f"Query status: {query_status.state.value} ({humanize.naturalsize(query_status.data_scanned_bytes)} scanned)"
)