5 Request- und Response-Handling

5.1 Request-Objekt und Datenverarbeitung

Das Request-Objekt in Flask stellt alle Informationen über eingehende HTTP-Anfragen zur Verfügung. Es ermöglicht den Zugriff auf Headers, Parameter, Daten und Metainformationen der Client-Anfrage.

5.1.1 Grundlegende Request-Eigenschaften

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/request-info', methods=['GET', 'POST'])
def request_info():
    info = {
        'methode': request.method,
        'url': request.url,
        'basis_url': request.base_url,
        'pfad': request.path,
        'query_string': request.query_string.decode('utf-8'),
        'remote_addr': request.remote_addr,
        'user_agent': request.headers.get('User-Agent'),
        'content_type': request.content_type,
        'content_length': request.content_length
    }
    return jsonify(info)

5.1.2 Header-Verarbeitung

@app.route('/headers')
def header_analysis():
    headers_dict = {}
    
    # Alle Headers durchlaufen
    for header_name, header_value in request.headers:
        headers_dict[header_name] = header_value
    
    # Spezifische Headers abrufen
    auth_header = request.headers.get('Authorization')
    accept_header = request.headers.get('Accept', 'text/html')
    
    # Custom Headers
    api_key = request.headers.get('X-API-Key')
    client_version = request.headers.get('X-Client-Version')
    
    return jsonify({
        'alle_headers': headers_dict,
        'authorization': auth_header,
        'accept': accept_header,
        'api_key': api_key,
        'client_version': client_version
    })

5.1.3 Request-Kontext und Threading

from flask import g, current_app
import time

@app.before_request
def before_request():
    g.start_time = time.time()
    g.request_id = str(uuid.uuid4())
    current_app.logger.info(f'Request {g.request_id} gestartet')

@app.after_request
def after_request(response):
    duration = time.time() - g.start_time
    current_app.logger.info(f'Request {g.request_id} beendet in {duration:.3f}s')
    response.headers['X-Request-ID'] = g.request_id
    response.headers['X-Response-Time'] = f'{duration:.3f}s'
    return response

5.1.4 Request-Daten-Eigenschaften

@app.route('/request-details', methods=['POST'])
def request_details():
    details = {
        'is_json': request.is_json,
        'is_multipart': bool(request.files),
        'mimetype': request.mimetype,
        'charset': request.charset,
        'encoding_errors': request.encoding_errors,
        'max_content_length': request.max_content_length,
        'stream_available': hasattr(request, 'stream')
    }
    
    # Rohe Daten (nur bei kleinen Requests)
    if request.content_length and request.content_length < 1024:
        details['raw_data'] = request.get_data(as_text=True)
    
    return jsonify(details)

5.2 Formulardaten verarbeiten

HTML-Formulare übertragen Daten in verschiedenen Formaten. Flask bietet umfassende Unterstützung für die Verarbeitung von Formulardaten aus GET- und POST-Requests.

5.2.1 Einfache Formularverarbeitung

from flask import request, render_template, redirect, url_for, flash

@app.route('/kontakt', methods=['GET', 'POST'])
def contact_form():
    if request.method == 'POST':
        # Formulardaten abrufen
        name = request.form.get('name', '').strip()
        email = request.form.get('email', '').strip()
        nachricht = request.form.get('nachricht', '').strip()
        
        # Validierung
        errors = []
        if not name:
            errors.append('Name ist erforderlich')
        if not email or '@' not in email:
            errors.append('Gültige E-Mail-Adresse ist erforderlich')
        if not nachricht:
            errors.append('Nachricht ist erforderlich')
        
        if errors:
            for error in errors:
                flash(error, 'error')
            return render_template('kontakt.html', 
                                 name=name, email=email, nachricht=nachricht)
        
        # Verarbeitung erfolgreich
        flash('Nachricht wurde gesendet', 'success')
        return redirect(url_for('contact_form'))
    
    return render_template('kontakt.html')

5.2.2 Multiselect und Checkbox-Verarbeitung

@app.route('/einstellungen', methods=['GET', 'POST'])
def user_settings():
    if request.method == 'POST':
        # Einzelne Checkbox
        newsletter = request.form.get('newsletter') == 'on'
        
        # Mehrere Checkboxen mit getlist
        interessen = request.form.getlist('interessen')
        
        # Radio Button
        benachrichtigung = request.form.get('benachrichtigung', 'email')
        
        # Select-Box
        sprache = request.form.get('sprache', 'de')
        
        settings = {
            'newsletter': newsletter,
            'interessen': interessen,
            'benachrichtigung': benachrichtigung,
            'sprache': sprache
        }
        
        return jsonify(settings)
    
    return render_template('einstellungen.html')

5.2.3 Datei-Upload-Verarbeitung

import os
from werkzeug.utils import secure_filename

UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'doc', 'docx'}

def allowed_file(filename):
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
    if request.method == 'POST':
        # Einzelne Datei
        if 'datei' not in request.files:
            flash('Keine Datei ausgewählt', 'error')
            return redirect(request.url)
        
        file = request.files['datei']
        
        if file.filename == '':
            flash('Keine Datei ausgewählt', 'error')
            return redirect(request.url)
        
        if file and allowed_file(file.filename):
            filename = secure_filename(file.filename)
            
            # Eindeutigen Dateinamen generieren
            timestamp = int(time.time())
            filename = f"{timestamp}_{filename}"
            
            filepath = os.path.join(UPLOAD_FOLDER, filename)
            file.save(filepath)
            
            # Dateiinformationen speichern
            file_info = {
                'original_name': file.filename,
                'saved_name': filename,
                'size': os.path.getsize(filepath),
                'mimetype': file.content_type
            }
            
            return jsonify(file_info)
        else:
            flash('Dateityp nicht erlaubt', 'error')
    
    return render_template('upload.html')

@app.route('/multi-upload', methods=['POST'])
def multi_upload():
    uploaded_files = []
    
    # Mehrere Dateien verarbeiten
    files = request.files.getlist('dateien')
    
    for file in files:
        if file and file.filename != '' and allowed_file(file.filename):
            filename = secure_filename(file.filename)
            timestamp = int(time.time())
            unique_filename = f"{timestamp}_{filename}"
            
            filepath = os.path.join(UPLOAD_FOLDER, unique_filename)
            file.save(filepath)
            
            uploaded_files.append({
                'original': file.filename,
                'saved': unique_filename,
                'size': os.path.getsize(filepath)
            })
    
    return jsonify({'uploaded_files': uploaded_files})

5.2.4 Formulardaten-Validierung

import re
from datetime import datetime

def validate_form_data(form_data):
    errors = {}
    
    # Name validieren
    name = form_data.get('name', '').strip()
    if not name:
        errors['name'] = 'Name ist erforderlich'
    elif len(name) < 2:
        errors['name'] = 'Name muss mindestens 2 Zeichen lang sein'
    
    # E-Mail validieren
    email = form_data.get('email', '').strip()
    email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    if not email:
        errors['email'] = 'E-Mail ist erforderlich'
    elif not re.match(email_pattern, email):
        errors['email'] = 'Ungültige E-Mail-Adresse'
    
    # Telefonnummer validieren
    telefon = form_data.get('telefon', '').strip()
    if telefon:
        telefon_pattern = r'^[\d\s\-\+\(\)]+$'
        if not re.match(telefon_pattern, telefon):
            errors['telefon'] = 'Ungültige Telefonnummer'
    
    # Geburtsdatum validieren
    geburtsdatum = form_data.get('geburtsdatum')
    if geburtsdatum:
        try:
            datum = datetime.strptime(geburtsdatum, '%Y-%m-%d')
            if datum > datetime.now():
                errors['geburtsdatum'] = 'Geburtsdatum darf nicht in der Zukunft liegen'
        except ValueError:
            errors['geburtsdatum'] = 'Ungültiges Datumsformat'
    
    return errors

@app.route('/registrierung', methods=['POST'])
def registration():
    errors = validate_form_data(request.form)
    
    if errors:
        return jsonify({'erfolg': False, 'fehler': errors}), 400
    
    # Registrierung verarbeiten
    return jsonify({'erfolg': True, 'nachricht': 'Registrierung erfolgreich'})

5.3 JSON-Daten verarbeiten

JSON ist das Standardformat für API-Kommunikation. Flask bietet integrierte Unterstützung für das Parsen und Generieren von JSON-Daten.

5.3.1 JSON-Request-Verarbeitung

from flask import request, jsonify

@app.route('/api/benutzer', methods=['POST'])
def create_user():
    # JSON-Daten abrufen
    if not request.is_json:
        return jsonify({'fehler': 'Content-Type muss application/json sein'}), 400
    
    data = request.get_json()
    
    if data is None:
        return jsonify({'fehler': 'Ungültiges JSON'}), 400
    
    # Erforderliche Felder prüfen
    required_fields = ['username', 'email', 'password']
    missing_fields = [field for field in required_fields if field not in data]
    
    if missing_fields:
        return jsonify({
            'fehler': 'Fehlende Felder',
            'fehlende_felder': missing_fields
        }), 400
    
    # Daten verarbeiten
    user_data = {
        'id': 123,
        'username': data['username'],
        'email': data['email'],
        'erstellt': datetime.utcnow().isoformat()
    }
    
    return jsonify(user_data), 201

@app.route('/api/benutzer/<int:user_id>', methods=['PUT'])
def update_user(user_id):
    data = request.get_json(force=True)  # JSON erzwingen
    
    # Aktualisierbare Felder definieren
    updateable_fields = ['username', 'email', 'first_name', 'last_name']
    updates = {}
    
    for field in updateable_fields:
        if field in data:
            updates[field] = data[field]
    
    if not updates:
        return jsonify({'fehler': 'Keine aktualisierbaren Felder gefunden'}), 400
    
    return jsonify({
        'user_id': user_id,
        'aktualisierte_felder': updates
    })

5.3.2 Verschachtelte JSON-Strukturen

@app.route('/api/bestellung', methods=['POST'])
def create_order():
    data = request.get_json()
    
    # Verschachtelte Struktur validieren
    required_structure = {
        'kunde': ['name', 'email'],
        'adresse': ['strasse', 'stadt', 'plz'],
        'artikel': []  # Liste von Artikeln
    }
    
    errors = []
    
    # Hauptobjekte prüfen
    for key, required_fields in required_structure.items():
        if key not in data:
            errors.append(f'Feld "{key}" fehlt')
            continue
        
        if isinstance(required_fields, list) and required_fields:
            # Verschachtelte Objekte prüfen
            for field in required_fields:
                if field not in data[key]:
                    errors.append(f'Feld "{key}.{field}" fehlt')
    
    # Artikel-Liste validieren
    if 'artikel' in data:
        if not isinstance(data['artikel'], list):
            errors.append('Artikel muss eine Liste sein')
        elif len(data['artikel']) == 0:
            errors.append('Mindestens ein Artikel erforderlich')
        else:
            for i, artikel in enumerate(data['artikel']):
                if 'produkt_id' not in artikel:
                    errors.append(f'Artikel[{i}].produkt_id fehlt')
                if 'menge' not in artikel:
                    errors.append(f'Artikel[{i}].menge fehlt')
    
    if errors:
        return jsonify({'fehler': errors}), 400
    
    # Bestellung verarbeiten
    bestellung = {
        'bestellnummer': 'B-2024-001',
        'kunde': data['kunde'],
        'adresse': data['adresse'],
        'artikel_anzahl': len(data['artikel']),
        'status': 'eingegangen'
    }
    
    return jsonify(bestellung), 201

5.3.3 JSON-Serialisierung mit benutzerdefinierten Objekten

from datetime import datetime, date
import json

class DateTimeEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        elif isinstance(obj, date):
            return obj.isoformat()
        elif hasattr(obj, '__dict__'):
            return obj.__dict__
        return super().default(obj)

class User:
    def __init__(self, id, name, email, created_at):
        self.id = id
        self.name = name
        self.email = email
        self.created_at = created_at
    
    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'email': self.email,
            'created_at': self.created_at.isoformat() if self.created_at else None
        }

@app.route('/api/benutzer/<int:user_id>')
def get_user(user_id):
    # Beispiel-Benutzer erstellen
    user = User(
        id=user_id,
        name='Max Mustermann',
        email='max@beispiel.de',
        created_at=datetime.utcnow()
    )
    
    # Option 1: Manuelle Serialisierung
    return jsonify(user.to_dict())

@app.route('/api/benutzer-json/<int:user_id>')
def get_user_json(user_id):
    user = User(user_id, 'Max Mustermann', 'max@beispiel.de', datetime.utcnow())
    
    # Option 2: Custom JSON Encoder verwenden
    app.json_encoder = DateTimeEncoder
    return jsonify(user.__dict__)

5.4 Response-Objekte erstellen

Flask bietet verschiedene Möglichkeiten zur Erstellung von HTTP-Responses mit unterschiedlichen Inhalten, Headers und Statuscodes.

5.4.1 Einfache Response-Erstellung

from flask import Response, make_response, jsonify

@app.route('/text-response')
def text_response():
    return 'Einfache Textantwort'

@app.route('/html-response')
def html_response():
    html_content = '''
    <html>
        <head><title>Dynamische Seite</title></head>
        <body>
            <h1>Dynamisch generierte HTML-Seite</h1>
            <p>Aktuelle Zeit: {}</p>
        </body>
    </html>
    '''.format(datetime.now().strftime('%H:%M:%S'))
    
    return html_content

@app.route('/json-response')
def json_response():
    data = {
        'status': 'success',
        'timestamp': datetime.utcnow().isoformat(),
        'data': [1, 2, 3, 4, 5]
    }
    return jsonify(data)

5.4.2 Response-Objekte mit make_response

@app.route('/custom-response')
def custom_response():
    response = make_response('Custom Response Content')
    response.headers['X-Custom-Header'] = 'Mein Wert'
    response.headers['Cache-Control'] = 'no-cache'
    response.status_code = 200
    return response

@app.route('/json-custom-response')
def json_custom_response():
    data = {'nachricht': 'Erfolg'}
    response = make_response(jsonify(data))
    response.headers['X-API-Version'] = '1.0'
    response.headers['Access-Control-Allow-Origin'] = '*'
    return response

5.4.3 Streaming-Responses

import time

@app.route('/stream')
def stream_response():
    def generate():
        for i in range(10):
            yield f"Nachricht {i+1}\n"
            time.sleep(1)
    
    return Response(generate(), mimetype='text/plain')

@app.route('/csv-download')
def csv_download():
    def generate_csv():
        yield 'Name,Email,Alter\n'
        users = [
            ('Max Mustermann', 'max@beispiel.de', 30),
            ('Anna Schmidt', 'anna@beispiel.de', 25),
            ('Tom Weber', 'tom@beispiel.de', 35)
        ]
        for name, email, alter in users:
            yield f'{name},{email},{alter}\n'
    
    response = Response(generate_csv(), mimetype='text/csv')
    response.headers['Content-Disposition'] = 'attachment; filename=benutzer.csv'
    return response

5.4.4 Binäre Responses

import io
from PIL import Image

@app.route('/generate-image')
def generate_image():
    # Einfaches Bild erstellen
    img = Image.new('RGB', (200, 100), color='lightblue')
    
    # In Memory speichern
    img_io = io.BytesIO()
    img.save(img_io, 'PNG')
    img_io.seek(0)
    
    return Response(img_io.getvalue(), mimetype='image/png')

@app.route('/download-file')
def download_file():
    # Datei-Content aus Datenbank oder Dateisystem
    file_content = b'Dies ist der Inhalt einer Datei'
    
    response = Response(file_content, mimetype='application/octet-stream')
    response.headers['Content-Disposition'] = 'attachment; filename=download.txt'
    response.headers['Content-Length'] = len(file_content)
    
    return response

5.5 HTTP-Statuscodes

HTTP-Statuscodes kommunizieren das Ergebnis einer Anfrage. Die korrekte Verwendung ist essentiell für APIs und Benutzerfreundlichkeit.

5.5.1 Erfolgreiche Responses (2xx)

@app.route('/api/artikel', methods=['POST'])
def create_article():
    data = request.get_json()
    
    # Artikel erstellen
    article = {
        'id': 123,
        'titel': data.get('titel'),
        'inhalt': data.get('inhalt')
    }
    
    # 201 Created für erfolgreiche Erstellung
    return jsonify(article), 201

@app.route('/api/artikel/<int:id>', methods=['PUT'])
def update_article(id):
    data = request.get_json()
    
    # Artikel aktualisieren
    return jsonify({'id': id, 'status': 'aktualisiert'}), 200

@app.route('/api/artikel/<int:id>', methods=['DELETE'])
def delete_article(id):
    # Artikel löschen
    # 204 No Content für erfolgreiche Löschung ohne Response-Body
    return '', 204

@app.route('/api/status')
def api_status():
    # 200 OK für erfolgreiche Abfrage
    return jsonify({'status': 'online', 'version': '1.0'}), 200

5.5.2 Client-Fehler (4xx)

@app.route('/api/benutzer/<int:user_id>')
def get_user_by_id(user_id):
    # Simulation: Benutzer nicht gefunden
    if user_id > 1000:
        return jsonify({'fehler': 'Benutzer nicht gefunden'}), 404
    
    return jsonify({'id': user_id, 'name': 'Benutzer'})

@app.route('/api/protected', methods=['POST'])
def protected_endpoint():
    auth_header = request.headers.get('Authorization')
    
    if not auth_header:
        return jsonify({'fehler': 'Authorization Header erforderlich'}), 401
    
    if not auth_header.startswith('Bearer '):
        return jsonify({'fehler': 'Ungültiger Authorization Header'}), 401
    
    # Token validieren
    token = auth_header.split(' ')[1]
    if token != 'gültiger-token':
        return jsonify({'fehler': 'Ungültiger Token'}), 403
    
    return jsonify({'nachricht': 'Zugriff gewährt'})

@app.route('/api/validation-demo', methods=['POST'])
def validation_demo():
    data = request.get_json()
    
    if not data:
        return jsonify({'fehler': 'JSON-Daten erforderlich'}), 400
    
    if 'email' not in data:
        return jsonify({
            'fehler': 'Validierungsfehler',
            'details': ['Email ist erforderlich']
        }), 422  # Unprocessable Entity
    
    return jsonify({'status': 'validiert'})

5.5.3 Server-Fehler (5xx)

@app.route('/api/fehler-simulation')
def error_simulation():
    try:
        # Simulation eines Datenbankfehlers
        result = 1 / 0
    except ZeroDivisionError:
        return jsonify({
            'fehler': 'Interner Serverfehler',
            'code': 'DIVISION_BY_ZERO'
        }), 500

@app.route('/api/service-unavailable')
def service_unavailable():
    # Service temporär nicht verfügbar
    return jsonify({
        'fehler': 'Service temporär nicht verfügbar',
        'retry_after': 300  # 5 Minuten
    }), 503

@app.errorhandler(500)
def internal_error(error):
    return jsonify({
        'fehler': 'Interner Serverfehler',
        'timestamp': datetime.utcnow().isoformat()
    }), 500

@app.errorhandler(404)
def not_found(error):
    return jsonify({
        'fehler': 'Ressource nicht gefunden',
        'pfad': request.path
    }), 404

5.5.4 Bedingte Responses

@app.route('/api/artikel/<int:id>/status', methods=['PATCH'])
def update_article_status(id):
    data = request.get_json()
    new_status = data.get('status')
    
    # Aktuellen Status simulieren
    current_status = 'draft'
    
    if current_status == new_status:
        # 304 Not Modified wenn keine Änderung
        return '', 304
    
    # Status aktualisieren
    return jsonify({
        'id': id,
        'alter_status': current_status,
        'neuer_status': new_status
    }), 200

@app.route('/api/health')
def health_check():
    # Verschiedene Service-Checks
    database_ok = True  # Datenbankverbindung prüfen
    cache_ok = True     # Cache-Verbindung prüfen
    
    if database_ok and cache_ok:
        return jsonify({'status': 'healthy'}), 200
    elif database_ok:
        return jsonify({
            'status': 'degraded',
            'details': 'Cache nicht verfügbar'
        }), 207  # Multi-Status
    else:
        return jsonify({
            'status': 'unhealthy',
            'details': 'Datenbankverbindung fehlgeschlagen'
        }), 503

Die korrekte Behandlung von Requests und Responses bildet die Grundlage für robuste und benutzerfreundliche Flask-Anwendungen.