initial commit

This commit is contained in:
flo-eberle
2026-02-15 10:04:41 +01:00
commit 1819779aac
20 changed files with 1078 additions and 0 deletions

14
.env.example Normal file
View File

@@ -0,0 +1,14 @@
# Email Configuration
SMTP_SERVER=smtp.gmail.com
SMTP_PORT=587
EMAIL_USERNAME=your-email@gmail.com
EMAIL_PASSWORD=your-app-password
EMAIL_FROM=your-email@gmail.com
EMAIL_TO=recipient@example.com
# Email Security (optional)
# Options: none, ssl, tls, starttls
EMAIL_SECURITY=starttls
# Optional: Multiple recipients (comma-separated)
# EMAIL_TO=recipient1@example.com,recipient2@example.com

42
.gitignore vendored Normal file
View File

@@ -0,0 +1,42 @@
# Python
__pycache__
*.pyc
*.pyo
*.pyd
.Python
env
pip-log.txt
pip-delete-this-directory.txt
.tox
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.log
.git
.mypy_cache
.pytest_cache
.hypothesis
# IDE
.vscode
.idea
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Local data
data/*.csv
# Environment variables
.env
.env.local
.env.production
# Docker
.dockerignore

38
Dockerfile Normal file
View File

@@ -0,0 +1,38 @@
# Use Python 3.11 slim image for ARM64 compatibility
FROM python:3.11-slim-bullseye
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PLAYWRIGHT_BROWSERS_PATH=/ms-playwright
# Install system dependencies
RUN apt-get update && apt-get install -y \
chromium \
wget \
gnupg \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Install Playwright browsers
RUN playwright install chromium
RUN playwright install-deps chromium
# Copy application code
COPY src/ ./src/
# Create data directory
RUN mkdir -p data
# Create non-root user
RUN useradd -m -u 1000 scraper && chown -R scraper:scraper /app
USER scraper
# Default command
CMD ["python", "src/main.py"]

289
README.md Normal file
View File

@@ -0,0 +1,289 @@
# Flat Scraper
Automatischer Web Scraper für Wohnungangebote auf NHG.at mit Benachrichtigungen bei neuen Ergebnissen.
## Features
- 🏠 **Automatisches Scraping** von NHG.at Wohnungsangeboten
- 📍 **PLZ-basierte Suche** für 1120, 1140, 1150, 1160
- 📊 **CSV Storage** für Ergebnisverfolgung
- 🔔 **Benachrichtigungen** bei neuen Wohnungen (Console + Email)
- 🐳 **Docker Support** für Raspberry Pi (ARM64)
-**Automatisierte Ausführung** alle 6 Stunden
- 🔐 **Environment Variables** für sensitive Daten (.env)
- 📧 **Email Security** (SSL/TLS/STARTTLS Support)
## Projektstruktur
```
flat_scraper/
├── src/
│ ├── scrapers/ # Scraper Module
│ │ ├── base_scraper.py # Basis-Klasse
│ │ └── nhg_scraper.py # NHG.at spezifisch
│ ├── storage/ # Daten-Speicher
│ │ └── csv_storage.py # CSV-basiert
│ ├── notifier/ # Benachrichtigungen
│ │ └── email_notifier.py
│ ├── config/ # Konfiguration
│ │ └── sites.yaml
│ ├── config_loader.py # Konfigurations-Loader mit .env Support
│ └── main.py # Hauptanwendung
├── data/ # CSV Ergebnisse
├── .env.example # Environment Vorlage
├── .env # Deine sensitiven Daten (nicht in VCS)
├── .gitignore # Git ignore für .env und data/
├── requirements.txt # Python Dependencies
├── Dockerfile # ARM64 optimiert
├── docker-compose.yml # Automatisierung
└── README.md
```
## Quick Start
### 1. Environment einrichten
```bash
# Environment Vorlage kopieren
cp .env.example .env
# Deine Daten eintragen
vim .env
```
**Wichtige .env Variablen:**
```bash
SMTP_SERVER=smtp.gmail.com
SMTP_PORT=587
EMAIL_USERNAME=deine-email@gmail.com
EMAIL_PASSWORD=dein-app-password
EMAIL_FROM=deine-email@gmail.com
EMAIL_TO=empfänger@example.com
EMAIL_SECURITY=starttls # Options: none, ssl, tls, starttls
```
### 2. Docker auf Raspberry Pi
```bash
# Build und Start
docker-compose up -d
# Logs ansehen
docker-compose logs -f flat-scraper
# Scheduler starten (automatisch alle 6 Stunden)
docker-compose up -d scheduler
```
### 3. Manuelles Testen
```bash
# Einmaliger Lauf
docker-compose run --rm flat-scraper
# Mit Environment File
docker run --rm -v $(pwd):/app --env-file .env flat-scraper-test python src/main.py
```
## Konfiguration
### Sites konfigurieren (`src/config/sites.yaml`)
```yaml
sites:
nhg:
name: "Neue Heimat Gewog"
url: "https://nhg.at/immobilienangebot/wohnungsangebot/"
scraper_class: "nhg_scraper.NHGScraper"
enabled: true
search_params:
plz_list:
- "1120 Wien"
- "1140 Wien"
- "1150 Wien"
- "1160 Wien"
schedule:
cron: "0 */6 * * *" # Alle 6 Stunden
timezone: "Europe/Vienna"
```
### Email-Benachrichtigungen
```yaml
notification:
email:
enabled: true
smtp_server: "${SMTP_SERVER}"
smtp_port: "${SMTP_PORT}"
username: "${EMAIL_USERNAME}"
password: "${EMAIL_PASSWORD}"
from_email: "${EMAIL_FROM}"
to_emails:
- "${EMAIL_TO}"
security: "${EMAIL_SECURITY:starttls}" # Options: none, ssl, tls, starttls
console:
enabled: true # Immer für Debugging
```
## Datenformat
Ergebnisse werden als CSV gespeichert:
```csv
scrape_time,plz,address,link,hash,scraper
2024-01-15T10:30:00,1120,"1120 Wien, Flurschützstraße 5 / 2 / 10",https://...,abc123,nhg
```
**Hash-basierter Vergleich** vermeidet Duplikate zwischen Läufen.
## Erweiterbarkeit
### Neue Webseite hinzufügen
1. **Neue Scraper-Klasse** in `src/scrapers/`:
```python
from .base_scraper import BaseScraper
class NewSiteScraper(BaseScraper):
async def scrape(self, search_params):
# Implementierung
pass
```
2. **Konfiguration erweitern**:
```yaml
sites:
new_site:
name: "New Site"
url: "https://example.com"
scraper_class: "new_site_scraper.NewSiteScraper"
enabled: true
search_params:
# Site-spezifische Parameter
```
### Environment Variables
Der ConfigLoader unterstützt **automatische Substitution**:
```yaml
# In YAML
smtp_server: "${SMTP_SERVER}"
username: "${EMAIL_USERNAME:default@example.com}" # Mit Default
```
## Deployment auf Raspberry Pi
### ARM64 Support
Der Dockerfile ist für ARM64 optimiert:
```dockerfile
FROM python:3.11-slim-bullseye
# ARM64 optimierte Browser Installation
RUN apt-get update && apt-get install -y chromium
```
### Performance-Tipps
- `--no-sandbox` für Chromium (im Dockerfile berücksichtigt)
- Shared Browser Path: `PLAYWRIGHT_BROWSERS_PATH=/ms-playwright`
- Memory-optimierte Settings
- Environment Variables statt Hardcoding
### Docker Compose Features
- **Volume Mounting**: `./data:/app/data` für persistente CSVs
- **Environment Support**: `--env-file .env` für sensitive Daten
- **Scheduler Service**: Automatische Ausführung alle 6 Stunden
- **Restart Policy**: `unless-stopped` für Zuverlässigkeit
## Troubleshooting
### Häufige Probleme
1. **Browser startet nicht**: `playwright install-deps chromium`
2. **Keine Ergebnisse**: PLZ nicht verfügbar oder Website geändert
3. **Email funktioniert nicht**: SMTP-Einstellungen und Security prüfen
4. **Environment nicht geladen**: `.env` Datei prüfen und Rechte
### Debugging
```bash
# Logs ansehen
docker-compose logs -f flat-scraper
# Manuell testen
docker-compose run --rm flat-scraper python src/main.py
# Email Test
docker run --rm -v $(pwd):/app --env-file .env flat-scraper-test python -c "
from src.notifier.email_notifier import EmailNotifier
from src.config_loader import ConfigLoader
config = ConfigLoader()
notifier = EmailNotifier(config.get_notification_config()['email'])
test_results = [{'plz': '1120', 'address': 'Test', 'link': '#', 'hash': 'test'}]
notifier.send_notification('test', test_results)
"
```
## Entwicklung
### Testing
```bash
# Einzelnen Scraper testen
python -c "
import asyncio
from src.scrapers.nhg_scraper import NHGScraper
scraper = NHGScraper({'url': 'https://nhg.at/immobilienangebot/wohnungsangebot/', 'search_params': {'plz_list': ['1120 Wien']}})
results = asyncio.run(scraper.scrape())
print(results)
"
```
### Logging
Logs werden automatisch geschrieben:
- Level: `INFO` (kann in `sites.yaml` angepasst werden)
- Format: `Zeitstempel - Modul - Level - Nachricht`
- Output: Console + Docker Logs
## Sicherheit
### Environment Variables
- **`.env`** wird nicht in Git eingecheckt (siehe `.gitignore`)
- **`.env.example`** als Vorlage für das Team
- **Keine Passwörter** im Code oder in YAML
- **Docker Secrets** optional für Production
### Email Security
Unterstützte Security Modi:
- **`none`** - Keine Verschlüsselung
- **`ssl`** - SMTP_SSL (Port 465)
- **`tls`** - Explicit TLS (Port 587 + STARTTLS)
- **`starttls`** - STARTTLS (Standard für Gmail)
- **`ssl/tls`** - Kompatibilitätsmodus
## Architektur
### Hybrid-Ansatz
- **BaseScraper**: Gemeinsame Funktionalität (Hashing, Metadata)
- **Site-spezifische Scraper**: Individuelle Implementierungen
- **Config-Driven**: YAML Konfiguration mit Environment Support
- **Modular**: Storage und Notifier austauschbar
### Datenfluss
```
Config → Scraper → Results → Storage → Comparison → Notifier
↓ ↓ ↓ ↓ ↓
Environment Playwright CSV Hash-Vergleich Email/Console
```
## Lizenz
MIT License

BIN
debug.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 153 KiB

48
docker-compose.yml Normal file
View File

@@ -0,0 +1,48 @@
version: '3.8'
services:
flat-scraper:
build: .
restart: unless-stopped
volumes:
- ./data:/app/data
- ./src/config:/app/src/config
- ./.env:/app/.env:ro
environment:
- TZ=Europe/Vienna
# For ARM64 (Raspberry Pi) - uncomment if needed
# platform: linux/arm64
# Optional: Add a scheduler service for automated runs
scheduler:
build: .
restart: unless-stopped
volumes:
- ./data:/app/data
- ./src/config:/app/src/config
- ./.env:/app/.env:ro
environment:
- TZ=Europe/Vienna
command: >
python -c "
import schedule
import time
from src.main import FlatScraper
def run_scraper():
scraper = FlatScraper()
scraper.run_once()
# Schedule every 6 hours
schedule.every(6).hours.do(run_scraper)
print('Scheduler started. Running every 6 hours.')
run_scraper() # Run immediately
while True:
schedule.run_pending()
time.sleep(60)
"
# platform: linux/arm64
depends_on:
- flat-scraper

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
playwright==1.40.0
pandas==2.1.4
pydantic==2.5.2
pyyaml==6.0.1
apscheduler==3.10.4

0
src/config/__init__.py Normal file
View File

38
src/config/sites.yaml Normal file
View File

@@ -0,0 +1,38 @@
sites:
nhg:
name: "Neue Heimat Gewog"
url: "https://nhg.at/immobilienangebot/wohnungsangebot/"
scraper_class: "nhg_scraper.NHGScraper"
enabled: true
search_params:
plz_list:
- "1120 Wien"
- "1140 Wien"
- "1150 Wien"
- "1160 Wien"
schedule:
cron: "0 */6 * * *" # Alle 6 Stunden
timezone: "Europe/Vienna"
# Email notification settings
notification:
email:
enabled: true # Set to true to enable email notifications
smtp_server: "${SMTP_SERVER}"
smtp_port: "${SMTP_PORT}"
username: "${EMAIL_USERNAME}"
password: "${EMAIL_PASSWORD}"
from_email: "${EMAIL_FROM}"
to_emails:
- "${EMAIL_TO}"
security: "${EMAIL_SECURITY:starttls}" # Options: none, ssl, tls, starttls
console:
enabled: true # Always enabled for debugging
# General settings
general:
data_dir: "data"
log_level: "INFO"
max_retries: 3
retry_delay: 5 # seconds

69
src/config_loader.py Normal file
View File

@@ -0,0 +1,69 @@
import yaml
import os
from typing import Dict, Any
from pathlib import Path
class ConfigLoader:
"""Load and manage configuration"""
def __init__(self, config_path: str = "src/config/sites.yaml"):
self.config_path = Path(config_path)
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from YAML file"""
if not self.config_path.exists():
raise FileNotFoundError(f"Konfigurationsdatei nicht gefunden: {self.config_path}")
try:
with open(self.config_path, 'r', encoding='utf-8') as file:
config = yaml.safe_load(file)
return self._substitute_env_vars(config)
except Exception as e:
raise ValueError(f"Fehler beim Laden der Konfiguration: {e}")
def _substitute_env_vars(self, config: Any) -> Any:
"""Recursively substitute environment variables in configuration"""
if isinstance(config, dict):
return {key: self._substitute_env_vars(value) for key, value in config.items()}
elif isinstance(config, list):
return [self._substitute_env_vars(item) for item in config]
elif isinstance(config, str) and config.startswith('${') and config.endswith('}'):
# Extract environment variable name
env_var = config[2:-1]
default_value = None
# Handle default values (e.g., ${VAR:default})
if ':' in env_var:
env_var, default_value = env_var.split(':', 1)
return os.getenv(env_var, default_value)
else:
return config
def get_sites(self) -> Dict[str, Any]:
"""Get all site configurations"""
return self.config.get('sites', {})
def get_site_config(self, site_name: str) -> Dict[str, Any]:
"""Get configuration for a specific site"""
sites = self.get_sites()
if site_name not in sites:
raise ValueError(f"Site '{site_name}' nicht in Konfiguration gefunden")
return sites[site_name]
def get_notification_config(self) -> Dict[str, Any]:
"""Get notification configuration"""
return self.config.get('notification', {})
def get_general_config(self) -> Dict[str, Any]:
"""Get general configuration"""
return self.config.get('general', {})
def is_site_enabled(self, site_name: str) -> bool:
"""Check if a site is enabled"""
try:
config = self.get_site_config(site_name)
return config.get('enabled', True)
except ValueError:
return False

130
src/main.py Normal file
View File

@@ -0,0 +1,130 @@
import asyncio
import logging
from datetime import datetime
from typing import Dict, List, Any
import importlib
from config_loader import ConfigLoader
from storage.csv_storage import CSVStorage
from notifier.email_notifier import EmailNotifier, ConsoleNotifier
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class FlatScraper:
"""Main application class"""
def __init__(self, config_path: str = "src/config/sites.yaml"):
self.config_loader = ConfigLoader(config_path)
self.storage = CSVStorage(self.config_loader.get_general_config().get('data_dir', 'data'))
self.notifiers = self._setup_notifiers()
self.scrapers = {}
def _setup_notifiers(self) -> List:
"""Setup notification systems"""
notifiers = []
notification_config = self.config_loader.get_notification_config()
# Console notifier (always enabled for debugging)
console_config = notification_config.get('console', {})
if console_config.get('enabled', True):
notifiers.append(ConsoleNotifier())
# Email notifier
email_config = notification_config.get('email', {})
if email_config.get('enabled', False):
notifiers.append(EmailNotifier(email_config))
return notifiers
def _get_scraper_class(self, scraper_class_path: str):
"""Dynamically import scraper class"""
module_name, class_name = scraper_class_path.rsplit('.', 1)
module = importlib.import_module(f'scrapers.{module_name}')
return getattr(module, class_name)
def _get_scraper(self, site_name: str):
"""Get or create scraper instance"""
if site_name not in self.scrapers:
site_config = self.config_loader.get_site_config(site_name)
scraper_class_path = site_config.get('scraper_class')
scraper_class = self._get_scraper_class(scraper_class_path)
self.scrapers[site_name] = scraper_class(site_config)
return self.scrapers[site_name]
async def scrape_site(self, site_name: str) -> Dict[str, Any]:
"""Scrape a single site"""
try:
logger.info(f"Start scraping {site_name}")
scraper = self._get_scraper(site_name)
results = await scraper.scrape()
logger.info(f"Found {len(results)} results for {site_name}")
# Compare with previous results
new_results, removed_results = self.storage.compare_results(site_name, results)
# Save results
self.storage.save_results(site_name, results)
# Send notifications for new results
if new_results:
logger.info(f"Found {len(new_results)} new results for {site_name}")
for notifier in self.notifiers:
try:
notifier.send_notification(site_name, new_results)
except Exception as e:
logger.error(f"Error sending notification: {e}")
else:
logger.info(f"No new results for {site_name}")
return {
'site': site_name,
'total_results': len(results),
'new_results': len(new_results),
'success': True
}
except Exception as e:
logger.error(f"Error scraping {site_name}: {e}")
return {
'site': site_name,
'error': str(e),
'success': False
}
async def scrape_all_sites(self) -> List[Dict[str, Any]]:
"""Scrape all enabled sites"""
results = []
sites = self.config_loader.get_sites()
for site_name in sites.keys():
if self.config_loader.is_site_enabled(site_name):
result = await self.scrape_site(site_name)
results.append(result)
return results
def run_once(self) -> None:
"""Run scraping once"""
logger.info("Starting flat scraper run")
results = asyncio.run(self.scrape_all_sites())
# Summary
successful = sum(1 for r in results if r['success'])
total_new = sum(r.get('new_results', 0) for r in results)
logger.info(f"Scraping completed: {successful}/{len(results)} sites successful, {total_new} new results")
def main():
"""Main entry point"""
scraper = FlatScraper()
scraper.run_once()
if __name__ == "__main__":
main()

0
src/notifier/__init__.py Normal file
View File

View File

@@ -0,0 +1,135 @@
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List, Dict, Optional
import os
from datetime import datetime
class EmailNotifier:
"""Email notification system"""
def __init__(self, config: Dict):
self.smtp_server = config.get('smtp_server', 'localhost')
self.smtp_port = int(config.get('smtp_port', 587))
self.username = config.get('username', '')
self.password = config.get('password', '')
self.from_email = config.get('from_email', self.username)
self.security = config.get('security', 'starttls')
# Handle to_emails - can be string or list
to_emails = config.get('to_emails', [])
if isinstance(to_emails, str):
self.to_emails = [to_emails]
else:
self.to_emails = to_emails
def send_notification(self, scraper_name: str, new_results: List[Dict]) -> bool:
"""Send email notification for new results"""
if not new_results:
return True
if not self.to_emails:
print("Keine Empfänger-Emails konfiguriert")
return False
try:
# Create message
msg = MIMEMultipart()
msg['From'] = self.from_email
msg['To'] = ', '.join(self.to_emails)
msg['Subject'] = f"Neue Wohnungen gefunden: {len(new_results)} neue Ergebnisse für {scraper_name}"
# Create email body
body = self._create_email_body(scraper_name, new_results)
msg.attach(MIMEText(body, 'html'))
# Send email with security settings
if self.security in ['ssl', 'ssl/tls']:
server = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port)
else:
server = smtplib.SMTP(self.smtp_server, self.smtp_port)
# Apply security settings for non-SSL
if self.security in ['tls', 'starttls']:
server.starttls()
# Login if credentials provided
if self.username and self.password:
server.login(self.username, self.password)
server.send_message(msg)
server.quit()
print(f"Email-Benachrichtigung gesendet an {len(self.to_emails)} Empfänger")
return True
except Exception as e:
print(f"Fehler beim Senden der Email: {e}")
return False
def _create_email_body(self, scraper_name: str, new_results: List[Dict]) -> str:
"""Create HTML email body"""
html = f"""
<html>
<body>
<h2>🏠 Neue Wohnungen gefunden - {scraper_name}</h2>
<p><strong>Zeitpunkt:</strong> {datetime.now().strftime('%d.%m.%Y %H:%M')}</p>
<p><strong>Anzahl neuer Ergebnisse:</strong> {len(new_results)}</p>
<h3>Neue Wohnungen:</h3>
<table border="1" cellpadding="5" cellspacing="0" style="border-collapse: collapse;">
<tr style="background-color: #f0f0f0;">
<th>PLZ</th>
<th>Adresse</th>
<th>Link</th>
</tr>
"""
for result in new_results:
plz = result.get('plz', 'N/A')
address = result.get('address', 'N/A')
link = result.get('link', '#')
html += f"""
<tr>
<td>{plz}</td>
<td>{address}</td>
<td><a href="{link}">Details</a></td>
</tr>
"""
html += """
</table>
<br>
<p><small>Diese Nachricht wurde automatisch vom Flat Scraper gesendet.</small></p>
</body>
</html>
"""
return html
class ConsoleNotifier:
"""Console notification for testing"""
def send_notification(self, scraper_name: str, new_results: List[Dict]) -> bool:
"""Print notification to console"""
if not new_results:
return True
print(f"\n{'='*50}")
print(f"🏠 NEUE WOHNUNGEN GEFUNDEN: {scraper_name}")
print(f"Zeitpunkt: {datetime.now().strftime('%d.%m.%Y %H:%M')}")
print(f"Anzahl: {len(new_results)}")
print(f"{'='*50}")
for result in new_results:
plz = result.get('plz', 'N/A')
address = result.get('address', 'N/A')
link = result.get('link', '#')
print(f"📍 PLZ {plz}: {address}")
if link != '#':
print(f" 🔗 {link}")
print()
return True

0
src/scrapers/__init__.py Normal file
View File

View File

@@ -0,0 +1,35 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from datetime import datetime
class BaseScraper(ABC):
"""Base class for all web scrapers"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.name = config.get('name', 'unknown')
self.base_url = config.get('url', '')
@abstractmethod
async def scrape(self, search_params: Dict[str, Any]) -> List[Dict]:
"""Scrape data from the website"""
pass
def generate_hash(self, data: Dict) -> str:
"""Generate unique hash for result comparison"""
import hashlib
import json
# Sort keys for consistent hashing
sorted_data = json.dumps(data, sort_keys=True)
return hashlib.md5(sorted_data.encode()).hexdigest()
def add_metadata(self, results: List[Dict]) -> List[Dict]:
"""Add metadata to results"""
for result in results:
result.update({
'scraper': self.name,
'scrape_time': datetime.now().isoformat(),
'hash': self.generate_hash(result)
})
return results

View File

@@ -0,0 +1,94 @@
import asyncio
from playwright.async_api import async_playwright
from typing import List, Dict, Any
import time
from .base_scraper import BaseScraper
class NHGScraper(BaseScraper):
"""NHG.at specific scraper"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.plz_list = config.get('search_params', {}).get('plz_list', ["1120", "1140", "1150", "1160"])
self.base_url = config.get('url', 'https://nhg.at/immobilienangebot/wohnungsangebot/')
async def scrape_plz(self, page, plz: str) -> List[Dict]:
"""Scrape alle Wohnungen für eine PLZ"""
results = []
try:
# Seite laden
await page.goto(self.base_url)
await page.wait_for_load_state('networkidle')
# Prüfen ob PLZ verfügbar
options = await page.locator('#Filter_City option').all_text_contents()
if plz not in options:
print(f"PLZ {plz} nicht verfügbar")
return results
# PLZ auswählen
await page.select_option('#Filter_City', plz)
# Warten auf Ergebnisse
await page.wait_for_timeout(3000)
# Ergebnisse aus UnitsList extrahieren
units_list = await page.query_selector('#UnitsList')
if not units_list:
print(f"Keine UnitsList gefunden für PLZ {plz}")
return results
# Den gesamten Textinhalt holen und nach Adressen durchsuchen
content = await units_list.text_content()
# Adressen mit Regex finden
import re
address_pattern = r'(\d{4}\s+Wien,\s*[^,\n]+)'
addresses = re.findall(address_pattern, content)
for address in addresses:
address = address.strip()
if address:
# Details Link suchen
details_link = None
try:
# Suche nach Details Link nach der Adresse
details_elements = await page.locator('#UnitsList a').all()
for element in details_elements:
link_text = await element.text_content()
if 'Details' in link_text:
details_link = await element.get_attribute('href')
break
except:
pass
result = {
'plz': plz.split()[0], # Nur die PLZ, ohne "Wien"
'address': address,
'link': details_link,
}
results.append(result)
except Exception as e:
print(f"Fehler beim Scraping von PLZ {plz}: {e}")
return results
async def scrape(self, search_params: Dict[str, Any] = None) -> List[Dict]:
"""Scrape alle PLZs"""
all_results = []
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
for plz in self.plz_list:
print(f"Scraping PLZ {plz}...")
results = await self.scrape_plz(page, plz)
all_results.extend(results)
time.sleep(1) # Rate limiting
await browser.close()
return self.add_metadata(all_results)

0
src/storage/__init__.py Normal file
View File

View File

@@ -0,0 +1,78 @@
import pandas as pd
import os
from typing import List, Dict, Set, Tuple
from datetime import datetime
from pathlib import Path
class CSVStorage:
"""CSV-based storage for scraping results"""
def __init__(self, data_dir: str = "data"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(exist_ok=True)
def get_filename(self, scraper_name: str) -> Path:
"""Get CSV filename for scraper"""
return self.data_dir / f"{scraper_name}_results.csv"
def load_previous_results(self, scraper_name: str) -> Set[str]:
"""Load previous result hashes"""
filename = self.get_filename(scraper_name)
if not filename.exists():
return set()
try:
df = pd.read_csv(filename)
return set(df['hash'].dropna().unique())
except Exception as e:
print(f"Fehler beim Laden vorheriger Ergebnisse: {e}")
return set()
def save_results(self, scraper_name: str, results: List[Dict]) -> None:
"""Save results to CSV"""
if not results:
print(f"Keine Ergebnisse für {scraper_name}")
return
filename = self.get_filename(scraper_name)
# Convert to DataFrame
df = pd.DataFrame(results)
# Append to existing file or create new one
if filename.exists():
df.to_csv(filename, mode='a', header=False, index=False)
else:
df.to_csv(filename, index=False)
print(f"{len(results)} Ergebnisse für {scraper_name} gespeichert")
def compare_results(self, scraper_name: str, new_results: List[Dict]) -> Tuple[List[Dict], List[Dict]]:
"""Compare new results with previous ones"""
previous_hashes = self.load_previous_results(scraper_name)
new_hashes = {result['hash'] for result in new_results}
# Find new results
new_items = [result for result in new_results if result['hash'] not in previous_hashes]
# Find removed results (optional, for reporting)
removed_hashes = previous_hashes - new_hashes
removed_items = [] # We don't have the full data for removed items
return new_items, removed_items
def get_latest_results(self, scraper_name: str, limit: int = 50) -> pd.DataFrame:
"""Get latest results from CSV"""
filename = self.get_filename(scraper_name)
if not filename.exists():
return pd.DataFrame()
try:
df = pd.read_csv(filename)
# Sort by scrape_time descending and get latest
if 'scrape_time' in df.columns:
df = df.sort_values('scrape_time', ascending=False)
return df.head(limit)
except Exception as e:
print(f"Fehler beim Lesen der Ergebnisse: {e}")
return pd.DataFrame()

View File

@@ -0,0 +1,2 @@
plz,address,link,scraper,scrape_time,hash
1120,"1120 Wien, Flurschützstraße 5 / 2 / 10",#,NHG Test,2026-02-15T08:13:09.072841,75b2a4c4eb48f8f22047d252320d56f6
1 plz address link scraper scrape_time hash
2 1120 1120 Wien, Flurschützstraße 5 / 2 / 10 # NHG Test 2026-02-15T08:13:09.072841 75b2a4c4eb48f8f22047d252320d56f6

61
test_scraper.py Normal file
View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python3
"""
Test script for the NHG scraper
"""
import asyncio
import sys
import os
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from scrapers.nhg_scraper import NHGScraper
from storage.csv_storage import CSVStorage
async def test_nhg_scraper():
"""Test the NHG scraper with a single PLZ"""
print("Testing NHG Scraper...")
config = {
'name': 'NHG Test',
'url': 'https://nhg.at/immobilienangebot/wohnungsangebot/',
'search_params': {
'plz_list': ['1120 Wien'] # Test with full PLZ name
}
}
scraper = NHGScraper(config)
try:
results = await scraper.scrape()
print(f"Found {len(results)} results:")
for result in results:
print(f" PLZ: {result.get('plz')}")
print(f" Address: {result.get('address')}")
print(f" Link: {result.get('link')}")
print(f" Hash: {result.get('hash')}")
print("-" * 40)
# Test storage
storage = CSVStorage('test_data')
new_results, removed_results = storage.compare_results('nhg_test', results)
print(f"New results: {len(new_results)}")
print(f"Removed results: {len(removed_results)}")
# Save results
storage.save_results('nhg_test', results)
return True
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = asyncio.run(test_nhg_scraper())
sys.exit(0 if success else 1)