import json
import re
import humanize
# nopycln: file
import csv
import datetime
import os
import time
import tempfile
import pendulum
import requests
import string
import dateutil
import dateutil.parser
import pandas as pd
import tldextract
from urllib.parse import urlparse, quote
from nltk.tokenize import TweetTokenizer
from typing import List, Any, Dict, Callable
from gzip import GzipFile
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from enum import Enum
from typing import Generator, Union
from twixl.collections.twitter._api import APISession
from twixl.collections.twitter import _api
from twixl.collections.twitter.exceptions import (
QueryCanceled,
QueryFailed,
QueryTimeout,
)
_DEFAULT_QUERY_POLLING_INTERVAL = pendulum.duration(seconds=10)
_DEFAULT_QUERY_TIMEOUT = pendulum.duration(minutes=15)
_SCRATCH_DIRECTORY_ENVIRONMENT_VARIABLE_KEY = "TWIXL_SCRATCH_DIRECTORY"
class QueryState(Enum):
"""
Twi-XL query state: queued, running or downloading results.
# TODO: should downloading results be a separate state?
"""
RUNNING = "RUNNING"
QUEUED = "QUEUED"
DOWNLOADING_RESULTS = "DOWNLOADING_RESULTS"
@dataclass(frozen=True)
class QueryStatus:
state: QueryState
data_scanned_bytes: int
@dataclass(frozen=True)
class _SuccessfulQuery:
download_location: List[str] # TODO: better type (Furl)?
data_scanned_bytes: int
@dataclass
class WordFrequencydataGroupedByHour:
word: str
hour: datetime.datetime
frequency: int
@dataclass
class WordFrequencyResults:
"""WordFrequencyResults object.
Attributes:
tweets (List): List with :class:`WordFrequencydataGroupedByHour`.
"""
# TODO: fix representation to top N elements
words: List[WordFrequencydataGroupedByHour]
def to_pandas(self, by_hour: bool = False) -> pd.DataFrame:
"""Return the word frequency data as a pandas dataframe.
:param by_hour (default=false): group the frequency list by hour
"""
df = pd.DataFrame([vars(word) for word in self.words])
if not by_hour:
return df.groupby("word").sum().reset_index()
return df
@staticmethod
def from_file(path: Path) -> "WordFrequencyResults":
"""Read the file from the given path and return a WordFrequency data object.
:param path: source file path
"""
# TODO: typed response possible here?
with path.open("rt") as stream:
reader = csv.reader(stream)
next(reader) # skip header
return WordFrequencyResults(
[
WordFrequencydataGroupedByHour(
word, dateutil.parser.isoparse(hour), int(frequency)
)
for word, hour, frequency in reader
]
)
@dataclass
class TweetMetadata:
tweet_id: str
timestamp: datetime.datetime
@dataclass
class Tweet:
tweet_id: str
timestamp: datetime.datetime
text: str
full_text: str
hashtags: List[str]
urls: List[str]
source: str
@dataclass
class ResultPartition:
"""Represents a partitioned result from a query, containing the URL to download and an optional file path.
Attributes
----------
url : str
The URL to download the result from.
file : Path | None
The file path where the result is saved, by default None.
"""
url: str
file: Optional[Path] = None
def save_as(self, target_dir: Path) -> "ResultPartition":
"""Saves the current file to the specified target directory.
If the file is not already downloaded, it triggers the download process.
:param target_dir: The directory to save the file to.
:type target_dir: Path
:return: The instance of ResultPartition with updated file path.
:rtype: ResultPartition
"""
target_dir.mkdir(parents=True, exist_ok=True)
if self.file is None:
# If file is not already downloaded, download it
self.download(target_dir=target_dir)
else:
target = target_dir / self.file.name
self.file.rename(target)
self.file = target
return self
def download(self, target_dir: Optional[Path] = None) -> "ResultPartition":
"""Downloads the file from the URL and saves it to the specified directory or a temporary location.
:param target_dir: The directory to save the downloaded file to. If not provided, a temporary directory is used.
:type target_dir: Path | None
:return: The instance of ResultPartition with the downloaded file.
:rtype: ResultPartition
"""
if self.file is None:
filename = urlparse(self.url).path.rpartition("/")[-1]
with _get_temporary_file_for_writing(
prefix="twixl-query-results-",
suffix=f"-{filename}.parquet",
dir=target_dir,
) as stream:
with requests.get(self.url, stream=True) as response:
response.raise_for_status() # type: ignore
for chunk in response.iter_content(chunk_size=8192): # type: ignore
stream.write(chunk)
self.file = Path(stream.name)
return self
[docs]class SearchResults(object):
"""Manages a collection of ResultPartition objects, providing methods to save and download all results.
Attributes
----------
result_partitions : List[ResultPartition]
A list of ResultPartition objects initialized from the provided URLs.
"""
def __init__(self, result_urls: List[str]):
"""
Initializes SearchResults with a list of result URLs.
:param result_urls: A list of URLs to download results from.
:type result_urls: List[str]
"""
self.result_partitions = [ResultPartition(url) for url in result_urls]
[docs] def save_as(self, target_dir: Union[str, Path]) -> None:
"""Saves all result files to the specified target directory.
:param target_dir: The directory to save all files to.
:type target_dir: Union[str, Path]
"""
for rp in self.result_partitions:
rp.save_as(target_dir=Path(target_dir))
[docs] def download(self) -> Generator[pd.DataFrame, None, None]:
"""Downloads each result file and yields it as a DataFrame.
:yield: A DataFrame for each downloaded file.
:rtype: Generator[pd.DataFrame, None, None]
"""
for rp in self.result_partitions:
if rp.file is None:
rp.download()
yield pd.read_parquet(rp.file, engine="pyarrow")
[docs] def download_all(self) -> pd.DataFrame:
"""Downloads all result files and concatenates them into a single DataFrame.
:return: A concatenated DataFrame containing all downloaded results.
:rtype: pd.DataFrame
"""
dfs = [
pd.read_parquet(rp.download().file, engine="pyarrow")
for rp in self.result_partitions
]
return pd.concat(dfs, ignore_index=True)
@dataclass
class TweetMetricItem:
year: int
month: int
num_tweets: int
@staticmethod
def from_dict(d: dict) -> "TweetMetricItem":
return TweetMetricItem(d["year"], d["month"], d["num_tweets"])
[docs]class dataset(Enum):
"""
Twitter datasets available on twi-xl.
"""
ALL = "/tweets/search/all"
TWINL = "/tweets/search/twinl"
POLITICS = "/tweets/search/politics"
[docs]class Query:
"""A TwiXL query object to define a search query."""
def __init__(self, dataset=dataset.ALL):
"""Instantiate a new TwiXL.Query object."""
self._query: List[Dict[Any, Any]] = []
self.dataset = dataset
[docs] def keywords(self, keywords: List[str]) -> "Query":
"""Add a query statement that matches any tweet with the specified keyword(s).
:param keywords: A list of words
:return: Query object
:rtpe: twitter:Query
Usage::
>>> from twixl.collections import Query
>>> Query().keywords(keywords=['twitter', 'tweet'])
"""
if type(keywords) != list:
raise TypeError("keywords parameter can only be a list")
self._query.append({"keywords": keywords})
return self
[docs] def regex(self, regex: List[str]) -> "Query":
"""Add a query statement that matches any tweet which match any of the specified regular expression(s).
:param regex: A list of regular expressions
:return: Query object
:rtpe: twitter:Query
Usage::
>>> from twixl.collections import Query
>>> Query().regex(regex=['\\btwit\\w+'])
"""
if type(regex) != list:
raise TypeError("regex parameter can only be a list")
self._query.append({"regex": regex})
return self
[docs] def from_usernames(self, usernames: List[str], overwrite: bool = False) -> "Query":
"""Add a query statement that matches any tweet from the specific username(s) to the TwiXL.Query object.
:param usernames: A list of usernames (excluding the @ character)
:return: Query object
:rtpe: twitter:Query
Usage::
>>> from twixl.collections import Query
>>> Query().from_usernames(usernames=['twitterdev', 'twitterapi'])
"""
if type(usernames) != list:
raise TypeError("usernames parameter can only be a list")
if any(filter(lambda q: "from_userids" in q.keys(), self._query)): # type: ignore
if overwrite:
self._query.remove(
next(filter(lambda q: "from_userids" in q.keys(), self._query))
)
else:
raise ValueError(
"'from_userids' already exists in this query, use overwrite to replace with 'from_usernames'."
)
self._query.append({"from_usernames": usernames})
return self
[docs] def from_userids(self, userids: List[str], overwrite: bool = False) -> "Query":
"""Add a query statement that matches any tweet from the specific userid(s) to the TwiXL.Query object.
:param userids: A list of user's numeric user ID's
:return: Query object
:rtpe: twitter:Query
Usage::
>>> from twixl.collections import Query
>>> Query().from_userids(userids=['twitterdev', 'twitterapi'])
"""
if type(userids) != list:
raise TypeError("userids parameter can only be a list")
if any(filter(lambda q: "from_usernames" in q.keys(), self._query)): # type: ignore
if overwrite:
self._query.remove(
next(filter(lambda q: "from_usernames" in q.keys(), self._query))
)
else:
raise ValueError(
"'from_usernames' already exists in this query, use overwrite to replace with 'from_userids'"
)
self._query.append({"from_userids": userids})
return self
[docs] def url(self, url_regex: List[str], must_not: bool = False) -> "Query":
"""Add a query statement that matches any tweet that contains one of the specified URLs to the TwiXL.Query object.
:param url: List of url's or regular expressions
:param must_not: If set to True, the list of url's must not appear in the final results
:return: Query object
:rtype: twitter.Query
Usage::
>>> from twixl.collections import Query
>>> Query().url(url_regex: ["twitter.com/twitter*"])
"""
if type(url_regex) != list:
raise TypeError("userids parameter can only be a list")
for r in url_regex:
re.compile(r)
if must_not:
self._query.append(
{
"not_url": [quote(q) for q in url_regex],
}
)
else:
self._query.append(
{
"url": [quote(q) for q in url_regex],
}
)
return self
[docs] def print(self) -> None:
"""Print query."""
output = {"query": []} # type: dict
for q_stmt in self._query:
output["query"].append(
{
"AND": {
# convert array into OR seperated string
next(iter(q_stmt)): str(next(iter(q_stmt.values()))).replace(
",", " OR"
)
}
}
)
print(json.dumps(output, indent=4, sort_keys=True))
[docs] def to_dict(self) -> dict:
"""Returns the TwiXL query as a dict.
:return: Query object as dictionary.
:rtype: dict
"""
return {"query": self._query}
def _download_query_results(self, url: str) -> Path:
"""Downloads query contents to a file, returning a path.
:param url: The AWS S3 presigned URL.
:return: A Query Result object.
"""
# TODO: callback for downloading status
# TODO: cache
# TODO: cache results locally first to lower costs
# TODO: get ID from query status
filename = urlparse(url).path.rpartition("/")[-1]
with _get_temporary_file_for_writing(
prefix="twixl-query-results-", suffix=f"-{filename}.gz"
) as stream, GzipFile(
fileobj=stream, mode="wb", compresslevel=1
) as compressed_stream:
with requests.get(url, stream=True) as r:
for chunk in r.iter_content(chunk_size=8192): # type: ignore
compressed_stream.write(chunk)
return Path(stream.name)
[docs]class API(object):
def __init__(self, api_endpoint: str, api_key: str):
"""Instantiate a new TwiXL.Api object.
Args:
api_endpoint (str):
The TwiXL endpoint.
api_key (str):
Your TwiXL api key.
"""
self._session = APISession(api_endpoint, api_key)
def get_search(
self,
query: Query,
start_time: Optional[datetime.datetime] = None,
end_time: Optional[datetime.datetime] = None,
max_results: Optional[int] = None, # TODO: should go into Query-object
callback: Optional[Callable[[QueryStatus], Any]] = None,
extended_tweet: bool = True,
) -> SearchResults:
"""Return TwiXL search.
:param query: A TwiXL query object that describes the search query.
:param start_time (Optional): The oldest UTC timestamp from which
the Tweets will be provided.
:param end_time (Optional): The newest, most recent timestamp to
which the Tweets will be provided.
:param max_results (Optional): The maximum number of search results in query output.
By default, all search results will be stored in the query output.
:param callback (Optional): Callback function accepting the query status.
:param extended_tweet (Optional): A boolean flag that determines whether to include
the full, untruncated text of tweets in the search query.
- Default: True — The search query applies to the entire 280-character text of the tweet.
- If set to False: The search query applies only to the truncated portion (first 140 characters) of the tweet.
In both modes, any tweets that match the query will contain the full text of the tweet in the results.
:return: A JSON object.
:rtype: twitter.SearchResults
"""
parameters: dict
parameters = {}
# query also runs on extended tweet field, this is default
parameters["extended_tweet"] = extended_tweet
if max_results is not None:
parameters["max_results"] = str(max_results)
if start_time is not None:
parameters["start_time"] = start_time.strftime("%Y-%m-%d %H:%M:%S")
if end_time is not None:
parameters["end_time"] = end_time.strftime("%Y-%m-%d %H:%M:%S")
response = self._session.request_safe(
"POST", query.dataset.value, params=parameters, json=query.to_dict()
)
query_id = _api.QueryId.parse_obj(response.json())
successful_query = self._wait_for_query_completion(
query_id.id, _DEFAULT_QUERY_TIMEOUT, callback
)
if callback is not None:
callback(
QueryStatus(
QueryState.DOWNLOADING_RESULTS, successful_query.data_scanned_bytes
)
)
return SearchResults(successful_query.download_location)
# def get_word_frequency(
# self,
# date: datetime.datetime,
# max_results: Optional[int] = None,
# min_length_words: Optional[int] = None,
# frequency_limit: Optional[int] = None,
# callback: Optional[Callable[[QueryStatus], Any]] = None,
# ) -> WordFrequencyResults:
# """Execute a word_frequency request to the TwiXL API for the specified date.
# Download and return the result of the request.
# :param date: The date for which the word-frequency should be returned.
# :param max_results (Optional): The maximum number of search
# results in the word cloud. By default, all search results
# will be stored in the query output.
# The results are returned in descending order.
# :param min_length_words (Optional): The minimum word length of the
# words in the word-frequency list. The default minimum word length is '1'.
# :param frequency_limit (Optional): The minimum occurence rate of the
# words in the word-frequency list. The default is '1'
# :return: WordFrequencyResults object
# :rtype: twitter.WordFrequencyResults
# """
# # function
# # TODO: check types of query params; currently all are strings
# parameters: dict
# parameters = {"date": date.strftime("%Y-%m-%d")}
# if max_results is not None:
# parameters["max_results"] = str(max_results)
# if min_length_words is not None:
# parameters["min_length_words"] = str(min_length_words)
# if frequency_limit is not None:
# parameters["frequency_limit"] = str(frequency_limit)
# response = self._session.request_safe(
# "POST", "/tweets/words/frequency", params=parameters
# )
# query_id = _api.QueryId.parse_obj(response.json())
# successful_query = self._wait_for_query_completion(
# query_id.id, _DEFAULT_QUERY_TIMEOUT, callback
# )
# if callback:
# callback(
# QueryStatus(
# QueryState.DOWNLOADING_RESULTS, successful_query.data_scanned_bytes
# )
# )
# return WordFrequencyResults.from_file(
# self._download_query_results(successful_query.download_location)
# )
def get_tweet_metrics(self) -> TweetMetrics:
return TweetMetrics.from_dict(
self._session.request_safe("GET", "/tweets/metrics").json()
)
# TODO: raise error on failed/canceled query and return query location + bytes scanned
def _wait_for_query_completion(
self,
query_id: str,
timeout: pendulum.Duration = _DEFAULT_QUERY_TIMEOUT,
callback: Optional[Callable[[QueryStatus], Any]] = None,
) -> _SuccessfulQuery:
# TODO: make function which waits and times out
start = datetime.datetime.now()
while (datetime.datetime.now() - start) < timeout:
response = self._session.request_safe("GET", f"/query/results/{query_id}")
api_query_status = _api.QueryStatus.parse_obj(response.json())
if api_query_status.state == _api.QueryState.SUCCEEDED:
if len(api_query_status.location) == 0:
raise ValueError(
"query has succeeded but no download location is given"
)
return _SuccessfulQuery(
api_query_status.location, api_query_status.datascanned # type: ignore
)
elif api_query_status.state == _api.QueryState.FAILED:
raise QueryFailed()
elif api_query_status.state == _api.QueryState.CANCELED:
raise QueryCanceled()
if callback is not None:
callback(
QueryStatus(
QueryState[api_query_status.state.value],
api_query_status.datascanned,
)
)
# TODO: make interval configurable
time.sleep(_DEFAULT_QUERY_POLLING_INTERVAL.total_seconds())
else:
raise QueryTimeout(timeout)
def _download_query_results(self, url: List[str]) -> List[Path]:
"""Downloads query contents to a file, returning a path.
:param url: The AWS S3 presigned URL.
:return: A Query Result object.
"""
# TODO: callback for downloading status
# TODO: cache
# TODO: cache results locally first to lower costs
# TODO: get ID from query status
ret = []
for file in url:
filename = urlparse(file).path.rpartition("/")[-1]
with _get_temporary_file_for_writing(
prefix="twixl-query-results-", suffix=f"-{filename}.gz"
) as stream:
with requests.get(file, stream=True) as r:
for chunk in r.iter_content(chunk_size=8192): # type: ignore
stream.write(chunk)
ret.append(Path(stream.name))
# with _get_temporary_file_for_writing(
# prefix="twixl-query-results-", suffix=f"-{filename}.gz"
# ) as stream, GzipFile(
# fileobj=stream, mode="wb", compresslevel=1
# ) as compressed_stream:
# with requests.get(file, stream=True) as r:
# for chunk in r.iter_content(chunk_size=8192): # type: ignore
# compressed_stream.write(chunk)
# ret.append(Path(stream.name))
print(file)
return ret
def _get_temporary_file_for_writing(
prefix: Optional[str] = None,
suffix: Optional[str] = None,
mode: str = "wb",
delete: bool = False,
dir: Optional[Path] = None,
):
"""Return a temporary file for writing to. Will either be created in a user-specified scratch directory (if provided
through the relevant environment variable), or created directly as a temporary file through Python's tempfile."""
# TODO: provide protocol for write() and .name
try:
if dir is None:
scratch_directory = Path(
os.environ[_SCRATCH_DIRECTORY_ENVIRONMENT_VARIABLE_KEY]
)
else:
scratch_directory = dir
scratch_directory.mkdir(parents=True, exist_ok=True)
except KeyError:
scratch_directory = None
return tempfile.NamedTemporaryFile(
mode, prefix=prefix, suffix=suffix, dir=scratch_directory, delete=delete
)
def get_twixl_api() -> API:
"""Initialize a twixl api object.
:rtype: Initialized twixl API object
"""
try:
return API(
api_endpoint=os.environ["TWIXL_API_ENDPOINT"],
api_key=os.environ["TWIXL_API_KEY"],
)
except KeyError as e:
raise ValueError(
f"environment variable {e.args[0]} required for initializing TwiXL API"
)
[docs]def search(
query: Query,
start_time: Optional[datetime.datetime] = None,
end_time: Optional[datetime.datetime] = None,
max_results: Optional[int] = None,
api: Optional[API] = None,
callback: Optional[Callable[[QueryStatus], Any]] = None,
) -> SearchResults:
"""Sends a TwiXL search query.
:param query: A TwiXL query object that describes the search query.
:param start_time: The oldest UTC timestamp from which the Tweets will be provided.
:param end_time: The newest UTC timestamp from which the Tweets will be provided.
:param max_results (Optional): The maximum number of search results in query output.
By default, all search results will be stored in the query output.
:param api (Optional): An initialized twixl Api object. If not provided,
an object will be initialized.
:param callback (Optional): Callback function taking a QueryStatus object.
:return: :class:`SearchReuslts <SearchResults>` object
:rtype: twitter.SearchResults
Usage::
>>> from twixl.collections import twitter
>>> query = (
>>> twitter.Query().
>>> .keywords(regex=['twitter', 'tweet'])
>>> .urls(url_regex: ["twitter.com/twitter*"])
>>> )
>>> search_results = twitter.search(
>>> query=query,
>>> start_time=datetime.datetime(2022, 1, 1, 0, 0),
>>> end_time=datetime.datetime(2022, 1, 1, 23, 59, 59),
>>> max_results=100,
>>> callback=twitter.print_callback
>>> )
Query status: RUNNING (0 Bytes scanned)
Query status: DOWNLOADING_RESULTS (1 GB scanned)
>>> SearchResults
SearchResults(...)
"""
if api is None:
api = get_twixl_api()
return api.get_search(
query=query,
start_time=start_time,
end_time=end_time,
max_results=max_results,
callback=callback,
)
# def word_frequency(
# date: datetime.datetime,
# min_length_words: int = 1,
# max_results: Optional[int] = None,
# frequency_limit: Optional[int] = None,
# api: Optional[API] = None,
# callback: Optional[Callable[[QueryStatus], Any]] = None,
# ) -> WordFrequencyResults:
# """Sends a word_frequency request to the TwiXL API for the specified date.
# :param date: The date for which the word-frequency should be returned.
# :param min_length_words: The minimum word length of the words in the word-frequency list.
# The default minimum word length is '1'.
# :param max_results (Optional): The maximum number of search results in query output.
# By default, all search results will be stored in the query output.
# :param frequency_limit (Optional): The minimum occurence rate of the words
# in the word-frequency list. The default limit is '1'.
# :param api (Optional): An initialized twixl Api object. If not provided, an object will be initialized.
# :return: :class:`WordFrequencyResults <WordFrequencyResults>` object
# :rtype: twitter.WordFrequencyResults
# Usage::
# >>> from twixl.collections import twitter
# >>> word_frequencies = twitter.word_frequency(
# >>> date=datetime.datetime(2022, 1, 1),
# >>> min_length_words=2,
# >>> callback=twitter.print_callback
# >>> )
# Query status: RUNNING (0 Bytes scanned)
# Query status: DOWNLOADING_RESULTS (1.0 GB scanned)
# >>> word_frequencies.to_pandas()
# word frequency
# 0 example 100
# 1 words 50
# """
# if api is None:
# api = get_twixl_api()
# return api.get_word_frequency(
# date=date,
# min_length_words=min_length_words,
# max_results=max_results,
# frequency_limit=frequency_limit,
# callback=callback,
# )
def frequencies(series: pd.Series, mapper: Optional[Callable] = None) -> pd.Series:
"""
Calculate frequencies for a series where each item in the series is a list of items.
Each list item can be optionally mapped with a mapping function.
"""
def identity(x):
return x
mapper = mapper or identity
return series.explode().dropna().map(mapper).dropna().value_counts()
def hashtag_frequencies(df: pd.DataFrame) -> pd.Series:
"""
Return a series with counts for each hashtag in the tweets data frame.
"""
return frequencies(df["hashtags"])
def url_frequencies(df: pd.DataFrame) -> pd.Series:
"""
Return a series with counts for each URL in the tweets data frame.
"""
return frequencies(df["urls"])
def domain_frequencies(df: pd.DataFrame) -> pd.Series:
"""
Return a series with counts for each URL's domain in the tweets data frame.
Domains will be extracted with tldextract.
"""
return frequencies(df["urls"], lambda url: tldextract.extract(url).domain)
def country_code_frequencies(df: pd.DataFrame) -> pd.Series:
"""
Return a series with counts for each URL's country code in the tweets data frame.
Country codes will be extracted with tldextract.
"""
return frequencies(df["urls"], lambda url: tldextract.extract(url).suffix)
def word_frequencies(
tweets: pd.DataFrame, tokenizer: Optional[Callable] = None
) -> pd.Series:
"""
Return a series with counts for each word in the tweets data frame.
Tokenization will be based on NLTK's TweetTokenizer if no tokenizer is specified.
"""
tokenizer = tokenizer or TweetTokenizer().tokenize
def tokenize(text: str) -> List[str]:
tokens = tokenizer(text) # type: ignore
return [token.lower() for token in tokens if token not in string.punctuation]
return frequencies(tweets["text"].map(tokenize))
def print_callback(query_status: QueryStatus) -> None:
"""Callback that prints the status of a query to standard out.
:param query_status: The query status.
"""
print(
f"Query status: {query_status.state.value} ({humanize.naturalsize(query_status.data_scanned_bytes)} scanned)"
)