Tuesday, 31 October 2023

Supervisor (superuser) approval of root superuser login in Django

In my Django project with two superusers called root and supervisor, I'm trying to implement the following flow:

  • When root logs into the application, he is directed to a 'waiting for authorisation' page

  • In the meantime, an email is sent to the supervisor containing the OTP in plain text as well as a link to the page where the otp needs to be entered.

  • When the supervisor clicks this link, enters the OTP and clicks 'Approve', OTP verification for root occurs

  • If authentication is successful, the supervisor sees a message saying OTP authentication successful, and in the meantime, the root user is redirected from the waiting page to the landing page

  • If authentication fails, root and the supervisor are shown a message on the page saying that the OTP authorisation failed

The only user who needs OTP authentication is root and approval can only be granted by supervisor.

I have everything down except for the part where I login the root user.

Using debugging statements, I can see that I am able to access the root user's session details correctly and create a new request to associate with that session, but ultimately, supervisor ends up getting redirected to the landing page.

I am sharing the relevant parts of my code below:

in models.py:

class OTP(models.Model):
    user = models.OneToOneField(User, on_delete=models.CASCADE)
    otp_secret = models.CharField()
    is_verified = models.BooleanField(default=False)

    def __str__(self):
        return self.user.email

class UserSession(models.Model):
    user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
    session = models.ForeignKey(Session, on_delete=models.CASCADE)

in views.py:

User = get_user_model()

user = User.objects.get(username='supervisor')
SUPERVISOR_EMAIL = user.email

user2 = User.objects.get(username='root')
ROOT_EMAIL = user2.email

logger = logging.getLogger(__name__)

#Interval (in seconds) for OTP verification
otp_interval = 300 # 300 seconds (5 minutes)

@login_required(login_url='custom_login')
def logout(request):
    delete_user_sessions(request.user)
    django_logout(request)
    return redirect('custom_login')

def user_logged_in_handler(sender, request, user, **kwargs):
    UserSession.objects.get_or_create(
        user = user,
        session_id = request.session.session_key
    )

def delete_user_sessions(user):
    user_sessions = UserSession.objects.filter(user = user)
    for user_session in user_sessions:
        user_session.session.delete()


def get_supervisor_email():
    try:
        supervisor_user = User.objects.get(username='supervisor')
        return supervisor_user.email
    except User.DoesNotExist:
        # Handle the case where the 'supervisor' user does not exist
        raise User.DoesNotExist('The \'supervisor\' user does not exist.')

def get_root_email():
    try:
        root_user = User.objects.get(username='root')
        return root_user.email
    except User.DoesNotExist:
        # Handle the case where the 'supervisor' user does not exist
        raise User.DoesNotExist('The \'root\' user does not exist.')

def is_root_user(user):
    return user.is_authenticated and user.is_superuser and user.username == 'root'
    
def generate_otp(request, user):
    logger.debug('inside generate_otp method')
    
    if request is None:
        logger.error('generate_otp: The \'request\' object is missing for generating the OTP.')
        messages.error(request, 'Something went wrong. Please try again.')
        return render(request, 'something_went_wrong.html')

    elif request is not None and user.username=='root' and user.is_authenticated:
        try:
            logger.debug('generate_otp: request is not None and user.username==root and user.is_authenticated')
            logger.debug('generate_otp: inside the try block in generate_otp method')
            supervisor_email = get_supervisor_email()
            ROOT_EMAIL = get_root_email()
            

            if not request.session.session_key:
                logger.debug('generate_otp: saving session because session key is none')
                request.session.save()

            session_data = {
                "user_id": user.id,
                "role": "root",
                "session_key": request.session.session_key
            }

            logger.debug('generate_otp: created session_data dictionary')

            request.session['root_data'] = session_data
            request.session['root_target_url'] = reverse('landingpage')
            request.session.save()

            session_key = request.session.session_key
            session = Session.objects.get(session_key=session_key)

            
            user_session = UserSession(user=user, session=session)
            user_session.save()
            logger.debug('generate_otp: saved session data to main_usersession')

            otp_secret = pyotp.random_base32()
            logger.debug('inside generate_otp, otp_secret is: %s', otp_secret)
            otp = pyotp.TOTP(otp_secret, interval=otp_interval)
            otp_code = otp.now()
            logger.debug('otp_code generated from within generate_otp is %s', otp_code)
            
            # Save OTP to the database
            otp_obj, created = OTP.objects.get_or_create(user=user)
            otp_obj.otp_secret = str(otp_secret)
            logger.debug('otp_secret is %s', otp_secret)
            otp_obj.save()
            logger.debug('otp saved to db, in generate_otp method')
            
            # Prepare the verification link
            current_site = get_current_site(request)
            verification_link = f"http://{current_site}{reverse('enter_otp')}"
            
            # Prepare email
            subject = 'OTP for Root Super User Login Approval'
            message = f'The OTP to approve the superuser with email address {ROOT_EMAIL} is: {otp_code}. Navigate to this page to enter the OTP and approve this user\'s login {verification_link}'
            from_email = settings.DEFAULT_FROM_EMAIL
            recipient_list = [supervisor_email]
            send_mail(subject, message, from_email, recipient_list)
            logger.debug('Email message: %s %s', message, user.username)
            messages.success(request, f'OTP has been shared with your supervisor on email at {supervisor_email}')
            return render(request, 'login_redirect.html')
            
        except Exception as e:
            logger.error(f'An error occurred during OTP generation and email sending: {str(e)} %s', user.username)
            messages.error(request, 'An error occurred during login. Please try again.')
            return render(request, 'something_went_wrong.html')


@login_required(login_url='custom_login')
def verify_otp(request):
    logger.debug('inside verify_otp. request.user is %s', request.user)
    request.session.save()
    otp = request.POST.get('otp')
    logger.debug('inside verify_otp: otp is %s. Request.user is: %s', otp, request.user)

    try:
        latest_root_session = UserSession.objects.filter(user=user2, user__otp__is_verified=False).latest('session__expire_date')
        logger.debug('inside verify_otp, latest_root_session fetched. Request.user is %s', request.user)

        latest_root_session_user_id = latest_root_session.user.id
        logger.debug('inside verify_otp, latest_root_session_user_id fetched. Request.user is %s', request.user)

    except ObjectDoesNotExist:
        latest_root_session = None

    if latest_root_session: 
        if latest_root_session_user_id == user2.id:
            logger.debug('inside verify_otp, if latest_root_session and latest_root_session_user_id = user2.id is true. Request.user is %s', request.user)
            try:
                logger.debug('inside verify_otp, inside try block. Request.user is %s', request.user)
                session = latest_root_session.session
                session_key = session.session_key
                logger.debug('inside verify_otp, session and session key extracted from main_usersession. Request.user is %s', request.user)

                if otp is not None and otp != 'None':
                    try:
                        int_otp = int(otp)
                    except ValueError:
                        messages.error(request, 'Invalid OTP format.')
                        return render(request, 'something_went_wrong.html')
                else:
                    messages.error(request, 'OTP is missing.')
                    return render(request, 'something_went_wrong.html')

                otp_obj = OTP.objects.filter(user__pk=user2.id).first()
                otp_secret = otp_obj.otp_secret
                logger.debug('inside verify_otp, fetched otp_obj for user2 which is root. Request.user is %s', request.user)
                logger.debug('otp_obj: %s. Request.user is', otp_obj, request.user)
                logger.debug('otp_secret: %s. Request.user is %s', str(otp_secret), request.user)
                logger.debug('int_otp: %s. Request.user is %s', int_otp, request.user)
                
                if otp_obj and int_otp:
                    logger.debug('inside verify_otp, if otp_obj and int_otp is true. Request.user is %s', request.user)
                    otp = pyotp.TOTP(otp_secret, interval=otp_interval)
                    logger.debug('inside verify_otp, otp extracted from otp_obj otp secret: %s. Request.user is %s', otp, request.user)
                    logger.debug('inside verify_otp, int_otp: %s. Request.user is %s', int_otp, request.user)
                    verification_result = otp.verify(int_otp)
                    logger.debug('verification_result: %s. Request.otp is %s', verification_result, request.user)

                    if verification_result:
                        logger.debug('inside verify_otp, verification_result is true, OTP verification succeeded. Request.user is %s', request.user)
                        # Retrieve the target URL from the root user's session
                        session_data = latest_root_session.session.get_decoded()
                        root_target_url = session_data.get('root_target_url', None)
                        logger.debug('inside verify_otp, root_target_url with value %s has been fetched. Request.user is %s', root_target_url, request.user)

                        if root_target_url:
                            logger.debug('root_target_url exists. Request.user is %s', request.user)

                            session = latest_root_session.session
                            logger.debug('executed session = latest_root_session.session: %s. Request.user is %s', str(session), request.user)
                            
                            session_key = session.session_key
                            logger.debug('session_key = session.session_key: %s. Request.user is %s', str(session_key), request.user)
                            logger.debug('data type for session_key: %s. Request.user is %s', type(session_key), request.user)
                            
                            user = latest_root_session.user
                            logger.debug('user = latest_root_session.user: %s. Request.user is %s', str(user), request.user)
                            

                            # Create a new HttpRequest object based on the session
                            request = HttpRequest()
                            logger.debug('request = HttpRequest() executed.')
                            
                            request.session = SessionStore(session_key=session_key)
                            logger.debug('request.session = SessionStore(session_key=session_key) executed.')

                            request.user = user
                            logger.debug('request.user = user executed. Request.user is %s', request.user)
                            logger.debug('currently, request is %s and user is %s', request, user)
                            login(request, user)
                            return redirect('landingpage')
                        
                    else:
                        logger.debug('if verification_result evaluated to false')
                        messages.error(request, 'Root target URL is missing or user is not root.')
                        return render(request, 'something_went_wrong.html')

                else:
                    logger.debug('if otp_obj and int_otp is false')
                    messages.error(request, 'OTP authorization failed. Please try logging in again.')
                    return render(request, 'something_went_wrong.html')

            except signing.SignatureExpired:
                messages.error(request, 'The verification link has expired. Please try again.')
                return render(request, 'something_went_wrong.html')

            except signing.BadSignature:
                messages.error(request, 'Invalid or tampered verification link. Please try again.')
                return render(request, 'something_went_wrong.html')

        else:
            logger.debug('if latest_root_session_user_id == user2.id is false')
            messages.error(request, 'There are no sessions for the root user pending approval from supervisor.')
            return render(request, 'something_went_wrong.html')
    else:
        logger.debug('if latest_root_session condition failed')
        messages.error(request, 'There are no sessions for the root user pending approval from supervisor.')
        return render(request, 'something_went_wrong.html')

    messages.error(request, 'Something went wrong. Please try again after some time.')
    return render(request, 'something_went_wrong.html')

In the logs that are generated, at the end I see the following:

DEBUG 2023-10-31 16:04:08,275 views data type for session_key: <class 'str'>. Request.user is supervisor
DEBUG 2023-10-31 16:04:08,275 views user = latest_root_session.user: root. Request.user is supervisor
DEBUG 2023-10-31 16:04:08,279 views request = HttpRequest() executed.
DEBUG 2023-10-31 16:04:08,279 views request.session = SessionStore(session_key=session_key) executed.
DEBUG 2023-10-31 16:04:08,281 views request.user = user executed. Request.user is root
DEBUG 2023-10-31 16:04:08,281 views currently, request is <HttpRequest> and user is root

and the superuser is directed to the landing page, but nothing happens to the root user's page.

I would deeply appreciate any guidance or help. Thank you!



from Supervisor (superuser) approval of root superuser login in Django

No comments:

Post a Comment