import time from datetime import datetime from sqlite3 import Cursor import re from typing import List from exceptiongroup import catch from prettytable import PrettyTable from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait as Wait timeout = 5 base_url = "https://app.bathnes.gov.uk/webforms/planning" current_date = datetime.today().strftime('%Y-%m-%d') HEADER_START = re.compile(r']+font-weight-bold[^>]+>') CLOSE_TAGS = re.compile(r']+>') REMAINING_TAGS = re.compile(r']+>') class Application: @staticmethod def CreateTableIfNotExists(cursor: Cursor, reset = False): if reset: cursor.execute("DROP TABLE IF EXISTS applications;") create_table = """ CREATE TABLE IF NOT EXISTS applications ( id INTEGER PRIMARY KEY AUTOINCREMENT, reference TEXT NOT NULL, dateScraped TEXT NOT NULL, dateDecided TEXT, caseOfficer TEXT, description TEXT, decision TEXT, num_documents INTEGER ); """ cursor.execute(create_table) @staticmethod def PrintTable(applications: List): table = PrettyTable(['Ref', 'Scrape Date', 'Decision Date', 'Decision', 'Case Officer', 'Docs', 'Description']) for application in applications: if type(application) is Application: table.add_row([application.reference, application.dateScraped, application.dateDecided, application.decision, application.caseOfficer, application.num_documents, application.description]) table.align = "l" print(table) def __init__(self, cursor: Cursor, reference: str): self.cursor = cursor self.reference = reference self.url = f"{base_url}/details.html?refval={self.reference.replace('/', '%2F')}" self.raw_data_map = {} self.new_documents_found = False cursor.execute("SELECT dateScraped, dateDecided, caseOfficer, description, decision, num_documents FROM applications WHERE reference = ?", (self.reference,)) result = cursor.fetchall() if len(result) == 0: insert_application = "INSERT INTO applications (reference, dateScraped) VALUES(?,?)" cursor.execute(insert_application, (self.reference, current_date)) self.dateScraped = current_date self.dateDecided = "" self.caseOfficer = "" self.description = "" self.decision = "" self.num_documents = 0 else: self.dateScraped = result[0][0] self.dateDecided = result[0][1] or "" self.caseOfficer = result[0][2] or "" self.description = result[0][3] or "" self.decision = result[0][4] or "" self.num_documents = result[0][5] or 0 def scrape_portal(self, browser: WebDriver, force: bool = False, count_documents: bool = False): if not force and self.caseOfficer: print(f"Already parsed {self.reference}") return print(f"Parsing {self.reference}") browser.get(self.url) details = Wait(browser, timeout=timeout).until(EC.visibility_of_element_located((By.ID, "details"))) self.__html_to_map(details.get_attribute('innerHTML')) important_dates = Wait(browser, timeout=timeout).until(EC.invisibility_of_element_located((By.ID, "importantDates"))) self.__html_to_map(important_dates.get_attribute('innerHTML')) self.__parse_raw_data() update_sql = "UPDATE applications SET dateDecided = ?, caseOfficer = ?, description = ?, decision = ? WHERE reference = ?" self.cursor.execute(update_sql, (self.dateDecided, self.caseOfficer, self.description, self.decision, self.reference)) if count_documents: self.__count_documents(browser) def __html_to_map(self, html: str): details = HEADER_START.sub('', html) details = CLOSE_TAGS.sub('', details) details = REMAINING_TAGS.sub('\t', details) for detail_raw in details.split(""): detail = detail_raw.strip() if detail: k_v = detail.split('\t', 1) if len(k_v) == 2: self.raw_data_map[k_v[0]] = k_v[1] else: print(f"Error parsing: {detail}") def __parse_raw_data(self): self.description = self.raw_data_map["Proposal"].replace('\n', '
') self.caseOfficer = self.raw_data_map["Case Officer Name"] try: self.decision = self.raw_data_map["Decision"] except KeyError: self.decision = None try: decision_date = self.raw_data_map["Decision Made"].split('/') self.dateDecided = f"{decision_date[2]}-{decision_date[1]}-{decision_date[0]}" except KeyError: self.dateDecided = None def __count_documents(self, browser: WebDriver): documents_button = Wait(browser, timeout=timeout).until(EC.element_to_be_clickable((By.ID, "tab_documents_Section"))) documents_button.click() documents_frame = Wait(browser, timeout=20).until(EC.visibility_of_element_located((By.ID, "iframe"))) browser.switch_to.frame(documents_frame) Wait(browser, timeout=60).until(EC.none_of(EC.text_to_be_present_in_element((By.ID, "documents_info"), "No documents found"))) new_num_documents = int(browser.find_element(by=By.ID, value="documents_info").text.split(" of ")[1].replace(" documents", "")) browser.switch_to.default_content() if new_num_documents > self.num_documents: self.num_documents = new_num_documents self.new_documents_found = True update_sql = "UPDATE applications SET num_documents = ? WHERE reference = ?" self.cursor.execute(update_sql, (self.num_documents, self.reference)) def __str__(self): return (f'Application: {self.reference}\n' f'Date Decided: {self.dateDecided}\n' f'Case Officer: {self.caseOfficer}\n' f'Description: {self.description}\n' f'Decision: {self.decision}' f'\nDocument Count: {self.num_documents}' if self.num_documents > 0 else "")