REST (Representational State Transfer) ist ein Architekturstil für die Entwicklung von Web-Services, der auf den Grundprinzipien des World Wide Web basiert. Um REST wirklich zu verstehen, müssen wir zunächst die fundamentalen Konzepte betrachten, die diesen Ansatz von anderen API-Designs unterscheiden.
Das wichtigste Prinzip von REST ist die Zustandslosigkeit. Jede Anfrage an eine REST-API muss alle Informationen enthalten, die der Server zur Verarbeitung benötigt. Der Server speichert keinen Kontext zwischen verschiedenen Anfragen desselben Clients. Dies unterscheidet REST fundamental von traditionellen Web-Anwendungen, die Session-Zustand verwenden.
Ein zweites zentrales Konzept ist die Ressourcen-orientierte
Architektur. In REST repräsentiert jede URL eine Ressource. Anstatt
Aktionen in URLs zu kodieren wie /getUserById?id=123,
verwenden REST-APIs URLs wie /users/123, die direkt auf die
Ressource verweisen. Die gewünschte Aktion wird durch die HTTP-Methode
bestimmt: GET zum Lesen, POST zum Erstellen, PUT zum Aktualisieren,
DELETE zum Löschen.
Die HTTP-Methoden bilden das Vokabular von REST-APIs. Jede Methode hat eine spezifische Semantik und erwartete Seiteneffekte.
from flask import Flask, request, jsonify
from datetime import datetime
app = Flask(__name__)
# Simulierte Datenbank
users_db = [
{"id": 1, "name": "Max Mustermann", "email": "max@beispiel.de", "created_at": "2024-01-15"},
{"id": 2, "name": "Anna Schmidt", "email": "anna@beispiel.de", "created_at": "2024-01-20"}
]
# GET - Ressourcen abrufen (idempotent, keine Seiteneffekte)
@app.route('/api/users', methods=['GET'])
def get_users():
"""
Gibt alle Benutzer zurück. GET-Requests sollten niemals Daten ändern.
Idempotenz bedeutet: mehrfache Ausführung hat dasselbe Ergebnis.
"""
return jsonify({
"users": users_db,
"total": len(users_db),
"timestamp": datetime.utcnow().isoformat()
})
@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
"""
Gibt einen spezifischen Benutzer zurück.
Verwendet HTTP-Statuscodes zur Kommunikation des Ergebnisses.
"""
user = next((u for u in users_db if u["id"] == user_id), None)
if user is None:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
return jsonify(user)
# POST - Neue Ressourcen erstellen (nicht idempotent)
@app.route('/api/users', methods=['POST'])
def create_user():
"""
Erstellt einen neuen Benutzer. POST ist nicht idempotent:
jede Ausführung erstellt eine neue Ressource.
"""
data = request.get_json()
# Validierung der Eingabedaten
if not data or 'name' not in data or 'email' not in data:
return jsonify({
"error": "Name und E-Mail sind erforderlich",
"required_fields": ["name", "email"]
}), 400
# Neue ID generieren (in echter Anwendung: Datenbank-Auto-Increment)
new_id = max([u["id"] for u in users_db], default=0) + 1
new_user = {
"id": new_id,
"name": data["name"],
"email": data["email"],
"created_at": datetime.utcnow().isoformat()
}
users_db.append(new_user)
# 201 Created mit Location-Header zur neuen Ressource
response = jsonify(new_user)
response.status_code = 201
response.headers['Location'] = f'/api/users/{new_id}'
return response
# PUT - Ressourcen vollständig ersetzen (idempotent)
@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
"""
Ersetzt einen Benutzer vollständig. PUT ist idempotent:
mehrfache Ausführung mit denselben Daten hat dasselbe Ergebnis.
"""
data = request.get_json()
if not data or 'name' not in data or 'email' not in data:
return jsonify({"error": "Name und E-Mail sind erforderlich"}), 400
user_index = next((i for i, u in enumerate(users_db) if u["id"] == user_id), None)
if user_index is None:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Vollständige Ersetzung (PUT-Semantik)
users_db[user_index] = {
"id": user_id,
"name": data["name"],
"email": data["email"],
"created_at": users_db[user_index]["created_at"], # Erstellungsdatum beibehalten
"updated_at": datetime.utcnow().isoformat()
}
return jsonify(users_db[user_index])
# PATCH - Ressourcen teilweise aktualisieren (idempotent)
@app.route('/api/users/<int:user_id>', methods=['PATCH'])
def patch_user(user_id):
"""
Aktualisiert Teile eines Benutzers. PATCH erlaubt partielle Updates
und ist ebenfalls idempotent.
"""
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten zum Aktualisieren"}), 400
user_index = next((i for i, u in enumerate(users_db) if u["id"] == user_id), None)
if user_index is None:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Nur bereitgestellte Felder aktualisieren
user = users_db[user_index]
if 'name' in data:
user['name'] = data['name']
if 'email' in data:
user['email'] = data['email']
user['updated_at'] = datetime.utcnow().isoformat()
return jsonify(user)
# DELETE - Ressourcen entfernen (idempotent)
@app.route('/api/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
"""
Löscht einen Benutzer. DELETE ist idempotent:
mehrfache Löschung derselben Ressource hat dasselbe Ergebnis.
"""
user_index = next((i for i, u in enumerate(users_db) if u["id"] == user_id), None)
if user_index is None:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
deleted_user = users_db.pop(user_index)
# 204 No Content - erfolgreiche Löschung ohne Antwortinhalt
return '', 204Während die Dekorator-Syntax mit @app.route für einfache
APIs ausreicht, bietet die MethodView-Klasse einen objektorientierten
Ansatz, der bei komplexeren APIs Vorteile bringt. Verstehen wir
zunächst, warum diese Alternative existiert und wann sie sinnvoll
ist.
Der traditionelle Dekorator-Ansatz funktioniert gut für einfache APIs, zeigt aber bei komplexeren Szenarien Schwächen. Wenn eine Ressource mehrere HTTP-Methoden unterstützt, führt dies zu Code-Duplikation und erschwert die Wartung.
# Traditioneller Ansatz: eine Funktion pro Methode
@app.route('/api/articles', methods=['GET'])
def get_articles():
# GET-Logik für Artikel-Liste
pass
@app.route('/api/articles', methods=['POST'])
def create_article():
# POST-Logik für Artikel-Erstellung
pass
@app.route('/api/articles/<int:article_id>', methods=['GET'])
def get_article(article_id):
# GET-Logik für einzelnen Artikel
pass
@app.route('/api/articles/<int:article_id>', methods=['PUT'])
def update_article(article_id):
# PUT-Logik für Artikel-Update
pass
@app.route('/api/articles/<int:article_id>', methods=['DELETE'])
def delete_article(article_id):
# DELETE-Logik für Artikel-Löschung
passDieser Ansatz führt zu mehreren Problemen: Code für gemeinsame Funktionalitäten (wie Authentifizierung oder Validierung) muss in jeder Funktion wiederholt werden. Die Logik für eine Ressource ist über mehrere Funktionen verteilt. Gemeinsame Hilfsmethoden sind schwer zu organisieren.
MethodView löst diese Probleme durch Gruppierung der HTTP-Methoden in einer Klasse. Jede HTTP-Methode wird als Klassenmethode implementiert.
from flask.views import MethodView
from flask import request, jsonify
class UserAPI(MethodView):
"""
MethodView für Benutzer-Ressourcen.
Jede HTTP-Methode wird als separate Klassenmethode implementiert.
"""
def __init__(self):
# Gemeinsame Initialisierung für alle Methoden
self.users_db = [
{"id": 1, "name": "Max Mustermann", "email": "max@beispiel.de"},
{"id": 2, "name": "Anna Schmidt", "email": "anna@beispiel.de"}
]
def get(self, user_id=None):
"""
GET-Methode: Benutzer abrufen.
Behandelt sowohl Einzelabfragen (/users/1) als auch Listen (/users).
"""
if user_id is None:
# Liste aller Benutzer
return jsonify({
"users": self.users_db,
"total": len(self.users_db)
})
# Einzelner Benutzer
user = self._find_user(user_id)
if user is None:
return self._not_found_response()
return jsonify(user)
def post(self):
"""
POST-Methode: Neuen Benutzer erstellen.
Nur für Collection-Endpoints (/users), nicht für spezifische Ressourcen.
"""
data = request.get_json()
# Validierung mit Hilfsmethode
validation_errors = self._validate_user_data(data)
if validation_errors:
return jsonify({"errors": validation_errors}), 400
new_user = self._create_user(data)
response = jsonify(new_user)
response.status_code = 201
response.headers['Location'] = f'/api/users/{new_user["id"]}'
return response
def put(self, user_id):
"""
PUT-Methode: Benutzer vollständig ersetzen.
Benötigt eine spezifische Benutzer-ID.
"""
if user_id is None:
return jsonify({"error": "Benutzer-ID erforderlich"}), 400
data = request.get_json()
validation_errors = self._validate_user_data(data, required_all=True)
if validation_errors:
return jsonify({"errors": validation_errors}), 400
user = self._find_user(user_id)
if user is None:
return self._not_found_response()
updated_user = self._update_user(user_id, data, full_replace=True)
return jsonify(updated_user)
def patch(self, user_id):
"""
PATCH-Methode: Benutzer teilweise aktualisieren.
Erlaubt partielle Updates.
"""
if user_id is None:
return jsonify({"error": "Benutzer-ID erforderlich"}), 400
data = request.get_json()
if not data:
return jsonify({"error": "Keine Daten bereitgestellt"}), 400
user = self._find_user(user_id)
if user is None:
return self._not_found_response()
updated_user = self._update_user(user_id, data, full_replace=False)
return jsonify(updated_user)
def delete(self, user_id):
"""
DELETE-Methode: Benutzer löschen.
"""
if user_id is None:
return jsonify({"error": "Benutzer-ID erforderlich"}), 400
user_index = next((i for i, u in enumerate(self.users_db) if u["id"] == user_id), None)
if user_index is None:
return self._not_found_response()
self.users_db.pop(user_index)
return '', 204
# Hilfsmethoden - ein großer Vorteil von MethodView
def _find_user(self, user_id):
"""Findet einen Benutzer nach ID."""
return next((u for u in self.users_db if u["id"] == user_id), None)
def _validate_user_data(self, data, required_all=False):
"""Validiert Benutzerdaten."""
errors = []
if not data:
return ["Keine Daten bereitgestellt"]
if required_all or 'name' in data:
if not data.get('name', '').strip():
errors.append("Name ist erforderlich")
if required_all or 'email' in data:
email = data.get('email', '').strip()
if not email:
errors.append("E-Mail ist erforderlich")
elif '@' not in email:
errors.append("Ungültige E-Mail-Adresse")
return errors
def _create_user(self, data):
"""Erstellt einen neuen Benutzer."""
new_id = max([u["id"] for u in self.users_db], default=0) + 1
new_user = {
"id": new_id,
"name": data["name"],
"email": data["email"],
"created_at": datetime.utcnow().isoformat()
}
self.users_db.append(new_user)
return new_user
def _update_user(self, user_id, data, full_replace=False):
"""Aktualisiert einen Benutzer."""
user_index = next((i for i, u in enumerate(self.users_db) if u["id"] == user_id), None)
user = self.users_db[user_index]
if full_replace:
# PUT: Vollständige Ersetzung
self.users_db[user_index] = {
"id": user_id,
"name": data["name"],
"email": data["email"],
"created_at": user.get("created_at"),
"updated_at": datetime.utcnow().isoformat()
}
else:
# PATCH: Partielle Aktualisierung
if 'name' in data:
user['name'] = data['name']
if 'email' in data:
user['email'] = data['email']
user['updated_at'] = datetime.utcnow().isoformat()
return self.users_db[user_index]
def _not_found_response(self):
"""Standardisierte 404-Antwort."""
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# MethodView registrieren
user_view = UserAPI.as_view('user_api')
# Routen für Collection (alle Benutzer)
app.add_url_rule('/api/users', view_func=user_view, methods=['GET', 'POST'])
# Routen für spezifische Ressourcen (einzelne Benutzer)
app.add_url_rule('/api/users/<int:user_id>', view_func=user_view, methods=['GET', 'PUT', 'PATCH', 'DELETE'])MethodView ermöglicht ausgefeilte Patterns für API-Design, einschließlich Vererbung und Middleware-ähnlicher Funktionalität.
class BaseResourceAPI(MethodView):
"""
Basis-Klasse für alle Ressourcen-APIs.
Implementiert gemeinsame Funktionalitäten.
"""
def dispatch_request(self, *args, **kwargs):
"""
Wird vor jeder HTTP-Methode aufgerufen.
Ähnlich wie Middleware - perfekt für Authentifizierung.
"""
# Authentifizierung prüfen
auth_result = self._check_authentication()
if auth_result is not None:
return auth_result
# Autorisierung prüfen
permission_result = self._check_permissions()
if permission_result is not None:
return permission_result
# Normale Verarbeitung fortsetzen
return super().dispatch_request(*args, **kwargs)
def _check_authentication(self):
"""Prüft Authentifizierung für alle Methoden."""
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({"error": "Authentifizierung erforderlich"}), 401
token = auth_header.split(' ')[1]
if not self._validate_token(token):
return jsonify({"error": "Ungültiger Token"}), 401
return None # Authentifizierung erfolgreich
def _check_permissions(self):
"""Prüft spezifische Berechtigungen (überschreibbar)."""
return None # Standard: alle authentifizierten Benutzer haben Zugriff
def _validate_token(self, token):
"""Validiert Authentifizierungs-Token."""
# Vereinfachte Token-Validierung
return token == "valid-token-123"
def _standard_error_response(self, message, status_code=400):
"""Standardisierte Fehlerantworten."""
return jsonify({
"error": message,
"timestamp": datetime.utcnow().isoformat(),
"status": status_code
}), status_code
class ArticleAPI(BaseResourceAPI):
"""
Artikel-API mit Vererbung von gemeinsamen Funktionalitäten.
"""
def __init__(self):
self.articles_db = [
{"id": 1, "title": "Erste Nachricht", "content": "Inhalt...", "author_id": 1},
{"id": 2, "title": "Zweite Nachricht", "content": "Mehr Inhalt...", "author_id": 2}
]
def _check_permissions(self):
"""Artikel-spezifische Berechtigungen."""
method = request.method
# Jeder kann Artikel lesen
if method == 'GET':
return None
# Nur Autoren können Artikel erstellen/bearbeiten
user_role = self._get_user_role()
if user_role not in ['author', 'admin']:
return self._standard_error_response("Keine Berechtigung für diese Aktion", 403)
return None
def _get_user_role(self):
"""Ermittelt Benutzerrolle aus Token (vereinfacht)."""
return "author" # In echter Anwendung: aus Token extrahieren
def get(self, article_id=None):
"""Artikel abrufen."""
if article_id is None:
return jsonify({"articles": self.articles_db})
article = next((a for a in self.articles_db if a["id"] == article_id), None)
if article is None:
return self._standard_error_response("Artikel nicht gefunden", 404)
return jsonify(article)
def post(self):
"""Artikel erstellen."""
data = request.get_json()
if not data or not data.get('title') or not data.get('content'):
return self._standard_error_response("Titel und Inhalt sind erforderlich")
new_id = max([a["id"] for a in self.articles_db], default=0) + 1
new_article = {
"id": new_id,
"title": data["title"],
"content": data["content"],
"author_id": 1, # In echter Anwendung: aus Token
"created_at": datetime.utcnow().isoformat()
}
self.articles_db.append(new_article)
response = jsonify(new_article)
response.status_code = 201
return response
# Artikel-API registrieren
article_view = ArticleAPI.as_view('article_api')
app.add_url_rule('/api/articles', view_func=article_view, methods=['GET', 'POST'])
app.add_url_rule('/api/articles/<int:article_id>', view_func=article_view, methods=['GET', 'PUT', 'DELETE'])Während MethodView bereits eine Verbesserung gegenüber einfachen Dekoratoren darstellt, bieten spezialisierte Extensions wie Flask-RESTful und Flask-Restx zusätzliche Funktionalitäten für die API-Entwicklung. Diese Extensions verstehen die speziellen Anforderungen von REST-APIs und bieten Lösungen für häufige Probleme.
Flask-RESTful erweitert MethodView um API-spezifische Funktionalitäten wie automatische Content-Type-Verhandlung, Eingabevalidierung und standardisierte Fehlerbehandlung.
from flask import Flask
from flask_restful import Api, Resource, reqparse, fields, marshal_with
from datetime import datetime
app = Flask(__name__)
api = Api(app)
# Ausgabe-Serialisierung mit marshal_with
user_fields = {
'id': fields.Integer,
'name': fields.String,
'email': fields.String,
'created_at': fields.DateTime(dt_format='iso8601'),
'updated_at': fields.DateTime(dt_format='iso8601')
}
user_list_fields = {
'users': fields.List(fields.Nested(user_fields)),
'total': fields.Integer,
'page': fields.Integer,
'per_page': fields.Integer
}
class UserResource(Resource):
"""
Flask-RESTful Resource für Benutzer.
Bietet automatische Serialisierung und Validierung.
"""
def __init__(self):
# Request-Parser für Eingabevalidierung
self.parser = reqparse.RequestParser()
self.parser.add_argument('name', type=str, required=True, help='Name ist erforderlich')
self.parser.add_argument('email', type=str, required=True, help='E-Mail ist erforderlich')
# Simulierte Datenbank
self.users_db = [
{
"id": 1,
"name": "Max Mustermann",
"email": "max@beispiel.de",
"created_at": datetime(2024, 1, 15),
"updated_at": None
},
{
"id": 2,
"name": "Anna Schmidt",
"email": "anna@beispiel.de",
"created_at": datetime(2024, 1, 20),
"updated_at": None
}
]
@marshal_with(user_fields)
def get(self, user_id):
"""
Einzelnen Benutzer abrufen.
@marshal_with sorgt für automatische JSON-Serialisierung.
"""
user = next((u for u in self.users_db if u["id"] == user_id), None)
if user is None:
api.abort(404, message="Benutzer nicht gefunden")
return user
@marshal_with(user_fields)
def put(self, user_id):
"""
Benutzer aktualisieren mit automatischer Validierung.
"""
args = self.parser.parse_args()
user_index = next((i for i, u in enumerate(self.users_db) if u["id"] == user_id), None)
if user_index is None:
api.abort(404, message="Benutzer nicht gefunden")
# Benutzer aktualisieren
self.users_db[user_index].update({
'name': args['name'],
'email': args['email'],
'updated_at': datetime.utcnow()
})
return self.users_db[user_index]
def delete(self, user_id):
"""
Benutzer löschen.
"""
user_index = next((i for i, u in enumerate(self.users_db) if u["id"] == user_id), None)
if user_index is None:
api.abort(404, message="Benutzer nicht gefunden")
self.users_db.pop(user_index)
return {'message': 'Benutzer gelöscht'}, 204
class UserListResource(Resource):
"""
Resource für Benutzer-Listen-Operationen.
Trennt Collection- von Item-Operationen.
"""
def __init__(self):
self.parser = reqparse.RequestParser()
self.parser.add_argument('name', type=str, required=True)
self.parser.add_argument('email', type=str, required=True)
self.parser.add_argument('page', type=int, default=1)
self.parser.add_argument('per_page', type=int, default=10)
# Geteilte Referenz zur Datenbank
self.users_db = UserResource().users_db
@marshal_with(user_list_fields)
def get(self):
"""
Benutzer-Liste mit Paginierung.
"""
args = self.parser.parse_args()
page = args['page']
per_page = min(args['per_page'], 100) # Maximum 100 pro Seite
start = (page - 1) * per_page
end = start + per_page
return {
'users': self.users_db[start:end],
'total': len(self.users_db),
'page': page,
'per_page': per_page
}
@marshal_with(user_fields)
def post(self):
"""
Neuen Benutzer erstellen.
"""
args = self.parser.parse_args()
# E-Mail-Eindeutigkeit prüfen
if any(u['email'] == args['email'] for u in self.users_db):
api.abort(400, message="E-Mail-Adresse bereits vorhanden")
new_id = max([u["id"] for u in self.users_db], default=0) + 1
new_user = {
"id": new_id,
"name": args['name'],
"email": args['email'],
"created_at": datetime.utcnow(),
"updated_at": None
}
self.users_db.append(new_user)
return new_user, 201
# Resources registrieren
api.add_resource(UserListResource, '/api/users')
api.add_resource(UserResource, '/api/users/<int:user_id>')Flask-Restx (Nachfolger von Flask-RESTPlus) erweitert Flask-RESTful um automatische API-Dokumentation mit Swagger/OpenAPI und erweiterte Validierungsmöglichkeiten.
from flask import Flask
from flask_restx import Api, Resource, fields, Namespace
from datetime import datetime
app = Flask(__name__)
# Flask-Restx API mit Swagger-Dokumentation
api = Api(app,
version='1.0',
title='Benutzer-API',
description='Eine API für Benutzerverwaltung mit automatischer Dokumentation',
doc='/api/docs/')
# Namespace für Organisation der API
users_ns = Namespace('users', description='Benutzer-Operationen')
api.add_namespace(users_ns)
# Model-Definitionen für automatische Dokumentation
user_model = api.model('User', {
'id': fields.Integer(readonly=True, description='Eindeutige Benutzer-ID'),
'name': fields.String(required=True, description='Vollständiger Name des Benutzers'),
'email': fields.String(required=True, description='E-Mail-Adresse'),
'created_at': fields.DateTime(readonly=True, description='Erstellungsdatum'),
'updated_at': fields.DateTime(readonly=True, description='Letzte Aktualisierung')
})
user_input_model = api.model('UserInput', {
'name': fields.String(required=True, description='Vollständiger Name'),
'email': fields.String(required=True, description='E-Mail-Adresse')
})
user_list_model = api.model('UserList', {
'users': fields.List(fields.Nested(user_model)),
'total': fields.Integer(description='Gesamtanzahl der Benutzer'),
'page': fields.Integer(description='Aktuelle Seite'),
'per_page': fields.Integer(description='Benutzer pro Seite')
})
# Simulierte Datenbank
users_database = [
{
"id": 1,
"name": "Max Mustermann",
"email": "max@beispiel.de",
"created_at": datetime(2024, 1, 15),
"updated_at": None
}
]
@users_ns.route('/')
class UserList(Resource):
"""
Benutzer-Collection-Operationen
"""
@users_ns.doc('list_users')
@users_ns.marshal_list_with(user_model)
@users_ns.param('page', 'Seitennummer', type=int, default=1)
@users_ns.param('per_page', 'Einträge pro Seite', type=int, default=10)
def get(self):
"""
Alle Benutzer abrufen
Gibt eine paginierte Liste aller Benutzer zurück.
Die Dokumentation wird automatisch aus diesem Docstring generiert.
"""
# Parameter aus Query-String extrahieren
page = int(users_ns.payload.get('page', 1)) if users_ns.payload else 1
per_page = int(users_ns.payload.get('per_page', 10)) if users_ns.payload else 10
start = (page - 1) * per_page
end = start + per_page
return users_database[start:end]
@users_ns.doc('create_user')
@users_ns.expect(user_input_model)
@users_ns.marshal_with(user_model, code=201)
@users_ns.response(400, 'Validierungsfehler')
@users_ns.response(409, 'E-Mail bereits vorhanden')
def post(self):
"""
Neuen Benutzer erstellen
Erstellt einen neuen Benutzer mit den bereitgestellten Daten.
Die E-Mail-Adresse muss eindeutig sein.
"""
data = users_ns.payload
# Validierung
if not data.get('name') or not data.get('email'):
users_ns.abort(400, 'Name und E-Mail sind erforderlich')
# E-Mail-Eindeutigkeit prüfen
if any(u['email'] == data['email'] for u in users_database):
users_ns.abort(409, 'E-Mail-Adresse bereits vorhanden')
new_id = max([u["id"] for u in users_database], default=0) + 1
new_user = {
"id": new_id,
"name": data['name'],
"email": data['email'],
"created_at": datetime.utcnow(),
"updated_at": None
}
users_database.append(new_user)
return new_user, 201
@users_ns.route('/<int:user_id>')
@users_ns.param('user_id', 'Eindeutige Benutzer-ID')
class User(Resource):
"""
Einzelne Benutzer-Operationen
"""
@users_ns.doc('get_user')
@users_ns.marshal_with(user_model)
@users_ns.response(404, 'Benutzer nicht gefunden')
def get(self, user_id):
"""
Spezifischen Benutzer abrufen
Gibt die Details eines einzelnen Benutzers anhand der ID zurück.
"""
user = next((u for u in users_database if u["id"] == user_id), None)
if user is None:
users_ns.abort(404, 'Benutzer nicht gefunden')
return user
@users_ns.doc('update_user')
@users_ns.expect(user_input_model)
@users_ns.marshal_with(user_model)
@users_ns.response(404, 'Benutzer nicht gefunden')
@users_ns.response(400, 'Validierungsfehler')
def put(self, user_id):
"""
Benutzer vollständig aktualisieren
Ersetzt alle Daten eines Benutzers mit den bereitgestellten Werten.
"""
data = users_ns.payload
user_index = next((i for i, u in enumerate(users_database) if u["id"] == user_id), None)
if user_index is None:
users_ns.abort(404, 'Benutzer nicht gefunden')
# Validierung
if not data.get('name') or not data.get('email'):
users_ns.abort(400, 'Name und E-Mail sind erforderlich')
# E-Mail-Eindeutigkeit prüfen (außer für aktuellen Benutzer)
if any(u['email'] == data['email'] and u['id'] != user_id for u in users_database):
users_ns.abort(409, 'E-Mail-Adresse bereits vorhanden')
users_database[user_index].update({
'name': data['name'],
'email': data['email'],
'updated_at': datetime.utcnow()
})
return users_database[user_index]
@users_ns.doc('delete_user')
@users_ns.response(204, 'Benutzer erfolgreich gelöscht')
@users_ns.response(404, 'Benutzer nicht gefunden')
def delete(self, user_id):
"""
Benutzer löschen
Entfernt einen Benutzer dauerhaft aus dem System.
"""
user_index = next((i for i, u in enumerate(users_database) if u["id"] == user_id), None)
if user_index is None:
users_ns.abort(404, 'Benutzer nicht gefunden')
users_database.pop(user_index)
return '', 204Serialisierung ist der Prozess der Konvertierung von Python-Objekten in JSON (oder andere Formate) für die API-Ausgabe. Deserialisierung ist der umgekehrte Prozess: JSON-Eingaben in Python-Objekte umwandeln. Marshmallow ist eine leistungsstarke Bibliothek, die beide Prozesse mit Validierung und Datenverarbeitung kombiniert.
Marshmallow verwendet Schemas zur Definition der Datenstruktur und Validierungsregeln. Ein Schema beschreibt, wie Daten aussehen sollen und welche Transformationen angewendet werden.
from marshmallow import Schema, fields, validate, validates, validates_schema, ValidationError, post_load
from datetime import datetime
from typing import Dict, Any
class UserSchema(Schema):
"""
Schema für Benutzer-Serialisierung und -Validierung.
Definiert die Struktur und Regeln für Benutzer-Daten.
"""
# Felder mit verschiedenen Validierungsregeln
id = fields.Int(dump_only=True) # Nur für Ausgabe, nicht für Eingabe
name = fields.Str(required=True, validate=validate.Length(min=2, max=100))
email = fields.Email(required=True)
age = fields.Int(validate=validate.Range(min=18, max=120), allow_none=True)
created_at = fields.DateTime(dump_only=True, format='iso')
updated_at = fields.DateTime(dump_only=True, format='iso', allow_none=True)
# Berechnete Felder
full_name = fields.Method("get_full_name", dump_only=True)
def get_full_name(self, obj):
"""
Methode für berechnete Felder.
Wird bei der Serialisierung aufgerufen.
"""
return f"{obj.get('title', '')} {obj.get('name', '')}".strip()
@validates('email')
def validate_email_domain(self, value):
"""
Benutzerdefinierte E-Mail-Validierung.
Prüft erlaubte Domains für Geschäfts-E-Mails.
"""
allowed_domains = ['firma.de', 'unternehmen.de', 'beispiel.de']
domain = value.split('@')[1] if '@' in value else ''
if domain not in allowed_domains:
raise ValidationError('Nur Geschäfts-E-Mail-Adressen sind erlaubt')
@validates_schema
def validate_schema(self, data, **kwargs):
"""
Schema-weite Validierung.
Prüft Beziehungen zwischen verschiedenen Feldern.
"""
# Beispiel: Name und E-Mail müssen konsistent sein
name = data.get('name', '').lower()
email = data.get('email', '').lower()
if 'admin' in name and 'admin' not in email:
raise ValidationError({
'email': 'Admin-Benutzer müssen eine Admin-E-Mail-Adresse haben'
})
@post_load
def make_user(self, data, **kwargs):
"""
Post-Load-Hook: wird nach erfolgreicher Validierung aufgerufen.
Kann Daten transformieren oder Objekte erstellen.
"""
# Automatisch Erstellungsdatum hinzufügen
data['created_at'] = datetime.utcnow()
return data
# Schema für verschiedene Anwendungsfälle
class UserCreateSchema(UserSchema):
"""Schema für Benutzer-Erstellung (strengere Validierung)."""
name = fields.Str(required=True, validate=validate.Length(min=2, max=100))
email = fields.Email(required=True)
password = fields.Str(required=True, validate=validate.Length(min=8), load_only=True)
@validates('password')
def validate_password_strength(self, value):
"""Passwort-Stärke validieren."""
import re
if not re.search(r'[A-Z]', value):
raise ValidationError('Passwort muss mindestens einen Großbuchstaben enthalten')
if not re.search(r'[a-z]', value):
raise ValidationError('Passwort muss mindestens einen Kleinbuchstaben enthalten')
if not re.search(r'\d', value):
raise ValidationError('Passwort muss mindestens eine Zahl enthalten')
class UserUpdateSchema(UserSchema):
"""Schema für Benutzer-Updates (flexiblere Validierung)."""
# Alle Felder optional für PATCH-Updates
name = fields.Str(validate=validate.Length(min=2, max=100), allow_none=True)
email = fields.Email(allow_none=True)
# Verschachtelte Schemas für komplexe Datenstrukturen
class AddressSchema(Schema):
"""Schema für Adressen."""
street = fields.Str(required=True)
city = fields.Str(required=True)
postal_code = fields.Str(required=True, validate=validate.Regexp(r'^\d{5}$'))
country = fields.Str(missing='DE', validate=validate.OneOf(['DE', 'AT', 'CH']))
class UserWithAddressSchema(UserSchema):
"""Erweiterte Benutzer-Schema mit Adresse."""
address = fields.Nested(AddressSchema, allow_none=True)
addresses = fields.List(fields.Nested(AddressSchema)) # Mehrere Adressen
# Integration in Flask-API
from flask import Flask, request, jsonify
app = Flask(__name__)
# Schema-Instanzen
user_schema = UserSchema()
users_schema = UserSchema(many=True) # Für Listen
user_create_schema = UserCreateSchema()
user_update_schema = UserUpdateSchema()
# Simulierte Datenbank
users_data = [
{
"id": 1,
"name": "Max Mustermann",
"email": "max@firma.de",
"age": 30,
"created_at": datetime(2024, 1, 15),
"updated_at": None
}
]
@app.route('/api/users', methods=['GET'])
def get_users():
"""
Benutzer-Liste mit Marshmallow-Serialisierung.
"""
try:
# Daten serialisieren
result = users_schema.dump(users_data)
return jsonify({
"users": result,
"total": len(result)
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/users', methods=['POST'])
def create_user():
"""
Benutzer erstellen mit Marshmallow-Validierung.
"""
try:
# JSON-Daten validieren und deserialisieren
user_data = user_create_schema.load(request.json)
# Neue ID generieren
new_id = max([u["id"] for u in users_data], default=0) + 1
user_data["id"] = new_id
# Benutzer speichern
users_data.append(user_data)
# Antwort serialisieren
result = user_schema.dump(user_data)
return jsonify(result), 201
except ValidationError as err:
# Validierungsfehler zurückgeben
return jsonify({"errors": err.messages}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
"""
Benutzer mit Marshmallow-Validierung aktualisieren.
"""
try:
# Benutzer finden
user_index = next((i for i, u in enumerate(users_data) if u["id"] == user_id), None)
if user_index is None:
return jsonify({"error": "Benutzer nicht gefunden"}), 404
# Daten validieren
updated_data = user_update_schema.load(request.json, partial=True)
# Benutzer aktualisieren
users_data[user_index].update(updated_data)
users_data[user_index]["updated_at"] = datetime.utcnow()
# Antwort serialisieren
result = user_schema.dump(users_data[user_index])
return jsonify(result)
except ValidationError as err:
return jsonify({"errors": err.messages}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500Marshmallow bietet mächtige Features für komplexe Serialisierungsszenarien, einschließlich bedingter Felder, Datentransformation und Beziehungsmanagement.
from marshmallow import fields, post_dump, pre_load
from datetime import datetime, timedelta
class FlexibleUserSchema(Schema):
"""
Erweiterte Schema mit bedingten Feldern und Transformationen.
"""
id = fields.Int(dump_only=True)
name = fields.Str(required=True)
email = fields.Email(required=True)
role = fields.Str(validate=validate.OneOf(['user', 'admin', 'moderator']))
# Bedingte Felder basierend auf Benutzerrolle
admin_notes = fields.Str(dump_only=True) # Nur für Admins sichtbar
last_login = fields.DateTime(dump_only=True)
# Berechnete Felder
days_since_creation = fields.Method("calculate_days_since_creation", dump_only=True)
is_recent_user = fields.Method("is_recent_user", dump_only=True)
def calculate_days_since_creation(self, obj):
"""Berechnet Tage seit Benutzer-Erstellung."""
created_at = obj.get('created_at')
if created_at:
delta = datetime.utcnow() - created_at
return delta.days
return None
def is_recent_user(self, obj):
"""Prüft, ob Benutzer in den letzten 30 Tagen erstellt wurde."""
days = self.calculate_days_since_creation(obj)
return days is not None and days <= 30
@pre_load
def preprocess_data(self, data, **kwargs):
"""
Pre-Load-Hook: Daten vor Validierung verarbeiten.
Normalisiert Eingabedaten.
"""
# E-Mail normalisieren
if 'email' in data:
data['email'] = data['email'].lower().strip()
# Name normalisieren
if 'name' in data:
data['name'] = data['name'].strip().title()
return data
@post_dump
def postprocess_output(self, data, **kwargs):
"""
Post-Dump-Hook: Ausgabe nach Serialisierung anpassen.
"""
# Admin-spezifische Felder nur für Admins
current_user_role = getattr(self.context.get('current_user', {}), 'role', None)
if current_user_role != 'admin':
data.pop('admin_notes', None)
data.pop('last_login', None)
# Metadaten hinzufügen
data['_metadata'] = {
'serialized_at': datetime.utcnow().isoformat(),
'version': 'v1'
}
return data
# Schema mit Beziehungen und Validierung
class ProjectSchema(Schema):
"""Schema für Projekte mit Benutzer-Beziehungen."""
id = fields.Int(dump_only=True)
name = fields.Str(required=True, validate=validate.Length(min=3, max=100))
description = fields.Str()
status = fields.Str(validate=validate.OneOf(['planning', 'active', 'completed', 'cancelled']))
# Beziehungen zu anderen Objekten
owner_id = fields.Int(required=True, load_only=True)
owner = fields.Nested(UserSchema, dump_only=True)
# Liste von Mitarbeitern
team_member_ids = fields.List(fields.Int(), load_only=True)
team_members = fields.List(fields.Nested(UserSchema), dump_only=True)
# Termine
start_date = fields.Date()
end_date = fields.Date()
@validates_schema
def validate_dates(self, data, **kwargs):
"""Validiert Datum-Konsistenz."""
start_date = data.get('start_date')
end_date = data.get('end_date')
if start_date and end_date and start_date > end_date:
raise ValidationError({
'end_date': 'Enddatum muss nach dem Startdatum liegen'
})
@post_load
def resolve_relationships(self, data, **kwargs):
"""Löst Beziehungen zu anderen Objekten auf."""
# Owner auflösen
if 'owner_id' in data:
owner = next((u for u in users_data if u['id'] == data['owner_id']), None)
if owner:
data['owner'] = owner
# Team-Mitglieder auflösen
if 'team_member_ids' in data:
team_members = [u for u in users_data if u['id'] in data['team_member_ids']]
data['team_members'] = team_members
return data
# API-Endpoint mit erweiterten Features
@app.route('/api/projects', methods=['POST'])
def create_project():
"""
Projekt erstellen mit Beziehungsvalidierung.
"""
try:
# Aktueller Benutzer aus Authentifizierung
current_user = {"id": 1, "role": "admin"} # Vereinfacht
# Schema mit Kontext für bedingte Serialisierung
schema = ProjectSchema()
schema.context = {'current_user': current_user}
# Daten validieren und verarbeiten
project_data = schema.load(request.json)
# ID generieren und speichern
new_id = max([p.get("id", 0) for p in projects_data], default=0) + 1
project_data["id"] = new_id
project_data["created_at"] = datetime.utcnow()
projects_data.append(project_data)
# Antwort mit vollständigen Beziehungen serialisieren
result = schema.dump(project_data)
return jsonify(result), 201
except ValidationError as err:
return jsonify({"errors": err.messages}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
# Simulierte Projekt-Datenbank
projects_data = []API-Dokumentation ist essentiell für die Verwendbarkeit einer API. Entwickler müssen verstehen, welche Endpoints verfügbar sind, welche Parameter erwartet werden und welche Antworten sie erhalten. Moderne APIs verwenden standardisierte Dokumentationsformate wie OpenAPI (früher Swagger), die sowohl von Menschen als auch von Tools gelesen werden können.
OpenAPI ist ein Standard zur Beschreibung von REST-APIs. Er definiert eine maschinenlesbare Spezifikation, die automatisch interaktive Dokumentation, Code-Generierung und Tests ermöglicht.
from flask import Flask
from flask_restx import Api, Resource, fields
from datetime import datetime
# Flask-App mit umfassender API-Dokumentation
app = Flask(__name__)
api = Api(
app,
version='1.0',
title='Unternehmens-API',
description='''
Eine umfassende API für die Verwaltung von Unternehmensdaten.
## Authentifizierung
Alle Endpoints (außer der Dokumentation) erfordern einen gültigen API-Token
im Authorization-Header: `Bearer <token>`
## Fehlercodes
- 400: Ungültige Anfrage oder Validierungsfehler
- 401: Authentifizierung erforderlich
- 403: Keine Berechtigung für diese Aktion
- 404: Ressource nicht gefunden
- 409: Konflikt (z.B. E-Mail bereits vorhanden)
- 500: Interner Serverfehler
## Versionierung
Die API verwendet semantische Versionierung. Aktuelle Version: v1.0
''',
doc='/api/docs/', # Swagger-UI verfügbar unter dieser URL
contact='api-support@firma.de',
license='Proprietär',
authorizations={
'Bearer': {
'type': 'apiKey',
'in': 'header',
'name': 'Authorization',
'description': 'JWT Token im Format: Bearer <token>'
}
},
security='Bearer'
)
# Namespaces für Organisation
users_ns = api.namespace('users', description='Benutzerverwaltung')
projects_ns = api.namespace('projects', description='Projektverwaltung')
# Detaillierte Model-Definitionen
user_model = api.model('User', {
'id': fields.Integer(
readonly=True,
description='Eindeutige Benutzer-ID',
example=123
),
'name': fields.String(
required=True,
description='Vollständiger Name des Benutzers',
example='Max Mustermann',
min_length=2,
max_length=100
),
'email': fields.String(
required=True,
description='E-Mail-Adresse (muss eindeutig sein)',
example='max.mustermann@firma.de'
),
'role': fields.String(
description='Benutzerrolle im System',
enum=['user', 'admin', 'moderator'],
example='user'
),
'department': fields.String(
description='Abteilung des Benutzers',
example='IT'
),
'created_at': fields.DateTime(
readonly=True,
description='Zeitpunkt der Benutzer-Erstellung',
example='2024-01-15T10:30:00Z'
),
'last_login': fields.DateTime(
readonly=True,
description='Zeitpunkt der letzten Anmeldung',
example='2024-03-10T14:25:00Z'
),
'is_active': fields.Boolean(
description='Ob der Benutzer aktiv ist',
example=True
)
})
user_input_model = api.model('UserInput', {
'name': fields.String(
required=True,
description='Vollständiger Name',
example='Anna Schmidt'
),
'email': fields.String(
required=True,
description='E-Mail-Adresse',
example='anna.schmidt@firma.de'
),
'role': fields.String(
description='Benutzerrolle',
enum=['user', 'admin', 'moderator'],
default='user'
),
'department': fields.String(
description='Abteilung',
example='Marketing'
)
})
# Pagination-Model
pagination_model = api.model('Pagination', {
'page': fields.Integer(description='Aktuelle Seite', example=1),
'per_page': fields.Integer(description='Einträge pro Seite', example=10),
'total': fields.Integer(description='Gesamtanzahl der Einträge', example=150),
'pages': fields.Integer(description='Gesamtanzahl der Seiten', example=15)
})
user_list_model = api.model('UserList', {
'users': fields.List(fields.Nested(user_model)),
'pagination': fields.Nested(pagination_model)
})
# Error-Models für konsistente Fehlerbehandlung
error_model = api.model('Error', {
'error': fields.String(description='Fehlermeldung', example='Benutzer nicht gefunden'),
'code': fields.String(description='Fehlercode', example='USER_NOT_FOUND'),
'timestamp': fields.DateTime(description='Zeitpunkt des Fehlers'),
'request_id': fields.String(description='Eindeutige Request-ID für Debugging')
})
validation_error_model = api.model('ValidationError', {
'errors': fields.Raw(
description='Validierungsfehler pro Feld',
example={
'email': ['Ungültige E-Mail-Adresse'],
'name': ['Name ist erforderlich']
}
),
'message': fields.String(
description='Allgemeine Fehlermeldung',
example='Validierung fehlgeschlagen'
)
})
@users_ns.route('/')
class UserList(Resource):
@users_ns.doc('list_users')
@users_ns.marshal_with(user_list_model)
@users_ns.param('page', 'Seitennummer (beginnend bei 1)', type=int, default=1)
@users_ns.param('per_page', 'Anzahl Einträge pro Seite (max 100)', type=int, default=10)
@users_ns.param('search', 'Suchbegriff für Name oder E-Mail', type=str)
@users_ns.param('department', 'Filter nach Abteilung', type=str)
@users_ns.param('role', 'Filter nach Rolle', type=str, enum=['user', 'admin', 'moderator'])
@users_ns.param('is_active', 'Filter nach Aktiv-Status', type=bool)
@users_ns.response(200, 'Erfolg')
@users_ns.response(401, 'Authentifizierung erforderlich', error_model)
def get(self):
"""
Benutzer-Liste abrufen
Gibt eine paginierte Liste aller Benutzer zurück. Unterstützt verschiedene
Filter- und Suchoptionen für eine gezielte Abfrage.
### Suchfunktionalität
Der `search`-Parameter durchsucht Name und E-Mail-Adresse mit partieller Übereinstimmung.
### Filteroptionen
- `department`: Exakte Übereinstimmung mit Abteilungsname
- `role`: Filterung nach Benutzerrolle
- `is_active`: Nur aktive (true) oder inaktive (false) Benutzer
### Paginierung
- Standard: 10 Einträge pro Seite
- Maximum: 100 Einträge pro Seite
- Links zu vorheriger/nächster Seite in Response-Headers
"""
# Implementierung würde hier stehen
pass
@users_ns.doc('create_user')
@users_ns.expect(user_input_model, validate=True)
@users_ns.marshal_with(user_model, code=201, description='Benutzer erfolgreich erstellt')
@users_ns.response(400, 'Validierungsfehler', validation_error_model)
@users_ns.response(409, 'E-Mail bereits vorhanden', error_model)
def post(self):
"""
Neuen Benutzer erstellen
Erstellt einen neuen Benutzer mit den bereitgestellten Daten.
### Validierungsregeln
- **Name**: 2-100 Zeichen, erforderlich
- **E-Mail**: Gültige E-Mail-Adresse, muss eindeutig sein
- **Rolle**: Optional, Standard ist 'user'
- **Abteilung**: Optional
### Automatische Felder
- `id`: Wird automatisch generiert
- `created_at`: Aktueller Zeitstempel
- `is_active`: Standard ist `true`
### Benachrichtigungen
Nach erfolgreicher Erstellung wird eine Willkommens-E-Mail an den Benutzer gesendet.
"""
# Implementierung würde hier stehen
pass
@users_ns.route('/<int:user_id>')
@users_ns.param('user_id', 'Eindeutige Benutzer-ID')
class User(Resource):
@users_ns.doc('get_user')
@users_ns.marshal_with(user_model)
@users_ns.response(404, 'Benutzer nicht gefunden', error_model)
def get(self, user_id):
"""
Einzelnen Benutzer abrufen
Gibt die vollständigen Details eines Benutzers anhand der ID zurück.
### Berechtigungen
- Alle authentifizierten Benutzer können ihre eigenen Daten abrufen
- Administratoren können alle Benutzerdaten abrufen
- Moderatoren können Benutzerdaten ihrer Abteilung abrufen
"""
pass
@users_ns.doc('update_user')
@users_ns.expect(user_input_model, validate=True)
@users_ns.marshal_with(user_model)
@users_ns.response(404, 'Benutzer nicht gefunden', error_model)
@users_ns.response(400, 'Validierungsfehler', validation_error_model)
def put(self, user_id):
"""
Benutzer vollständig aktualisieren
Ersetzt alle veränderbaren Felder eines Benutzers mit den bereitgestellten Werten.
### Unveränderliche Felder
- `id`: Kann nicht geändert werden
- `created_at`: Bleibt unverändert
- `last_login`: Wird vom System verwaltet
### Berechtigungen
- Benutzer können ihre eigenen Daten bearbeiten (außer `role`)
- Administratoren können alle Felder aller Benutzer bearbeiten
"""
pass
@users_ns.doc('delete_user')
@users_ns.response(204, 'Benutzer erfolgreich gelöscht')
@users_ns.response(404, 'Benutzer nicht gefunden', error_model)
@users_ns.response(403, 'Keine Berechtigung', error_model)
def delete(self, user_id):
"""
Benutzer löschen
Entfernt einen Benutzer dauerhaft aus dem System.
### Vorsichtsmaßnahmen
- Löschung ist irreversibel
- Zugehörige Daten (Projekte, Kommentare) bleiben erhalten
- Admin-Benutzer können nicht gelöscht werden, wenn sie der letzte Admin sind
### Berechtigungen
Nur Administratoren können Benutzer löschen.
"""
passContent Negotiation ermöglicht es APIs, verschiedene Datenformate zu unterstützen und automatisch das beste Format für jeden Client auszuwählen. Dies ist besonders wichtig in heterogenen Umgebungen, wo verschiedene Clients unterschiedliche Anforderungen haben.
HTTP-Clients verwenden den Accept-Header, um ihre bevorzugten Datenformate anzugeben. Server können diese Information nutzen, um die Antwort entsprechend zu formatieren.
from flask import Flask, request, jsonify, make_response
import json
import xml.etree.ElementTree as ET
from datetime import datetime
app = Flask(__name__)
class ContentNegotiator:
"""
Klasse für Content Negotiation basierend auf Accept-Header.
Unterstützt automatische Format-Auswahl und -Konvertierung.
"""
SUPPORTED_FORMATS = {
'application/json': 'json',
'application/xml': 'xml',
'text/xml': 'xml',
'text/csv': 'csv',
'text/plain': 'text'
}
def __init__(self, default_format='json'):
self.default_format = default_format
def get_best_format(self, accept_header):
"""
Ermittelt das beste Format basierend auf Accept-Header.
Berücksichtigt Quality-Values (q-parameter).
"""
if not accept_header:
return self.default_format
# Accept-Header parsen
accepted_types = []
for media_range in accept_header.split(','):
media_range = media_range.strip()
# Quality-Value extrahieren (default: 1.0)
if ';q=' in media_range:
media_type, quality = media_range.split(';q=', 1)
quality = float(quality.split(';')[0]) # Weitere Parameter ignorieren
else:
media_type, quality = media_range, 1.0
media_type = media_type.strip()
if media_type in self.SUPPORTED_FORMATS:
accepted_types.append((media_type, quality))
if not accepted_types:
return self.default_format
# Nach Quality-Value sortieren (höchste zuerst)
accepted_types.sort(key=lambda x: x[1], reverse=True)
return self.SUPPORTED_FORMATS[accepted_types[0][0]]
def serialize_data(self, data, format_type):
"""
Serialisiert Daten in das gewünschte Format.
"""
if format_type == 'json':
return self._to_json(data)
elif format_type == 'xml':
return self._to_xml(data)
elif format_type == 'csv':
return self._to_csv(data)
elif format_type == 'text':
return self._to_text(data)
else:
raise ValueError(f"Unsupported format: {format_type}")
def _to_json(self, data):
"""Konvertiert Daten zu JSON."""
def json_serializer(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
return json.dumps(data, indent=2, default=json_serializer, ensure_ascii=False)
def _to_xml(self, data):
"""Konvertiert Daten zu XML."""
def dict_to_xml(d, root_name='data'):
root = ET.Element(root_name)
def add_element(parent, key, value):
if isinstance(value, dict):
child = ET.SubElement(parent, key)
for k, v in value.items():
add_element(child, k, v)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
child = ET.SubElement(parent, key)
for k, v in item.items():
add_element(child, k, v)
else:
child = ET.SubElement(parent, key)
child.text = str(item)
else:
child = ET.SubElement(parent, key)
child.text = str(value) if value is not None else ''
for key, value in d.items():
add_element(root, key, value)
return ET.tostring(root, encoding='unicode')
if isinstance(data, dict):
return dict_to_xml(data)
elif isinstance(data, list):
return dict_to_xml({'items': data})
else:
return dict_to_xml({'value': data})
def _to_csv(self, data):
"""Konvertiert Daten zu CSV (nur für Listen von Dictionaries)."""
if not isinstance(data, list) or not data:
return ""
if not isinstance(data[0], dict):
return "\n".join(str(item) for item in data)
# Header erstellen
headers = list(data[0].keys())
csv_lines = [",".join(headers)]
# Datenzeilen
for item in data:
row = []
for header in headers:
value = item.get(header, '')
# CSV-Escaping für Kommas und Anführungszeichen
if ',' in str(value) or '"' in str(value):
value = f'"{str(value).replace('"', '""')}"'
row.append(str(value))
csv_lines.append(",".join(row))
return "\n".join(csv_lines)
def _to_text(self, data):
"""Konvertiert Daten zu Plain Text."""
def format_item(item, indent=0):
prefix = " " * indent
if isinstance(item, dict):
lines = []
for key, value in item.items():
if isinstance(value, (dict, list)):
lines.append(f"{prefix}{key}:")
lines.append(format_item(value, indent + 1))
else:
lines.append(f"{prefix}{key}: {value}")
return "\n".join(lines)
elif isinstance(item, list):
lines = []
for i, value in enumerate(item):
lines.append(f"{prefix}- {format_item(value, indent)}")
return "\n".join(lines)
else:
return f"{prefix}{item}"
return format_item(data)
# Content Negotiator instanziieren
negotiator = ContentNegotiator()
def create_response(data, status_code=200):
"""
Erstellt Response mit automatischer Content Negotiation.
"""
accept_header = request.headers.get('Accept', 'application/json')
format_type = negotiator.get_best_format(accept_header)
try:
content = negotiator.serialize_data(data, format_type)
# Content-Type basierend auf Format setzen
content_types = {
'json': 'application/json; charset=utf-8',
'xml': 'application/xml; charset=utf-8',
'csv': 'text/csv; charset=utf-8',
'text': 'text/plain; charset=utf-8'
}
response = make_response(content, status_code)
response.headers['Content-Type'] = content_types[format_type]
# Zusätzliche Headers für bessere API-Erfahrung
response.headers['Vary'] = 'Accept' # Caching-Hinweis
response.headers['X-Content-Format'] = format_type
return response
except Exception as e:
# Fallback auf JSON bei Serialisierungsfehlern
error_data = {
"error": "Serialization failed",
"details": str(e),
"requested_format": format_type
}
response = make_response(json.dumps(error_data), 500)
response.headers['Content-Type'] = 'application/json'
return response
# API-Endpoints mit Content Negotiation
@app.route('/api/users')
def get_users_negotiated():
"""
Benutzer-Liste mit automatischer Format-Verhandlung.
Unterstützte Formate:
- application/json (Standard)
- application/xml
- text/csv
- text/plain
"""
users_data = [
{
"id": 1,
"name": "Max Mustermann",
"email": "max@firma.de",
"department": "IT",
"created_at": datetime(2024, 1, 15)
},
{
"id": 2,
"name": "Anna Schmidt",
"email": "anna@firma.de",
"department": "Marketing",
"created_at": datetime(2024, 1, 20)
}
]
response_data = {
"users": users_data,
"total": len(users_data),
"timestamp": datetime.utcnow()
}
return create_response(response_data)
@app.route('/api/users/<int:user_id>')
def get_user_negotiated(user_id):
"""Einzelner Benutzer mit Content Negotiation."""
# Simulierte Datenbankabfrage
user_data = {
"id": user_id,
"name": "Max Mustermann",
"email": "max@firma.de",
"department": "IT",
"role": "user",
"created_at": datetime(2024, 1, 15),
"last_login": datetime(2024, 3, 10, 14, 30)
}
return create_response(user_data)
# Endpoint für verfügbare Formate
@app.route('/api/formats')
def get_supported_formats():
"""
Gibt unterstützte Content-Types zurück.
"""
formats_info = {
"supported_formats": list(negotiator.SUPPORTED_FORMATS.keys()),
"default_format": f"application/{negotiator.default_format}",
"description": "Use Accept header to specify preferred format",
"examples": {
"json": "Accept: application/json",
"xml": "Accept: application/xml",
"csv": "Accept: text/csv",
"text": "Accept: text/plain"
}
}
return create_response(formats_info)
# Middleware für automatische Content Negotiation
@app.before_request
def log_content_negotiation():
"""Logging für Content Negotiation debugging."""
accept_header = request.headers.get('Accept')
if accept_header:
preferred_format = negotiator.get_best_format(accept_header)
app.logger.info(f"Request to {request.path} - Accept: {accept_header} - Format: {preferred_format}")
if __name__ == '__main__':
app.run(debug=True)Diese umfassende Behandlung von RESTful APIs zeigt, wie Flask von einfachen Dekoratoren zu ausgefeilten API-Frameworks skaliert werden kann. Die verschiedenen Ansätze - von grundlegenden Routes über MethodView bis hin zu Flask-RESTful und Flask-Restx - bieten jeweils spezifische Vorteile je nach Komplexität und Anforderungen der API. Marshmallow bringt robuste Serialisierung und Validierung, während automatische Dokumentation und Content Negotiation die Benutzerfreundlichkeit und Integration der API erheblich verbessern.