""" Website Change Alert System Monitors a URL for content changes and sends email notifications. Supports optional XPath/CSS selectors for monitoring specific page sections. """ import hashlib import logging import logging.handlers import smtplib import sys from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from pathlib import Path from typing import Optional import requests from environs import Env from lxml import html def get_page_content(url: str, selector: Optional[str] = None, selector_type: str = "xpath") -> str: """ Fetch webpage content, optionally filtered by selector. Args: url: The URL to fetch selector: Optional XPath or CSS selector to extract specific content selector_type: Type of selector - "xpath" or "css" Returns: The page content (full HTML or selected portion) """ response = requests.get(url, timeout=30) response.raise_for_status() if not selector: return response.text # Parse HTML and extract selected content tree = html.fromstring(response.content) if selector_type == "xpath": elements = tree.xpath(selector) else: # css elements = tree.cssselect(selector) if not elements: raise ValueError(f"No elements found matching selector: {selector}") # Convert elements to string content if isinstance(elements[0], str): # XPath returned text directly return " ".join(str(e) for e in elements) else: # Got element objects return " ".join(html.tostring(e, encoding="unicode") for e in elements) def compute_hash(content: str) -> str: """Compute SHA256 hash of content.""" return hashlib.sha256(content.encode("utf-8")).hexdigest() def load_cached_hash(cache_file: Path) -> Optional[str]: """Load previously cached hash from file.""" if cache_file.exists(): return cache_file.read_text().strip() return None def save_hash(cache_file: Path, hash_value: str) -> None: """Save hash to cache file.""" cache_file.parent.mkdir(parents=True, exist_ok=True) cache_file.write_text(hash_value) def send_email( smtp_host: str, smtp_port: int, smtp_user: str, smtp_password: str, from_addr: str, to_addr: str, url: str, use_tls: bool = True, ) -> None: """ Send email notification about website change. Args: smtp_host: SMTP server hostname smtp_port: SMTP server port smtp_user: SMTP username smtp_password: SMTP password from_addr: From email address to_addr: To email address url: The URL that changed use_tls: Whether to use TLS encryption """ msg = MIMEMultipart() msg["From"] = from_addr msg["To"] = to_addr msg["Subject"] = f"Website Changed: {url}" body = f""" The website you are monitoring has changed: URL: {url} Time: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Please visit the URL to see the changes. """ msg.attach(MIMEText(body, "plain")) # Send email if use_tls: server = smtplib.SMTP(smtp_host, smtp_port) server.starttls() else: server = smtplib.SMTP_SSL(smtp_host, smtp_port) server.login(smtp_user, smtp_password) server.send_message(msg) server.quit() def main() -> int: """Main execution function.""" # Load configuration from environment env = Env() env.read_env() # Configure logging log_format = "[%(asctime)s] %(levelname)s: %(message)s" log_level = logging.INFO # Get log file path from environment (optional) log_file = env.str("LOG_FILE", default=None) # Setup logging handlers handlers = [] # Always add console handler console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter(log_format)) handlers.append(console_handler) # Add file handler if log file is configured if log_file: log_path = Path(log_file) log_path.parent.mkdir(parents=True, exist_ok=True) file_handler = logging.handlers.RotatingFileHandler( log_path, maxBytes=10 * 1024 * 1024, # 10 MB backupCount=5 ) file_handler.setFormatter(logging.Formatter(log_format)) handlers.append(file_handler) # Configure root logger logging.basicConfig(level=log_level, format=log_format, handlers=handlers, force=True) url = env.str("URL") selector = env.str("SELECTOR", default=None) selector_type = env.str("SELECTOR_TYPE", default="xpath") cache_file = Path(env.str("CACHE_FILE", default=".cache/hash.txt")) # Email configuration smtp_host = env.str("SMTP_HOST") smtp_port = env.int("SMTP_PORT", default=587) smtp_user = env.str("SMTP_USER") smtp_password = env.str("SMTP_PASSWORD") from_addr = env.str("FROM_EMAIL") to_addr = env.str("TO_EMAIL") use_tls = env.bool("SMTP_USE_TLS", default=True) try: # Fetch current content logging.info("Fetching content from %s...", url) content = get_page_content(url, selector, selector_type) current_hash = compute_hash(content) # Load cached hash cached_hash = load_cached_hash(cache_file) if cached_hash is None: # First run - save hash and exit logging.info("First run - saving initial hash") save_hash(cache_file, current_hash) return 0 if current_hash != cached_hash: # Content changed - send email logging.warning("Content has changed! Sending email notification...") send_email( smtp_host, smtp_port, smtp_user, smtp_password, from_addr, to_addr, url, use_tls ) # Update cached hash save_hash(cache_file, current_hash) logging.info("Email sent and cache updated") return 0 else: logging.info("No changes detected") return 0 except Exception as e: # pylint: disable=broad-except logging.error("Error: %s", e) return 1 if __name__ == "__main__": sys.exit(main())