Tuesday 31 October 2023

Supervisor (superuser) approval of root superuser login in Django

In my Django project with two superusers called root and supervisor, I'm trying to implement the following flow:

  • When root logs into the application, he is directed to a 'waiting for authorisation' page

  • In the meantime, an email is sent to the supervisor containing the OTP in plain text as well as a link to the page where the otp needs to be entered.

  • When the supervisor clicks this link, enters the OTP and clicks 'Approve', OTP verification for root occurs

  • If authentication is successful, the supervisor sees a message saying OTP authentication successful, and in the meantime, the root user is redirected from the waiting page to the landing page

  • If authentication fails, root and the supervisor are shown a message on the page saying that the OTP authorisation failed

The only user who needs OTP authentication is root and approval can only be granted by supervisor.

I have everything down except for the part where I login the root user.

Using debugging statements, I can see that I am able to access the root user's session details correctly and create a new request to associate with that session, but ultimately, supervisor ends up getting redirected to the landing page.

I am sharing the relevant parts of my code below:

in models.py:

class OTP(models.Model):
    user = models.OneToOneField(User, on_delete=models.CASCADE)
    otp_secret = models.CharField()
    is_verified = models.BooleanField(default=False)

    def __str__(self):
        return self.user.email

class UserSession(models.Model):
    user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
    session = models.ForeignKey(Session, on_delete=models.CASCADE)

in views.py:

User = get_user_model()

user = User.objects.get(username='supervisor')
SUPERVISOR_EMAIL = user.email

user2 = User.objects.get(username='root')
ROOT_EMAIL = user2.email

logger = logging.getLogger(__name__)

#Interval (in seconds) for OTP verification
otp_interval = 300 # 300 seconds (5 minutes)

@login_required(login_url='custom_login')
def logout(request):
    delete_user_sessions(request.user)
    django_logout(request)
    return redirect('custom_login')

def user_logged_in_handler(sender, request, user, **kwargs):
    UserSession.objects.get_or_create(
        user = user,
        session_id = request.session.session_key
    )

def delete_user_sessions(user):
    user_sessions = UserSession.objects.filter(user = user)
    for user_session in user_sessions:
        user_session.session.delete()


def get_supervisor_email():
    try:
        supervisor_user = User.objects.get(username='supervisor')
        return supervisor_user.email
    except User.DoesNotExist:
        # Handle the case where the 'supervisor' user does not exist
        raise User.DoesNotExist('The \'supervisor\' user does not exist.')

def get_root_email():
    try:
        root_user = User.objects.get(username='root')
        return root_user.email
    except User.DoesNotExist:
        # Handle the case where the 'supervisor' user does not exist
        raise User.DoesNotExist('The \'root\' user does not exist.')

def is_root_user(user):
    return user.is_authenticated and user.is_superuser and user.username == 'root'
    
def generate_otp(request, user):
    logger.debug('inside generate_otp method')
    
    if request is None:
        logger.error('generate_otp: The \'request\' object is missing for generating the OTP.')
        messages.error(request, 'Something went wrong. Please try again.')
        return render(request, 'something_went_wrong.html')

    elif request is not None and user.username=='root' and user.is_authenticated:
        try:
            logger.debug('generate_otp: request is not None and user.username==root and user.is_authenticated')
            logger.debug('generate_otp: inside the try block in generate_otp method')
            supervisor_email = get_supervisor_email()
            ROOT_EMAIL = get_root_email()
            

            if not request.session.session_key:
                logger.debug('generate_otp: saving session because session key is none')
                request.session.save()

            session_data = {
                "user_id": user.id,
                "role": "root",
                "session_key": request.session.session_key
            }

            logger.debug('generate_otp: created session_data dictionary')

            request.session['root_data'] = session_data
            request.session['root_target_url'] = reverse('landingpage')
            request.session.save()

            session_key = request.session.session_key
            session = Session.objects.get(session_key=session_key)

            
            user_session = UserSession(user=user, session=session)
            user_session.save()
            logger.debug('generate_otp: saved session data to main_usersession')

            otp_secret = pyotp.random_base32()
            logger.debug('inside generate_otp, otp_secret is: %s', otp_secret)
            otp = pyotp.TOTP(otp_secret, interval=otp_interval)
            otp_code = otp.now()
            logger.debug('otp_code generated from within generate_otp is %s', otp_code)
            
            # Save OTP to the database
            otp_obj, created = OTP.objects.get_or_create(user=user)
            otp_obj.otp_secret = str(otp_secret)
            logger.debug('otp_secret is %s', otp_secret)
            otp_obj.save()
            logger.debug('otp saved to db, in generate_otp method')
            
            # Prepare the verification link
            current_site = get_current_site(request)
            verification_link = f"http://{current_site}{reverse('enter_otp')}"
            
            # Prepare email
            subject = 'OTP for Root Super User Login Approval'
            message = f'The OTP to approve the superuser with email address {ROOT_EMAIL} is: {otp_code}. Navigate to this page to enter the OTP and approve this user\'s login {verification_link}'
            from_email = settings.DEFAULT_FROM_EMAIL
            recipient_list = [supervisor_email]
            send_mail(subject, message, from_email, recipient_list)
            logger.debug('Email message: %s %s', message, user.username)
            messages.success(request, f'OTP has been shared with your supervisor on email at {supervisor_email}')
            return render(request, 'login_redirect.html')
            
        except Exception as e:
            logger.error(f'An error occurred during OTP generation and email sending: {str(e)} %s', user.username)
            messages.error(request, 'An error occurred during login. Please try again.')
            return render(request, 'something_went_wrong.html')


@login_required(login_url='custom_login')
def verify_otp(request):
    logger.debug('inside verify_otp. request.user is %s', request.user)
    request.session.save()
    otp = request.POST.get('otp')
    logger.debug('inside verify_otp: otp is %s. Request.user is: %s', otp, request.user)

    try:
        latest_root_session = UserSession.objects.filter(user=user2, user__otp__is_verified=False).latest('session__expire_date')
        logger.debug('inside verify_otp, latest_root_session fetched. Request.user is %s', request.user)

        latest_root_session_user_id = latest_root_session.user.id
        logger.debug('inside verify_otp, latest_root_session_user_id fetched. Request.user is %s', request.user)

    except ObjectDoesNotExist:
        latest_root_session = None

    if latest_root_session: 
        if latest_root_session_user_id == user2.id:
            logger.debug('inside verify_otp, if latest_root_session and latest_root_session_user_id = user2.id is true. Request.user is %s', request.user)
            try:
                logger.debug('inside verify_otp, inside try block. Request.user is %s', request.user)
                session = latest_root_session.session
                session_key = session.session_key
                logger.debug('inside verify_otp, session and session key extracted from main_usersession. Request.user is %s', request.user)

                if otp is not None and otp != 'None':
                    try:
                        int_otp = int(otp)
                    except ValueError:
                        messages.error(request, 'Invalid OTP format.')
                        return render(request, 'something_went_wrong.html')
                else:
                    messages.error(request, 'OTP is missing.')
                    return render(request, 'something_went_wrong.html')

                otp_obj = OTP.objects.filter(user__pk=user2.id).first()
                otp_secret = otp_obj.otp_secret
                logger.debug('inside verify_otp, fetched otp_obj for user2 which is root. Request.user is %s', request.user)
                logger.debug('otp_obj: %s. Request.user is', otp_obj, request.user)
                logger.debug('otp_secret: %s. Request.user is %s', str(otp_secret), request.user)
                logger.debug('int_otp: %s. Request.user is %s', int_otp, request.user)
                
                if otp_obj and int_otp:
                    logger.debug('inside verify_otp, if otp_obj and int_otp is true. Request.user is %s', request.user)
                    otp = pyotp.TOTP(otp_secret, interval=otp_interval)
                    logger.debug('inside verify_otp, otp extracted from otp_obj otp secret: %s. Request.user is %s', otp, request.user)
                    logger.debug('inside verify_otp, int_otp: %s. Request.user is %s', int_otp, request.user)
                    verification_result = otp.verify(int_otp)
                    logger.debug('verification_result: %s. Request.otp is %s', verification_result, request.user)

                    if verification_result:
                        logger.debug('inside verify_otp, verification_result is true, OTP verification succeeded. Request.user is %s', request.user)
                        # Retrieve the target URL from the root user's session
                        session_data = latest_root_session.session.get_decoded()
                        root_target_url = session_data.get('root_target_url', None)
                        logger.debug('inside verify_otp, root_target_url with value %s has been fetched. Request.user is %s', root_target_url, request.user)

                        if root_target_url:
                            logger.debug('root_target_url exists. Request.user is %s', request.user)

                            session = latest_root_session.session
                            logger.debug('executed session = latest_root_session.session: %s. Request.user is %s', str(session), request.user)
                            
                            session_key = session.session_key
                            logger.debug('session_key = session.session_key: %s. Request.user is %s', str(session_key), request.user)
                            logger.debug('data type for session_key: %s. Request.user is %s', type(session_key), request.user)
                            
                            user = latest_root_session.user
                            logger.debug('user = latest_root_session.user: %s. Request.user is %s', str(user), request.user)
                            

                            # Create a new HttpRequest object based on the session
                            request = HttpRequest()
                            logger.debug('request = HttpRequest() executed.')
                            
                            request.session = SessionStore(session_key=session_key)
                            logger.debug('request.session = SessionStore(session_key=session_key) executed.')

                            request.user = user
                            logger.debug('request.user = user executed. Request.user is %s', request.user)
                            logger.debug('currently, request is %s and user is %s', request, user)
                            login(request, user)
                            return redirect('landingpage')
                        
                    else:
                        logger.debug('if verification_result evaluated to false')
                        messages.error(request, 'Root target URL is missing or user is not root.')
                        return render(request, 'something_went_wrong.html')

                else:
                    logger.debug('if otp_obj and int_otp is false')
                    messages.error(request, 'OTP authorization failed. Please try logging in again.')
                    return render(request, 'something_went_wrong.html')

            except signing.SignatureExpired:
                messages.error(request, 'The verification link has expired. Please try again.')
                return render(request, 'something_went_wrong.html')

            except signing.BadSignature:
                messages.error(request, 'Invalid or tampered verification link. Please try again.')
                return render(request, 'something_went_wrong.html')

        else:
            logger.debug('if latest_root_session_user_id == user2.id is false')
            messages.error(request, 'There are no sessions for the root user pending approval from supervisor.')
            return render(request, 'something_went_wrong.html')
    else:
        logger.debug('if latest_root_session condition failed')
        messages.error(request, 'There are no sessions for the root user pending approval from supervisor.')
        return render(request, 'something_went_wrong.html')

    messages.error(request, 'Something went wrong. Please try again after some time.')
    return render(request, 'something_went_wrong.html')

In the logs that are generated, at the end I see the following:

DEBUG 2023-10-31 16:04:08,275 views data type for session_key: <class 'str'>. Request.user is supervisor
DEBUG 2023-10-31 16:04:08,275 views user = latest_root_session.user: root. Request.user is supervisor
DEBUG 2023-10-31 16:04:08,279 views request = HttpRequest() executed.
DEBUG 2023-10-31 16:04:08,279 views request.session = SessionStore(session_key=session_key) executed.
DEBUG 2023-10-31 16:04:08,281 views request.user = user executed. Request.user is root
DEBUG 2023-10-31 16:04:08,281 views currently, request is <HttpRequest> and user is root

and the superuser is directed to the landing page, but nothing happens to the root user's page.

I would deeply appreciate any guidance or help. Thank you!



from Supervisor (superuser) approval of root superuser login in Django

No comments:

Post a Comment