9 RESTful APIs

9.1 REST-Prinzipien verstehen

REST (Representational State Transfer) ist ein Architekturstil für die Entwicklung von Web-Services, der auf den Grundprinzipien des World Wide Web basiert. Um REST wirklich zu verstehen, müssen wir zunächst die fundamentalen Konzepte betrachten, die diesen Ansatz von anderen API-Designs unterscheiden.

Das wichtigste Prinzip von REST ist die Zustandslosigkeit. Jede Anfrage an eine REST-API muss alle Informationen enthalten, die der Server zur Verarbeitung benötigt. Der Server speichert keinen Kontext zwischen verschiedenen Anfragen desselben Clients. Dies unterscheidet REST fundamental von traditionellen Web-Anwendungen, die Session-Zustand verwenden.

Ein zweites zentrales Konzept ist die Ressourcen-orientierte Architektur. In REST repräsentiert jede URL eine Ressource. Anstatt Aktionen in URLs zu kodieren wie /getUserById?id=123, verwenden REST-APIs URLs wie /users/123, die direkt auf die Ressource verweisen. Die gewünschte Aktion wird durch die HTTP-Methode bestimmt: GET zum Lesen, POST zum Erstellen, PUT zum Aktualisieren, DELETE zum Löschen.

9.1.1 HTTP-Methoden in REST-APIs

Die HTTP-Methoden bilden das Vokabular von REST-APIs. Jede Methode hat eine spezifische Semantik und erwartete Seiteneffekte.

from flask import Flask, request, jsonify
from datetime import datetime

app = Flask(__name__)

# Simulierte Datenbank
users_db = [
    {"id": 1, "name": "Max Mustermann", "email": "max@beispiel.de", "created_at": "2024-01-15"},
    {"id": 2, "name": "Anna Schmidt", "email": "anna@beispiel.de", "created_at": "2024-01-20"}
]

# GET - Ressourcen abrufen (idempotent, keine Seiteneffekte)
@app.route('/api/users', methods=['GET'])
def get_users():
    """
    Gibt alle Benutzer zurück. GET-Requests sollten niemals Daten ändern.
    Idempotenz bedeutet: mehrfache Ausführung hat dasselbe Ergebnis.
    """
    return jsonify({
        "users": users_db,
        "total": len(users_db),
        "timestamp": datetime.utcnow().isoformat()
    })

@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    """
    Gibt einen spezifischen Benutzer zurück.
    Verwendet HTTP-Statuscodes zur Kommunikation des Ergebnisses.
    """
    user = next((u for u in users_db if u["id"] == user_id), None)
    
    if user is None:
        return jsonify({"error": "Benutzer nicht gefunden"}), 404
    
    return jsonify(user)

# POST - Neue Ressourcen erstellen (nicht idempotent)
@app.route('/api/users', methods=['POST'])
def create_user():
    """
    Erstellt einen neuen Benutzer. POST ist nicht idempotent:
    jede Ausführung erstellt eine neue Ressource.
    """
    data = request.get_json()
    
    # Validierung der Eingabedaten
    if not data or 'name' not in data or 'email' not in data:
        return jsonify({
            "error": "Name und E-Mail sind erforderlich",
            "required_fields": ["name", "email"]
        }), 400
    
    # Neue ID generieren (in echter Anwendung: Datenbank-Auto-Increment)
    new_id = max([u["id"] for u in users_db], default=0) + 1
    
    new_user = {
        "id": new_id,
        "name": data["name"],
        "email": data["email"],
        "created_at": datetime.utcnow().isoformat()
    }
    
    users_db.append(new_user)
    
    # 201 Created mit Location-Header zur neuen Ressource
    response = jsonify(new_user)
    response.status_code = 201
    response.headers['Location'] = f'/api/users/{new_id}'
    
    return response

# PUT - Ressourcen vollständig ersetzen (idempotent)
@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
    """
    Ersetzt einen Benutzer vollständig. PUT ist idempotent:
    mehrfache Ausführung mit denselben Daten hat dasselbe Ergebnis.
    """
    data = request.get_json()
    
    if not data or 'name' not in data or 'email' not in data:
        return jsonify({"error": "Name und E-Mail sind erforderlich"}), 400
    
    user_index = next((i for i, u in enumerate(users_db) if u["id"] == user_id), None)
    
    if user_index is None:
        return jsonify({"error": "Benutzer nicht gefunden"}), 404
    
    # Vollständige Ersetzung (PUT-Semantik)
    users_db[user_index] = {
        "id": user_id,
        "name": data["name"],
        "email": data["email"],
        "created_at": users_db[user_index]["created_at"],  # Erstellungsdatum beibehalten
        "updated_at": datetime.utcnow().isoformat()
    }
    
    return jsonify(users_db[user_index])

# PATCH - Ressourcen teilweise aktualisieren (idempotent)
@app.route('/api/users/<int:user_id>', methods=['PATCH'])
def patch_user(user_id):
    """
    Aktualisiert Teile eines Benutzers. PATCH erlaubt partielle Updates
    und ist ebenfalls idempotent.
    """
    data = request.get_json()
    
    if not data:
        return jsonify({"error": "Keine Daten zum Aktualisieren"}), 400
    
    user_index = next((i for i, u in enumerate(users_db) if u["id"] == user_id), None)
    
    if user_index is None:
        return jsonify({"error": "Benutzer nicht gefunden"}), 404
    
    # Nur bereitgestellte Felder aktualisieren
    user = users_db[user_index]
    
    if 'name' in data:
        user['name'] = data['name']
    if 'email' in data:
        user['email'] = data['email']
    
    user['updated_at'] = datetime.utcnow().isoformat()
    
    return jsonify(user)

# DELETE - Ressourcen entfernen (idempotent)
@app.route('/api/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
    """
    Löscht einen Benutzer. DELETE ist idempotent:
    mehrfache Löschung derselben Ressource hat dasselbe Ergebnis.
    """
    user_index = next((i for i, u in enumerate(users_db) if u["id"] == user_id), None)
    
    if user_index is None:
        return jsonify({"error": "Benutzer nicht gefunden"}), 404
    
    deleted_user = users_db.pop(user_index)
    
    # 204 No Content - erfolgreiche Löschung ohne Antwortinhalt
    return '', 204

9.2 Flask @app.route vs. MethodView

Während die Dekorator-Syntax mit @app.route für einfache APIs ausreicht, bietet die MethodView-Klasse einen objektorientierten Ansatz, der bei komplexeren APIs Vorteile bringt. Verstehen wir zunächst, warum diese Alternative existiert und wann sie sinnvoll ist.

9.2.1 Grenzen des Dekorator-Ansatzes

Der traditionelle Dekorator-Ansatz funktioniert gut für einfache APIs, zeigt aber bei komplexeren Szenarien Schwächen. Wenn eine Ressource mehrere HTTP-Methoden unterstützt, führt dies zu Code-Duplikation und erschwert die Wartung.

# Traditioneller Ansatz: eine Funktion pro Methode
@app.route('/api/articles', methods=['GET'])
def get_articles():
    # GET-Logik für Artikel-Liste
    pass

@app.route('/api/articles', methods=['POST'])
def create_article():
    # POST-Logik für Artikel-Erstellung
    pass

@app.route('/api/articles/<int:article_id>', methods=['GET'])
def get_article(article_id):
    # GET-Logik für einzelnen Artikel
    pass

@app.route('/api/articles/<int:article_id>', methods=['PUT'])
def update_article(article_id):
    # PUT-Logik für Artikel-Update
    pass

@app.route('/api/articles/<int:article_id>', methods=['DELETE'])
def delete_article(article_id):
    # DELETE-Logik für Artikel-Löschung
    pass

Dieser Ansatz führt zu mehreren Problemen: Code für gemeinsame Funktionalitäten (wie Authentifizierung oder Validierung) muss in jeder Funktion wiederholt werden. Die Logik für eine Ressource ist über mehrere Funktionen verteilt. Gemeinsame Hilfsmethoden sind schwer zu organisieren.

9.2.2 MethodView: Objektorientierte API-Entwicklung

MethodView löst diese Probleme durch Gruppierung der HTTP-Methoden in einer Klasse. Jede HTTP-Methode wird als Klassenmethode implementiert.

from flask.views import MethodView
from flask import request, jsonify

class UserAPI(MethodView):
    """
    MethodView für Benutzer-Ressourcen.
    Jede HTTP-Methode wird als separate Klassenmethode implementiert.
    """
    
    def __init__(self):
        # Gemeinsame Initialisierung für alle Methoden
        self.users_db = [
            {"id": 1, "name": "Max Mustermann", "email": "max@beispiel.de"},
            {"id": 2, "name": "Anna Schmidt", "email": "anna@beispiel.de"}
        ]
    
    def get(self, user_id=None):
        """
        GET-Methode: Benutzer abrufen.
        Behandelt sowohl Einzelabfragen (/users/1) als auch Listen (/users).
        """
        if user_id is None:
            # Liste aller Benutzer
            return jsonify({
                "users": self.users_db,
                "total": len(self.users_db)
            })
        
        # Einzelner Benutzer
        user = self._find_user(user_id)
        if user is None:
            return self._not_found_response()
        
        return jsonify(user)
    
    def post(self):
        """
        POST-Methode: Neuen Benutzer erstellen.
        Nur für Collection-Endpoints (/users), nicht für spezifische Ressourcen.
        """
        data = request.get_json()
        
        # Validierung mit Hilfsmethode
        validation_errors = self._validate_user_data(data)
        if validation_errors:
            return jsonify({"errors": validation_errors}), 400
        
        new_user = self._create_user(data)
        
        response = jsonify(new_user)
        response.status_code = 201
        response.headers['Location'] = f'/api/users/{new_user["id"]}'
        
        return response
    
    def put(self, user_id):
        """
        PUT-Methode: Benutzer vollständig ersetzen.
        Benötigt eine spezifische Benutzer-ID.
        """
        if user_id is None:
            return jsonify({"error": "Benutzer-ID erforderlich"}), 400
        
        data = request.get_json()
        validation_errors = self._validate_user_data(data, required_all=True)
        if validation_errors:
            return jsonify({"errors": validation_errors}), 400
        
        user = self._find_user(user_id)
        if user is None:
            return self._not_found_response()
        
        updated_user = self._update_user(user_id, data, full_replace=True)
        return jsonify(updated_user)
    
    def patch(self, user_id):
        """
        PATCH-Methode: Benutzer teilweise aktualisieren.
        Erlaubt partielle Updates.
        """
        if user_id is None:
            return jsonify({"error": "Benutzer-ID erforderlich"}), 400
        
        data = request.get_json()
        if not data:
            return jsonify({"error": "Keine Daten bereitgestellt"}), 400
        
        user = self._find_user(user_id)
        if user is None:
            return self._not_found_response()
        
        updated_user = self._update_user(user_id, data, full_replace=False)
        return jsonify(updated_user)
    
    def delete(self, user_id):
        """
        DELETE-Methode: Benutzer löschen.
        """
        if user_id is None:
            return jsonify({"error": "Benutzer-ID erforderlich"}), 400
        
        user_index = next((i for i, u in enumerate(self.users_db) if u["id"] == user_id), None)
        if user_index is None:
            return self._not_found_response()
        
        self.users_db.pop(user_index)
        return '', 204
    
    # Hilfsmethoden - ein großer Vorteil von MethodView
    def _find_user(self, user_id):
        """Findet einen Benutzer nach ID."""
        return next((u for u in self.users_db if u["id"] == user_id), None)
    
    def _validate_user_data(self, data, required_all=False):
        """Validiert Benutzerdaten."""
        errors = []
        
        if not data:
            return ["Keine Daten bereitgestellt"]
        
        if required_all or 'name' in data:
            if not data.get('name', '').strip():
                errors.append("Name ist erforderlich")
        
        if required_all or 'email' in data:
            email = data.get('email', '').strip()
            if not email:
                errors.append("E-Mail ist erforderlich")
            elif '@' not in email:
                errors.append("Ungültige E-Mail-Adresse")
        
        return errors
    
    def _create_user(self, data):
        """Erstellt einen neuen Benutzer."""
        new_id = max([u["id"] for u in self.users_db], default=0) + 1
        new_user = {
            "id": new_id,
            "name": data["name"],
            "email": data["email"],
            "created_at": datetime.utcnow().isoformat()
        }
        self.users_db.append(new_user)
        return new_user
    
    def _update_user(self, user_id, data, full_replace=False):
        """Aktualisiert einen Benutzer."""
        user_index = next((i for i, u in enumerate(self.users_db) if u["id"] == user_id), None)
        user = self.users_db[user_index]
        
        if full_replace:
            # PUT: Vollständige Ersetzung
            self.users_db[user_index] = {
                "id": user_id,
                "name": data["name"],
                "email": data["email"],
                "created_at": user.get("created_at"),
                "updated_at": datetime.utcnow().isoformat()
            }
        else:
            # PATCH: Partielle Aktualisierung
            if 'name' in data:
                user['name'] = data['name']
            if 'email' in data:
                user['email'] = data['email']
            user['updated_at'] = datetime.utcnow().isoformat()
        
        return self.users_db[user_index]
    
    def _not_found_response(self):
        """Standardisierte 404-Antwort."""
        return jsonify({"error": "Benutzer nicht gefunden"}), 404

# MethodView registrieren
user_view = UserAPI.as_view('user_api')

# Routen für Collection (alle Benutzer)
app.add_url_rule('/api/users', view_func=user_view, methods=['GET', 'POST'])

# Routen für spezifische Ressourcen (einzelne Benutzer)
app.add_url_rule('/api/users/<int:user_id>', view_func=user_view, methods=['GET', 'PUT', 'PATCH', 'DELETE'])

9.2.3 Erweiterte MethodView-Patterns

MethodView ermöglicht ausgefeilte Patterns für API-Design, einschließlich Vererbung und Middleware-ähnlicher Funktionalität.

class BaseResourceAPI(MethodView):
    """
    Basis-Klasse für alle Ressourcen-APIs.
    Implementiert gemeinsame Funktionalitäten.
    """
    
    def dispatch_request(self, *args, **kwargs):
        """
        Wird vor jeder HTTP-Methode aufgerufen.
        Ähnlich wie Middleware - perfekt für Authentifizierung.
        """
        # Authentifizierung prüfen
        auth_result = self._check_authentication()
        if auth_result is not None:
            return auth_result
        
        # Autorisierung prüfen
        permission_result = self._check_permissions()
        if permission_result is not None:
            return permission_result
        
        # Normale Verarbeitung fortsetzen
        return super().dispatch_request(*args, **kwargs)
    
    def _check_authentication(self):
        """Prüft Authentifizierung für alle Methoden."""
        auth_header = request.headers.get('Authorization')
        if not auth_header or not auth_header.startswith('Bearer '):
            return jsonify({"error": "Authentifizierung erforderlich"}), 401
        
        token = auth_header.split(' ')[1]
        if not self._validate_token(token):
            return jsonify({"error": "Ungültiger Token"}), 401
        
        return None  # Authentifizierung erfolgreich
    
    def _check_permissions(self):
        """Prüft spezifische Berechtigungen (überschreibbar)."""
        return None  # Standard: alle authentifizierten Benutzer haben Zugriff
    
    def _validate_token(self, token):
        """Validiert Authentifizierungs-Token."""
        # Vereinfachte Token-Validierung
        return token == "valid-token-123"
    
    def _standard_error_response(self, message, status_code=400):
        """Standardisierte Fehlerantworten."""
        return jsonify({
            "error": message,
            "timestamp": datetime.utcnow().isoformat(),
            "status": status_code
        }), status_code

class ArticleAPI(BaseResourceAPI):
    """
    Artikel-API mit Vererbung von gemeinsamen Funktionalitäten.
    """
    
    def __init__(self):
        self.articles_db = [
            {"id": 1, "title": "Erste Nachricht", "content": "Inhalt...", "author_id": 1},
            {"id": 2, "title": "Zweite Nachricht", "content": "Mehr Inhalt...", "author_id": 2}
        ]
    
    def _check_permissions(self):
        """Artikel-spezifische Berechtigungen."""
        method = request.method
        
        # Jeder kann Artikel lesen
        if method == 'GET':
            return None
        
        # Nur Autoren können Artikel erstellen/bearbeiten
        user_role = self._get_user_role()
        if user_role not in ['author', 'admin']:
            return self._standard_error_response("Keine Berechtigung für diese Aktion", 403)
        
        return None
    
    def _get_user_role(self):
        """Ermittelt Benutzerrolle aus Token (vereinfacht)."""
        return "author"  # In echter Anwendung: aus Token extrahieren
    
    def get(self, article_id=None):
        """Artikel abrufen."""
        if article_id is None:
            return jsonify({"articles": self.articles_db})
        
        article = next((a for a in self.articles_db if a["id"] == article_id), None)
        if article is None:
            return self._standard_error_response("Artikel nicht gefunden", 404)
        
        return jsonify(article)
    
    def post(self):
        """Artikel erstellen."""
        data = request.get_json()
        
        if not data or not data.get('title') or not data.get('content'):
            return self._standard_error_response("Titel und Inhalt sind erforderlich")
        
        new_id = max([a["id"] for a in self.articles_db], default=0) + 1
        new_article = {
            "id": new_id,
            "title": data["title"],
            "content": data["content"],
            "author_id": 1,  # In echter Anwendung: aus Token
            "created_at": datetime.utcnow().isoformat()
        }
        
        self.articles_db.append(new_article)
        
        response = jsonify(new_article)
        response.status_code = 201
        return response

# Artikel-API registrieren
article_view = ArticleAPI.as_view('article_api')
app.add_url_rule('/api/articles', view_func=article_view, methods=['GET', 'POST'])
app.add_url_rule('/api/articles/<int:article_id>', view_func=article_view, methods=['GET', 'PUT', 'DELETE'])

9.3 Flask-RESTful und Flask-Restx

Während MethodView bereits eine Verbesserung gegenüber einfachen Dekoratoren darstellt, bieten spezialisierte Extensions wie Flask-RESTful und Flask-Restx zusätzliche Funktionalitäten für die API-Entwicklung. Diese Extensions verstehen die speziellen Anforderungen von REST-APIs und bieten Lösungen für häufige Probleme.

9.3.1 Flask-RESTful: Fokus auf Einfachheit

Flask-RESTful erweitert MethodView um API-spezifische Funktionalitäten wie automatische Content-Type-Verhandlung, Eingabevalidierung und standardisierte Fehlerbehandlung.

from flask import Flask
from flask_restful import Api, Resource, reqparse, fields, marshal_with
from datetime import datetime

app = Flask(__name__)
api = Api(app)

# Ausgabe-Serialisierung mit marshal_with
user_fields = {
    'id': fields.Integer,
    'name': fields.String,
    'email': fields.String,
    'created_at': fields.DateTime(dt_format='iso8601'),
    'updated_at': fields.DateTime(dt_format='iso8601')
}

user_list_fields = {
    'users': fields.List(fields.Nested(user_fields)),
    'total': fields.Integer,
    'page': fields.Integer,
    'per_page': fields.Integer
}

class UserResource(Resource):
    """
    Flask-RESTful Resource für Benutzer.
    Bietet automatische Serialisierung und Validierung.
    """
    
    def __init__(self):
        # Request-Parser für Eingabevalidierung
        self.parser = reqparse.RequestParser()
        self.parser.add_argument('name', type=str, required=True, help='Name ist erforderlich')
        self.parser.add_argument('email', type=str, required=True, help='E-Mail ist erforderlich')
        
        # Simulierte Datenbank
        self.users_db = [
            {
                "id": 1, 
                "name": "Max Mustermann", 
                "email": "max@beispiel.de",
                "created_at": datetime(2024, 1, 15),
                "updated_at": None
            },
            {
                "id": 2, 
                "name": "Anna Schmidt", 
                "email": "anna@beispiel.de",
                "created_at": datetime(2024, 1, 20),
                "updated_at": None
            }
        ]
    
    @marshal_with(user_fields)
    def get(self, user_id):
        """
        Einzelnen Benutzer abrufen.
        @marshal_with sorgt für automatische JSON-Serialisierung.
        """
        user = next((u for u in self.users_db if u["id"] == user_id), None)
        if user is None:
            api.abort(404, message="Benutzer nicht gefunden")
        return user
    
    @marshal_with(user_fields)
    def put(self, user_id):
        """
        Benutzer aktualisieren mit automatischer Validierung.
        """
        args = self.parser.parse_args()
        
        user_index = next((i for i, u in enumerate(self.users_db) if u["id"] == user_id), None)
        if user_index is None:
            api.abort(404, message="Benutzer nicht gefunden")
        
        # Benutzer aktualisieren
        self.users_db[user_index].update({
            'name': args['name'],
            'email': args['email'],
            'updated_at': datetime.utcnow()
        })
        
        return self.users_db[user_index]
    
    def delete(self, user_id):
        """
        Benutzer löschen.
        """
        user_index = next((i for i, u in enumerate(self.users_db) if u["id"] == user_id), None)
        if user_index is None:
            api.abort(404, message="Benutzer nicht gefunden")
        
        self.users_db.pop(user_index)
        return {'message': 'Benutzer gelöscht'}, 204

class UserListResource(Resource):
    """
    Resource für Benutzer-Listen-Operationen.
    Trennt Collection- von Item-Operationen.
    """
    
    def __init__(self):
        self.parser = reqparse.RequestParser()
        self.parser.add_argument('name', type=str, required=True)
        self.parser.add_argument('email', type=str, required=True)
        self.parser.add_argument('page', type=int, default=1)
        self.parser.add_argument('per_page', type=int, default=10)
        
        # Geteilte Referenz zur Datenbank
        self.users_db = UserResource().users_db
    
    @marshal_with(user_list_fields)
    def get(self):
        """
        Benutzer-Liste mit Paginierung.
        """
        args = self.parser.parse_args()
        page = args['page']
        per_page = min(args['per_page'], 100)  # Maximum 100 pro Seite
        
        start = (page - 1) * per_page
        end = start + per_page
        
        return {
            'users': self.users_db[start:end],
            'total': len(self.users_db),
            'page': page,
            'per_page': per_page
        }
    
    @marshal_with(user_fields)
    def post(self):
        """
        Neuen Benutzer erstellen.
        """
        args = self.parser.parse_args()
        
        # E-Mail-Eindeutigkeit prüfen
        if any(u['email'] == args['email'] for u in self.users_db):
            api.abort(400, message="E-Mail-Adresse bereits vorhanden")
        
        new_id = max([u["id"] for u in self.users_db], default=0) + 1
        new_user = {
            "id": new_id,
            "name": args['name'],
            "email": args['email'],
            "created_at": datetime.utcnow(),
            "updated_at": None
        }
        
        self.users_db.append(new_user)
        return new_user, 201

# Resources registrieren
api.add_resource(UserListResource, '/api/users')
api.add_resource(UserResource, '/api/users/<int:user_id>')

9.3.2 Flask-Restx: APIs mit automatischer Dokumentation

Flask-Restx (Nachfolger von Flask-RESTPlus) erweitert Flask-RESTful um automatische API-Dokumentation mit Swagger/OpenAPI und erweiterte Validierungsmöglichkeiten.

from flask import Flask
from flask_restx import Api, Resource, fields, Namespace
from datetime import datetime

app = Flask(__name__)

# Flask-Restx API mit Swagger-Dokumentation
api = Api(app, 
          version='1.0', 
          title='Benutzer-API',
          description='Eine API für Benutzerverwaltung mit automatischer Dokumentation',
          doc='/api/docs/')

# Namespace für Organisation der API
users_ns = Namespace('users', description='Benutzer-Operationen')
api.add_namespace(users_ns)

# Model-Definitionen für automatische Dokumentation
user_model = api.model('User', {
    'id': fields.Integer(readonly=True, description='Eindeutige Benutzer-ID'),
    'name': fields.String(required=True, description='Vollständiger Name des Benutzers'),
    'email': fields.String(required=True, description='E-Mail-Adresse'),
    'created_at': fields.DateTime(readonly=True, description='Erstellungsdatum'),
    'updated_at': fields.DateTime(readonly=True, description='Letzte Aktualisierung')
})

user_input_model = api.model('UserInput', {
    'name': fields.String(required=True, description='Vollständiger Name'),
    'email': fields.String(required=True, description='E-Mail-Adresse')
})

user_list_model = api.model('UserList', {
    'users': fields.List(fields.Nested(user_model)),
    'total': fields.Integer(description='Gesamtanzahl der Benutzer'),
    'page': fields.Integer(description='Aktuelle Seite'),
    'per_page': fields.Integer(description='Benutzer pro Seite')
})

# Simulierte Datenbank
users_database = [
    {
        "id": 1,
        "name": "Max Mustermann",
        "email": "max@beispiel.de",
        "created_at": datetime(2024, 1, 15),
        "updated_at": None
    }
]

@users_ns.route('/')
class UserList(Resource):
    """
    Benutzer-Collection-Operationen
    """
    
    @users_ns.doc('list_users')
    @users_ns.marshal_list_with(user_model)
    @users_ns.param('page', 'Seitennummer', type=int, default=1)
    @users_ns.param('per_page', 'Einträge pro Seite', type=int, default=10)
    def get(self):
        """
        Alle Benutzer abrufen
        
        Gibt eine paginierte Liste aller Benutzer zurück.
        Die Dokumentation wird automatisch aus diesem Docstring generiert.
        """
        # Parameter aus Query-String extrahieren
        page = int(users_ns.payload.get('page', 1)) if users_ns.payload else 1
        per_page = int(users_ns.payload.get('per_page', 10)) if users_ns.payload else 10
        
        start = (page - 1) * per_page
        end = start + per_page
        
        return users_database[start:end]
    
    @users_ns.doc('create_user')
    @users_ns.expect(user_input_model)
    @users_ns.marshal_with(user_model, code=201)
    @users_ns.response(400, 'Validierungsfehler')
    @users_ns.response(409, 'E-Mail bereits vorhanden')
    def post(self):
        """
        Neuen Benutzer erstellen
        
        Erstellt einen neuen Benutzer mit den bereitgestellten Daten.
        Die E-Mail-Adresse muss eindeutig sein.
        """
        data = users_ns.payload
        
        # Validierung
        if not data.get('name') or not data.get('email'):
            users_ns.abort(400, 'Name und E-Mail sind erforderlich')
        
        # E-Mail-Eindeutigkeit prüfen
        if any(u['email'] == data['email'] for u in users_database):
            users_ns.abort(409, 'E-Mail-Adresse bereits vorhanden')
        
        new_id = max([u["id"] for u in users_database], default=0) + 1
        new_user = {
            "id": new_id,
            "name": data['name'],
            "email": data['email'],
            "created_at": datetime.utcnow(),
            "updated_at": None
        }
        
        users_database.append(new_user)
        return new_user, 201

@users_ns.route('/<int:user_id>')
@users_ns.param('user_id', 'Eindeutige Benutzer-ID')
class User(Resource):
    """
    Einzelne Benutzer-Operationen
    """
    
    @users_ns.doc('get_user')
    @users_ns.marshal_with(user_model)
    @users_ns.response(404, 'Benutzer nicht gefunden')
    def get(self, user_id):
        """
        Spezifischen Benutzer abrufen
        
        Gibt die Details eines einzelnen Benutzers anhand der ID zurück.
        """
        user = next((u for u in users_database if u["id"] == user_id), None)
        if user is None:
            users_ns.abort(404, 'Benutzer nicht gefunden')
        return user
    
    @users_ns.doc('update_user')
    @users_ns.expect(user_input_model)
    @users_ns.marshal_with(user_model)
    @users_ns.response(404, 'Benutzer nicht gefunden')
    @users_ns.response(400, 'Validierungsfehler')
    def put(self, user_id):
        """
        Benutzer vollständig aktualisieren
        
        Ersetzt alle Daten eines Benutzers mit den bereitgestellten Werten.
        """
        data = users_ns.payload
        
        user_index = next((i for i, u in enumerate(users_database) if u["id"] == user_id), None)
        if user_index is None:
            users_ns.abort(404, 'Benutzer nicht gefunden')
        
        # Validierung
        if not data.get('name') or not data.get('email'):
            users_ns.abort(400, 'Name und E-Mail sind erforderlich')
        
        # E-Mail-Eindeutigkeit prüfen (außer für aktuellen Benutzer)
        if any(u['email'] == data['email'] and u['id'] != user_id for u in users_database):
            users_ns.abort(409, 'E-Mail-Adresse bereits vorhanden')
        
        users_database[user_index].update({
            'name': data['name'],
            'email': data['email'],
            'updated_at': datetime.utcnow()
        })
        
        return users_database[user_index]
    
    @users_ns.doc('delete_user')
    @users_ns.response(204, 'Benutzer erfolgreich gelöscht')
    @users_ns.response(404, 'Benutzer nicht gefunden')
    def delete(self, user_id):
        """
        Benutzer löschen
        
        Entfernt einen Benutzer dauerhaft aus dem System.
        """
        user_index = next((i for i, u in enumerate(users_database) if u["id"] == user_id), None)
        if user_index is None:
            users_ns.abort(404, 'Benutzer nicht gefunden')
        
        users_database.pop(user_index)
        return '', 204

9.4 Serialisierung mit Marshmallow

Serialisierung ist der Prozess der Konvertierung von Python-Objekten in JSON (oder andere Formate) für die API-Ausgabe. Deserialisierung ist der umgekehrte Prozess: JSON-Eingaben in Python-Objekte umwandeln. Marshmallow ist eine leistungsstarke Bibliothek, die beide Prozesse mit Validierung und Datenverarbeitung kombiniert.

9.4.1 Grundlagen der Marshmallow-Serialisierung

Marshmallow verwendet Schemas zur Definition der Datenstruktur und Validierungsregeln. Ein Schema beschreibt, wie Daten aussehen sollen und welche Transformationen angewendet werden.

from marshmallow import Schema, fields, validate, validates, validates_schema, ValidationError, post_load
from datetime import datetime
from typing import Dict, Any

class UserSchema(Schema):
    """
    Schema für Benutzer-Serialisierung und -Validierung.
    Definiert die Struktur und Regeln für Benutzer-Daten.
    """
    
    # Felder mit verschiedenen Validierungsregeln
    id = fields.Int(dump_only=True)  # Nur für Ausgabe, nicht für Eingabe
    name = fields.Str(required=True, validate=validate.Length(min=2, max=100))
    email = fields.Email(required=True)
    age = fields.Int(validate=validate.Range(min=18, max=120), allow_none=True)
    created_at = fields.DateTime(dump_only=True, format='iso')
    updated_at = fields.DateTime(dump_only=True, format='iso', allow_none=True)
    
    # Berechnete Felder
    full_name = fields.Method("get_full_name", dump_only=True)
    
    def get_full_name(self, obj):
        """
        Methode für berechnete Felder.
        Wird bei der Serialisierung aufgerufen.
        """
        return f"{obj.get('title', '')} {obj.get('name', '')}".strip()
    
    @validates('email')
    def validate_email_domain(self, value):
        """
        Benutzerdefinierte E-Mail-Validierung.
        Prüft erlaubte Domains für Geschäfts-E-Mails.
        """
        allowed_domains = ['firma.de', 'unternehmen.de', 'beispiel.de']
        domain = value.split('@')[1] if '@' in value else ''
        
        if domain not in allowed_domains:
            raise ValidationError('Nur Geschäfts-E-Mail-Adressen sind erlaubt')
    
    @validates_schema
    def validate_schema(self, data, **kwargs):
        """
        Schema-weite Validierung.
        Prüft Beziehungen zwischen verschiedenen Feldern.
        """
        # Beispiel: Name und E-Mail müssen konsistent sein
        name = data.get('name', '').lower()
        email = data.get('email', '').lower()
        
        if 'admin' in name and 'admin' not in email:
            raise ValidationError({
                'email': 'Admin-Benutzer müssen eine Admin-E-Mail-Adresse haben'
            })
    
    @post_load
    def make_user(self, data, **kwargs):
        """
        Post-Load-Hook: wird nach erfolgreicher Validierung aufgerufen.
        Kann Daten transformieren oder Objekte erstellen.
        """
        # Automatisch Erstellungsdatum hinzufügen
        data['created_at'] = datetime.utcnow()
        return data

# Schema für verschiedene Anwendungsfälle
class UserCreateSchema(UserSchema):
    """Schema für Benutzer-Erstellung (strengere Validierung)."""
    
    name = fields.Str(required=True, validate=validate.Length(min=2, max=100))
    email = fields.Email(required=True)
    password = fields.Str(required=True, validate=validate.Length(min=8), load_only=True)
    
    @validates('password')
    def validate_password_strength(self, value):
        """Passwort-Stärke validieren."""
        import re
        
        if not re.search(r'[A-Z]', value):
            raise ValidationError('Passwort muss mindestens einen Großbuchstaben enthalten')
        
        if not re.search(r'[a-z]', value):
            raise ValidationError('Passwort muss mindestens einen Kleinbuchstaben enthalten')
        
        if not re.search(r'\d', value):
            raise ValidationError('Passwort muss mindestens eine Zahl enthalten')

class UserUpdateSchema(UserSchema):
    """Schema für Benutzer-Updates (flexiblere Validierung)."""
    
    # Alle Felder optional für PATCH-Updates
    name = fields.Str(validate=validate.Length(min=2, max=100), allow_none=True)
    email = fields.Email(allow_none=True)

# Verschachtelte Schemas für komplexe Datenstrukturen
class AddressSchema(Schema):
    """Schema für Adressen."""
    
    street = fields.Str(required=True)
    city = fields.Str(required=True)
    postal_code = fields.Str(required=True, validate=validate.Regexp(r'^\d{5}$'))
    country = fields.Str(missing='DE', validate=validate.OneOf(['DE', 'AT', 'CH']))

class UserWithAddressSchema(UserSchema):
    """Erweiterte Benutzer-Schema mit Adresse."""
    
    address = fields.Nested(AddressSchema, allow_none=True)
    addresses = fields.List(fields.Nested(AddressSchema))  # Mehrere Adressen

# Integration in Flask-API
from flask import Flask, request, jsonify

app = Flask(__name__)

# Schema-Instanzen
user_schema = UserSchema()
users_schema = UserSchema(many=True)  # Für Listen
user_create_schema = UserCreateSchema()
user_update_schema = UserUpdateSchema()

# Simulierte Datenbank
users_data = [
    {
        "id": 1,
        "name": "Max Mustermann",
        "email": "max@firma.de",
        "age": 30,
        "created_at": datetime(2024, 1, 15),
        "updated_at": None
    }
]

@app.route('/api/users', methods=['GET'])
def get_users():
    """
    Benutzer-Liste mit Marshmallow-Serialisierung.
    """
    try:
        # Daten serialisieren
        result = users_schema.dump(users_data)
        return jsonify({
            "users": result,
            "total": len(result)
        })
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/users', methods=['POST'])
def create_user():
    """
    Benutzer erstellen mit Marshmallow-Validierung.
    """
    try:
        # JSON-Daten validieren und deserialisieren
        user_data = user_create_schema.load(request.json)
        
        # Neue ID generieren
        new_id = max([u["id"] for u in users_data], default=0) + 1
        user_data["id"] = new_id
        
        # Benutzer speichern
        users_data.append(user_data)
        
        # Antwort serialisieren
        result = user_schema.dump(user_data)
        return jsonify(result), 201
        
    except ValidationError as err:
        # Validierungsfehler zurückgeben
        return jsonify({"errors": err.messages}), 400
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
    """
    Benutzer mit Marshmallow-Validierung aktualisieren.
    """
    try:
        # Benutzer finden
        user_index = next((i for i, u in enumerate(users_data) if u["id"] == user_id), None)
        if user_index is None:
            return jsonify({"error": "Benutzer nicht gefunden"}), 404
        
        # Daten validieren
        updated_data = user_update_schema.load(request.json, partial=True)
        
        # Benutzer aktualisieren
        users_data[user_index].update(updated_data)
        users_data[user_index]["updated_at"] = datetime.utcnow()
        
        # Antwort serialisieren
        result = user_schema.dump(users_data[user_index])
        return jsonify(result)
        
    except ValidationError as err:
        return jsonify({"errors": err.messages}), 400
    except Exception as e:
        return jsonify({"error": str(e)}), 500

9.4.2 Erweiterte Marshmallow-Features

Marshmallow bietet mächtige Features für komplexe Serialisierungsszenarien, einschließlich bedingter Felder, Datentransformation und Beziehungsmanagement.

from marshmallow import fields, post_dump, pre_load
from datetime import datetime, timedelta

class FlexibleUserSchema(Schema):
    """
    Erweiterte Schema mit bedingten Feldern und Transformationen.
    """
    
    id = fields.Int(dump_only=True)
    name = fields.Str(required=True)
    email = fields.Email(required=True)
    role = fields.Str(validate=validate.OneOf(['user', 'admin', 'moderator']))
    
    # Bedingte Felder basierend auf Benutzerrolle
    admin_notes = fields.Str(dump_only=True)  # Nur für Admins sichtbar
    last_login = fields.DateTime(dump_only=True)
    
    # Berechnete Felder
    days_since_creation = fields.Method("calculate_days_since_creation", dump_only=True)
    is_recent_user = fields.Method("is_recent_user", dump_only=True)
    
    def calculate_days_since_creation(self, obj):
        """Berechnet Tage seit Benutzer-Erstellung."""
        created_at = obj.get('created_at')
        if created_at:
            delta = datetime.utcnow() - created_at
            return delta.days
        return None
    
    def is_recent_user(self, obj):
        """Prüft, ob Benutzer in den letzten 30 Tagen erstellt wurde."""
        days = self.calculate_days_since_creation(obj)
        return days is not None and days <= 30
    
    @pre_load
    def preprocess_data(self, data, **kwargs):
        """
        Pre-Load-Hook: Daten vor Validierung verarbeiten.
        Normalisiert Eingabedaten.
        """
        # E-Mail normalisieren
        if 'email' in data:
            data['email'] = data['email'].lower().strip()
        
        # Name normalisieren
        if 'name' in data:
            data['name'] = data['name'].strip().title()
        
        return data
    
    @post_dump
    def postprocess_output(self, data, **kwargs):
        """
        Post-Dump-Hook: Ausgabe nach Serialisierung anpassen.
        """
        # Admin-spezifische Felder nur für Admins
        current_user_role = getattr(self.context.get('current_user', {}), 'role', None)
        
        if current_user_role != 'admin':
            data.pop('admin_notes', None)
            data.pop('last_login', None)
        
        # Metadaten hinzufügen
        data['_metadata'] = {
            'serialized_at': datetime.utcnow().isoformat(),
            'version': 'v1'
        }
        
        return data

# Schema mit Beziehungen und Validierung
class ProjectSchema(Schema):
    """Schema für Projekte mit Benutzer-Beziehungen."""
    
    id = fields.Int(dump_only=True)
    name = fields.Str(required=True, validate=validate.Length(min=3, max=100))
    description = fields.Str()
    status = fields.Str(validate=validate.OneOf(['planning', 'active', 'completed', 'cancelled']))
    
    # Beziehungen zu anderen Objekten
    owner_id = fields.Int(required=True, load_only=True)
    owner = fields.Nested(UserSchema, dump_only=True)
    
    # Liste von Mitarbeitern
    team_member_ids = fields.List(fields.Int(), load_only=True)
    team_members = fields.List(fields.Nested(UserSchema), dump_only=True)
    
    # Termine
    start_date = fields.Date()
    end_date = fields.Date()
    
    @validates_schema
    def validate_dates(self, data, **kwargs):
        """Validiert Datum-Konsistenz."""
        start_date = data.get('start_date')
        end_date = data.get('end_date')
        
        if start_date and end_date and start_date > end_date:
            raise ValidationError({
                'end_date': 'Enddatum muss nach dem Startdatum liegen'
            })
    
    @post_load
    def resolve_relationships(self, data, **kwargs):
        """Löst Beziehungen zu anderen Objekten auf."""
        # Owner auflösen
        if 'owner_id' in data:
            owner = next((u for u in users_data if u['id'] == data['owner_id']), None)
            if owner:
                data['owner'] = owner
        
        # Team-Mitglieder auflösen
        if 'team_member_ids' in data:
            team_members = [u for u in users_data if u['id'] in data['team_member_ids']]
            data['team_members'] = team_members
        
        return data

# API-Endpoint mit erweiterten Features
@app.route('/api/projects', methods=['POST'])
def create_project():
    """
    Projekt erstellen mit Beziehungsvalidierung.
    """
    try:
        # Aktueller Benutzer aus Authentifizierung
        current_user = {"id": 1, "role": "admin"}  # Vereinfacht
        
        # Schema mit Kontext für bedingte Serialisierung
        schema = ProjectSchema()
        schema.context = {'current_user': current_user}
        
        # Daten validieren und verarbeiten
        project_data = schema.load(request.json)
        
        # ID generieren und speichern
        new_id = max([p.get("id", 0) for p in projects_data], default=0) + 1
        project_data["id"] = new_id
        project_data["created_at"] = datetime.utcnow()
        
        projects_data.append(project_data)
        
        # Antwort mit vollständigen Beziehungen serialisieren
        result = schema.dump(project_data)
        return jsonify(result), 201
        
    except ValidationError as err:
        return jsonify({"errors": err.messages}), 400
    except Exception as e:
        return jsonify({"error": str(e)}), 500

# Simulierte Projekt-Datenbank
projects_data = []

9.5 API-Dokumentation

API-Dokumentation ist essentiell für die Verwendbarkeit einer API. Entwickler müssen verstehen, welche Endpoints verfügbar sind, welche Parameter erwartet werden und welche Antworten sie erhalten. Moderne APIs verwenden standardisierte Dokumentationsformate wie OpenAPI (früher Swagger), die sowohl von Menschen als auch von Tools gelesen werden können.

9.5.1 OpenAPI/Swagger-Integration

OpenAPI ist ein Standard zur Beschreibung von REST-APIs. Er definiert eine maschinenlesbare Spezifikation, die automatisch interaktive Dokumentation, Code-Generierung und Tests ermöglicht.

from flask import Flask
from flask_restx import Api, Resource, fields
from datetime import datetime

# Flask-App mit umfassender API-Dokumentation
app = Flask(__name__)

api = Api(
    app,
    version='1.0',
    title='Unternehmens-API',
    description='''
    Eine umfassende API für die Verwaltung von Unternehmensdaten.
    
    ## Authentifizierung
    Alle Endpoints (außer der Dokumentation) erfordern einen gültigen API-Token 
    im Authorization-Header: `Bearer <token>`
    
    ## Fehlercodes
    - 400: Ungültige Anfrage oder Validierungsfehler
    - 401: Authentifizierung erforderlich
    - 403: Keine Berechtigung für diese Aktion
    - 404: Ressource nicht gefunden
    - 409: Konflikt (z.B. E-Mail bereits vorhanden)
    - 500: Interner Serverfehler
    
    ## Versionierung
    Die API verwendet semantische Versionierung. Aktuelle Version: v1.0
    ''',
    doc='/api/docs/',  # Swagger-UI verfügbar unter dieser URL
    contact='api-support@firma.de',
    license='Proprietär',
    authorizations={
        'Bearer': {
            'type': 'apiKey',
            'in': 'header',
            'name': 'Authorization',
            'description': 'JWT Token im Format: Bearer <token>'
        }
    },
    security='Bearer'
)

# Namespaces für Organisation
users_ns = api.namespace('users', description='Benutzerverwaltung')
projects_ns = api.namespace('projects', description='Projektverwaltung')

# Detaillierte Model-Definitionen
user_model = api.model('User', {
    'id': fields.Integer(
        readonly=True, 
        description='Eindeutige Benutzer-ID',
        example=123
    ),
    'name': fields.String(
        required=True, 
        description='Vollständiger Name des Benutzers',
        example='Max Mustermann',
        min_length=2,
        max_length=100
    ),
    'email': fields.String(
        required=True, 
        description='E-Mail-Adresse (muss eindeutig sein)',
        example='max.mustermann@firma.de'
    ),
    'role': fields.String(
        description='Benutzerrolle im System',
        enum=['user', 'admin', 'moderator'],
        example='user'
    ),
    'department': fields.String(
        description='Abteilung des Benutzers',
        example='IT'
    ),
    'created_at': fields.DateTime(
        readonly=True,
        description='Zeitpunkt der Benutzer-Erstellung',
        example='2024-01-15T10:30:00Z'
    ),
    'last_login': fields.DateTime(
        readonly=True,
        description='Zeitpunkt der letzten Anmeldung',
        example='2024-03-10T14:25:00Z'
    ),
    'is_active': fields.Boolean(
        description='Ob der Benutzer aktiv ist',
        example=True
    )
})

user_input_model = api.model('UserInput', {
    'name': fields.String(
        required=True,
        description='Vollständiger Name',
        example='Anna Schmidt'
    ),
    'email': fields.String(
        required=True,
        description='E-Mail-Adresse',
        example='anna.schmidt@firma.de'
    ),
    'role': fields.String(
        description='Benutzerrolle',
        enum=['user', 'admin', 'moderator'],
        default='user'
    ),
    'department': fields.String(
        description='Abteilung',
        example='Marketing'
    )
})

# Pagination-Model
pagination_model = api.model('Pagination', {
    'page': fields.Integer(description='Aktuelle Seite', example=1),
    'per_page': fields.Integer(description='Einträge pro Seite', example=10),
    'total': fields.Integer(description='Gesamtanzahl der Einträge', example=150),
    'pages': fields.Integer(description='Gesamtanzahl der Seiten', example=15)
})

user_list_model = api.model('UserList', {
    'users': fields.List(fields.Nested(user_model)),
    'pagination': fields.Nested(pagination_model)
})

# Error-Models für konsistente Fehlerbehandlung
error_model = api.model('Error', {
    'error': fields.String(description='Fehlermeldung', example='Benutzer nicht gefunden'),
    'code': fields.String(description='Fehlercode', example='USER_NOT_FOUND'),
    'timestamp': fields.DateTime(description='Zeitpunkt des Fehlers'),
    'request_id': fields.String(description='Eindeutige Request-ID für Debugging')
})

validation_error_model = api.model('ValidationError', {
    'errors': fields.Raw(
        description='Validierungsfehler pro Feld',
        example={
            'email': ['Ungültige E-Mail-Adresse'],
            'name': ['Name ist erforderlich']
        }
    ),
    'message': fields.String(
        description='Allgemeine Fehlermeldung',
        example='Validierung fehlgeschlagen'
    )
})

@users_ns.route('/')
class UserList(Resource):
    
    @users_ns.doc('list_users')
    @users_ns.marshal_with(user_list_model)
    @users_ns.param('page', 'Seitennummer (beginnend bei 1)', type=int, default=1)
    @users_ns.param('per_page', 'Anzahl Einträge pro Seite (max 100)', type=int, default=10)
    @users_ns.param('search', 'Suchbegriff für Name oder E-Mail', type=str)
    @users_ns.param('department', 'Filter nach Abteilung', type=str)
    @users_ns.param('role', 'Filter nach Rolle', type=str, enum=['user', 'admin', 'moderator'])
    @users_ns.param('is_active', 'Filter nach Aktiv-Status', type=bool)
    @users_ns.response(200, 'Erfolg')
    @users_ns.response(401, 'Authentifizierung erforderlich', error_model)
    def get(self):
        """
        Benutzer-Liste abrufen
        
        Gibt eine paginierte Liste aller Benutzer zurück. Unterstützt verschiedene 
        Filter- und Suchoptionen für eine gezielte Abfrage.
        
        ### Suchfunktionalität
        Der `search`-Parameter durchsucht Name und E-Mail-Adresse mit partieller Übereinstimmung.
        
        ### Filteroptionen
        - `department`: Exakte Übereinstimmung mit Abteilungsname
        - `role`: Filterung nach Benutzerrolle
        - `is_active`: Nur aktive (true) oder inaktive (false) Benutzer
        
        ### Paginierung
        - Standard: 10 Einträge pro Seite
        - Maximum: 100 Einträge pro Seite
        - Links zu vorheriger/nächster Seite in Response-Headers
        """
        # Implementierung würde hier stehen
        pass
    
    @users_ns.doc('create_user')
    @users_ns.expect(user_input_model, validate=True)
    @users_ns.marshal_with(user_model, code=201, description='Benutzer erfolgreich erstellt')
    @users_ns.response(400, 'Validierungsfehler', validation_error_model)
    @users_ns.response(409, 'E-Mail bereits vorhanden', error_model)
    def post(self):
        """
        Neuen Benutzer erstellen
        
        Erstellt einen neuen Benutzer mit den bereitgestellten Daten.
        
        ### Validierungsregeln
        - **Name**: 2-100 Zeichen, erforderlich
        - **E-Mail**: Gültige E-Mail-Adresse, muss eindeutig sein
        - **Rolle**: Optional, Standard ist 'user'
        - **Abteilung**: Optional
        
        ### Automatische Felder
        - `id`: Wird automatisch generiert
        - `created_at`: Aktueller Zeitstempel
        - `is_active`: Standard ist `true`
        
        ### Benachrichtigungen
        Nach erfolgreicher Erstellung wird eine Willkommens-E-Mail an den Benutzer gesendet.
        """
        # Implementierung würde hier stehen
        pass

@users_ns.route('/<int:user_id>')
@users_ns.param('user_id', 'Eindeutige Benutzer-ID')
class User(Resource):
    
    @users_ns.doc('get_user')
    @users_ns.marshal_with(user_model)
    @users_ns.response(404, 'Benutzer nicht gefunden', error_model)
    def get(self, user_id):
        """
        Einzelnen Benutzer abrufen
        
        Gibt die vollständigen Details eines Benutzers anhand der ID zurück.
        
        ### Berechtigungen
        - Alle authentifizierten Benutzer können ihre eigenen Daten abrufen
        - Administratoren können alle Benutzerdaten abrufen
        - Moderatoren können Benutzerdaten ihrer Abteilung abrufen
        """
        pass
    
    @users_ns.doc('update_user')
    @users_ns.expect(user_input_model, validate=True)
    @users_ns.marshal_with(user_model)
    @users_ns.response(404, 'Benutzer nicht gefunden', error_model)
    @users_ns.response(400, 'Validierungsfehler', validation_error_model)
    def put(self, user_id):
        """
        Benutzer vollständig aktualisieren
        
        Ersetzt alle veränderbaren Felder eines Benutzers mit den bereitgestellten Werten.
        
        ### Unveränderliche Felder
        - `id`: Kann nicht geändert werden
        - `created_at`: Bleibt unverändert
        - `last_login`: Wird vom System verwaltet
        
        ### Berechtigungen
        - Benutzer können ihre eigenen Daten bearbeiten (außer `role`)
        - Administratoren können alle Felder aller Benutzer bearbeiten
        """
        pass
    
    @users_ns.doc('delete_user')
    @users_ns.response(204, 'Benutzer erfolgreich gelöscht')
    @users_ns.response(404, 'Benutzer nicht gefunden', error_model)
    @users_ns.response(403, 'Keine Berechtigung', error_model)
    def delete(self, user_id):
        """
        Benutzer löschen
        
        Entfernt einen Benutzer dauerhaft aus dem System.
        
        ### Vorsichtsmaßnahmen
        - Löschung ist irreversibel
        - Zugehörige Daten (Projekte, Kommentare) bleiben erhalten
        - Admin-Benutzer können nicht gelöscht werden, wenn sie der letzte Admin sind
        
        ### Berechtigungen
        Nur Administratoren können Benutzer löschen.
        """
        pass

9.6 Content Negotiation

Content Negotiation ermöglicht es APIs, verschiedene Datenformate zu unterstützen und automatisch das beste Format für jeden Client auszuwählen. Dies ist besonders wichtig in heterogenen Umgebungen, wo verschiedene Clients unterschiedliche Anforderungen haben.

9.6.1 Accept-Header und Mime-Types

HTTP-Clients verwenden den Accept-Header, um ihre bevorzugten Datenformate anzugeben. Server können diese Information nutzen, um die Antwort entsprechend zu formatieren.

from flask import Flask, request, jsonify, make_response
import json
import xml.etree.ElementTree as ET
from datetime import datetime

app = Flask(__name__)

class ContentNegotiator:
    """
    Klasse für Content Negotiation basierend auf Accept-Header.
    Unterstützt automatische Format-Auswahl und -Konvertierung.
    """
    
    SUPPORTED_FORMATS = {
        'application/json': 'json',
        'application/xml': 'xml',
        'text/xml': 'xml',
        'text/csv': 'csv',
        'text/plain': 'text'
    }
    
    def __init__(self, default_format='json'):
        self.default_format = default_format
    
    def get_best_format(self, accept_header):
        """
        Ermittelt das beste Format basierend auf Accept-Header.
        Berücksichtigt Quality-Values (q-parameter).
        """
        if not accept_header:
            return self.default_format
        
        # Accept-Header parsen
        accepted_types = []
        for media_range in accept_header.split(','):
            media_range = media_range.strip()
            
            # Quality-Value extrahieren (default: 1.0)
            if ';q=' in media_range:
                media_type, quality = media_range.split(';q=', 1)
                quality = float(quality.split(';')[0])  # Weitere Parameter ignorieren
            else:
                media_type, quality = media_range, 1.0
            
            media_type = media_type.strip()
            
            if media_type in self.SUPPORTED_FORMATS:
                accepted_types.append((media_type, quality))
        
        if not accepted_types:
            return self.default_format
        
        # Nach Quality-Value sortieren (höchste zuerst)
        accepted_types.sort(key=lambda x: x[1], reverse=True)
        
        return self.SUPPORTED_FORMATS[accepted_types[0][0]]
    
    def serialize_data(self, data, format_type):
        """
        Serialisiert Daten in das gewünschte Format.
        """
        if format_type == 'json':
            return self._to_json(data)
        elif format_type == 'xml':
            return self._to_xml(data)
        elif format_type == 'csv':
            return self._to_csv(data)
        elif format_type == 'text':
            return self._to_text(data)
        else:
            raise ValueError(f"Unsupported format: {format_type}")
    
    def _to_json(self, data):
        """Konvertiert Daten zu JSON."""
        def json_serializer(obj):
            if isinstance(obj, datetime):
                return obj.isoformat()
            raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
        
        return json.dumps(data, indent=2, default=json_serializer, ensure_ascii=False)
    
    def _to_xml(self, data):
        """Konvertiert Daten zu XML."""
        def dict_to_xml(d, root_name='data'):
            root = ET.Element(root_name)
            
            def add_element(parent, key, value):
                if isinstance(value, dict):
                    child = ET.SubElement(parent, key)
                    for k, v in value.items():
                        add_element(child, k, v)
                elif isinstance(value, list):
                    for item in value:
                        if isinstance(item, dict):
                            child = ET.SubElement(parent, key)
                            for k, v in item.items():
                                add_element(child, k, v)
                        else:
                            child = ET.SubElement(parent, key)
                            child.text = str(item)
                else:
                    child = ET.SubElement(parent, key)
                    child.text = str(value) if value is not None else ''
            
            for key, value in d.items():
                add_element(root, key, value)
            
            return ET.tostring(root, encoding='unicode')
        
        if isinstance(data, dict):
            return dict_to_xml(data)
        elif isinstance(data, list):
            return dict_to_xml({'items': data})
        else:
            return dict_to_xml({'value': data})
    
    def _to_csv(self, data):
        """Konvertiert Daten zu CSV (nur für Listen von Dictionaries)."""
        if not isinstance(data, list) or not data:
            return ""
        
        if not isinstance(data[0], dict):
            return "\n".join(str(item) for item in data)
        
        # Header erstellen
        headers = list(data[0].keys())
        csv_lines = [",".join(headers)]
        
        # Datenzeilen
        for item in data:
            row = []
            for header in headers:
                value = item.get(header, '')
                # CSV-Escaping für Kommas und Anführungszeichen
                if ',' in str(value) or '"' in str(value):
                    value = f'"{str(value).replace('"', '""')}"'
                row.append(str(value))
            csv_lines.append(",".join(row))
        
        return "\n".join(csv_lines)
    
    def _to_text(self, data):
        """Konvertiert Daten zu Plain Text."""
        def format_item(item, indent=0):
            prefix = "  " * indent
            if isinstance(item, dict):
                lines = []
                for key, value in item.items():
                    if isinstance(value, (dict, list)):
                        lines.append(f"{prefix}{key}:")
                        lines.append(format_item(value, indent + 1))
                    else:
                        lines.append(f"{prefix}{key}: {value}")
                return "\n".join(lines)
            elif isinstance(item, list):
                lines = []
                for i, value in enumerate(item):
                    lines.append(f"{prefix}- {format_item(value, indent)}")
                return "\n".join(lines)
            else:
                return f"{prefix}{item}"
        
        return format_item(data)

# Content Negotiator instanziieren
negotiator = ContentNegotiator()

def create_response(data, status_code=200):
    """
    Erstellt Response mit automatischer Content Negotiation.
    """
    accept_header = request.headers.get('Accept', 'application/json')
    format_type = negotiator.get_best_format(accept_header)
    
    try:
        content = negotiator.serialize_data(data, format_type)
        
        # Content-Type basierend auf Format setzen
        content_types = {
            'json': 'application/json; charset=utf-8',
            'xml': 'application/xml; charset=utf-8',
            'csv': 'text/csv; charset=utf-8',
            'text': 'text/plain; charset=utf-8'
        }
        
        response = make_response(content, status_code)
        response.headers['Content-Type'] = content_types[format_type]
        
        # Zusätzliche Headers für bessere API-Erfahrung
        response.headers['Vary'] = 'Accept'  # Caching-Hinweis
        response.headers['X-Content-Format'] = format_type
        
        return response
        
    except Exception as e:
        # Fallback auf JSON bei Serialisierungsfehlern
        error_data = {
            "error": "Serialization failed",
            "details": str(e),
            "requested_format": format_type
        }
        response = make_response(json.dumps(error_data), 500)
        response.headers['Content-Type'] = 'application/json'
        return response

# API-Endpoints mit Content Negotiation
@app.route('/api/users')
def get_users_negotiated():
    """
    Benutzer-Liste mit automatischer Format-Verhandlung.
    
    Unterstützte Formate:
    - application/json (Standard)
    - application/xml
    - text/csv
    - text/plain
    """
    users_data = [
        {
            "id": 1,
            "name": "Max Mustermann",
            "email": "max@firma.de",
            "department": "IT",
            "created_at": datetime(2024, 1, 15)
        },
        {
            "id": 2,
            "name": "Anna Schmidt",
            "email": "anna@firma.de",
            "department": "Marketing",
            "created_at": datetime(2024, 1, 20)
        }
    ]
    
    response_data = {
        "users": users_data,
        "total": len(users_data),
        "timestamp": datetime.utcnow()
    }
    
    return create_response(response_data)

@app.route('/api/users/<int:user_id>')
def get_user_negotiated(user_id):
    """Einzelner Benutzer mit Content Negotiation."""
    # Simulierte Datenbankabfrage
    user_data = {
        "id": user_id,
        "name": "Max Mustermann",
        "email": "max@firma.de",
        "department": "IT",
        "role": "user",
        "created_at": datetime(2024, 1, 15),
        "last_login": datetime(2024, 3, 10, 14, 30)
    }
    
    return create_response(user_data)

# Endpoint für verfügbare Formate
@app.route('/api/formats')
def get_supported_formats():
    """
    Gibt unterstützte Content-Types zurück.
    """
    formats_info = {
        "supported_formats": list(negotiator.SUPPORTED_FORMATS.keys()),
        "default_format": f"application/{negotiator.default_format}",
        "description": "Use Accept header to specify preferred format",
        "examples": {
            "json": "Accept: application/json",
            "xml": "Accept: application/xml",
            "csv": "Accept: text/csv",
            "text": "Accept: text/plain"
        }
    }
    
    return create_response(formats_info)

# Middleware für automatische Content Negotiation
@app.before_request
def log_content_negotiation():
    """Logging für Content Negotiation debugging."""
    accept_header = request.headers.get('Accept')
    if accept_header:
        preferred_format = negotiator.get_best_format(accept_header)
        app.logger.info(f"Request to {request.path} - Accept: {accept_header} - Format: {preferred_format}")

if __name__ == '__main__':
    app.run(debug=True)

Diese umfassende Behandlung von RESTful APIs zeigt, wie Flask von einfachen Dekoratoren zu ausgefeilten API-Frameworks skaliert werden kann. Die verschiedenen Ansätze - von grundlegenden Routes über MethodView bis hin zu Flask-RESTful und Flask-Restx - bieten jeweils spezifische Vorteile je nach Komplexität und Anforderungen der API. Marshmallow bringt robuste Serialisierung und Validierung, während automatische Dokumentation und Content Negotiation die Benutzerfreundlichkeit und Integration der API erheblich verbessern.