monitor-planning/app/application.py

157 lines
6.4 KiB
Python

from datetime import datetime
from sqlite3 import Cursor
import re
from typing import List
from prettytable import PrettyTable
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait as Wait
timeout = 60
base_url = "https://app.bathnes.gov.uk/webforms/planning"
current_date = datetime.today().strftime('%Y-%m-%d')
HEADER_START = re.compile(r'<p[^>]+font-weight-bold[^>]+>')
CLOSE_TAGS = re.compile(r'</[^>]+>')
REMAINING_TAGS = re.compile(r'<p[^>]+>')
class Application:
@staticmethod
def CreateTableIfNotExists(cursor: Cursor, reset = False):
if reset:
cursor.execute("DROP TABLE IF EXISTS applications;")
create_table = """ CREATE TABLE IF NOT EXISTS applications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
reference TEXT NOT NULL,
dateScraped TEXT NOT NULL,
dateDecided TEXT,
caseOfficer TEXT,
description TEXT,
decision TEXT,
num_documents INTEGER
); """
cursor.execute(create_table)
@staticmethod
def PrintTable(applications: List):
table = PrettyTable(['Ref', 'Scrape Date', 'Decision Date', 'Decision', 'Case Officer', 'Docs', 'Description'])
for application in applications:
if type(application) is Application:
table.add_row([application.reference, application.dateScraped, application.dateDecided, application.decision, application.caseOfficer, application.num_documents, application.description])
table.align = "l"
table.max_width["Description"] = 200
print(table)
def __init__(self, cursor: Cursor, reference: str):
self.cursor = cursor
self.reference = reference
self.url = f"{base_url}/details.html?refval={self.reference.replace('/', '%2F')}"
self.raw_data_map = {}
self.new_documents_found = False
cursor.execute("SELECT dateScraped, dateDecided, caseOfficer, description, decision, num_documents FROM applications WHERE reference = ?", (self.reference,))
result = cursor.fetchall()
if len(result) == 0:
insert_application = "INSERT INTO applications (reference, dateScraped) VALUES(?,?)"
cursor.execute(insert_application, (self.reference, current_date))
self.dateScraped = current_date
self.dateDecided = ""
self.caseOfficer = ""
self.description = ""
self.decision = ""
self.num_documents = 0
else:
self.dateScraped = result[0][0]
self.dateDecided = result[0][1] or ""
self.caseOfficer = result[0][2] or ""
self.description = result[0][3] or ""
self.decision = result[0][4] or ""
self.num_documents = result[0][5] or 0
def scrape_portal(self, browser: WebDriver, force: bool = False, count_documents: bool = False):
if not force and self.caseOfficer:
print(f"Already parsed {self.reference}")
return
print(f"Parsing {self.reference}")
browser.get(self.url)
details = Wait(browser, timeout=timeout).until(EC.visibility_of_element_located((By.ID, "details")))
self.__html_to_map(details.get_attribute('innerHTML'))
important_dates = Wait(browser, timeout=timeout).until(EC.invisibility_of_element_located((By.ID, "importantDates")))
self.__html_to_map(important_dates.get_attribute('innerHTML'))
self.__parse_raw_data()
update_sql = "UPDATE applications SET dateDecided = ?, caseOfficer = ?, description = ?, decision = ? WHERE reference = ?"
self.cursor.execute(update_sql, (self.dateDecided, self.caseOfficer, self.description, self.decision, self.reference))
if count_documents:
self.__count_documents(browser)
def __html_to_map(self, html: str):
details = HEADER_START.sub('<new_field>', html)
details = CLOSE_TAGS.sub('', details)
details = REMAINING_TAGS.sub('\t', details)
for detail_raw in details.split("<new_field>"):
detail = detail_raw.strip()
if detail:
k_v = detail.split('\t', 1)
if len(k_v) == 2:
self.raw_data_map[k_v[0]] = k_v[1]
else:
print(f"Error parsing: {detail}")
def __parse_raw_data(self):
self.description = self.raw_data_map["Proposal"].replace('\n', '<br>')
self.caseOfficer = self.raw_data_map["Case Officer Name"]
try:
self.decision = self.raw_data_map["Decision"]
except KeyError:
self.decision = None
try:
decision_date = self.raw_data_map["Decision Made"].split('/')
self.dateDecided = f"{decision_date[2]}-{decision_date[1]}-{decision_date[0]}"
except KeyError:
self.dateDecided = None
def __count_documents(self, browser: WebDriver):
documents_button = Wait(browser, timeout=timeout).until(EC.element_to_be_clickable((By.ID, "tab_documents_Section")))
documents_button.click()
documents_frame = Wait(browser, timeout=timeout).until(EC.visibility_of_element_located((By.ID, "iframe")))
browser.switch_to.frame(documents_frame)
Wait(browser, timeout=timeout).until(EC.none_of(EC.text_to_be_present_in_element((By.ID, "documents_info"), "No documents found")))
new_num_documents = int(browser.find_element(by=By.ID, value="documents_info").text.split(" of ")[1].replace(" documents", ""))
browser.switch_to.default_content()
if new_num_documents > self.num_documents:
self.num_documents = new_num_documents
self.new_documents_found = True
update_sql = "UPDATE applications SET num_documents = ? WHERE reference = ?"
self.cursor.execute(update_sql, (self.num_documents, self.reference))
def __str__(self):
return (f'Application: {self.reference}\n'
f'Date Decided: {self.dateDecided}\n'
f'Case Officer: {self.caseOfficer}\n'
f'Description: {self.description}\n'
f'Decision: {self.decision}'
f'\nDocument Count: {self.num_documents}' if self.num_documents > 0 else "")