from flask import Flask, render_template, redirect, url_for, request, jsonify, session, send_file from flask_login import LoginManager, login_user, logout_user, login_required, current_user from authlib.integrations.flask_client import OAuth from werkzeug.middleware.proxy_fix import ProxyFix from sqlalchemy.orm import joinedload, selectinload from config import Config from models import db, User, Poll, PollResponse, PollParticipant, TimeSlot from datetime import datetime from io import BytesIO from PIL import Image from urllib.parse import urlparse import requests import base64 import secrets import json import ipaddress app = Flask(__name__) app.config.from_object(Config) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) # Session configuration for OAuth CSRF protection app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['PERMANENT_SESSION_LIFETIME'] = 86400 # 24 hours db.init_app(app) login_manager = LoginManager(app) login_manager.login_view = 'login' oauth = OAuth(app) oauth.register( name='oidc', client_id=app.config['OIDC_CLIENT_ID'], client_secret=app.config['OIDC_CLIENT_SECRET'], server_metadata_url=app.config['OIDC_DISCOVERY_URL'], client_kwargs={'scope': 'openid email profile'} ) @login_manager.user_loader def load_user(user_id): return db.session.get(User, user_id) def is_safe_url(target): """Validate that a redirect URL is safe (same host).""" if not target: return False ref_url = urlparse(request.host_url) test_url = urlparse(target) # Allow relative URLs and same-host URLs only return test_url.scheme in ('http', 'https', '') and \ (not test_url.netloc or ref_url.netloc == test_url.netloc) def fetch_and_scale_profile_picture(url): """Fetch and scale profile picture with safety checks.""" if not url: return None try: # Validate URL scheme parsed = urlparse(url) if parsed.scheme not in ('http', 'https'): return None # Prevent SSRF to local/private addresses # Extract hostname (remove port if present) hostname = parsed.netloc.split(':')[0] try: ip = ipaddress.ip_address(hostname) if ip.is_private or ip.is_loopback: return None except ValueError: # Not an IP address, likely a domain name - allow it pass resp = requests.get(url, timeout=10, allow_redirects=True, max_redirects=3) if resp.status_code == 200: # Validate content type content_type = resp.headers.get('Content-Type', '') if not content_type.startswith('image/'): return None img = Image.open(BytesIO(resp.content)) img = img.convert('RGB') img = img.resize((256, 256), Image.Resampling.LANCZOS) output = BytesIO() img.save(output, format='JPEG', quality=85) return output.getvalue() except: pass return None @app.route('/') def index(): return render_template('index.html') @app.route('/login') def login(): # Store the page to return to after login (validate to prevent open redirect) next_url = request.args.get('next') or request.referrer redirect_uri = url_for('auth_callback', _external=True) if next_url and is_safe_url(next_url): # Generate a unique state for this OAuth flow to support multi-tab scenarios # Each tab will have its own state, preventing overwrites import secrets state = secrets.token_urlsafe(32) # Store redirect URL keyed by this state session[f'oauth_redirect_{state}'] = next_url session.modified = True # Pass our custom state to the OAuth provider return oauth.oidc.authorize_redirect(redirect_uri, state=state) return oauth.oidc.authorize_redirect(redirect_uri) @app.route('/auth/callback') def auth_callback(): token = oauth.oidc.authorize_access_token() userinfo = token.get('userinfo') or oauth.oidc.userinfo() oauth_id = userinfo['sub'] user = User.query.filter_by(oauth_id=oauth_id).first() username = userinfo.get('preferred_username') or userinfo.get('name') or userinfo.get('email', '').split('@')[0] if app.config.get('OIDC_USE_GIVEN_NAME'): first_name = userinfo.get('firstName') or userinfo.get('given_name') last_name = userinfo.get('lastName') or userinfo.get('family_name') else: first_name = None last_name = None email = userinfo.get('email') picture_url = userinfo.get('picture') profile_pic = fetch_and_scale_profile_picture(picture_url) if picture_url else None if user: user.username = username user.first_name = first_name user.last_name = last_name user.email = email if profile_pic: user.profile_picture = profile_pic db.session.commit() else: user = User( oauth_id=oauth_id, username=username, first_name=first_name, last_name=last_name, email=email, profile_picture=profile_pic ) db.session.add(user) db.session.commit() login_user(user) # Retrieve redirect URL using the OAuth state parameter (supports multi-tab) state = request.args.get('state') next_url = None if state: next_url = session.pop(f'oauth_redirect_{state}', None) # Fallback to old single-redirect key for backwards compatibility if not next_url: next_url = session.pop('login_next', None) # Re-validate for extra safety if next_url and not is_safe_url(next_url): next_url = None return redirect(next_url or url_for('dashboard')) @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('index')) @app.route('/create') @login_required def dashboard(): return render_template('dashboard.html') @app.route('/poll/new', methods=['POST']) @login_required def create_poll(): data = request.json allowed_dates = data.get('allowed_dates', []) poll = Poll( title=data['title'], description=data.get('description', ''), creator_id=current_user.id, allow_anonymous=data.get('allow_anonymous', False), is_day_wise=data.get('is_day_wise', False), allowed_dates=json.dumps(allowed_dates) if allowed_dates else None ) db.session.add(poll) db.session.flush() # Auto-add creator as participant participant = PollParticipant(poll_id=poll.id, user_id=current_user.id) db.session.add(participant) db.session.commit() return jsonify({'id': poll.id}) def get_poll_data(poll_id): poll = db.get_or_404(Poll, poll_id) allowed_dates = json.loads(poll.allowed_dates) if poll.allowed_dates else [] chosen_slots = json.loads(poll.chosen_slots) if poll.chosen_slots else [] # Eager load all related data in single queries participants_list = PollParticipant.query.options( joinedload(PollParticipant.user) ).filter_by(poll_id=poll_id).all() responses_list = PollResponse.query.options( joinedload(PollResponse.user) ).filter_by(poll_id=poll_id).all() # Load all time slots for this poll's responses in one query response_ids = [r.id for r in responses_list] all_time_slots = TimeSlot.query.filter(TimeSlot.response_id.in_(response_ids)).all() if response_ids else [] time_slots_by_response = {} for ts in all_time_slots: time_slots_by_response.setdefault(ts.response_id, []).append(ts) # Determine current user/guest participation has_joined = False guest_participant = None if current_user.is_authenticated: has_joined = any(p.user_id == current_user.id for p in participants_list) else: session_key = request.cookies.get(f'poll_session_{poll_id}') if session_key: guest_participant = next((p for p in participants_list if p.session_key == session_key), None) has_joined = guest_participant is not None participants = [] for p in participants_list: is_current = False if current_user.is_authenticated and p.user_id == current_user.id: is_current = True elif guest_participant and p.id == guest_participant.id: is_current = True participants.append({ 'id': p.id, 'name': p.display_name, 'user_id': p.user_id, 'is_current_user': is_current }) responses = [] for resp in responses_list: slots = [{'date': ts.date.isoformat(), 'start': ts.start_minutes, 'end': ts.end_minutes, 'note': ts.note} for ts in time_slots_by_response.get(resp.id, [])] is_owner = False if current_user.is_authenticated: is_owner = resp.user_id == current_user.id elif guest_participant: is_owner = resp.guest_name == guest_participant.guest_name responses.append({ 'id': resp.id, 'name': resp.display_name, 'email': resp.display_email, 'is_owner': is_owner, 'has_picture': bool(resp.user.profile_picture if resp.user else resp.guest_profile_picture), 'slots': slots }) return { 'id': poll.id, 'title': poll.title, 'description': poll.description, 'creator_id': poll.creator_id, 'allow_anonymous': poll.allow_anonymous, 'is_day_wise': poll.is_day_wise, 'allowed_dates': allowed_dates, 'is_locked': poll.is_locked, 'chosen_slots': chosen_slots, 'participants': participants, 'has_joined': has_joined, 'guest_name': guest_participant.guest_name if guest_participant else None, 'is_creator': current_user.is_authenticated and poll.creator_id == current_user.id, 'responses': responses } @app.route('/poll/') def view_poll(poll_id): poll = db.get_or_404(Poll, poll_id) if not poll.allow_anonymous and not current_user.is_authenticated: return redirect(url_for('login', next=request.path)) return render_template('poll.html', poll=poll) @app.route('/api/poll/') def get_poll(poll_id): return jsonify(get_poll_data(poll_id)) @app.route('/api/poll//join', methods=['POST']) def join_poll(poll_id): poll = db.get_or_404(Poll, poll_id) data = request.json or {} if current_user.is_authenticated: existing = PollParticipant.query.filter_by(poll_id=poll_id, user_id=current_user.id).first() if not existing: participant = PollParticipant(poll_id=poll_id, user_id=current_user.id) db.session.add(participant) db.session.commit() return jsonify({'success': True}) else: name = data.get('name', '').strip() if not name: return jsonify({'error': 'Name required'}), 400 # Check if name matches an OAuth user's username or display name existing_user = User.query.filter( db.or_( db.func.lower(User.username) == name.lower(), db.func.lower(User.first_name + ' ' + User.last_name) == name.lower() ) ).first() if existing_user: return jsonify({'error': 'This name belongs to a registered user. Please login or use a different name.'}), 400 # Check if name already used by a guest in this poll - allow reclaiming identity existing_participant = PollParticipant.query.filter_by(poll_id=poll_id, guest_name=name).first() if existing_participant: # Allow joining as this guest (reclaim identity) resp = jsonify({'success': True, 'participant_id': existing_participant.id, 'session_key': existing_participant.session_key}) resp.set_cookie(f'poll_session_{poll_id}', existing_participant.session_key, max_age=60*60*24*365, httponly=True, samesite='Lax') return resp # Create new participant with session key new_session_key = secrets.token_hex(32) participant = PollParticipant(poll_id=poll_id, guest_name=name, session_key=new_session_key) db.session.add(participant) db.session.commit() resp = jsonify({'success': True, 'participant_id': participant.id, 'session_key': new_session_key}) resp.set_cookie(f'poll_session_{poll_id}', new_session_key, max_age=60*60*24*365, httponly=True, samesite='Lax') return resp @app.route('/api/poll//respond', methods=['POST']) def respond_to_poll(poll_id): poll = db.get_or_404(Poll, poll_id) data = request.json dates_to_update = [datetime.strptime(d, '%Y-%m-%d').date() for d in data.get('dates', [])] if current_user.is_authenticated: response = PollResponse.query.filter_by(poll_id=poll_id, user_id=current_user.id).first() if response: if poll.is_day_wise: # Day-wise: replace all slots TimeSlot.query.filter(TimeSlot.response_id == response.id).delete(synchronize_session=False) elif dates_to_update: TimeSlot.query.filter( TimeSlot.response_id == response.id, TimeSlot.date.in_(dates_to_update) ).delete(synchronize_session=False) else: response = PollResponse(poll_id=poll_id, user_id=current_user.id) db.session.add(response) else: guest_name = data['name'] response = PollResponse.query.filter_by(poll_id=poll_id, guest_name=guest_name).first() if response: if poll.is_day_wise: # Day-wise: replace all slots TimeSlot.query.filter(TimeSlot.response_id == response.id).delete(synchronize_session=False) elif dates_to_update: TimeSlot.query.filter( TimeSlot.response_id == response.id, TimeSlot.date.in_(dates_to_update) ).delete(synchronize_session=False) else: response = PollResponse(poll_id=poll_id, guest_name=guest_name) db.session.add(response) db.session.flush() note = data.get('note', '').strip() or None for slot in data['slots']: ts = TimeSlot( response_id=response.id, date=datetime.strptime(slot['date'], '%Y-%m-%d').date(), start_minutes=slot['start'], end_minutes=slot['end'], note=note ) db.session.add(ts) db.session.commit() return jsonify({'success': True, 'response_id': response.id}) @app.route('/api/poll//lock', methods=['POST']) @login_required def lock_poll(poll_id): poll = db.get_or_404(Poll, poll_id) if poll.creator_id != current_user.id: return jsonify({'error': 'Not authorized'}), 403 poll.is_locked = not poll.is_locked if not poll.is_locked: poll.chosen_slots = None db.session.commit() return jsonify({'success': True, 'is_locked': poll.is_locked}) @app.route('/api/poll//choose', methods=['POST']) @login_required def choose_slot(poll_id): poll = db.get_or_404(Poll, poll_id) if poll.creator_id != current_user.id: return jsonify({'error': 'Not authorized'}), 403 if not poll.is_locked: return jsonify({'error': 'Poll must be locked'}), 400 data = request.json date = data.get('date') start = data.get('start') chosen = json.loads(poll.chosen_slots) if poll.chosen_slots else [] slot = {'date': date, 'start': start} existing = next((i for i, s in enumerate(chosen) if s['date'] == date and s['start'] == start), None) if existing is not None: chosen.pop(existing) else: chosen.append(slot) poll.chosen_slots = json.dumps(chosen) db.session.commit() return jsonify({'success': True, 'chosen_slots': chosen}) @app.route('/api/poll//delete', methods=['POST']) @login_required def delete_poll(poll_id): poll = db.get_or_404(Poll, poll_id) if poll.creator_id != current_user.id: return jsonify({'error': 'Not authorized'}), 403 db.session.delete(poll) db.session.commit() return jsonify({'success': True}) @app.route('/api/poll//edit', methods=['POST']) @login_required def edit_poll(poll_id): poll = db.get_or_404(Poll, poll_id) if poll.creator_id != current_user.id: return jsonify({'error': 'Not authorized'}), 403 data = request.json poll.title = data.get('title', poll.title) poll.description = data.get('description', poll.description) poll.allow_anonymous = data.get('allow_anonymous', poll.allow_anonymous) new_dates = set(data.get('allowed_dates', [])) old_dates = set(json.loads(poll.allowed_dates)) if poll.allowed_dates else set() # Find dates that were removed removed_dates = old_dates - new_dates if removed_dates: # Delete time slots for removed dates removed_date_objs = [datetime.strptime(d, '%Y-%m-%d').date() for d in removed_dates] for response in poll.responses: TimeSlot.query.filter( TimeSlot.response_id == response.id, TimeSlot.date.in_(removed_date_objs) ).delete(synchronize_session=False) poll.allowed_dates = json.dumps(list(new_dates)) if new_dates else None db.session.commit() return jsonify({'success': True}) @app.route('/api/response//picture') def get_response_picture(response_id): resp = db.get_or_404(PollResponse, response_id) pic = resp.user.profile_picture if resp.user else resp.guest_profile_picture if pic: return send_file(BytesIO(pic), mimetype='image/jpeg') return '', 404 @app.route('/api/user/picture') @login_required def get_user_picture(): if current_user.profile_picture: return send_file(BytesIO(current_user.profile_picture), mimetype='image/jpeg') return '', 404 @app.route('/api/polls') @login_required def list_polls(): polls = Poll.query.filter_by(creator_id=current_user.id).order_by(Poll.created_at.desc()).all() return jsonify([{'id': p.id, 'title': p.title, 'created_at': p.created_at.isoformat()} for p in polls]) @app.route('/api/my-polls') @login_required def my_polls(): participations = PollParticipant.query.filter_by(user_id=current_user.id).all() polls = [] for p in participations: poll = p.poll polls.append({ 'id': poll.id, 'title': poll.title, 'participant_count': poll.participants.count() }) return jsonify(polls) with app.app_context(): db.create_all() if __name__ == '__main__': app.run(debug=True, port=5000, host="0.0.0.0")