commit 1f59cfecd34d2cb8f156a1b9112ace0b40a59173 Author: James Jennett-Wheeler Date: Mon Jun 23 11:06:51 2025 +0100 Initial version diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ad62966 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/driver diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/PlanningScraper.iml b/.idea/PlanningScraper.iml new file mode 100644 index 0000000..defcddb --- /dev/null +++ b/.idea/PlanningScraper.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..c795a61 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..e08f11c --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/application.py b/application.py new file mode 100644 index 0000000..460dae8 --- /dev/null +++ b/application.py @@ -0,0 +1,157 @@ +import time +from datetime import datetime +from sqlite3 import Cursor +import re +from typing import List + +from exceptiongroup import catch +from prettytable import PrettyTable + +from selenium.webdriver.chrome.webdriver import WebDriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.wait import WebDriverWait as Wait + +timeout = 5 +base_url = "https://app.bathnes.gov.uk/webforms/planning" +current_date = datetime.today().strftime('%Y-%m-%d') + +HEADER_START = re.compile(r']+font-weight-bold[^>]+>') +CLOSE_TAGS = re.compile(r']+>') +REMAINING_TAGS = re.compile(r']+>') + + +class Application: + @staticmethod + def CreateTableIfNotExists(cursor: Cursor, reset = False): + if reset: + cursor.execute("DROP TABLE IF EXISTS applications;") + + create_table = """ CREATE TABLE IF NOT EXISTS applications ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + reference TEXT NOT NULL, + dateScraped TEXT NOT NULL, + dateDecided TEXT, + caseOfficer TEXT, + description TEXT, + decision TEXT, + num_documents INTEGER + ); """ + cursor.execute(create_table) + + @staticmethod + def PrintTable(applications: List): + table = PrettyTable(['Ref', 'Scrape Date', 'Decision Date', 'Decision', 'Case Officer', 'Docs', 'Description']) + for application in applications: + if type(application) is Application: + table.add_row([application.reference, application.dateScraped, application.dateDecided, application.decision, application.caseOfficer, application.num_documents, application.description]) + table.align = "l" + print(table) + + def __init__(self, cursor: Cursor, reference: str): + self.cursor = cursor + self.reference = reference + self.url = f"{base_url}/details.html?refval={self.reference.replace('/', '%2F')}" + self.raw_data_map = {} + self.new_documents_found = False + + cursor.execute("SELECT dateScraped, dateDecided, caseOfficer, description, decision, num_documents FROM applications WHERE reference = ?", (self.reference,)) + result = cursor.fetchall() + + if len(result) == 0: + insert_application = "INSERT INTO applications (reference, dateScraped) VALUES(?,?)" + cursor.execute(insert_application, (self.reference, current_date)) + self.dateScraped = current_date + self.dateDecided = "" + self.caseOfficer = "" + self.description = "" + self.decision = "" + self.num_documents = 0 + else: + self.dateScraped = result[0][0] + self.dateDecided = result[0][1] or "" + self.caseOfficer = result[0][2] or "" + self.description = result[0][3] or "" + self.decision = result[0][4] or "" + self.num_documents = result[0][5] or 0 + + def scrape_portal(self, browser: WebDriver, force: bool = False, count_documents: bool = False): + if not force and self.caseOfficer: + print(f"Already parsed {self.reference}") + return + + print(f"Parsing {self.reference}") + + browser.get(self.url) + + details = Wait(browser, timeout=timeout).until(EC.visibility_of_element_located((By.ID, "details"))) + self.__html_to_map(details.get_attribute('innerHTML')) + + important_dates = Wait(browser, timeout=timeout).until(EC.invisibility_of_element_located((By.ID, "importantDates"))) + self.__html_to_map(important_dates.get_attribute('innerHTML')) + + self.__parse_raw_data() + + update_sql = "UPDATE applications SET dateDecided = ?, caseOfficer = ?, description = ?, decision = ? WHERE reference = ?" + self.cursor.execute(update_sql, (self.dateDecided, self.caseOfficer, self.description, self.decision, self.reference)) + + if count_documents: + self.__count_documents(browser) + + def __html_to_map(self, html: str): + details = HEADER_START.sub('', html) + details = CLOSE_TAGS.sub('', details) + details = REMAINING_TAGS.sub('\t', details) + + for detail_raw in details.split(""): + detail = detail_raw.strip() + if detail: + k_v = detail.split('\t', 1) + + if len(k_v) == 2: + self.raw_data_map[k_v[0]] = k_v[1] + else: + print(f"Error parsing: {detail}") + + def __parse_raw_data(self): + self.description = self.raw_data_map["Proposal"].replace('\n', '
') + self.caseOfficer = self.raw_data_map["Case Officer Name"] + + try: + self.decision = self.raw_data_map["Decision"] + except KeyError: + self.decision = None + + try: + decision_date = self.raw_data_map["Decision Made"].split('/') + self.dateDecided = f"{decision_date[2]}-{decision_date[1]}-{decision_date[0]}" + except KeyError: + self.dateDecided = None + + def __count_documents(self, browser: WebDriver): + documents_button = Wait(browser, timeout=timeout).until(EC.element_to_be_clickable((By.ID, "tab_documents_Section"))) + documents_button.click() + + documents_frame = Wait(browser, timeout=20).until(EC.visibility_of_element_located((By.ID, "iframe"))) + browser.switch_to.frame(documents_frame) + + Wait(browser, timeout=60).until(EC.none_of(EC.text_to_be_present_in_element((By.ID, "documents_info"), "No documents found"))) + new_num_documents = int(browser.find_element(by=By.ID, value="documents_info").text.split(" of ")[1].replace(" documents", "")) + + browser.switch_to.default_content() + + if new_num_documents > self.num_documents: + self.num_documents = new_num_documents + self.new_documents_found = True + + update_sql = "UPDATE applications SET num_documents = ? WHERE reference = ?" + self.cursor.execute(update_sql, (self.num_documents, self.reference)) + + + def __str__(self): + return (f'Application: {self.reference}\n' + f'Date Decided: {self.dateDecided}\n' + f'Case Officer: {self.caseOfficer}\n' + f'Description: {self.description}\n' + f'Decision: {self.decision}' + f'\nDocument Count: {self.num_documents}' if self.num_documents > 0 else "") diff --git a/database.db b/database.db new file mode 100644 index 0000000..063bde3 Binary files /dev/null and b/database.db differ diff --git a/monitor-planning.py b/monitor-planning.py new file mode 100644 index 0000000..4ef7bda --- /dev/null +++ b/monitor-planning.py @@ -0,0 +1,122 @@ +import os +import sys +import traceback +from datetime import datetime, time +from sqlite3 import Cursor + +import pause +import requests + +from application import Application + +import sqlite3 +from selenium import webdriver + +from weeklyList import WeeklyList +from workingHours import is_working_hours, next_working_hour, potential_midday_upload + +refresh_rate_minutes = 5 +search_past_week = 0 +search_num_weeks = 1 +reset_table = False + +web_opts = webdriver.ChromeOptions() +web_opts.add_argument('--headless') + +def notify(title, message): + api_url = 'https://hass.jennett-wheeler.co.uk/api/webhook/-Qx6jHsGLHwbBlJpLek5Nj8qS' + requests.post(api_url, json={"title": title, "message": message}) + +def update_other_applications(): + there_were_newly_decided_applications = False + with sqlite3.connect("database.db") as _conn: + _cursor = _conn.cursor() + + with webdriver.Chrome(options=web_opts) as _browser: + print("Scrape Weekly List(s)") + weekly_list = WeeklyList(_cursor) + + for search_week_idx in range(search_past_week, + min(search_past_week + search_num_weeks, 9)): # Council only allow latest 9 weeks + weekly_list.scrape(_browser, search_week_idx) + + there_were_newly_decided_applications = len(weekly_list.new_applications) > 0 + print(" Number of new decided applications: " + str(len(weekly_list.new_applications))) + print(" Number of existing applications: " + str(len(weekly_list.existing_applications))) + print("") + + if there_were_newly_decided_applications: + notify("New decisions found", f"Council has uploaded {len(weekly_list.new_applications)} new decisions") + + _cursor.execute("SELECT reference FROM applications WHERE caseOfficer IS NULL") + newly_decided_applications = _cursor.fetchall() + + if len(newly_decided_applications) > 0: + print(f"Scrape Newly Decided Applications: {len(newly_decided_applications)}") + + for (application_ref, ) in newly_decided_applications: + _app = Application(_cursor, application_ref) + _app.scrape_portal(_browser) + + print("") + + return there_were_newly_decided_applications + +if __name__ == '__main__': + try: + with sqlite3.connect("database.db") as connection: + cursor = connection.cursor() + Application.CreateTableIfNotExists(cursor, reset_table) + + midday_checked = False + while True: + with sqlite3.connect("database.db") as connection: + application = Application(connection.cursor(), "25/00605/FUL") + + with webdriver.Chrome(options=web_opts) as browser: + application.scrape_portal(browser, force=True, count_documents=True) + + if application.new_documents_found: + notify("New Documents Found", f"Application now has {application.num_documents} documents") + print("") + + if is_working_hours(): + if not midday_checked and potential_midday_upload(): + midday_checked = update_other_applications() + if midday_checked: + print(f"New decisions found at: {datetime.now().strftime('%H-%M-%S')}" ) + + pause.minutes(refresh_rate_minutes) + else: + if update_other_applications(): + print(f"New decisions found at: {datetime.now().strftime('%H-%M-%S')}" ) + + next_start = next_working_hour() + print(f"Pausing until: {next_start}") + pause.until(next_start) + else: + if datetime.now().time() > time(19, 0, 0): + next_start = next_working_hour() + print(f"Pausing until: {next_start}") + pause.until(next_start) + + else: + pause.minutes(refresh_rate_minutes) + + + except KeyboardInterrupt: + print('Interrupted') + try: + sys.exit(130) + except SystemExit: + os._exit(130) + + except Exception as e: + print(f'Error found: {repr(e)}') + print(traceback.format_exc()) + notify("Error in planning monitor", repr(e)) + + try: + sys.exit(130) + except SystemExit: + os._exit(130) \ No newline at end of file diff --git a/scrape-my-application.py b/scrape-my-application.py new file mode 100644 index 0000000..9a11c4b --- /dev/null +++ b/scrape-my-application.py @@ -0,0 +1,52 @@ +import os +import sys +import time + +import pause +import requests + +from application import Application + +import sqlite3 +from selenium import webdriver + +from workingHours import is_working_hours, next_working_hour + +refresh_rate_minutes = 5 +api_url = 'https://hass.jennett-wheeler.co.uk/api/webhook/-Qx6jHsGLHwbBlJpLek5Nj8qS' + +if __name__ == '__main__': + try: + with sqlite3.connect("database.db") as connection: + cursor = connection.cursor() + + options = webdriver.ChromeOptions() + options.add_argument('--headless') + + application = Application(cursor, "25/00605/FUL") + num_documents = 18 + + while True: + if is_working_hours(): + with webdriver.Chrome(options=options) as browser: + application.scrape_portal(browser, force=True, count_documents=True) + + if num_documents < application.num_documents: + num_new_documents = application.num_documents - num_documents + num_documents = application.num_documents + requests.post(api_url) + print(f"New documents! {num_new_documents}") + + pause.minutes(refresh_rate_minutes) + + else: + next_start = next_working_hour() + print(f"Pausing until: {next_start}") + pause.until(next_start) + + except KeyboardInterrupt: + print('Interrupted') + try: + sys.exit(130) + except SystemExit: + os._exit(130) diff --git a/scrape-new-applications.py b/scrape-new-applications.py new file mode 100644 index 0000000..2618e4b --- /dev/null +++ b/scrape-new-applications.py @@ -0,0 +1,56 @@ +import os +import sys + +from application import Application +from weeklyList import WeeklyList + +import sqlite3 +from selenium import webdriver +import re + +search_past_week = 0 +search_num_weeks = 1 +reset_table = False + +TAG_RE = re.compile(r'<[^>]+>') + +if __name__ == '__main__': + try: + with sqlite3.connect("database.db") as connection: + cursor = connection.cursor() + Application.CreateTableIfNotExists(cursor, reset_table) + + options = webdriver.ChromeOptions() + options.add_argument('--headless') + + with webdriver.Chrome(options=options) as browser: + print("Scrape Weekly List(s)") + weeklyList = WeeklyList(cursor) + + for search_week_idx in range(search_past_week, min(search_past_week + search_num_weeks, 9)): # Council only allow latest 9 weeks + weeklyList.scrape(browser, search_week_idx) + + print("Number of new decided applications: " + str(len(weeklyList.new_applications))) + print("Number of existing applications: " + str(len(weeklyList.existing_applications))) + print("") + + cursor.execute("SELECT reference FROM applications WHERE caseOfficer IS NULL") + newly_decided_applications = cursor.fetchall() + + if len(newly_decided_applications) > 0: + print(f"Scrape Newly Decided Applications: {len(newly_decided_applications)}") + + for (application_ref, ) in newly_decided_applications: + application = Application(cursor, application_ref) + application.scrape_portal(browser) + + print("") + + print("Done") + + except KeyboardInterrupt: + print('Interrupted') + try: + sys.exit(130) + except SystemExit: + os._exit(130) \ No newline at end of file diff --git a/search-db.py b/search-db.py new file mode 100644 index 0000000..25979a8 --- /dev/null +++ b/search-db.py @@ -0,0 +1,18 @@ +from application import Application + +import sqlite3 + +with sqlite3.connect("database.db") as connection: + cursor = connection.cursor() + applications = [] + + print("This week's Application decisions:") + cursor.execute("SELECT reference FROM applications WHERE dateScraped >= '2025-06-23' ORDER BY dateDecided DESC") + + # print("Chris' Applications:") + # cursor.execute("SELECT reference FROM applications WHERE caseOfficer = 'Christopher Masters' ORDER BY dateDecided DESC") + + for (application_ref,) in cursor.fetchall(): + applications.append(Application(cursor, application_ref)) + + Application.PrintTable(applications) \ No newline at end of file diff --git a/weeklyList.py b/weeklyList.py new file mode 100644 index 0000000..b51a993 --- /dev/null +++ b/weeklyList.py @@ -0,0 +1,57 @@ +import time +from sqlite3 import Cursor +import re + +from selenium.webdriver.support.select import Select + +from application import Application + +from selenium.webdriver.chrome.webdriver import WebDriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.wait import WebDriverWait as Wait + +timeout = 5 +base_url = "https://app.bathnes.gov.uk/webforms/planning" +TAG_RE = re.compile(r'<[^>]+>') + +class WeeklyList: + def __init__(self, cursor: Cursor): + Application.CreateTableIfNotExists(cursor) + self.cursor = cursor + self.new_applications = [] + self.existing_applications = [] + + def scrape(self, browser: WebDriver, search_past_week = 0): + browser.refresh() + browser.get(f"{base_url}/search.html#weeklyList") + + # Bring up list of decided applications + search_button = Wait(browser, timeout=timeout).until(EC.element_to_be_clickable((By.ID, "weeklySearchBtn"))) + time.sleep(0.5) # Give a little extra time + + search_type = Select(browser.find_element(by=By.ID, value="weeklyListOption")) + search_type.select_by_value('decided') + search_week = Select(browser.find_element(by=By.ID, value="weeklyListBetween")) + search_week.select_by_index(search_past_week) + + week_str = search_week.options[search_past_week].text.split(" to ")[0] + print(f"Week: {week_str}") + + search_button.click() + + results = Wait(browser, timeout=timeout).until(EC.visibility_of_element_located((By.ID, "results-table"))) + + rows = results.find_elements(By.TAG_NAME, "tr") + for row in rows: + col = row.find_elements(By.TAG_NAME, "td")[0] + application_html = col.get_attribute('innerHTML').replace('\n', '
') + + application_ref_html = application_html.strip().split("
")[0].strip() + application_ref = TAG_RE.sub('', application_ref_html).replace("Application Reference: ", "") + + application = Application(self.cursor, application_ref) + if application.caseOfficer: + self.existing_applications.append(application) + else: + self.new_applications.append(application) diff --git a/workingHours.py b/workingHours.py new file mode 100644 index 0000000..509e72c --- /dev/null +++ b/workingHours.py @@ -0,0 +1,62 @@ +from datetime import time, datetime, timedelta + +def is_working_hours(date = datetime.now()): + if date.weekday() >= 5: + return False + + start = time(8, 0, 0) + end = time(18, 0, 0) + + current_time = date.time() + return start <= current_time <= end + +def potential_midday_upload(date = datetime.now()): + if date.weekday() >= 5: + return False + + midday_upload_time = time(14, 0, 0) + + current_time = date.time() + return midday_upload_time <= current_time + +def next_working_hour(date = datetime.now()): + if is_working_hours(date): + return date + + potential_start = date.replace(hour=8, minute=0, second=0, microsecond=0) + if date > potential_start: + potential_start += timedelta(days=1) + + while not is_working_hours(potential_start): + potential_start += timedelta(days=1) + + return potential_start + +if __name__ == '__main__': + # Test Times + assert is_working_hours(datetime(2025, 6, 20, 16,54, 0)) + assert is_working_hours(datetime(2025, 6, 20, 18,54, 0)) + assert is_working_hours(datetime(2025, 6, 20, 19,0, 0)) + assert not is_working_hours(datetime(2025, 6, 20, 19,1, 0)) + assert is_working_hours(datetime(2025, 6, 20, 8,0, 0)) + assert not is_working_hours(datetime(2025, 6, 20, 7,59, 59)) + + # Test Week Day + assert not is_working_hours(datetime(2025, 6, 21, 16,54, 0)) + assert not is_working_hours(datetime(2025, 6, 21, 18,54, 0)) + assert not is_working_hours(datetime(2025, 6, 21, 19,0, 0)) + assert not is_working_hours(datetime(2025, 6, 21, 19,1, 0)) + assert not is_working_hours(datetime(2025, 6, 21, 8,0, 0)) + assert not is_working_hours(datetime(2025, 6, 21, 7,59, 59)) + + print(next_working_hour(datetime(2025, 6, 20, 7,59, 59))) + print(next_working_hour(datetime(2025, 6, 21, 7,59, 59))) + print(next_working_hour(datetime(2025, 6, 22, 7,59, 59))) + print(next_working_hour(datetime(2025, 6, 23, 7,59, 59))) + print(next_working_hour(datetime(2025, 6, 24, 7,59, 59))) + + print(next_working_hour(datetime(2025, 6, 19, 19,1, 0))) + print(next_working_hour(datetime(2025, 6, 20, 19,1, 0))) + print(next_working_hour(datetime(2025, 6, 21, 19,1, 0))) + print(next_working_hour(datetime(2025, 6, 22, 19,1, 0))) + print(next_working_hour(datetime(2025, 6, 23, 19,1, 0))) \ No newline at end of file