-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
264 lines (211 loc) · 9.53 KB
/
main.py
File metadata and controls
264 lines (211 loc) · 9.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
from typing import List, Dict, AnyStr
import logging
import re
import hashlib
from bs4 import BeautifulSoup
import asyncio
import aiohttp
import time
from mastoBot.configManager import ConfigAccessor
from mastoBot.mastoBot import MastoBot, handleMastodonExceptions
def generate_redis_key(input_string: AnyStr) -> AnyStr:
# Create a SHA-256 hash object
sha256_hash = hashlib.sha256()
# Convert the input string to bytes (required for hashing)
input_bytes = input_string.encode("utf-8")
# Update the hash object with the input bytes
sha256_hash.update(input_bytes)
# Get the hexadecimal representation of the hash (fixed-length)
hash_hex = sha256_hash.hexdigest()
return hash_hex
def toPascalCase(s: AnyStr) -> AnyStr:
parts = s.split(" ")
return parts[0].capitalize() + "".join(part.title() for part in parts[1:])
def shortenTopicUrl(url: AnyStr) -> AnyStr:
return re.sub(r"/t/[^/]+/", "/t/", url)
class MyBot(MastoBot):
@handleMastodonExceptions
def processMention(self, mention: Dict):
api_status = self.getStatus(mention.get("status"))
api_account = self.getAccount(mention.get("account"))
content = api_status.get("content")
# Check for report tag
report_pattern = r"(.*?)(?<!\S)\$report\b\s*(.*)</p>"
report_match = re.search(report_pattern, content)
# If report message
if report_match:
before_report = report_match.group(1).strip()
report_message = report_match.group(2).strip()
logging.info(f"⛔ \t Report message received: {report_message}")
template_data = {
"creator": api_account.get("acct"),
"reported_post_id": mention.get("status"),
"reported_post_url": api_status.get("url"),
"report_message": report_message,
}
try:
output = self.getTemplate("report.txt", template_data)
self._api.status_post(status=output, visibility="direct")
except Exception as e:
logging.critical("❗ \t Error posting status message")
raise e
else:
# Check boost and favourite configs
shouldReblog = self.shouldReblog(mention.get("status"))
shouldFavourite = self.shouldFavorite(mention.get("status"))
altTextTestPassed = self.altTextTestPassed(mention.get("status"), "boosts")
# Check boost
if shouldReblog:
try:
self.reblogStatus(mention.get("status"))
except Exception as e:
logging.warning(f"❗ \t Status could not be boosted")
logging.error(e)
elif not altTextTestPassed:
template_data = {"account": api_account.get("acct")}
try:
output = self.getTemplate("missing_alt_text.txt", template_data)
self._api.status_post(status=output, visibility="direct")
except Exception as e:
logging.critical("❗ \t Error sending missing-alt-text message")
raise e
# Check favourite
if shouldFavourite:
try:
self.favoriteStatus(mention.get("status"))
except Exception as e:
logging.warning(f"❗ \t Status could not be favourited")
logging.error(e)
logging.info(f"📬 \t Mention processed: {mention.get('id')}")
self.dismissNotification(mention.get("id"))
@handleMastodonExceptions
def processReblog(self, reblog: Dict):
self.dismissNotification(reblog.get("id"))
@handleMastodonExceptions
def processFavourite(self, favourite: Dict):
self.dismissNotification(favourite.get("id"))
@handleMastodonExceptions
def processFollow(self, follow: Dict):
# Get latest account from the Mastodon API
api_account = self.getAccount(follow.get("account"))
account = api_account.get("acct")
template_data = {"account": account}
# Generate the welcoming message from the template
try:
output = self.getTemplate("new_follow.txt", template_data)
self._api.status_post(status=output, visibility="direct")
except Exception as e:
logging.critical("❗ \t Error posting Status")
raise e
logging.info(f"📭 \t Follow processed: {follow.get('id')}")
self.dismissNotification(follow.get("id"))
@handleMastodonExceptions
def processPoll(self, poll: Dict):
self.dismissNotification(poll.get("id"))
@handleMastodonExceptions
def processFollowRequest(self, follow_request: Dict):
self.dismissNotification(follow_request.get("id"))
@handleMastodonExceptions
def processUpdate(self, update: Dict) -> None:
self.dismissNotification(update.get("id"))
async def fetchLatestPosts(self) -> List[AnyStr]:
posts: List[AnyStr] = list()
url = "https://discuss.python.org/latest"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
# Get the page soup
soup = BeautifulSoup(await response.text(), "html.parser")
# Get topics soup
topics_soup = soup.find_all("tr", class_="topic-list-item")
# Loop through topics
for topic_soup in topics_soup:
# Get URL
topic_url = topic_soup.find(
"a", class_="title raw-link raw-topic-link"
).get("href")
# Get page data
post_data = await self.getPostDataFromUrl(session, topic_url)
# If the record does not already exist
if not self.localStoreExists(
"python-discuss-post", post_data.get("id")
):
self.localStoreSet(
"pending-python-discuss-post",
post_data.get("id"),
post_data,
)
else:
self.localStoreSet(
"python-discuss-post", post_data.get("id"), post_data
)
else:
logging.warning(
"Failed to retrieve the page. Status Code:", response.status
)
async def getPostDataFromUrl(self, session, url: str) -> Dict:
async with session.get(url) as response:
if response.status == 200:
# Get the page soup
soup = BeautifulSoup(await response.text(), "html.parser")
# Get the topic soup
topic_soup = soup.find("div", {"id": "topic-title"})
# Get the topic attributes
topic_title = topic_soup.find("a").text.strip()
topic_category = topic_soup.find(
"span", class_="category-name"
).text.strip()
# Generate an ID from the shortened URL
shortened_link = shortenTopicUrl(url)
generated_id = generate_redis_key(shortened_link)
post = {
"id": generated_id,
"title": topic_title,
"url": shortened_link,
"topic_category": toPascalCase(topic_category),
}
return post
else:
logging.warning(
"Failed to retrieve the page. Status Code:", response.status
)
async def processPythonDiscussPendingPosts(self) -> None:
for record_id in self.r.scan_iter(match="pending-python-discuss-post:*"):
key, id = record_id.split(":")
record = self.localStoreGet(key, id)
output = self.getTemplate("discuss_post.txt", record)
try:
new_status = self._api.status_post(status=output, visibility="unlisted")
record.setdefault("status_url", new_status.get("url"))
record.setdefault("status_uri", new_status.get("uri"))
record.setdefault("status_id", new_status.get("id"))
logging.info(f"✨ \t New python-discuss posted: {new_status.get('url')}")
self.localStoreDelete(key, id)
self.localStoreSet("python-discuss-post", id, record)
except Exception as e:
logging.critical("❗ \t Error posting Status")
raise e
await asyncio.sleep(120)
if __name__ == "__main__":
config = ConfigAccessor("config.yml")
credentials = ConfigAccessor("credentials.yml")
bot = MyBot(credentials=credentials, config=config)
async def runBot():
while True:
logging.info("✅ \t Running bot")
await bot.run()
await asyncio.sleep(10)
async def runScraper():
while True:
logging.info("⛏️ \t Running scraper")
await bot.fetchLatestPosts()
await bot.processPythonDiscussPendingPosts()
await asyncio.sleep(120)
async def main():
await asyncio.gather(runBot(), runScraper())
while True:
try:
asyncio.run(main())
except:
time.sleep(10)
pass