157 lines
6.4 KiB
Python
157 lines
6.4 KiB
Python
from datetime import datetime
|
|
from sqlite3 import Cursor
|
|
import re
|
|
from typing import List
|
|
|
|
from prettytable import PrettyTable
|
|
|
|
from selenium.webdriver.chrome.webdriver import WebDriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.support.wait import WebDriverWait as Wait
|
|
|
|
timeout = 60
|
|
base_url = "https://app.bathnes.gov.uk/webforms/planning"
|
|
current_date = datetime.today().strftime('%Y-%m-%d')
|
|
|
|
HEADER_START = re.compile(r'<p[^>]+font-weight-bold[^>]+>')
|
|
CLOSE_TAGS = re.compile(r'</[^>]+>')
|
|
REMAINING_TAGS = re.compile(r'<p[^>]+>')
|
|
|
|
|
|
class Application:
|
|
@staticmethod
|
|
def CreateTableIfNotExists(cursor: Cursor, reset = False):
|
|
if reset:
|
|
cursor.execute("DROP TABLE IF EXISTS applications;")
|
|
|
|
create_table = """ CREATE TABLE IF NOT EXISTS applications (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
reference TEXT NOT NULL,
|
|
dateScraped TEXT NOT NULL,
|
|
dateDecided TEXT,
|
|
caseOfficer TEXT,
|
|
description TEXT,
|
|
decision TEXT,
|
|
num_documents INTEGER
|
|
); """
|
|
cursor.execute(create_table)
|
|
|
|
@staticmethod
|
|
def PrintTable(applications: List):
|
|
table = PrettyTable(['Ref', 'Scrape Date', 'Decision Date', 'Decision', 'Case Officer', 'Docs', 'Description'])
|
|
for application in applications:
|
|
if type(application) is Application:
|
|
table.add_row([application.reference, application.dateScraped, application.dateDecided, application.decision, application.caseOfficer, application.num_documents, application.description])
|
|
table.align = "l"
|
|
table.max_width["Description"] = 200
|
|
print(table)
|
|
|
|
def __init__(self, cursor: Cursor, reference: str):
|
|
self.cursor = cursor
|
|
self.reference = reference
|
|
self.url = f"{base_url}/details.html?refval={self.reference.replace('/', '%2F')}"
|
|
self.raw_data_map = {}
|
|
self.new_documents_found = False
|
|
|
|
cursor.execute("SELECT dateScraped, dateDecided, caseOfficer, description, decision, num_documents FROM applications WHERE reference = ?", (self.reference,))
|
|
result = cursor.fetchall()
|
|
|
|
if len(result) == 0:
|
|
insert_application = "INSERT INTO applications (reference, dateScraped) VALUES(?,?)"
|
|
cursor.execute(insert_application, (self.reference, current_date))
|
|
self.dateScraped = current_date
|
|
self.dateDecided = ""
|
|
self.caseOfficer = ""
|
|
self.description = ""
|
|
self.decision = ""
|
|
self.num_documents = 0
|
|
else:
|
|
self.dateScraped = result[0][0]
|
|
self.dateDecided = result[0][1] or ""
|
|
self.caseOfficer = result[0][2] or ""
|
|
self.description = result[0][3] or ""
|
|
self.decision = result[0][4] or ""
|
|
self.num_documents = result[0][5] or 0
|
|
|
|
def scrape_portal(self, browser: WebDriver, force: bool = False, count_documents: bool = False):
|
|
if not force and self.caseOfficer:
|
|
print(f"Already parsed {self.reference}")
|
|
return
|
|
|
|
print(f"Parsing {self.reference}")
|
|
|
|
browser.get(self.url)
|
|
|
|
details = Wait(browser, timeout=timeout).until(EC.visibility_of_element_located((By.ID, "details")))
|
|
self.__html_to_map(details.get_attribute('innerHTML'))
|
|
|
|
important_dates = Wait(browser, timeout=timeout).until(EC.invisibility_of_element_located((By.ID, "importantDates")))
|
|
self.__html_to_map(important_dates.get_attribute('innerHTML'))
|
|
|
|
self.__parse_raw_data()
|
|
|
|
update_sql = "UPDATE applications SET dateDecided = ?, caseOfficer = ?, description = ?, decision = ? WHERE reference = ?"
|
|
self.cursor.execute(update_sql, (self.dateDecided, self.caseOfficer, self.description, self.decision, self.reference))
|
|
|
|
if count_documents:
|
|
self.__count_documents(browser)
|
|
|
|
def __html_to_map(self, html: str):
|
|
details = HEADER_START.sub('<new_field>', html)
|
|
details = CLOSE_TAGS.sub('', details)
|
|
details = REMAINING_TAGS.sub('\t', details)
|
|
|
|
for detail_raw in details.split("<new_field>"):
|
|
detail = detail_raw.strip()
|
|
if detail:
|
|
k_v = detail.split('\t', 1)
|
|
|
|
if len(k_v) == 2:
|
|
self.raw_data_map[k_v[0]] = k_v[1]
|
|
else:
|
|
print(f"Error parsing: {detail}")
|
|
|
|
def __parse_raw_data(self):
|
|
self.description = self.raw_data_map["Proposal"].replace('\n', '<br>')
|
|
self.caseOfficer = self.raw_data_map["Case Officer Name"]
|
|
|
|
try:
|
|
self.decision = self.raw_data_map["Decision"]
|
|
except KeyError:
|
|
self.decision = None
|
|
|
|
try:
|
|
decision_date = self.raw_data_map["Decision Made"].split('/')
|
|
self.dateDecided = f"{decision_date[2]}-{decision_date[1]}-{decision_date[0]}"
|
|
except KeyError:
|
|
self.dateDecided = None
|
|
|
|
def __count_documents(self, browser: WebDriver):
|
|
documents_button = Wait(browser, timeout=timeout).until(EC.element_to_be_clickable((By.ID, "tab_documents_Section")))
|
|
documents_button.click()
|
|
|
|
documents_frame = Wait(browser, timeout=timeout).until(EC.visibility_of_element_located((By.ID, "iframe")))
|
|
browser.switch_to.frame(documents_frame)
|
|
|
|
Wait(browser, timeout=timeout).until(EC.none_of(EC.text_to_be_present_in_element((By.ID, "documents_info"), "No documents found")))
|
|
new_num_documents = int(browser.find_element(by=By.ID, value="documents_info").text.split(" of ")[1].replace(" documents", ""))
|
|
|
|
browser.switch_to.default_content()
|
|
|
|
if new_num_documents > self.num_documents:
|
|
self.num_documents = new_num_documents
|
|
self.new_documents_found = True
|
|
|
|
update_sql = "UPDATE applications SET num_documents = ? WHERE reference = ?"
|
|
self.cursor.execute(update_sql, (self.num_documents, self.reference))
|
|
|
|
|
|
def __str__(self):
|
|
return (f'Application: {self.reference}\n'
|
|
f'Date Decided: {self.dateDecided}\n'
|
|
f'Case Officer: {self.caseOfficer}\n'
|
|
f'Description: {self.description}\n'
|
|
f'Decision: {self.decision}'
|
|
f'\nDocument Count: {self.num_documents}' if self.num_documents > 0 else "")
|