-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdocbot.py
More file actions
executable file
·344 lines (313 loc) · 13.7 KB
/
docbot.py
File metadata and controls
executable file
·344 lines (313 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/env python
""""
[SyntaxBot] Python documentation bot for Reddit by /u/num8lock
Desc: reddit module
version: v.0.2
git:
Notes: praw4 (see requirements.txt)
[Heroku] uses env variables for all authentication credentials
Acknowledgment:
Codes based on many reddit bot creators /u/redditdev
and helps from /r/learnpython.
Thanks to:
- u/w1282 for reminding what function in programming function means
- u/bboe for authoring praw and making it easier to use reddit API
"""
import os
import re
import time
from datetime import datetime
import logging, logging.config
import praw
import ast
from sqlalchemy import create_engine, exists
from sqlalchemy.orm import sessionmaker
from docdb import Library as libdb
from docdb import RedditActivity as reddb
''' CONFIGS '''
# Retrieve (Heroku) env private variables
ua = useragent = 'Python Syntax help bot for reddit v.0.1 (by /u/num8lock)'
appid = os.getenv('syntaxbot_app_id')
secret = os.getenv('syntaxbot_app_secret')
botlogin = os.getenv('syntaxbot_username')
passwd = os.getenv('syntaxbot_password')
db_config = os.getenv('DATABASE_URL')
# Reddit related variables
botcommand = 'SyntaxBot!'
sub_name = 'bottest'
version3 = 352
version2 = 2712
default_version = version3
# regex pattern for capturing user commands. Need to have everything
# captured between the identifiers
pattern = re.compile(r"""
(?P<bot>Doc|DocBot|SyntaxBot[!\s])
(?P<command>([-]?)|(find|get|lookup|search)|\s)
(?P<query>['`\"\(-]?.*\b|$)""", re.I | re.X)
non_syntax = re.compile(r'''[\(\)\{\}\?!`\'\";\\\|+,:\s]''')
def mark_as_replied(comment, reply):
"""http://stackoverflow.com/a/1830499/6882768
# get docbot replied posts firsts
= comment.id
query_datetime = datetime.utcfromtimestamp(comment.created_utc)
# got this from (vars(submission.comments))
_post = submission.comments._comments_by_id[comment.parent_id]
comment_id = _post.id
username = _post.author.__dict__['name']
query_datetime = datetime.utcfromtimestamp(_post.created_utc)
# keyword
_q_re = re.search(pattern, _post.body)
query_keyword = ''.join(str(_q_re.group(4))).replace('\(\)', '')
permalink = 'https//reddit.com/r/{0}/comments/{1}'.format(
sub_name, comment_id)
replied_list = {
'comment_id' : comment_id,
'username': username,
'query_keyword': query_keyword,
'query_version': '',
'query_topic': '',
'query_datetime': query_datetime,
'permalink': permalink,
'replied': replied,
'replied_datetime': replied_datetime
}
log.debug('Replied to: {}: {} \n{}'.format(vars(_post.author),
datetime.utcfromtimestamp(_post.created_utc),
replied_list))"""
''' double check own profile for replies listed '''
# check_profile(comment)
pass
def valid_query(comment):
"""Searching valid formatted command in comment, if found, strip non query
parts in comment.body. Return False or full query string(s)"""
find_query = re.search(pattern, comment.body)
if not find_query:
log.debug('Error: cannot find query in %s', comment)
log.debug('\n{}\n>> {}\n>> {}\n{}'.format(
'-'*80, comment.body, comment.__dict__, '-'*80)
)
return False
_query = '{0}{1}'.format( find_query.group('command'),
find_query.group('query') )
log.debug('Valid query: %s', find_query.groupdict())
return _query
def check_replied(comment):
"""check if comment is already listed as replied in database"""
# change column replied to replied_id for naming comprehension
comment_result = session.query(reddb.comment_id, reddb.replied).filter(
reddb.comment_id == comment.id).first()
if comment_result:
comment.save(category='comment_replied')
log.debug('Replied: [%s] %s', comment_result[0], comment_result[1])
return True
else: return False
def check_mentions():
"""Bots can and should monitor https://www.reddit.com/message/mentions.json
rather than polling/scraping every comment, whenever possible.
You can also monitor /api/v1/me and check the has_mail attribute to see if
you need to look up mentions.json"""
log.info('Checking /u/SyntaxBot mentions...')
# check_replied(mentions)
pass
def check_pm():
""" Check Private Messages for queries """
log.info('Checking PMs...')
me = r.user.me()
inbox = r.inbox
# log.debug('%s \n %s', dir(me.comments.validate_time_filter), dir(inbox.unread))
# todays_comment = me.comments.validate_time_filter(today)
log.debug(inbox.unread)
# check_replied(pm)
pass
def parse(query):
"""Get query definitions from libdb database"""
# see docs/library/functions.html?highlight=filter#filter
list_query = [*filter(None, re.split(non_syntax, query.strip()))]
log.info('Start parsing: %s', list_query)
_d = _definition = {'version': 'version_id', 'search': 'keyword',
'module': 'module'}
query_types = {
'v': _d['version'], 'version': _d['version'],
'm': _d['module'], 'module': _d['module'],
'f': _d['search'], 'find': _d['search'], 's': _d['search'],
'search': _d['search'], 'get': _d['search'], 'lookup': _d['search']
}
queries = {}
for i, arg in enumerate(list_query):
log.debug('parsing: i: %s, %s', i, arg)
qkeyword = list_query[i + 1]
queries[query_types[arg.strip('-')]] = qkeyword
log.debug('parsing: appended arg to queries: {%s: %s}',
query_types[arg.strip('-')], qkeyword)
log.debug('parsing: pop next arg: %s', list_query[i + 1])
list_query.pop(i + 1)
log.debug('parsing: queries dict result: %s', queries)
# http://stackoverflow.com/a/14516917/6882768
try:
log.debug('parsing: Set up version_id in: %s', queries['version_id'])
queries['version_id'] = queries['version_id'].replace('.', '')
if queries['version_id'].replace('.', '').isdigit() \
and queries['version_id'].startswith('2', 0, 1):
queries['version_id'] = version2
else:
queries['version_id'] = default_version
log.debug('parsing: version_id: stripped? %s', queries['version_id'])
except KeyError as err:
log.error('parsing: No version defined in %s, %s missing', queries, err)
queries['version_id'] = default_version
log.info("Parsed: %s", queries)
# DB queries
main_keyword = queries['keyword']
main_ver = queries['version_id']
option = queries['keyword'].rsplit('.', maxsplit=1)[0]
# Data needed to process a reply:
columns = [ libdb.id, libdb.version_id, libdb.module, libdb.keytype,
libdb.keyclass, libdb.keywords, libdb.header, libdb.body,
libdb.footer, libdb.url ]
log.info('> Query check: Version: `%s`. Keyword: `%s`, Module (opt): `%s`',
main_ver, main_keyword, option)
# check if keyword exists, from sqlalchemy creator
# http://techspot.zzzeek.org/2008/09/09/selecting-booleans/
main_check = session.query(exists().where(
libdb.keywords.contains(main_keyword)).where(
libdb.version_id == main_ver)).scalar()
opt_check = session.query(exists().where(libdb.module == option)
.where(libdb.version_id == main_ver)).scalar()
log.debug('> Exists? main_query: %s. opt_query: %s.', main_check, opt_check)
if main_check:
# group by module hopefully = better search (motherbot:build_definitions)
log.info('Starting main_query')
main_query = session.query(*columns).filter(
(libdb.keywords == main_keyword) &
(libdb.version_id == main_ver)
).group_by(libdb.module, libdb.id).order_by(libdb.id).first()
log.info('Library returns (main_query): id: %s, vers: %s, keyword: %s',
main_query.id, main_query.version_id, main_query.keywords)
return main_query, 'main'
elif opt_check:
# idkwhy exact word module w/ grouping keyword resulted to first keyword
log.info('opt_query: Query Library.module instead of Library.keyword')
opt_query = session.query(*columns).filter(
(libdb.module == option) & (libdb.version_id == main_ver)
).group_by(libdb.keywords, libdb.id).order_by(libdb.id).first()
log.info('Library returns (opt_query): id: %s, vers: %s, module: %s',
opt_query.id, opt_query.version_id, opt_query.module)
return opt_query, 'option'
else:
log.error('Nothing found for query %s', query)
return None
def format_response(data, comment):
"""Format bot reply to user query"""
permalink = comment.permalink(fast=True)
if data is None:
# for `not found` reply
msg_template = 'SyntaxBot --find {0} --version 3'.format('print')
# stolen from RemindMeBot
pm_link = \
'https://reddit.com/message/compose/?to={0}&subject={1}&message={2}' \
''.format('SyntaxBot', 'print', msg_template)
readme_link = 'https://www.reddit.com/r/SyntaxBot/'
standard_footer = '-----\n`>>>` [README]({0}) | ' \
'`>>>` [Try get it from PM!]({1})'.format(readme_link, pm_link)
query = valid_query(comment)
botreply = """I'm sorry, {0}, I can't seem to find what you're asking
here [`{1}`]({2}). Maybe a typo? Be sure to check the FAQ in the
Readme link. \n\nYou can PM me to try another request, too. Thanks!
\n{3}""".format(comment.author, query, permalink, standard_footer)
else:
reply_data = data[0]
reply_type = data[1]
# query easy to find, confident we got the right definition
if reply_type == 'main':
botreply = """{0} \n {1} \n {2}""".format(
reply_data.header, reply_data.body, reply_data.footer)
elif reply_type == 'option':
botreply = """I can't find the exact match of your resquest, here's
what I can find: \n\n{0} \n {1} \n {2}""".format(
reply_data.header, reply_data.body, reply_data.footer)
return botreply
def reply(comment):
"""Reply user comment"""
# double check? to make sure current comment is not replied as there's a
# slight chance marked_as_replied not yet finished
# checked = check_replied(comment)
# if checked:
# log.debug('Ignoring, %s already replied at %s', comment.id, checked)
# return
##
response_data = parse(valid_query(comment))
log.info('Replying...{} \n{}'.format( {comment.id:
[datetime.utcfromtimestamp(comment.created_utc), comment.author.name ]},
response_data ))
# pass comment too for logging purpose
bot_response = format_response(response_data, comment)
log.debug('Reply message: %s', bot_response)
# comment.reply(bot_response)
# comment.save(category='comment_replied')
# mark_as_replied(comment)
# pass
def search(subreddit, keyword, limit):
''' Search for the queries in the sub using reddit search, time filtered
'''
search_result = r.subreddit(subreddit).search('{0}'.format(
keyword), time_filter='month')
log.debug('Search result: {}'.format(search_result.__dict__))
if search_result is None:
log.info('No matching result.')
return None
for thread in search_result:
''' get OP thread / submission to iterate comment/replies '''
log.debug('Iterating threads in search result : %s', thread)
# iterate every comments and their replies
submission = r.submission(id=thread)
submission.comments.replace_more(limit=0)
log.debug('Processing comment tree: {} [{}]: {}'.format(
submission, submission.author, submission.comments.list() ))
for comment in submission.comments.list():
# skip own & replied comment
if comment.author == botlogin:
log.info('Skipping own comment: %s', comment)
continue
elif check_replied(comment):
log.info('Skipping comment %s: replied', comment)
continue
# skip non-query comment
elif not valid_query(comment):
log.info('Skipping comment %s: no query found', comment)
continue
else:
reply(comment)
def whatsub_doc(subreddit, keyword):
"""Main bot activities & limit rate requests to oauth.reddit.com"""
log.info('Whatsub, doc?')
limit = 100
check_pm()
# check_mentions()
# search(subreddit, keyword, limit)
def login():
"""praw4 OAuth2 login procedure"""
''' praw4 only needs the first 3 for read-only mode '''
log.info('Logging started')
r = praw.Reddit(user_agent=ua, client_id=appid, client_secret=secret,
username=botlogin,
password=passwd
)
''' log connection '''
log.info('Connected. Starting bot activity')
return r
if __name__ == '__main__':
log = logging.getLogger(__name__)
logging.config.dictConfig(ast.literal_eval(os.getenv('LOG_CFG')))
engine = create_engine(db_config, echo=True)
Session = sessionmaker(bind=engine)
session = Session()
''' capture exceptions '''
try:
r = login()
whatsub_doc(sub_name, botcommand)
except ConnectionError as no_connection:
log.error(no_connection, exc_info=True)
time.sleep(100)
log.info('Reconnecting in 10secs...')
r = login()
whatsub_doc(sub_name, botcommand)