Initial version

This commit is contained in:
James Jennett-Wheeler 2025-06-23 11:06:51 +01:00
commit 1f59cfecd3
15 changed files with 564 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/driver

3
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

10
.idea/PlanningScraper.iml Normal file
View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.9 (PlanningScraper)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

6
.idea/misc.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.9 (PlanningScraper)" />
</component>
</project>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/PlanningScraper.iml" filepath="$PROJECT_DIR$/.idea/PlanningScraper.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

157
application.py Normal file
View File

@ -0,0 +1,157 @@
import time
from datetime import datetime
from sqlite3 import Cursor
import re
from typing import List
from exceptiongroup import catch
from prettytable import PrettyTable
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait as Wait
timeout = 5
base_url = "https://app.bathnes.gov.uk/webforms/planning"
current_date = datetime.today().strftime('%Y-%m-%d')
HEADER_START = re.compile(r'<p[^>]+font-weight-bold[^>]+>')
CLOSE_TAGS = re.compile(r'</[^>]+>')
REMAINING_TAGS = re.compile(r'<p[^>]+>')
class Application:
@staticmethod
def CreateTableIfNotExists(cursor: Cursor, reset = False):
if reset:
cursor.execute("DROP TABLE IF EXISTS applications;")
create_table = """ CREATE TABLE IF NOT EXISTS applications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
reference TEXT NOT NULL,
dateScraped TEXT NOT NULL,
dateDecided TEXT,
caseOfficer TEXT,
description TEXT,
decision TEXT,
num_documents INTEGER
); """
cursor.execute(create_table)
@staticmethod
def PrintTable(applications: List):
table = PrettyTable(['Ref', 'Scrape Date', 'Decision Date', 'Decision', 'Case Officer', 'Docs', 'Description'])
for application in applications:
if type(application) is Application:
table.add_row([application.reference, application.dateScraped, application.dateDecided, application.decision, application.caseOfficer, application.num_documents, application.description])
table.align = "l"
print(table)
def __init__(self, cursor: Cursor, reference: str):
self.cursor = cursor
self.reference = reference
self.url = f"{base_url}/details.html?refval={self.reference.replace('/', '%2F')}"
self.raw_data_map = {}
self.new_documents_found = False
cursor.execute("SELECT dateScraped, dateDecided, caseOfficer, description, decision, num_documents FROM applications WHERE reference = ?", (self.reference,))
result = cursor.fetchall()
if len(result) == 0:
insert_application = "INSERT INTO applications (reference, dateScraped) VALUES(?,?)"
cursor.execute(insert_application, (self.reference, current_date))
self.dateScraped = current_date
self.dateDecided = ""
self.caseOfficer = ""
self.description = ""
self.decision = ""
self.num_documents = 0
else:
self.dateScraped = result[0][0]
self.dateDecided = result[0][1] or ""
self.caseOfficer = result[0][2] or ""
self.description = result[0][3] or ""
self.decision = result[0][4] or ""
self.num_documents = result[0][5] or 0
def scrape_portal(self, browser: WebDriver, force: bool = False, count_documents: bool = False):
if not force and self.caseOfficer:
print(f"Already parsed {self.reference}")
return
print(f"Parsing {self.reference}")
browser.get(self.url)
details = Wait(browser, timeout=timeout).until(EC.visibility_of_element_located((By.ID, "details")))
self.__html_to_map(details.get_attribute('innerHTML'))
important_dates = Wait(browser, timeout=timeout).until(EC.invisibility_of_element_located((By.ID, "importantDates")))
self.__html_to_map(important_dates.get_attribute('innerHTML'))
self.__parse_raw_data()
update_sql = "UPDATE applications SET dateDecided = ?, caseOfficer = ?, description = ?, decision = ? WHERE reference = ?"
self.cursor.execute(update_sql, (self.dateDecided, self.caseOfficer, self.description, self.decision, self.reference))
if count_documents:
self.__count_documents(browser)
def __html_to_map(self, html: str):
details = HEADER_START.sub('<new_field>', html)
details = CLOSE_TAGS.sub('', details)
details = REMAINING_TAGS.sub('\t', details)
for detail_raw in details.split("<new_field>"):
detail = detail_raw.strip()
if detail:
k_v = detail.split('\t', 1)
if len(k_v) == 2:
self.raw_data_map[k_v[0]] = k_v[1]
else:
print(f"Error parsing: {detail}")
def __parse_raw_data(self):
self.description = self.raw_data_map["Proposal"].replace('\n', '<br>')
self.caseOfficer = self.raw_data_map["Case Officer Name"]
try:
self.decision = self.raw_data_map["Decision"]
except KeyError:
self.decision = None
try:
decision_date = self.raw_data_map["Decision Made"].split('/')
self.dateDecided = f"{decision_date[2]}-{decision_date[1]}-{decision_date[0]}"
except KeyError:
self.dateDecided = None
def __count_documents(self, browser: WebDriver):
documents_button = Wait(browser, timeout=timeout).until(EC.element_to_be_clickable((By.ID, "tab_documents_Section")))
documents_button.click()
documents_frame = Wait(browser, timeout=20).until(EC.visibility_of_element_located((By.ID, "iframe")))
browser.switch_to.frame(documents_frame)
Wait(browser, timeout=60).until(EC.none_of(EC.text_to_be_present_in_element((By.ID, "documents_info"), "No documents found")))
new_num_documents = int(browser.find_element(by=By.ID, value="documents_info").text.split(" of ")[1].replace(" documents", ""))
browser.switch_to.default_content()
if new_num_documents > self.num_documents:
self.num_documents = new_num_documents
self.new_documents_found = True
update_sql = "UPDATE applications SET num_documents = ? WHERE reference = ?"
self.cursor.execute(update_sql, (self.num_documents, self.reference))
def __str__(self):
return (f'Application: {self.reference}\n'
f'Date Decided: {self.dateDecided}\n'
f'Case Officer: {self.caseOfficer}\n'
f'Description: {self.description}\n'
f'Decision: {self.decision}'
f'\nDocument Count: {self.num_documents}' if self.num_documents > 0 else "")

BIN
database.db Normal file

Binary file not shown.

122
monitor-planning.py Normal file
View File

@ -0,0 +1,122 @@
import os
import sys
import traceback
from datetime import datetime, time
from sqlite3 import Cursor
import pause
import requests
from application import Application
import sqlite3
from selenium import webdriver
from weeklyList import WeeklyList
from workingHours import is_working_hours, next_working_hour, potential_midday_upload
refresh_rate_minutes = 5
search_past_week = 0
search_num_weeks = 1
reset_table = False
web_opts = webdriver.ChromeOptions()
web_opts.add_argument('--headless')
def notify(title, message):
api_url = 'https://hass.jennett-wheeler.co.uk/api/webhook/-Qx6jHsGLHwbBlJpLek5Nj8qS'
requests.post(api_url, json={"title": title, "message": message})
def update_other_applications():
there_were_newly_decided_applications = False
with sqlite3.connect("database.db") as _conn:
_cursor = _conn.cursor()
with webdriver.Chrome(options=web_opts) as _browser:
print("Scrape Weekly List(s)")
weekly_list = WeeklyList(_cursor)
for search_week_idx in range(search_past_week,
min(search_past_week + search_num_weeks, 9)): # Council only allow latest 9 weeks
weekly_list.scrape(_browser, search_week_idx)
there_were_newly_decided_applications = len(weekly_list.new_applications) > 0
print(" Number of new decided applications: " + str(len(weekly_list.new_applications)))
print(" Number of existing applications: " + str(len(weekly_list.existing_applications)))
print("")
if there_were_newly_decided_applications:
notify("New decisions found", f"Council has uploaded {len(weekly_list.new_applications)} new decisions")
_cursor.execute("SELECT reference FROM applications WHERE caseOfficer IS NULL")
newly_decided_applications = _cursor.fetchall()
if len(newly_decided_applications) > 0:
print(f"Scrape Newly Decided Applications: {len(newly_decided_applications)}")
for (application_ref, ) in newly_decided_applications:
_app = Application(_cursor, application_ref)
_app.scrape_portal(_browser)
print("")
return there_were_newly_decided_applications
if __name__ == '__main__':
try:
with sqlite3.connect("database.db") as connection:
cursor = connection.cursor()
Application.CreateTableIfNotExists(cursor, reset_table)
midday_checked = False
while True:
with sqlite3.connect("database.db") as connection:
application = Application(connection.cursor(), "25/00605/FUL")
with webdriver.Chrome(options=web_opts) as browser:
application.scrape_portal(browser, force=True, count_documents=True)
if application.new_documents_found:
notify("New Documents Found", f"Application now has {application.num_documents} documents")
print("")
if is_working_hours():
if not midday_checked and potential_midday_upload():
midday_checked = update_other_applications()
if midday_checked:
print(f"New decisions found at: {datetime.now().strftime('%H-%M-%S')}" )
pause.minutes(refresh_rate_minutes)
else:
if update_other_applications():
print(f"New decisions found at: {datetime.now().strftime('%H-%M-%S')}" )
next_start = next_working_hour()
print(f"Pausing until: {next_start}")
pause.until(next_start)
else:
if datetime.now().time() > time(19, 0, 0):
next_start = next_working_hour()
print(f"Pausing until: {next_start}")
pause.until(next_start)
else:
pause.minutes(refresh_rate_minutes)
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(130)
except SystemExit:
os._exit(130)
except Exception as e:
print(f'Error found: {repr(e)}')
print(traceback.format_exc())
notify("Error in planning monitor", repr(e))
try:
sys.exit(130)
except SystemExit:
os._exit(130)

52
scrape-my-application.py Normal file
View File

@ -0,0 +1,52 @@
import os
import sys
import time
import pause
import requests
from application import Application
import sqlite3
from selenium import webdriver
from workingHours import is_working_hours, next_working_hour
refresh_rate_minutes = 5
api_url = 'https://hass.jennett-wheeler.co.uk/api/webhook/-Qx6jHsGLHwbBlJpLek5Nj8qS'
if __name__ == '__main__':
try:
with sqlite3.connect("database.db") as connection:
cursor = connection.cursor()
options = webdriver.ChromeOptions()
options.add_argument('--headless')
application = Application(cursor, "25/00605/FUL")
num_documents = 18
while True:
if is_working_hours():
with webdriver.Chrome(options=options) as browser:
application.scrape_portal(browser, force=True, count_documents=True)
if num_documents < application.num_documents:
num_new_documents = application.num_documents - num_documents
num_documents = application.num_documents
requests.post(api_url)
print(f"New documents! {num_new_documents}")
pause.minutes(refresh_rate_minutes)
else:
next_start = next_working_hour()
print(f"Pausing until: {next_start}")
pause.until(next_start)
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(130)
except SystemExit:
os._exit(130)

View File

@ -0,0 +1,56 @@
import os
import sys
from application import Application
from weeklyList import WeeklyList
import sqlite3
from selenium import webdriver
import re
search_past_week = 0
search_num_weeks = 1
reset_table = False
TAG_RE = re.compile(r'<[^>]+>')
if __name__ == '__main__':
try:
with sqlite3.connect("database.db") as connection:
cursor = connection.cursor()
Application.CreateTableIfNotExists(cursor, reset_table)
options = webdriver.ChromeOptions()
options.add_argument('--headless')
with webdriver.Chrome(options=options) as browser:
print("Scrape Weekly List(s)")
weeklyList = WeeklyList(cursor)
for search_week_idx in range(search_past_week, min(search_past_week + search_num_weeks, 9)): # Council only allow latest 9 weeks
weeklyList.scrape(browser, search_week_idx)
print("Number of new decided applications: " + str(len(weeklyList.new_applications)))
print("Number of existing applications: " + str(len(weeklyList.existing_applications)))
print("")
cursor.execute("SELECT reference FROM applications WHERE caseOfficer IS NULL")
newly_decided_applications = cursor.fetchall()
if len(newly_decided_applications) > 0:
print(f"Scrape Newly Decided Applications: {len(newly_decided_applications)}")
for (application_ref, ) in newly_decided_applications:
application = Application(cursor, application_ref)
application.scrape_portal(browser)
print("")
print("Done")
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(130)
except SystemExit:
os._exit(130)

18
search-db.py Normal file
View File

@ -0,0 +1,18 @@
from application import Application
import sqlite3
with sqlite3.connect("database.db") as connection:
cursor = connection.cursor()
applications = []
print("This week's Application decisions:")
cursor.execute("SELECT reference FROM applications WHERE dateScraped >= '2025-06-23' ORDER BY dateDecided DESC")
# print("Chris' Applications:")
# cursor.execute("SELECT reference FROM applications WHERE caseOfficer = 'Christopher Masters' ORDER BY dateDecided DESC")
for (application_ref,) in cursor.fetchall():
applications.append(Application(cursor, application_ref))
Application.PrintTable(applications)

57
weeklyList.py Normal file
View File

@ -0,0 +1,57 @@
import time
from sqlite3 import Cursor
import re
from selenium.webdriver.support.select import Select
from application import Application
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait as Wait
timeout = 5
base_url = "https://app.bathnes.gov.uk/webforms/planning"
TAG_RE = re.compile(r'<[^>]+>')
class WeeklyList:
def __init__(self, cursor: Cursor):
Application.CreateTableIfNotExists(cursor)
self.cursor = cursor
self.new_applications = []
self.existing_applications = []
def scrape(self, browser: WebDriver, search_past_week = 0):
browser.refresh()
browser.get(f"{base_url}/search.html#weeklyList")
# Bring up list of decided applications
search_button = Wait(browser, timeout=timeout).until(EC.element_to_be_clickable((By.ID, "weeklySearchBtn")))
time.sleep(0.5) # Give a little extra time
search_type = Select(browser.find_element(by=By.ID, value="weeklyListOption"))
search_type.select_by_value('decided')
search_week = Select(browser.find_element(by=By.ID, value="weeklyListBetween"))
search_week.select_by_index(search_past_week)
week_str = search_week.options[search_past_week].text.split(" to ")[0]
print(f"Week: {week_str}")
search_button.click()
results = Wait(browser, timeout=timeout).until(EC.visibility_of_element_located((By.ID, "results-table")))
rows = results.find_elements(By.TAG_NAME, "tr")
for row in rows:
col = row.find_elements(By.TAG_NAME, "td")[0]
application_html = col.get_attribute('innerHTML').replace('\n', '<br>')
application_ref_html = application_html.strip().split("<br>")[0].strip()
application_ref = TAG_RE.sub('', application_ref_html).replace("Application Reference: ", "")
application = Application(self.cursor, application_ref)
if application.caseOfficer:
self.existing_applications.append(application)
else:
self.new_applications.append(application)

62
workingHours.py Normal file
View File

@ -0,0 +1,62 @@
from datetime import time, datetime, timedelta
def is_working_hours(date = datetime.now()):
if date.weekday() >= 5:
return False
start = time(8, 0, 0)
end = time(18, 0, 0)
current_time = date.time()
return start <= current_time <= end
def potential_midday_upload(date = datetime.now()):
if date.weekday() >= 5:
return False
midday_upload_time = time(14, 0, 0)
current_time = date.time()
return midday_upload_time <= current_time
def next_working_hour(date = datetime.now()):
if is_working_hours(date):
return date
potential_start = date.replace(hour=8, minute=0, second=0, microsecond=0)
if date > potential_start:
potential_start += timedelta(days=1)
while not is_working_hours(potential_start):
potential_start += timedelta(days=1)
return potential_start
if __name__ == '__main__':
# Test Times
assert is_working_hours(datetime(2025, 6, 20, 16,54, 0))
assert is_working_hours(datetime(2025, 6, 20, 18,54, 0))
assert is_working_hours(datetime(2025, 6, 20, 19,0, 0))
assert not is_working_hours(datetime(2025, 6, 20, 19,1, 0))
assert is_working_hours(datetime(2025, 6, 20, 8,0, 0))
assert not is_working_hours(datetime(2025, 6, 20, 7,59, 59))
# Test Week Day
assert not is_working_hours(datetime(2025, 6, 21, 16,54, 0))
assert not is_working_hours(datetime(2025, 6, 21, 18,54, 0))
assert not is_working_hours(datetime(2025, 6, 21, 19,0, 0))
assert not is_working_hours(datetime(2025, 6, 21, 19,1, 0))
assert not is_working_hours(datetime(2025, 6, 21, 8,0, 0))
assert not is_working_hours(datetime(2025, 6, 21, 7,59, 59))
print(next_working_hour(datetime(2025, 6, 20, 7,59, 59)))
print(next_working_hour(datetime(2025, 6, 21, 7,59, 59)))
print(next_working_hour(datetime(2025, 6, 22, 7,59, 59)))
print(next_working_hour(datetime(2025, 6, 23, 7,59, 59)))
print(next_working_hour(datetime(2025, 6, 24, 7,59, 59)))
print(next_working_hour(datetime(2025, 6, 19, 19,1, 0)))
print(next_working_hour(datetime(2025, 6, 20, 19,1, 0)))
print(next_working_hour(datetime(2025, 6, 21, 19,1, 0)))
print(next_working_hour(datetime(2025, 6, 22, 19,1, 0)))
print(next_working_hour(datetime(2025, 6, 23, 19,1, 0)))