Below is the code for DYKUpdateBot. The bot runs on WP:Pywikibot.
import os
import pathlib
import pywikibot
import mwparserfromhell
import html
from datetime import datetime, timedelta, timezone
from functools import partial
from re import search
class DYKUpdateBot():
TDYK_LOC = 'Template:Did you know'
NEXT_UPDATE_QUEUE_LOC = 'Template:Did you know/Queue/Next'
LAST_UPDATE_TIME_LOC = 'Template:Did you know/Next update/Time'
TIME_BETWEEN_UPDATES_LOC = 'User:DYKUpdateBot/Time Between Updates'
QUEUE_ROOT_LOC = 'Template:Did you know/Queue/'
WTDYK_LOC = 'Wikipedia talk:Did you know'
ARCHIVE_LOC = 'Wikipedia:Recent additions'
ERROR_OUTPUT_LOC = 'User:DYKUpdateBot/Errors'
DRIFT_LOC = 'User:DYKUpdateBot/ResyncDrift'
SECONDS_BETWEEN_STATUS_CHECKS = 600
NUM_QUEUES = 7
def run(self) -> None:
DYKUpdateBotUtils.log('PID: {0}'.format(os.getpid()))
while self._is_on():
DYKUpdateBotUtils.log(datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z'))
if not pywikibot.Site().logged_in():
pywikibot.Site().login()
if not pywikibot.Site().logged_in():
break
results = ValidationResults()
seconds_until_next_update = DYKUpdateBot.SECONDS_BETWEEN_STATUS_CHECKS # placeholder
time_next_update, time_next_update_leaving = self._calculate_next_update_time(results.rgstr_errors)
if not results.rgstr_errors:
time_now = pywikibot.Site().server_time().replace(tzinfo=timezone.utc)
seconds_until_next_update = int((time_next_update - time_now).total_seconds())
DYKUpdateBotUtils.log('Seconds left until next update: {0}'.format(seconds_until_next_update))
if seconds_until_next_update < 7200:
self.validate_before_update(results, time_next_update_leaving)
if seconds_until_next_update <= 0:
results.timedelta_between_updates = time_next_update_leaving - time_next_update
self.update_dyk(time_now, results)
self._post_errors(results.rgstr_warnings, results.rgstr_errors)
results = None
seconds_to_sleep = DYKUpdateBot.SECONDS_BETWEEN_STATUS_CHECKS
if seconds_until_next_update > 0:
seconds_to_sleep = min(seconds_to_sleep, seconds_until_next_update)
pywikibot.sleep(seconds_to_sleep)
DYKUpdateBotUtils.log('Exiting...')
def _calculate_next_update_time(self, rgstr_errors) -> (pywikibot.Timestamp, pywikibot.Timestamp):
page_last_update_time = pywikibot.Page(pywikibot.Site(), DYKUpdateBot.LAST_UPDATE_TIME_LOC)
time_next_update = datetime.now(timezone.utc) # placeholder
try:
time_next_update = pywikibot.Timestamp.fromISOformat(page_last_update_time.text.strip()).replace(tzinfo=timezone.utc)
except:
self._log_error(rgstr_errors, 'Time at [[' + DYKUpdateBot.LAST_UPDATE_TIME_LOC +
']] is not formatted correctly')
return time_next_update, time_next_update
page_time_between_updates = pywikibot.Page(pywikibot.Site(), DYKUpdateBot.TIME_BETWEEN_UPDATES_LOC)
seconds_between_updates = 0 # placeholder
try:
seconds_between_updates = int(page_time_between_updates.text)
except ValueError:
self._log_error(rgstr_errors, 'Time between updates at [[' + DYKUpdateBot.TIME_BETWEEN_UPDATES_LOC +
']] is not formatted correctly')
return time_next_update, time_next_update
time_next_update = time_next_update + timedelta(seconds=seconds_between_updates)
return time_next_update, time_next_update + timedelta(seconds=seconds_between_updates)
# Returns:
# * Int of the next queue number, parsed from NEXT_UPDATE_QUEUE_LOC
# * 0 if NEXT_UPDATE_QUEUE_LOC doesn't parse to an int
def _find_next_queue_number(self) -> int:
page = pywikibot.Page(pywikibot.Site(), DYKUpdateBot.NEXT_UPDATE_QUEUE_LOC)
num_next_queue = 0
try:
num_next_queue = int(page.text)
except ValueError:
pass
return num_next_queue
def validate_before_update(self, results_val, time_set_leaving):
# figure out which queue to update from
results_val.num_queue = self._find_next_queue_number()
if results_val.num_queue == 0:
self._log_error(results_val.rgstr_errors, 'Could not parse [[{0}]]; check if it\'s a number 1-{1}'
.format(DYKUpdateBot.NEXT_UPDATE_QUEUE_LOC, DYKUpdateBot.NUM_QUEUES))
return results_val
# get the wikitext of the queue
results_val.page_queue = pywikibot.Page(pywikibot.Site(), DYKUpdateBot.QUEUE_ROOT_LOC + str(results_val.num_queue))
str_queue = results_val.page_queue.text
str_link_to_queue = DYKUpdateBotUtils.wikilink_to_queue(results_val.num_queue, True)
# make sure all curly braces are matched
if str_queue.count('{{') != str_queue.count('}}'):
self._log_error(results_val.rgstr_errors, 'Unmatched left <nowiki>("{{") and right ("}}")</nowiki> curly braces in ' + str_link_to_queue)
return results_val
# make sure the queue has {{DYKbotdo}}
has_dykbotdo, results_val.str_dykbotdo_signature = DYKUpdateBotUtils.parse_dykbotdo(str_queue)
if not has_dykbotdo:
self._post_almost_late_message_to_WTDYK(time_set_leaving, results_val.num_queue)
self._log_error(results_val.rgstr_errors, str_link_to_queue + ' is not tagged with {{tl|DYKbotdo}}')
return results_val
# make sure the queue has <!--Hooks--> and <!--HooksEnd--> and find hooks
results_val.hooks_incoming = DYKUpdateBotUtils.extract_hooks(str_queue)
if results_val.hooks_incoming is None:
self._log_error(results_val.rgstr_errors, str_link_to_queue + ' is missing a <nowiki><!--Hooks--> or <!--HooksEnd--></nowiki>')
return results_val
# make sure the image/file is protected
results_val.file_incoming = DYKUpdateBotUtils.find_file(results_val.hooks_incoming)
if results_val.file_incoming:
str_protection_error = DYKUpdateBotUtils.check_if_protected(results_val.file_incoming, time_set_leaving)
if str_protection_error:
self._log_error(results_val.rgstr_errors, str_protection_error)
else:
self._log_warning(results_val.rgstr_warnings, 'Can\'t find the image / file for incoming DYK set\n')
# fetch T:DYK
results_val.page_TDYK = pywikibot.Page(pywikibot.Site(), DYKUpdateBot.TDYK_LOC)
str_tdyk = results_val.page_TDYK.text
# make sure T:DYK has <!--Hooks--> and <!--HooksEnd--> and find hooks
results_val.hooks_outgoing = DYKUpdateBotUtils.extract_hooks(str_tdyk)
if results_val.hooks_outgoing is None:
self._log_error(results_val.rgstr_errors, '[[' + DYKUpdateBot.TDYK_LOC + ']] is missing a <nowiki><!--Hooks--> or <!--HooksEnd--></nowiki>')
return results_val
return results_val
def update_dyk(self, time_update, results) -> None:
if results.rgstr_errors:
return
str_link_to_queue = DYKUpdateBotUtils.wikilink_to_queue(results.num_queue, False)
# replace old hooks with new hooks
results.page_TDYK.text = results.page_TDYK.text.replace(results.hooks_outgoing, results.hooks_incoming)
self._edit(results.page_TDYK, 'Bot automatically updating DYK template with hooks copied from ' + str_link_to_queue)
# purge the Main Page
pywikibot.Page(pywikibot.Site(), 'Main Page').purge()
# set last update time
time_update = time_update.replace(second=0, microsecond=0)
num_minutes_drift = self._calculate_drift(time_update, results.timedelta_between_updates)
time_update_with_drift = time_update + timedelta(minutes=num_minutes_drift)
page_last_update_time = pywikibot.Page(pywikibot.Site(), DYKUpdateBot.LAST_UPDATE_TIME_LOC)
page_last_update_time.text = time_update_with_drift.isoformat()
self._edit(page_last_update_time, 'Resetting the clock' + (', with drift' if num_minutes_drift != 0 else ''))
# archive outgoing hooks
page_archive = pywikibot.Page(pywikibot.Site(), DYKUpdateBot.ARCHIVE_LOC)
page_archive.text = DYKUpdateBotUtils.archive(page_archive.text, time_update, results.hooks_outgoing)
self._edit(page_archive, 'Archiving latest set')
# credits - article talk, user talk
rgcredits = self._parse_and_populate_credits(results.page_queue, results.hooks_incoming, results.file_incoming, results.rgstr_warnings)
self._tag_articles(rgcredits, time_update)
self._give_user_credits(rgcredits, results.str_dykbotdo_signature)
# clear queue
results.page_queue.text = '{{User:DYKUpdateBot/REMOVE THIS LINE}}'
self._edit(results.page_queue, 'Update is done, removing the hooks')
# update next queue number
num_next_queue = (results.num_queue % DYKUpdateBot.NUM_QUEUES) + 1
page_next_queue_num = pywikibot.Page(pywikibot.Site(), DYKUpdateBot.NEXT_UPDATE_QUEUE_LOC)
page_next_queue_num.text = str(num_next_queue)
self._edit(page_next_queue_num, 'Next queue is ' + DYKUpdateBotUtils.wikilink_to_queue(num_next_queue, False))
# tag outgoing file
self._tag_outgoing_file(results.hooks_outgoing, time_update)
def _post_almost_late_message_to_WTDYK(self, time_set_leaving, num_next_queue) -> None:
str_timestamp = time_set_leaving.isoformat()
page_wtdyk = pywikibot.Page(pywikibot.Site(), DYKUpdateBot.WTDYK_LOC)
if str_timestamp in page_wtdyk.text:
return # bot already posted an "almost late" message for this update, don't post again
with open(str(pathlib.Path(__file__).parent / 'almostLate.txt'), 'r', encoding='utf-8') as f:
str_almost_late = f.read()
str_almost_late = str_almost_late.replace('queueNum', str(num_next_queue))
str_almost_late = str_almost_late.replace('hoursLeft', 'two hours')
str_almost_late = str_almost_late.replace('uniqueSetIdentifier', str_timestamp)
self._append_and_edit(DYKUpdateBot.WTDYK_LOC, str_almost_late, 'DYK is almost late')
def _calculate_drift(self, time_update, timedelta_between_updates) -> int:
num_max_advance_minutes = 0
num_max_delay_minutes = 0
page_drift = pywikibot.Page(pywikibot.Site(), DYKUpdateBot.DRIFT_LOC)
for str_line in page_drift.text.split('\n'):
try:
num_minutes_parsed = int(str_line[str_line.find(':') + 1:])
if 'advance' in str_line:
num_max_advance_minutes = num_minutes_parsed
elif 'delay' in str_line:
num_max_delay_minutes = num_minutes_parsed
except:
DYKUpdateBotUtils.log('Couldn\'t parse drift')
return 0
return DYKUpdateBotUtils.calculate_drift_core(time_update,
timedelta_between_updates,
num_max_advance_minutes,
num_max_delay_minutes)
def _parse_and_populate_credits(self, page_queue, hooks_incoming, file_incoming, rgstr_warnings) -> []:
rgcredits = DYKUpdateBotUtils.parse_credits(page_queue.text)
fn_log_warning = partial(self._log_warning, rgstr_warnings)
DYKUpdateBotUtils.validate_credits_articles(rgcredits, fn_log_warning)
DYKUpdateBotUtils.validate_credits_users(rgcredits, fn_log_warning)
DYKUpdateBotUtils.populate_hooks_and_file(rgcredits, hooks_incoming, file_incoming.title(with_ns=False))
for credit in rgcredits:
if credit.str_hook is None:
self._log_warning(rgstr_warnings, 'Couldn\'t find hook for [[{0}]], was the hook pulled or moved to a different set?'.format(credit.str_article))
return rgcredits
def _tag_articles(self, rgcredits, time_update) -> None:
set_tagged = set()
for credit in rgcredits:
if credit.str_article in set_tagged:
continue
str_edit_summary = None
page_talk = pywikibot.Page(pywikibot.Site(), 'Talk:' + credit.str_article)
page_talk.text, str_edit_summary = DYKUpdateBotUtils.tag_article_history(page_talk.text, credit, time_update)
if not str_edit_summary:
str_dyktalk_tag, str_edit_summary = DYKUpdateBotUtils.build_dyktalk_tag(credit, time_update)
page_talk.text = DYKUpdateBotUtils.add_template_to_talk(page_talk.text, str_dyktalk_tag)
self._edit(page_talk, str_edit_summary)
set_tagged.add(credit.str_article)
def _give_user_credits(self, rgcredits, str_dykbotdo_signature) -> None:
str_promoting_admin = DYKUpdateBotUtils.find_user_link(str_dykbotdo_signature)
for credit in rgcredits:
if not credit.str_user_talk:
continue
str_message, str_edit_summary = DYKUpdateBotUtils.build_user_talk_credit(credit, str_dykbotdo_signature, str_promoting_admin)
self._append_and_edit(credit.str_user_talk, str_message, str_edit_summary)
def _tag_outgoing_file(self, hooks_outgoing, time_update) -> None:
file_outgoing = DYKUpdateBotUtils.find_file(hooks_outgoing)
if file_outgoing:
file_outgoing_commons = pywikibot.FilePage(pywikibot.Site().image_repository(), file_outgoing.title())
if file_outgoing.exists() or file_outgoing_commons.exists():
str_dykfile_tag = '{{{{DYKfile|{d.day} {d:%B}|{d.year}}}}}'.format(d=time_update)
file_outgoing.text = DYKUpdateBotUtils.add_template_to_talk(file_outgoing.text, str_dykfile_tag)
self._edit(file_outgoing, 'File appeared on [[WP:Did you know|DYK]] on {d.day} {d:%B} {d.year}'.format(d=time_update))
if ('m-cropped' in file_outgoing.text.lower()) or ('c-uploaded' in file_outgoing.text.lower()):
DYKUpdateBotUtils.log('Outgoing file "{0}" tagged with {{m-cropped}} or {{c-uploaded}}'.format(file_outgoing.title()))
else:
DYKUpdateBotUtils.log('Special case (possible bug?): Outgoing file "{0}" doesn\'t exist'.format(file_outgoing.title()))
def _post_errors(self, rgstr_warnings, rgstr_errors) -> None:
str_output = ''
str_edit_summary = 'No errors or warnings; clear'
if rgstr_warnings:
str_warnings = 'Bot warnings:\n'
str_warnings += '\n'.join('* {0}'.format(str_warning) for str_warning in rgstr_warnings)
str_output = str_warnings + '\n\n' + str_output
str_edit_summary = 'Posting latest warnings'
if rgstr_errors:
str_errors = 'Errors blocking the bot from updating DYK:\n'
str_errors += '\n'.join('* {0}'.format(str_error) for str_error in rgstr_errors)
str_output = str_errors + '\n\n' + str_output
str_edit_summary = 'Bot is blocked from updating DYK, posting latest errors'
page_errors = pywikibot.Page(pywikibot.Site(), DYKUpdateBot.ERROR_OUTPUT_LOC)
if page_errors.text.strip() == str_output.strip():
return # if the errors are already on the page, don't post again
page_errors.text = str_output.strip()
self._edit(page_errors, str_edit_summary)
# ---------------------------------------------
# Core editing
# ---------------------------------------------
# Edge cases we're handling:
# * {{nobots}}
# * Redirects
# * Page doesn't exist
# * Edit conflicts
# * Protected page
def _append_and_edit(self, str_title, str_message, str_edit_summary) -> None:
page_to_edit = pywikibot.Page(pywikibot.Site(), str_title)
if page_to_edit.isRedirectPage():
page_to_edit = page_to_edit.getRedirectTarget()
if not page_to_edit.botMayEdit():
# Attempting to save the page when botMayEdit() is False will throw an OtherPageSaveError
DYKUpdateBotUtils.log('Couldn\'t edit ' + page_to_edit.title() + ' due to {{bots}} or {{nobots}}')
return
retry = True
while retry:
retry = False
try:
if page_to_edit.text != '':
page_to_edit.text += '\n\n'
page_to_edit.text += str_message
self._edit(page_to_edit, str_edit_summary)
except pywikibot.exceptions.EditConflictError:
retry = True
DYKUpdateBotUtils.log('Edit conflicted on ' + page_to_edit.title() + ' will retry after a short nap')
pywikibot.sleep(10) # sleep for 10 seconds
page_to_edit = pywikibot.Page(pywikibot.Site(), page_to_edit.title())
def _is_on(self) -> bool:
with open(str(pathlib.Path(__file__).parent / 'UpdateBotSwitch.txt'), 'r', encoding='utf-8') as f:
str_file_switch = f.read()
is_file_switch_on = str_file_switch.strip().lower() == 'on'
if not is_file_switch_on:
DYKUpdateBotUtils.log('Text file switch is not "on", exiting...')
return is_file_switch_on
def _edit(self, page_to_edit, str_edit_summary) -> None:
DYKUpdateBotUtils.log('Editing ' + page_to_edit.title())
if (not page_to_edit.exists()) and DYKUpdateBotUtils.check_if_salted(page_to_edit):
DYKUpdateBotUtils.log('Special case: ' + page_to_edit.title() + ' is salted, skipping...')
return
try:
page_to_edit.save(str_edit_summary, minor=False)
# For a dry run where the bot outputs to local files, comment out the above line and uncomment the lines below
# DYKUpdateBotUtils.log('Edit summary: ' + str_edit_summary)
# filename = ''.join(character for character in page_to_edit.title() if character not in '\/:*?<>|"') + '.txt'
# with open(str(pathlib.Path(__file__).parent / 'TestResources' / filename), 'w', encoding='utf-8') as file_write:
# file_write.write(page_to_edit.text)
except pywikibot.exceptions.LockedPageError: # I'm not sure it's possible to hit this with an adminbot...
DYKUpdateBotUtils.log('Special case: ' + page_to_edit.title() + ' is protected, skipping...')
def _log_error(self, rgstr_errors, str_error) -> None:
rgstr_errors.append(str_error)
DYKUpdateBotUtils.log('Error: ' + str_error)
def _log_warning(self, rgstr_warnings, str_warning) -> None:
rgstr_warnings.append(str_warning)
DYKUpdateBotUtils.log('Warning: ' + str_warning)
# Set of methods broken out for easier unit testability
# Unless otherwise noted, these methods don't make network calls
# Do Not edit the wiki from within these methods, otherwise unit tests will edit the wiki!
class DYKUpdateBotUtils():
@staticmethod
def wikilink_to_queue(num_queue, capitalize) -> str:
return '[[{0}{1}|{2}ueue {1}]]'.format(DYKUpdateBot.QUEUE_ROOT_LOC,
num_queue,
'Q' if capitalize else 'q')
# Returns a tuple:
# * First value is True if dykbotdo was found, False if not
# * Second value is the admin signature in dykbotdo, or None if not found
@staticmethod
def parse_dykbotdo(str_queue) -> (bool, str):
templates_in_queue = mwparserfromhell.parse(str_queue, skip_style_tags=True).filter_templates()
for template in templates_in_queue:
if template.name.matches('DYKbotdo'):
return True, str(template.get(1)) if template.has(1) else None
return False, None
# Returns:
# * Hooks if <!--Hooks--> and <!--HooksEnd--> tags are in order
# * None if not
@staticmethod
def extract_hooks(str_queue_or_tdyk) -> str:
idx_hooks_tag = str_queue_or_tdyk.find('<!--Hooks-->')
idx_hooksend_tag = str_queue_or_tdyk.find('<!--HooksEnd-->', max(idx_hooks_tag, 0))
if min(idx_hooks_tag, idx_hooksend_tag) == -1:
return None
return str_queue_or_tdyk[idx_hooks_tag + 12:idx_hooksend_tag].strip()
# Returns:
# * pywikibot.FilePage of the file in the DYK set if detected
# * None if not
@staticmethod
def find_file(str_hooks) -> pywikibot.FilePage:
templates_in_hooks = mwparserfromhell.parse(str_hooks, skip_style_tags=True).filter_templates()
for template in templates_in_hooks:
if template.name.matches('Main page image/DYK'):
# Note it's fine whether the parameter is File:XYZ.jpg, Image:XYZ.jpg, or XYZ.jpg
# all three formats will create the same FilePage object returning File:XYZ.jpg from title()
str_file = str(template.get('image').value)
if '{{!}}' in str_file:
DYKUpdateBotUtils.log('Special case: Stripping everything after pipe from filename "{0}"'.format(str_file))
str_file = str_file[:str_file.find('{{!}}')]
return pywikibot.FilePage(pywikibot.Site(), str_file)
return None
# This method makes network calls to the Wikipedia API (read-only)
# Returns:
# * None if protection looks good
# * A string describing the issue if not
# Cases to validate if changing this function (leverage the unit tests!):
# * File that doesn't exist
# * File:Nlksjdkfjskdljflkdsjfame.jpg
# * Fully not-protected file
# * en:File:Emmelie de Forest Hunter & Prey.png and commons:File:Novo Selo TE 01.JPG
# * Fully not-protected file on Commons with an enwiki description page
# * en:File:MET Breuer (48377070386).jpg
# * Semi-protected file
# * en:File:Amy Barlow.jpg and commons:File:Flag of Palestine.svg
# * Fully protected file indefinitely protected
# * en:File:George Floyd neck knelt on by police officer.png and commons:File:Name.jpg
# * Fully protected file via cascading protection
# * en:File:WPVG icon 2016.svg and commons:File:Wikitech-2020-logo.svg
# * Fully protected file with protection expiring before set leaves the Main Page
# * Use the API to find examples:
# * https://commons.wikimedia.org/w/api.php?action=query&list=allpages&apnamespace=6&apprtype=edit&apprexpiry=definite&apprlevel=sysop&aplimit=500
# * Fully protected file with protection expiring after set leaves the Main Page
# * see URL above
@staticmethod
def check_if_protected(filepage, time_set_leaving) -> str:
str_file_for_output = filepage.title(as_link=True, textlink=True)
filepage_commons = pywikibot.FilePage(pywikibot.Site().image_repository(), filepage.title())
if not (filepage.exists() or filepage_commons.exists()):
return str_file_for_output + ' does not exist'
on_commons = filepage.file_is_shared()
if on_commons:
filepage = filepage_commons
edit_protections = filepage.protection().get('edit')
if edit_protections is None:
if on_commons:
return str_file_for_output + ' is not protected; either 1) Upload the file to en.wiki ([[Wikipedia:Did you know/Admin instructions#If KrinkleBot is down|see instructions]]), or 2) protect the file at Commons'
else: # on enwiki
return str_file_for_output + ' is not protected'
if edit_protections[0] != 'sysop':
return str_file_for_output + ' is not fully protected'
str_prot_end = edit_protections[1]
if str_prot_end == 'infinity':
return None
time_prot_end = pywikibot.Timestamp.fromISOformat(str_prot_end).replace(tzinfo=timezone.utc)
if time_prot_end < time_set_leaving:
return 'The protection for ' + str_file_for_output + ' will expire before or while it\'s on the Main Page'
return None # protection expires after set leaves the Main Page
@staticmethod
def calculate_drift_core(time_update, timedelta_between_updates, minutes_max_advance, minutes_max_delay) -> int:
seconds_per_day = 60 * 60 * 24
seconds_least_difference_from_0000 = 60 * 60 * 24
set_seconds_differences = set()
time_iter = time_update
while True:
current_difference_from_0000 = int(time_iter.timestamp()) % seconds_per_day
if current_difference_from_0000 > (seconds_per_day / 2):
current_difference_from_0000 = -(seconds_per_day - current_difference_from_0000)
if abs(seconds_least_difference_from_0000) > abs(current_difference_from_0000):
seconds_least_difference_from_0000 = current_difference_from_0000
if seconds_least_difference_from_0000 == 0:
break
if (current_difference_from_0000 in set_seconds_differences) or (len(set_seconds_differences) >= 24):
break
set_seconds_differences.add(current_difference_from_0000)
time_iter = time_iter + timedelta_between_updates
if seconds_least_difference_from_0000 > 0:
return -min(minutes_max_advance, seconds_least_difference_from_0000 // 60)
elif seconds_least_difference_from_0000 < 0:
return min(minutes_max_delay, -seconds_least_difference_from_0000 // 60)
else:
return 0
# This method makes network calls to the Wikipedia API (read-only)
@staticmethod
def check_if_salted(page) -> bool:
create_protections = page.protection().get('create')
return create_protections and (create_protections[0] == 'sysop')
@staticmethod
def archive(str_archive, time_update, hooks_outgoing) -> str:
str_section_heading = '==={d.day} {d:%B} {d.year}==='.format(d=time_update)
str_set_heading = '*\'\'\'\'\'{d:%H}:{d:%M}, {d.day} {d:%B} {d.year} (UTC)\'\'\'\'\''.format(d=time_update)
idx_this_date = str_archive.find(str_section_heading) # check if there is a section heading already for today
if idx_this_date == -1: # if there isn't, create a new section heading
idx_insert_section = str_archive.find('\n', str_archive.find('<!--BOTPOINTER-->')) + 1
str_archive = DYKUpdateBotUtils._insert_str(str_archive, idx_insert_section, str_section_heading + '\n')
idx_this_date = idx_insert_section
idx_this_date = str_archive.find('\n', idx_this_date) + 1
return DYKUpdateBotUtils._insert_str(str_archive, idx_this_date, str_set_heading + '\n' + hooks_outgoing + '\n\n')
@staticmethod
def parse_credits(str_queue) -> []:
templates_in_queue = mwparserfromhell.parse(str_queue, skip_style_tags=True).filter_templates()
rgcredits = []
for template in templates_in_queue:
if template.name.matches('DYKmake') or template.name.matches('DYKnom'):
if not (template.has(1) and template.has(2)):
continue
credit = DYKCredit()
credit.str_article = html.unescape(str(template.get(1).value))
credit.str_user = html.unescape(str(template.get(2).value))
credit.is_dykmake = template.name.matches('DYKmake')
if template.has('subpage'):
str_subpage = html.unescape(str(template.get('subpage').value))
if str_subpage != '':
credit.str_nompage = 'Template:Did you know nominations/' + str_subpage
# sanitize
if (credit.str_article == 'Example' or credit.str_article == '' or
credit.str_user == '' or credit.str_user == 'Editor' or credit.str_user == 'Nominator'):
continue
credit.str_article = credit.str_article.replace('[[', '').replace(']]', '')
rgcredits.append(credit)
return rgcredits
# This method makes network calls to the Wikipedia API (read-only)
# As "output", sets str_article on valid credits & deletes credits for nonexistent articles
@staticmethod
def validate_credits_articles(rgcredits, fn_log_warning) -> None:
# Articles:
# * expand any templates in the article name
# * delete credits for nonexistent articles
# * follow redirects
# * normalize titles
dict_processed = {}
for idx_credit in reversed(range(len(rgcredits))):
str_article_orig = rgcredits[idx_credit].str_article
if str_article_orig in dict_processed:
rgcredits[idx_credit].str_article = dict_processed[str_article_orig].str_article
continue
str_article_processed = str_article_orig
if '}}' in str_article_processed:
str_article_processed = pywikibot.Site().expand_text(text=str_article_processed)
DYKUpdateBotUtils.log('Special case: Credit article title contains template "{0}"->"{1}"'.format(str_article_orig, str_article_processed))
page_article = pywikibot.Page(pywikibot.Site(), str_article_processed)
if page_article.isRedirectPage():
page_article = page_article.getRedirectTarget()
if not page_article.exists():
fn_log_warning('Article [[{0}]] does not exist'.format(str_article_orig))
del rgcredits[idx_credit]
continue
str_article_processed = page_article.title()
rgcredits[idx_credit].str_article = str_article_processed
dict_processed[str_article_orig] = rgcredits[idx_credit]
# This method makes network calls to the Wikipedia API (read-only)
# As "output", sets str_user_talk on valid credits
@staticmethod
def validate_credits_users(rgcredits, fn_log_warning) -> None:
# Users:
# * expand any templates in the username
# * check for nonexistent users
# * follow redirects
# * normalize titles
dict_processed = {}
for credit in rgcredits:
str_user_orig = credit.str_user
if str_user_orig in dict_processed:
credit.str_user_talk = dict_processed[str_user_orig].str_user_talk
continue
str_user_processed = str_user_orig
if '}}' in str_user_processed:
str_user_processed = pywikibot.Site().expand_text(text=str_user_processed)
DYKUpdateBotUtils.log('Special case: Credit username contains template "{0}"->"{1}"'.format(str_user_orig, str_user_processed))
user = pywikibot.User(pywikibot.Site(), str_user_processed)
is_valid_user = user.isRegistered() or (user.isAnonymous() and user.last_edit)
if not is_valid_user:
# was the user recently renamed?
# example API call: https://en.wikipedia.org/w/api.php?action=query&list=logevents&letype=renameuser&letitle=User:Carrot%20official&lelimit=1
for entry in pywikibot.Site().logevents('renameuser', page=user.title(), total=1):
if entry['params']['olduser'] == user.username:
user = pywikibot.User(pywikibot.Site(), entry['params']['newuser'])
DYKUpdateBotUtils.log('Special case: User listed in credit was renamed "{0}"->"{1}"'.format(str_user_orig, user.username))
is_valid_user = user.isRegistered() or (user.isAnonymous() and user.last_edit)
if is_valid_user:
page_usertalk = user.getUserTalkPage()
if page_usertalk.isRedirectPage():
DYKUpdateBotUtils.log('Special case: User talk is a redirect "{0}"'.format(page_usertalk.title()))
page_usertalk = page_usertalk.getRedirectTarget()
if page_usertalk.isTalkPage():
# no funny business - the redirect above shouldn't make the bot, eg, tag the Main Page with a DYK credit
credit.str_user_talk = page_usertalk.title()
else:
fn_log_warning('The username \'{0}\' is invalid'.format(str_user_orig))
dict_processed[str_user_orig] = credit
# This method makes network calls to the Wikipedia API (read-only) if:
# * There's a template within the hooks
# * There's no string match between the article listed in the credit and the hooks - redirect search
# As "output", sets str_hook and (if first hook) str_file on credits
@staticmethod
def populate_hooks_and_file(rgcredits, str_hooks, str_file) -> None:
# remove stuff at the top that isn't hooks (eg image)
if str_file and (str_file in str_hooks):
str_hooks = str_hooks[str_hooks.find('\n', str_hooks.find(str_file)):].strip()
idx_newline = str_hooks.rfind('\n', 0, str_hooks.find('...'))
if idx_newline != -1:
str_hooks = str_hooks[idx_newline:].strip()
# expand templates
str_hooks_normalized = str_hooks
if '}}' in str_hooks_normalized:
str_hooks_normalized = pywikibot.Site().expand_text(text=str_hooks_normalized)
# unescape HTML and replace non-breaking spaces with normal spaces
str_hooks_normalized = html.unescape(str_hooks_normalized).replace(html.unescape(' '), ' ')
rghooks_orig = str_hooks.split('\n')
rghooks_normalized = str_hooks_normalized.lower().split('\n')
# remove any lines without '...' and trim any leading characters, like *
for idx_hook in reversed(range(len(rghooks_orig))):
str_hook = rghooks_orig[idx_hook]
idx_ellipses = str_hook.find('...')
if idx_ellipses == -1:
del rghooks_orig[idx_hook]
del rghooks_normalized[idx_hook]
else:
rghooks_orig[idx_hook] = str_hook[idx_ellipses:]
# search for the hook for each article
dict_processed = {}
for credit in rgcredits:
if credit.str_article in dict_processed:
credit.str_hook = dict_processed[credit.str_article].str_hook
credit.str_file = dict_processed[credit.str_article].str_file
continue
idx_found_hook = DYKUpdateBotUtils._find_hook(credit.str_article, rghooks_normalized)
if idx_found_hook == -1: # maybe the hook links to a page that redirects to str_article?
page_article = pywikibot.Page(pywikibot.Site(), credit.str_article)
for page_redirect in page_article.getReferences(filter_redirects=True, namespaces=pywikibot.site.Namespace.MAIN):
idx_found_hook = DYKUpdateBotUtils._find_hook(page_redirect.title(), rghooks_normalized)
if idx_found_hook != -1:
DYKUpdateBotUtils.log('Special case: Hook matches redirect to article "{0}"'.format(credit.str_article))
break # got a hit! no need to keep iterating through redirects
if idx_found_hook >= 0:
credit.str_hook = rghooks_orig[idx_found_hook]
if idx_found_hook == 0:
credit.str_file = str_file
dict_processed[credit.str_article] = credit
@staticmethod
def _find_hook(str_article, rghooks_normalized) -> int:
str_article_lower = str_article.lower()
for idx_hook, str_hook_normalized in enumerate(rghooks_normalized):
if str_article_lower in str_hook_normalized:
return idx_hook
return -1
@staticmethod
def tag_article_history(str_talk, credit, time_update) -> (str, str):
template_ah = None
templates_on_talk = mwparserfromhell.parse(str_talk, skip_style_tags=True).filter_templates()
for template in templates_on_talk:
tname = template.name
if (tname.matches('Article history') or tname.matches('Articlehistory') or
tname.matches('Article History') or tname.matches('ArticleHistory') or
tname.matches('Article milestones') or tname.matches('Articlemilestones')):
template_ah = template
break
str_edit_summary = None
if template_ah:
str_edit_summary = ('Article appeared on [[WP:Did you know|DYK]] on {d.day} {d:%B} {d.year}'
', adding to {{{{[[Template:Article history|Article history]]}}}}'.format(d=time_update))
str_article_history_orig = str(template_ah)
# According to documentation at Template:Article_history, DYK params go between |currentstatus and |topic
param_topic = template_ah.get('topic') if template_ah.has('topic') else None
template_ah.add('dykdate', '{d.day} {d:%B} {d.year}'.format(d=time_update), before=param_topic)
if credit.str_hook:
template_ah.add('dykentry', credit.str_hook, before=param_topic)
if credit.str_nompage:
template_ah.add('dyknom', credit.str_nompage, before=param_topic)
str_talk = str_talk.replace(str_article_history_orig, str(template_ah))
return str_talk, str_edit_summary
# Returns a tuple:
# * First value is the dyktalk tag
# * Second value is the edit summary
@staticmethod
def build_dyktalk_tag(credit, time_update) -> (str, str):
str_tag = '\n{{{{DYK talk|{d.day} {d:%B}|{d.year}{str_image_param}{str_hook_param}{str_nompage_param}}}}}'.format(
d=time_update,
str_image_param=('|image=' + credit.str_file) if credit.str_file else '',
str_hook_param=('|entry=' + credit.str_hook) if credit.str_hook else '',
str_nompage_param=('|nompage=' + credit.str_nompage) if credit.str_nompage else '')
str_edit_summary = ('Article appeared on [[WP:Did you know|DYK]] on {d.day} {d:%B} {d.year}'
', adding {{{{[[Template:DYK talk|DYK talk]]}}}}'.format(d=time_update))
return str_tag, str_edit_summary
@staticmethod
def add_template_to_talk(str_talk, str_tag) -> str:
idx_first_section = str_talk.find('==')
if idx_first_section == -1:
idx_first_section = len(str_talk)
str_header = str_talk[:idx_first_section]
idx_last_template = DYKUpdateBotUtils._last_template_index(str_header)
if (idx_last_template < len(str_talk)) and (str_talk[idx_last_template] != '\n'):
str_tag = str_tag + '\n'
return DYKUpdateBotUtils._insert_str(str_talk, idx_last_template, str_tag).strip()
@staticmethod
def _last_template_index(str_header) -> int:
# To a human reader, GA / DYK etc discussions aren't templates, they're part of the content
# so detect and remove them from what we consider the header
# GA discussion transclusion example from Special:Diff/1022091498: {{Talk:Harry J. Capehart/GA1}}
# DYK discussion transclusion example from Special:Diff/873606519: {{Did you know nominations/Bishop John Carroll (statue)}}
# DYK discussion transclusion example from Special:Diff/1022869159: {{Template:Did you know nominations/Sacred Heart Catholic Church (Mathura)}}
# And some talk page templates show up as small by default, and should be below full-size tags
# {{Translated page}} example from Special:Diff/1029600040: {{Translated page|es|Auditoría Superior de la Federación||version=133396209}}
# {{archives}} example from Special:Diff/1025854855: {{archives}}
# {{User:ClueBot III/ArchiveThis}} example from Special:Diff/1026915635: {{User:ClueBot III/ArchiveThis|archiveprefix=Talk:Santa Cruz Operation/Archives/|format=Y|age=26297|index=yes|archivebox=yes|box-advert=yes}}
match = search('\{\{\s*([Tt]alk:|([Tt]emplate:\s*)?[Dd]id you know nominations/|[Tt]ranslated|[Uu]ser:ClueBot III/ArchiveThis|[Aa]rchive)', str_header)
if match:
str_header = str_header[:match.start()]
idx_last_template = str_header.rfind('}}')
if idx_last_template == -1:
idx_last_template = 0
else:
idx_last_template += 2
return idx_last_template
# Returns username if one was found, None if not
@staticmethod
def find_user_link(str_dykbotdo_signature) -> str:
links_in_sig = mwparserfromhell.parse(str_dykbotdo_signature, skip_style_tags=True).filter_wikilinks()
for link in links_in_sig:
str_title = str(link.title)
idx_user_or_usertalk = max(str_title.find('User:'), str_title.find('User talk:'))
if idx_user_or_usertalk != -1:
str_user = str_title[str_title.find(':', idx_user_or_usertalk) + 1:]
idx_trailing = max(str_user.find('#'), str_user.find('/'))
if idx_trailing != -1:
str_user = str_user[:idx_trailing]
return str_user
return None
# Returns a tuple:
# * First value is the message on the talk page (section + credit + signature)
# * Second value is the edit summary
@staticmethod
def build_user_talk_credit(credit, str_dykbotdo_signature, str_promoting_admin) -> (str, str):
str_message = ('==DYK for {str_article}==\n'
'{{{{subst:Template:{str_template} |article={str_article} {str_hook_param} '
'{str_nompage_param} |optional= }}}} {str_sig}'
.format(str_article=credit.str_article,
str_template='DYKmake/DYKmakecredit' if credit.is_dykmake else 'DYKnom/DYKnomcredit',
str_hook_param=('|hook=' + credit.str_hook) if credit.str_hook else '',
str_nompage_param=('|nompage=' + credit.str_nompage) if credit.str_nompage else '',
str_sig=(str_dykbotdo_signature + ' ~~~~~') if str_dykbotdo_signature else '~~~~'))
str_edit_summary = 'Giving DYK credit for [[{str_article}]]'.format(str_article=credit.str_article)
if str_promoting_admin:
str_edit_summary += ' on behalf of [[User:{str_username}|{str_username}]]'.format(str_username=str_promoting_admin)
return str_message, str_edit_summary
@staticmethod
def _insert_str(str_target, idx, str_insert) -> str:
return str_target[:idx] + str_insert + str_target[idx:]
@staticmethod
def log(str_to_log) -> None:
print(str_to_log, flush=True)
class ValidationResults():
def __init__(self) -> None:
self.rgstr_errors = []
self.rgstr_warnings = []
self.page_TDYK = None
self.page_queue = None
self.num_queue = 0
self.file_incoming = None
self.hooks_incoming = None
self.hooks_outgoing = None
self.str_dykbotdo_signature = None
self.timedelta_between_updates = None
class DYKCredit():
def __init__(self) -> None:
self.str_article = None
self.str_user = None
self.str_user_talk = None
self.str_nompage = None
self.is_dykmake = True
self.str_hook = None
self.str_file = None
def __str__(self):
return 'DYKCredit! article:{0}, user:{1}, nompage:{2}, is_dykmake:{3}, hook:{4}, file:{5}'.format(
self.str_article, self.str_user, self.str_nompage, self.is_dykmake, self.str_hook, self.str_file)
def main() -> None:
bot = DYKUpdateBot()
bot.run()
if __name__ == '__main__':
main()