Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 161 additions & 17 deletions app/services/scrapers/twitter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import asyncio
import traceback
from urllib.parse import urlparse
from typing import Dict, Optional, Any
from typing import Dict, List, Optional, Any, Tuple

import httpx
import jmespath
Expand Down Expand Up @@ -51,6 +51,8 @@ def __init__(
self.host = ""
self.headers = {}
self.params = {}
self.include_comments: bool = kwargs.get("include_comments", False)
self.article_tweet: bool = False

async def get_item(self) -> dict:
await self.get_twitter()
Expand Down Expand Up @@ -100,10 +102,12 @@ async def _rapidapi_get_response_tweet_data(self) -> Dict:

async def _api_client_get_response_tweet_data(self) -> Dict:
scraper = Scraper(
save=DEBUG_MODE,
save=False,
pbar=False,
debug=0,
cookies=TWITTER_COOKIES
)
tweet_data = await asyncio.to_thread(scraper.tweets_details, [int(self.tid)])
tweet_data = await asyncio.to_thread(scraper.tweets_details, [int(self.tid)], limit=1)
logger.debug(tweet_data)
return tweet_data[0]
Comment on lines 104 to 112
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, check the actual file to understand the context
head -150 app/services/scrapers/twitter/__init__.py | tail -60

Repository: aturret/FastFetchBot

Length of output: 2690


🏁 Script executed:

# Check if twitter-api-client-v2 is in the requirements
find . -name "requirements*.txt" -o -name "setup.py" -o -name "pyproject.toml" | xargs grep -l "twitter" 2>/dev/null

Repository: aturret/FastFetchBot

Length of output: 80


🏁 Script executed:

# Verify the actual Scraper usage and API in the codebase
rg "tweets_details" -A 2 -B 2 app/services/scrapers/twitter/

Repository: aturret/FastFetchBot

Length of output: 476


🏁 Script executed:

# Check for any imports of Scraper and how it's initialized elsewhere
rg "from.*Scraper|import.*Scraper" app/services/scrapers/twitter/

Repository: aturret/FastFetchBot

Length of output: 141


🏁 Script executed:

# Look at the TWITTER_COOKIES usage and validation
rg "TWITTER_COOKIES" app/services/scrapers/twitter/

Repository: aturret/FastFetchBot

Length of output: 250


🏁 Script executed:

# Check pyproject.toml for twitter library version
cat pyproject.toml | grep -A 20 "dependencies\|twitter"

Repository: aturret/FastFetchBot

Length of output: 1382


🏁 Script executed:

# Check the twitter-api-client documentation/code to verify Scraper API
# First check if we can see what parameters Scraper accepts
python << 'EOF'
try:
    from twitter.scraper import Scraper
    import inspect
    
    # Get __init__ signature
    print("=== Scraper.__init__ signature ===")
    sig = inspect.signature(Scraper.__init__)
    print(sig)
    print()
    
    # Get tweets_details signature
    print("=== Scraper.tweets_details signature ===")
    sig_tweets = inspect.signature(Scraper.tweets_details)
    print(sig_tweets)
    print()
    
    # Show docstring if available
    if Scraper.tweets_details.__doc__:
        print("=== tweets_details docstring ===")
        print(Scraper.tweets_details.__doc__)
except Exception as e:
    print(f"Error: {e}")
    print("Trying to check if twitter-api-client is installed...")
    import subprocess
    result = subprocess.run(['pip', 'list'], capture_output=True, text=True)
    if 'twitter' in result.stdout:
        print("Twitter library found in pip list:")
        for line in result.stdout.split('\n'):
            if 'twitter' in line.lower():
                print(line)
EOF

Repository: aturret/FastFetchBot

Length of output: 150


🏁 Script executed:

# Check the config file to verify TWITTER_COOKIES handling
cat app/config.py | head -50

Repository: aturret/FastFetchBot

Length of output: 1906


🏁 Script executed:

# Check for XSS issues in the HTML generation at lines 192-207
sed -n '192,207p' app/services/scrapers/twitter/__init__.py

Repository: aturret/FastFetchBot

Length of output: 977


🏁 Script executed:

# Check the entity map parsing and HTML generation (lines 262-306, 330-381)
sed -n '262,306p' app/services/scrapers/twitter/__init__.py

Repository: aturret/FastFetchBot

Length of output: 1973


🏁 Script executed:

# Check what wrap_text_into_html does
rg "def wrap_text_into_html" -A 10 app/

Repository: aturret/FastFetchBot

Length of output: 665


🏁 Script executed:

# Check the _apply_inline_formatting function
sed -n '330,381p' app/services/scrapers/twitter/__init__.py

Repository: aturret/FastFetchBot

Length of output: 1595


🏁 Script executed:

# Check for any documentation or tests about the API structure
find . -name "*.md" -o -name "*test*.py" | xargs grep -l "tweets_details\|entityMap" 2>/dev/null || echo "No test files found"

Repository: aturret/FastFetchBot

Length of output: 83


🏁 Script executed:

# Verify the Scraper import path - check if twitter-api-client-v2 uses a different module structure
python << 'EOF'
import sys
try:
    # Try different import paths
    print("Attempting imports...")
    try:
        from twitter.scraper import Scraper
        print("✓ from twitter.scraper import Scraper works")
    except ImportError as e:
        print(f"✗ from twitter.scraper import Scraper failed: {e}")
    
    try:
        import twitter_api_client_v2
        print(f"✓ twitter_api_client_v2 module found")
        print(f"  Module path: {twitter_api_client_v2.__file__}")
    except ImportError as e:
        print(f"✗ twitter_api_client_v2 not found: {e}")
    
    # Check pip list for the actual package
    import subprocess
    result = subprocess.run(['pip', 'show', 'twitter-api-client-v2'], capture_output=True, text=True)
    if result.returncode == 0:
        print("\n=== pip show twitter-api-client-v2 ===")
        print(result.stdout)
    else:
        print("twitter-api-client-v2 not installed")
        
except Exception as e:
    print(f"Error: {e}")
EOF

Repository: aturret/FastFetchBot

Length of output: 268


🏁 Script executed:

# Check if _find_article_media_url is defined
sed -n '321,327p' app/services/scrapers/twitter/__init__.py

Repository: aturret/FastFetchBot

Length of output: 387


🏁 Script executed:

# Look for any error handling in the scraper
rg "try:|except" app/services/scrapers/twitter/__init__.py -B 2 -A 2

Repository: aturret/FastFetchBot

Length of output: 511


🏁 Script executed:

# Check if there's any HTML escaping function used elsewhere in the codebase
rg "escape|sanitize|cgi.escape|html.escape" app/services/scrapers/

Repository: aturret/FastFetchBot

Length of output: 632


🏁 Script executed:

# Check what imports are at the top of the twitter scraper file
head -30 app/services/scrapers/twitter/__init__.py

Repository: aturret/FastFetchBot

Length of output: 900


🏁 Script executed:

# Verify if any HTML escaping is done on user_component or text before insertion
sed -n '155,210p' app/services/scrapers/twitter/__init__.py

Repository: aturret/FastFetchBot

Length of output: 2839


Escape user-provided and article data to prevent XSS vulnerabilities.

The code at lines 192-207 creates unescaped HTML by directly interpolating tweet['username'], tweet['name'], and display_text into HTML strings. The user_component link and subsequent text are passed to wrap_text_into_html(), which does not perform HTML escaping. Similarly, in _apply_inline_formatting() (line 369), URLs are inserted unescaped into href attributes using single quotes (href='{cur_link}'), and media URLs use single quotes (line 286: src='{media_url}').

If tweet['username'], tweet['name'], or any text field contains malicious HTML/JavaScript (e.g., <img onerror="...">), it will be rendered in the output. Additionally, if URLs contain single quotes, the HTML attribute will break and potentially allow injection.

Apply HTML escaping to all user-provided data before inserting into HTML. The codebase already has a pattern for this in the Instagram scraper: from html import escape and escape(self.text). Also use double quotes for HTML attributes or escape attribute values.

Additionally, ensure that JSON data keys in entityMap (line 266) and inline formatting ranges (lines 345-346, 355-358) are robustly validated before access, as missing fields will cause KeyErrors if the API structure varies.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/scrapers/twitter/__init__.py` around lines 104 - 112, Unescaped
user content and URLs are being injected into HTML in wrap_text_into_html and
_apply_inline_formatting (and when building media/img src and href attributes),
which allows XSS and can break attributes; fix by importing and applying
html.escape to all user-provided fields (e.g., tweet['username'], tweet['name'],
display_text) before interpolation, ensure attribute values are escaped (or use
double-quoted attributes and escape quotes), and update _apply_inline_formatting
to escape cur_link and media_url before placing into href/src; additionally
validate presence and types of keys in entityMap and inline formatting ranges
(check keys exist and ranges are integers and within text length) before
accessing to avoid KeyError/IndexError.


Expand All @@ -125,12 +129,24 @@ def _process_tweet_twitter135(self, tweet_data: Dict):
)
entries = entries_instruction['entries']
tweets = []
for i in entries:
if (
i["content"]["entryType"] == "TimelineTimelineItem"
and i["content"]["itemContent"]["itemType"] == "TimelineTweet"
):
tweets.append(i["content"]["itemContent"]["tweet_results"]["result"])
for entry in entries:
content = entry["content"]
entry_type = content.get("entryType", "")

if entry_type == "TimelineTimelineItem":
item_content = content.get("itemContent", {})
if item_content.get("itemType") == "TimelineTweet":
result = item_content.get("tweet_results", {}).get("result")
if result:
tweets.append(result)

elif entry_type == "TimelineTimelineModule" and self.include_comments:
for module_item in content.get("items", []):
item_content = module_item.get("item", {}).get("itemContent", {})
if item_content.get("itemType") == "TimelineTweet":
result = item_content.get("tweet_results", {}).get("result")
if result:
tweets.append(result)
for tweet in tweets:
if tweet["__typename"] == "TweetWithVisibilityResults":
tweet = tweet["tweet"]
Expand All @@ -140,12 +156,16 @@ def _process_tweet_twitter135(self, tweet_data: Dict):
self.text = self.text[:-1]
self.content += self.content_group
self.message_type = (
MessageType.LONG if get_html_text_length(self.text) > SHORT_LIMIT else MessageType.SHORT
MessageType.LONG if (get_html_text_length(self.text) > SHORT_LIMIT or self.article_tweet) else MessageType.SHORT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Line exceeds Black's default line length (88 chars).

This line is well over 88 characters. Black will reformat it. Consider wrapping proactively. As per coding guidelines, "Use Black formatter for all Python code formatting".

Suggested wrap
-            MessageType.LONG if (get_html_text_length(self.text) > SHORT_LIMIT or self.article_tweet) else MessageType.SHORT
+            MessageType.LONG
+            if (get_html_text_length(self.text) > SHORT_LIMIT or self.article_tweet)
+            else MessageType.SHORT
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/scrapers/twitter/__init__.py` at line 159, The expression
selecting MessageType currently exceeds Black's 88-char limit; break it into a
multi-line expression (or assign the condition to a short variable) so Black
won't reflow it. Locate the ternary using MessageType.LONG / MessageType.SHORT
and get_html_text_length(self.text) > SHORT_LIMIT or self.article_tweet, then
either wrap the condition in parentheses across lines or precompute a boolean
like is_long = get_html_text_length(self.text) > SHORT_LIMIT or
self.article_tweet and use MessageType.LONG if is_long else MessageType.SHORT to
satisfy formatting and preserve behavior.

)

def process_single_tweet_Twitter135(self, tweet: Dict, retweeted=False) -> None:
if tweet.get("tid") == self.tid:
self.title = f"{tweet['name']}'s Tweet"
if tweet.get("article") and tweet["article"].get("title"):
self.title = tweet["article"]["title"]
self.article_tweet = True
else:
self.title = f"{tweet['name']}'s Tweet"
self.author = tweet["name"]
self.author_url = f"https://twitter.com/{tweet['username']}"
self.date = tweet["date"]
Expand All @@ -163,15 +183,29 @@ def process_single_tweet_Twitter135(self, tweet: Dict, retweeted=False) -> None:

@staticmethod
def parse_single_tweet_Twitter135(tweet: Dict, retweeted=False) -> Dict:
text = tweet["full_text"] if tweet.get("full_text") else tweet["text"]
tweet_info = {
"media_files": [],
"text_group": "",
"content_group": "<hr>" if not retweeted else "<p>Quoted:</p>",
}
user_component = f"<a href='https://twitter.com/{tweet['username']}/status/{tweet['tid']}'>@{tweet['name']}</a>"
tweet_info["content_group"] += wrap_text_into_html(f"{user_component}: {text}")
tweet_info["text_group"] += f"{user_component}: {text}\n"

if tweet.get("article"):
article = tweet["article"]
article_title = article.get("title", "")
display_text = article_title if article_title else (
tweet["full_text"] if tweet.get("full_text") else tweet["text"]
)
tweet_info["content_group"] += wrap_text_into_html(f"{user_component}: {display_text}")
tweet_info["text_group"] += f"{user_component}: {display_text}\n"
article_html, article_media = Twitter.parse_article_content(article)
tweet_info["content_group"] += article_html
tweet_info["media_files"] += article_media
else:
text = tweet["full_text"] if tweet.get("full_text") else tweet["text"]
tweet_info["content_group"] += wrap_text_into_html(f"{user_component}: {text}")
tweet_info["text_group"] += f"{user_component}: {text}\n"
Comment on lines +192 to +207
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

User-controlled text is interpolated into HTML without escaping.

Tweet text (display_text, text) and user names are inserted directly into HTML strings. If any of these contain characters like <, >, ', or &, the output HTML will be malformed or could introduce XSS if rendered in a browser context. Consider escaping text content with html.escape() before interpolation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/scrapers/twitter/__init__.py` around lines 192 - 207, The code
interpolates user-controlled strings (user_component, display_text, text)
directly into HTML via wrap_text_into_html and string concatenation, causing
potential XSS/malformed HTML; fix by escaping these values before interpolation
(e.g., use html.escape() on user_component and on display_text/text) or update
wrap_text_into_html to perform escaping internally, and ensure any HTML returned
from Twitter.parse_article_content (article_html) is explicitly
sanitized/treated as already-safe before appending to
tweet_info["content_group"] to avoid double-escaping or unsafe insertion.


if tweet["media"]:
for media in tweet["media"]:
if media["type"] == "photo":
Expand Down Expand Up @@ -209,13 +243,14 @@ def parse_tweet_data_Twitter135(data: Dict) -> Dict:
result = jmespath.search(
"""{
tid: rest_id,
name: core.user_results.result.legacy.name,
username: core.user_results.result.legacy.screen_name,
name: core.user_results.result.core.name || core.user_results.result.legacy.name,
username: core.user_results.result.core.screen_name || core.user_results.result.legacy.screen_name,
date: legacy.created_at,
full_text: note_tweet.note_tweet_results.result.text,
text: legacy.full_text,
media: legacy.extended_entities.media,
quoted_tweet: quoted_status_result.result
quoted_tweet: quoted_status_result.result,
article: article.article_results.result
}""",
data,
)
Expand All @@ -224,6 +259,52 @@ def parse_tweet_data_Twitter135(data: Dict) -> Dict:
def _process_tweet_Twitter154(self, tweet_data: Dict):
pass

@staticmethod
def parse_article_content(article: Dict) -> Tuple[str, List[MediaFile]]:
content_state = article.get("content_state", {})
blocks = content_state.get("blocks", [])
entity_map_list = content_state.get("entityMap", [])

entity_lookup = {}
for entry in entity_map_list:
entity_lookup[str(entry["key"])] = entry["value"]

html_parts = []
media_files = []

for block in blocks:
block_type = block.get("type", "unstyled")
text = block.get("text", "")
inline_style_ranges = block.get("inlineStyleRanges", [])
entity_ranges = block.get("entityRanges", [])

if block_type == "atomic":
for er in entity_ranges:
entity = entity_lookup.get(str(er["key"]))
if entity and entity.get("type") == "MEDIA":
for media_item in entity.get("data", {}).get("mediaItems", []):
media_id = media_item.get("mediaId", "")
media_url = _find_article_media_url(article, media_id)
if media_url:
html_parts.append(f"<img src='{media_url}'/>")
Comment on lines +288 to +289
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Single-quoted HTML attributes are fragile with URLs containing '.

Lines 289 and 377 use single quotes for src and href attributes. If a media URL or link URL contains a single quote, the HTML will break. Prefer double quotes or escape the value.

-                                html_parts.append(f"<img src='{media_url}'/>")
+                                html_parts.append(f'<img src="{media_url}"/>')
-            segment = f"<a href='{cur_link}'>{segment}</a>"
+            segment = f'<a href="{cur_link}">{segment}</a>'

This same pattern also exists on lines 212–215 and 191 for existing code, but since the article parsing is new, it's worth getting right here.

Also applies to: 373-377

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/services/scrapers/twitter/__init__.py` around lines 288 - 289, The HTML
attribute strings built with html_parts.append currently use single quotes
around src/href (e.g., html_parts.append(f"<img src='{media_url}'/>") and
similar snippets), which will break if URLs contain a single quote; update those
append calls to either wrap attributes in double quotes or properly
escape/HTML-encode the media_url/link_url values before interpolation. Locate
all occurrences where html_parts.append builds "<img ... src=...>" or "<a ...
href=...>" (including the new article parsing instances and the existing spots
around the other noted occurrences) and change them to use double-quoted
attributes or escape the inserted URLs with an HTML-escaping helper to ensure
robust output.

media_files.append(MediaFile(
media_type="image",
url=media_url,
caption="",
))
continue

styled_text = _apply_inline_formatting(
text, inline_style_ranges, entity_ranges, entity_lookup
)

if block_type == "header-two":
html_parts.append(f"<h2>{styled_text}</h2>")
else:
html_parts.append(f"<p>{styled_text}</p>")

return "".join(html_parts), media_files

def _get_request_headers(self):
self.host = SCRAPER_INFO[self.scraper]["host"]
self.headers = {
Expand All @@ -235,3 +316,66 @@ def _get_request_headers(self):
self.params = {
SCRAPER_INFO[self.scraper]["params"]: self.tid,
}


def _find_article_media_url(article: Dict, media_id: str) -> str:
for entity in article.get("media_entities", []):
if str(entity.get("media_id")) == str(media_id):
media_info = entity.get("media_info", {})
url = media_info.get("original_img_url", "")
return url
return ""


def _apply_inline_formatting(
text: str,
style_ranges: List[Dict],
entity_ranges: List[Dict],
entity_lookup: Dict,
) -> str:
if not text or (not style_ranges and not entity_ranges):
return text

n = len(text)
bold = [False] * n
italic = [False] * n
link_url = [None] * n

for sr in style_ranges:
start = sr["offset"]
end = start + sr["length"]
for i in range(start, min(end, n)):
if sr["style"] == "Bold":
bold[i] = True
elif sr["style"] == "Italic":
italic[i] = True

for er in entity_ranges:
entity = entity_lookup.get(str(er["key"]))
if entity and entity.get("type") == "LINK":
url = entity.get("data", {}).get("url", "")
start = er["offset"]
end = start + er["length"]
for i in range(start, min(end, n)):
link_url[i] = url

result = []
i = 0
while i < n:
cur_bold = bold[i]
cur_italic = italic[i]
cur_link = link_url[i]
j = i
while j < n and bold[j] == cur_bold and italic[j] == cur_italic and link_url[j] == cur_link:
j += 1
segment = text[i:j]
if cur_bold:
segment = f"<b>{segment}</b>"
if cur_italic:
segment = f"<i>{segment}</i>"
if cur_link:
segment = f"<a href='{cur_link}'>{segment}</a>"
result.append(segment)
i = j

return "".join(result)
Loading