Das Request-Objekt in Flask stellt alle Informationen über eingehende HTTP-Anfragen zur Verfügung. Es ermöglicht den Zugriff auf Headers, Parameter, Daten und Metainformationen der Client-Anfrage.
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/request-info', methods=['GET', 'POST'])
def request_info():
info = {
'methode': request.method,
'url': request.url,
'basis_url': request.base_url,
'pfad': request.path,
'query_string': request.query_string.decode('utf-8'),
'remote_addr': request.remote_addr,
'user_agent': request.headers.get('User-Agent'),
'content_type': request.content_type,
'content_length': request.content_length
}
return jsonify(info)@app.route('/headers')
def header_analysis():
headers_dict = {}
# Alle Headers durchlaufen
for header_name, header_value in request.headers:
headers_dict[header_name] = header_value
# Spezifische Headers abrufen
auth_header = request.headers.get('Authorization')
accept_header = request.headers.get('Accept', 'text/html')
# Custom Headers
api_key = request.headers.get('X-API-Key')
client_version = request.headers.get('X-Client-Version')
return jsonify({
'alle_headers': headers_dict,
'authorization': auth_header,
'accept': accept_header,
'api_key': api_key,
'client_version': client_version
})from flask import g, current_app
import time
@app.before_request
def before_request():
g.start_time = time.time()
g.request_id = str(uuid.uuid4())
current_app.logger.info(f'Request {g.request_id} gestartet')
@app.after_request
def after_request(response):
duration = time.time() - g.start_time
current_app.logger.info(f'Request {g.request_id} beendet in {duration:.3f}s')
response.headers['X-Request-ID'] = g.request_id
response.headers['X-Response-Time'] = f'{duration:.3f}s'
return response@app.route('/request-details', methods=['POST'])
def request_details():
details = {
'is_json': request.is_json,
'is_multipart': bool(request.files),
'mimetype': request.mimetype,
'charset': request.charset,
'encoding_errors': request.encoding_errors,
'max_content_length': request.max_content_length,
'stream_available': hasattr(request, 'stream')
}
# Rohe Daten (nur bei kleinen Requests)
if request.content_length and request.content_length < 1024:
details['raw_data'] = request.get_data(as_text=True)
return jsonify(details)HTML-Formulare übertragen Daten in verschiedenen Formaten. Flask bietet umfassende Unterstützung für die Verarbeitung von Formulardaten aus GET- und POST-Requests.
from flask import request, render_template, redirect, url_for, flash
@app.route('/kontakt', methods=['GET', 'POST'])
def contact_form():
if request.method == 'POST':
# Formulardaten abrufen
name = request.form.get('name', '').strip()
email = request.form.get('email', '').strip()
nachricht = request.form.get('nachricht', '').strip()
# Validierung
errors = []
if not name:
errors.append('Name ist erforderlich')
if not email or '@' not in email:
errors.append('Gültige E-Mail-Adresse ist erforderlich')
if not nachricht:
errors.append('Nachricht ist erforderlich')
if errors:
for error in errors:
flash(error, 'error')
return render_template('kontakt.html',
name=name, email=email, nachricht=nachricht)
# Verarbeitung erfolgreich
flash('Nachricht wurde gesendet', 'success')
return redirect(url_for('contact_form'))
return render_template('kontakt.html')@app.route('/einstellungen', methods=['GET', 'POST'])
def user_settings():
if request.method == 'POST':
# Einzelne Checkbox
newsletter = request.form.get('newsletter') == 'on'
# Mehrere Checkboxen mit getlist
interessen = request.form.getlist('interessen')
# Radio Button
benachrichtigung = request.form.get('benachrichtigung', 'email')
# Select-Box
sprache = request.form.get('sprache', 'de')
settings = {
'newsletter': newsletter,
'interessen': interessen,
'benachrichtigung': benachrichtigung,
'sprache': sprache
}
return jsonify(settings)
return render_template('einstellungen.html')import os
from werkzeug.utils import secure_filename
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'doc', 'docx'}
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
# Einzelne Datei
if 'datei' not in request.files:
flash('Keine Datei ausgewählt', 'error')
return redirect(request.url)
file = request.files['datei']
if file.filename == '':
flash('Keine Datei ausgewählt', 'error')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
# Eindeutigen Dateinamen generieren
timestamp = int(time.time())
filename = f"{timestamp}_{filename}"
filepath = os.path.join(UPLOAD_FOLDER, filename)
file.save(filepath)
# Dateiinformationen speichern
file_info = {
'original_name': file.filename,
'saved_name': filename,
'size': os.path.getsize(filepath),
'mimetype': file.content_type
}
return jsonify(file_info)
else:
flash('Dateityp nicht erlaubt', 'error')
return render_template('upload.html')
@app.route('/multi-upload', methods=['POST'])
def multi_upload():
uploaded_files = []
# Mehrere Dateien verarbeiten
files = request.files.getlist('dateien')
for file in files:
if file and file.filename != '' and allowed_file(file.filename):
filename = secure_filename(file.filename)
timestamp = int(time.time())
unique_filename = f"{timestamp}_{filename}"
filepath = os.path.join(UPLOAD_FOLDER, unique_filename)
file.save(filepath)
uploaded_files.append({
'original': file.filename,
'saved': unique_filename,
'size': os.path.getsize(filepath)
})
return jsonify({'uploaded_files': uploaded_files})import re
from datetime import datetime
def validate_form_data(form_data):
errors = {}
# Name validieren
name = form_data.get('name', '').strip()
if not name:
errors['name'] = 'Name ist erforderlich'
elif len(name) < 2:
errors['name'] = 'Name muss mindestens 2 Zeichen lang sein'
# E-Mail validieren
email = form_data.get('email', '').strip()
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not email:
errors['email'] = 'E-Mail ist erforderlich'
elif not re.match(email_pattern, email):
errors['email'] = 'Ungültige E-Mail-Adresse'
# Telefonnummer validieren
telefon = form_data.get('telefon', '').strip()
if telefon:
telefon_pattern = r'^[\d\s\-\+\(\)]+$'
if not re.match(telefon_pattern, telefon):
errors['telefon'] = 'Ungültige Telefonnummer'
# Geburtsdatum validieren
geburtsdatum = form_data.get('geburtsdatum')
if geburtsdatum:
try:
datum = datetime.strptime(geburtsdatum, '%Y-%m-%d')
if datum > datetime.now():
errors['geburtsdatum'] = 'Geburtsdatum darf nicht in der Zukunft liegen'
except ValueError:
errors['geburtsdatum'] = 'Ungültiges Datumsformat'
return errors
@app.route('/registrierung', methods=['POST'])
def registration():
errors = validate_form_data(request.form)
if errors:
return jsonify({'erfolg': False, 'fehler': errors}), 400
# Registrierung verarbeiten
return jsonify({'erfolg': True, 'nachricht': 'Registrierung erfolgreich'})JSON ist das Standardformat für API-Kommunikation. Flask bietet integrierte Unterstützung für das Parsen und Generieren von JSON-Daten.
from flask import request, jsonify
@app.route('/api/benutzer', methods=['POST'])
def create_user():
# JSON-Daten abrufen
if not request.is_json:
return jsonify({'fehler': 'Content-Type muss application/json sein'}), 400
data = request.get_json()
if data is None:
return jsonify({'fehler': 'Ungültiges JSON'}), 400
# Erforderliche Felder prüfen
required_fields = ['username', 'email', 'password']
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return jsonify({
'fehler': 'Fehlende Felder',
'fehlende_felder': missing_fields
}), 400
# Daten verarbeiten
user_data = {
'id': 123,
'username': data['username'],
'email': data['email'],
'erstellt': datetime.utcnow().isoformat()
}
return jsonify(user_data), 201
@app.route('/api/benutzer/<int:user_id>', methods=['PUT'])
def update_user(user_id):
data = request.get_json(force=True) # JSON erzwingen
# Aktualisierbare Felder definieren
updateable_fields = ['username', 'email', 'first_name', 'last_name']
updates = {}
for field in updateable_fields:
if field in data:
updates[field] = data[field]
if not updates:
return jsonify({'fehler': 'Keine aktualisierbaren Felder gefunden'}), 400
return jsonify({
'user_id': user_id,
'aktualisierte_felder': updates
})@app.route('/api/bestellung', methods=['POST'])
def create_order():
data = request.get_json()
# Verschachtelte Struktur validieren
required_structure = {
'kunde': ['name', 'email'],
'adresse': ['strasse', 'stadt', 'plz'],
'artikel': [] # Liste von Artikeln
}
errors = []
# Hauptobjekte prüfen
for key, required_fields in required_structure.items():
if key not in data:
errors.append(f'Feld "{key}" fehlt')
continue
if isinstance(required_fields, list) and required_fields:
# Verschachtelte Objekte prüfen
for field in required_fields:
if field not in data[key]:
errors.append(f'Feld "{key}.{field}" fehlt')
# Artikel-Liste validieren
if 'artikel' in data:
if not isinstance(data['artikel'], list):
errors.append('Artikel muss eine Liste sein')
elif len(data['artikel']) == 0:
errors.append('Mindestens ein Artikel erforderlich')
else:
for i, artikel in enumerate(data['artikel']):
if 'produkt_id' not in artikel:
errors.append(f'Artikel[{i}].produkt_id fehlt')
if 'menge' not in artikel:
errors.append(f'Artikel[{i}].menge fehlt')
if errors:
return jsonify({'fehler': errors}), 400
# Bestellung verarbeiten
bestellung = {
'bestellnummer': 'B-2024-001',
'kunde': data['kunde'],
'adresse': data['adresse'],
'artikel_anzahl': len(data['artikel']),
'status': 'eingegangen'
}
return jsonify(bestellung), 201from datetime import datetime, date
import json
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, date):
return obj.isoformat()
elif hasattr(obj, '__dict__'):
return obj.__dict__
return super().default(obj)
class User:
def __init__(self, id, name, email, created_at):
self.id = id
self.name = name
self.email = email
self.created_at = created_at
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'email': self.email,
'created_at': self.created_at.isoformat() if self.created_at else None
}
@app.route('/api/benutzer/<int:user_id>')
def get_user(user_id):
# Beispiel-Benutzer erstellen
user = User(
id=user_id,
name='Max Mustermann',
email='max@beispiel.de',
created_at=datetime.utcnow()
)
# Option 1: Manuelle Serialisierung
return jsonify(user.to_dict())
@app.route('/api/benutzer-json/<int:user_id>')
def get_user_json(user_id):
user = User(user_id, 'Max Mustermann', 'max@beispiel.de', datetime.utcnow())
# Option 2: Custom JSON Encoder verwenden
app.json_encoder = DateTimeEncoder
return jsonify(user.__dict__)Flask bietet verschiedene Möglichkeiten zur Erstellung von HTTP-Responses mit unterschiedlichen Inhalten, Headers und Statuscodes.
from flask import Response, make_response, jsonify
@app.route('/text-response')
def text_response():
return 'Einfache Textantwort'
@app.route('/html-response')
def html_response():
html_content = '''
<html>
<head><title>Dynamische Seite</title></head>
<body>
<h1>Dynamisch generierte HTML-Seite</h1>
<p>Aktuelle Zeit: {}</p>
</body>
</html>
'''.format(datetime.now().strftime('%H:%M:%S'))
return html_content
@app.route('/json-response')
def json_response():
data = {
'status': 'success',
'timestamp': datetime.utcnow().isoformat(),
'data': [1, 2, 3, 4, 5]
}
return jsonify(data)@app.route('/custom-response')
def custom_response():
response = make_response('Custom Response Content')
response.headers['X-Custom-Header'] = 'Mein Wert'
response.headers['Cache-Control'] = 'no-cache'
response.status_code = 200
return response
@app.route('/json-custom-response')
def json_custom_response():
data = {'nachricht': 'Erfolg'}
response = make_response(jsonify(data))
response.headers['X-API-Version'] = '1.0'
response.headers['Access-Control-Allow-Origin'] = '*'
return responseimport time
@app.route('/stream')
def stream_response():
def generate():
for i in range(10):
yield f"Nachricht {i+1}\n"
time.sleep(1)
return Response(generate(), mimetype='text/plain')
@app.route('/csv-download')
def csv_download():
def generate_csv():
yield 'Name,Email,Alter\n'
users = [
('Max Mustermann', 'max@beispiel.de', 30),
('Anna Schmidt', 'anna@beispiel.de', 25),
('Tom Weber', 'tom@beispiel.de', 35)
]
for name, email, alter in users:
yield f'{name},{email},{alter}\n'
response = Response(generate_csv(), mimetype='text/csv')
response.headers['Content-Disposition'] = 'attachment; filename=benutzer.csv'
return responseimport io
from PIL import Image
@app.route('/generate-image')
def generate_image():
# Einfaches Bild erstellen
img = Image.new('RGB', (200, 100), color='lightblue')
# In Memory speichern
img_io = io.BytesIO()
img.save(img_io, 'PNG')
img_io.seek(0)
return Response(img_io.getvalue(), mimetype='image/png')
@app.route('/download-file')
def download_file():
# Datei-Content aus Datenbank oder Dateisystem
file_content = b'Dies ist der Inhalt einer Datei'
response = Response(file_content, mimetype='application/octet-stream')
response.headers['Content-Disposition'] = 'attachment; filename=download.txt'
response.headers['Content-Length'] = len(file_content)
return responseHTTP-Statuscodes kommunizieren das Ergebnis einer Anfrage. Die korrekte Verwendung ist essentiell für APIs und Benutzerfreundlichkeit.
@app.route('/api/artikel', methods=['POST'])
def create_article():
data = request.get_json()
# Artikel erstellen
article = {
'id': 123,
'titel': data.get('titel'),
'inhalt': data.get('inhalt')
}
# 201 Created für erfolgreiche Erstellung
return jsonify(article), 201
@app.route('/api/artikel/<int:id>', methods=['PUT'])
def update_article(id):
data = request.get_json()
# Artikel aktualisieren
return jsonify({'id': id, 'status': 'aktualisiert'}), 200
@app.route('/api/artikel/<int:id>', methods=['DELETE'])
def delete_article(id):
# Artikel löschen
# 204 No Content für erfolgreiche Löschung ohne Response-Body
return '', 204
@app.route('/api/status')
def api_status():
# 200 OK für erfolgreiche Abfrage
return jsonify({'status': 'online', 'version': '1.0'}), 200@app.route('/api/benutzer/<int:user_id>')
def get_user_by_id(user_id):
# Simulation: Benutzer nicht gefunden
if user_id > 1000:
return jsonify({'fehler': 'Benutzer nicht gefunden'}), 404
return jsonify({'id': user_id, 'name': 'Benutzer'})
@app.route('/api/protected', methods=['POST'])
def protected_endpoint():
auth_header = request.headers.get('Authorization')
if not auth_header:
return jsonify({'fehler': 'Authorization Header erforderlich'}), 401
if not auth_header.startswith('Bearer '):
return jsonify({'fehler': 'Ungültiger Authorization Header'}), 401
# Token validieren
token = auth_header.split(' ')[1]
if token != 'gültiger-token':
return jsonify({'fehler': 'Ungültiger Token'}), 403
return jsonify({'nachricht': 'Zugriff gewährt'})
@app.route('/api/validation-demo', methods=['POST'])
def validation_demo():
data = request.get_json()
if not data:
return jsonify({'fehler': 'JSON-Daten erforderlich'}), 400
if 'email' not in data:
return jsonify({
'fehler': 'Validierungsfehler',
'details': ['Email ist erforderlich']
}), 422 # Unprocessable Entity
return jsonify({'status': 'validiert'})@app.route('/api/fehler-simulation')
def error_simulation():
try:
# Simulation eines Datenbankfehlers
result = 1 / 0
except ZeroDivisionError:
return jsonify({
'fehler': 'Interner Serverfehler',
'code': 'DIVISION_BY_ZERO'
}), 500
@app.route('/api/service-unavailable')
def service_unavailable():
# Service temporär nicht verfügbar
return jsonify({
'fehler': 'Service temporär nicht verfügbar',
'retry_after': 300 # 5 Minuten
}), 503
@app.errorhandler(500)
def internal_error(error):
return jsonify({
'fehler': 'Interner Serverfehler',
'timestamp': datetime.utcnow().isoformat()
}), 500
@app.errorhandler(404)
def not_found(error):
return jsonify({
'fehler': 'Ressource nicht gefunden',
'pfad': request.path
}), 404@app.route('/api/artikel/<int:id>/status', methods=['PATCH'])
def update_article_status(id):
data = request.get_json()
new_status = data.get('status')
# Aktuellen Status simulieren
current_status = 'draft'
if current_status == new_status:
# 304 Not Modified wenn keine Änderung
return '', 304
# Status aktualisieren
return jsonify({
'id': id,
'alter_status': current_status,
'neuer_status': new_status
}), 200
@app.route('/api/health')
def health_check():
# Verschiedene Service-Checks
database_ok = True # Datenbankverbindung prüfen
cache_ok = True # Cache-Verbindung prüfen
if database_ok and cache_ok:
return jsonify({'status': 'healthy'}), 200
elif database_ok:
return jsonify({
'status': 'degraded',
'details': 'Cache nicht verfügbar'
}), 207 # Multi-Status
else:
return jsonify({
'status': 'unhealthy',
'details': 'Datenbankverbindung fehlgeschlagen'
}), 503Die korrekte Behandlung von Requests und Responses bildet die Grundlage für robuste und benutzerfreundliche Flask-Anwendungen.