In my Django project with two superusers called root and supervisor, I'm trying to implement the following flow:
-
When root logs into the application, he is directed to a 'waiting for authorisation' page
-
In the meantime, an email is sent to the supervisor containing the OTP in plain text as well as a link to the page where the otp needs to be entered.
-
When the supervisor clicks this link, enters the OTP and clicks 'Approve', OTP verification for root occurs
-
If authentication is successful, the supervisor sees a message saying OTP authentication successful, and in the meantime, the root user is redirected from the waiting page to the landing page
-
If authentication fails, root and the supervisor are shown a message on the page saying that the OTP authorisation failed
The only user who needs OTP authentication is root and approval can only be granted by supervisor.
I have everything down except for the part where I login the root user.
Using debugging statements, I can see that I am able to access the root user's session details correctly and create a new request to associate with that session, but ultimately, supervisor ends up getting redirected to the landing page.
I am sharing the relevant parts of my code below:
in models.py:
class OTP(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
otp_secret = models.CharField()
is_verified = models.BooleanField(default=False)
def __str__(self):
return self.user.email
class UserSession(models.Model):
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
session = models.ForeignKey(Session, on_delete=models.CASCADE)
in views.py:
User = get_user_model()
user = User.objects.get(username='supervisor')
SUPERVISOR_EMAIL = user.email
user2 = User.objects.get(username='root')
ROOT_EMAIL = user2.email
logger = logging.getLogger(__name__)
#Interval (in seconds) for OTP verification
otp_interval = 300 # 300 seconds (5 minutes)
@login_required(login_url='custom_login')
def logout(request):
delete_user_sessions(request.user)
django_logout(request)
return redirect('custom_login')
def user_logged_in_handler(sender, request, user, **kwargs):
UserSession.objects.get_or_create(
user = user,
session_id = request.session.session_key
)
def delete_user_sessions(user):
user_sessions = UserSession.objects.filter(user = user)
for user_session in user_sessions:
user_session.session.delete()
def get_supervisor_email():
try:
supervisor_user = User.objects.get(username='supervisor')
return supervisor_user.email
except User.DoesNotExist:
# Handle the case where the 'supervisor' user does not exist
raise User.DoesNotExist('The \'supervisor\' user does not exist.')
def get_root_email():
try:
root_user = User.objects.get(username='root')
return root_user.email
except User.DoesNotExist:
# Handle the case where the 'supervisor' user does not exist
raise User.DoesNotExist('The \'root\' user does not exist.')
def is_root_user(user):
return user.is_authenticated and user.is_superuser and user.username == 'root'
def generate_otp(request, user):
logger.debug('inside generate_otp method')
if request is None:
logger.error('generate_otp: The \'request\' object is missing for generating the OTP.')
messages.error(request, 'Something went wrong. Please try again.')
return render(request, 'something_went_wrong.html')
elif request is not None and user.username=='root' and user.is_authenticated:
try:
logger.debug('generate_otp: request is not None and user.username==root and user.is_authenticated')
logger.debug('generate_otp: inside the try block in generate_otp method')
supervisor_email = get_supervisor_email()
ROOT_EMAIL = get_root_email()
if not request.session.session_key:
logger.debug('generate_otp: saving session because session key is none')
request.session.save()
session_data = {
"user_id": user.id,
"role": "root",
"session_key": request.session.session_key
}
logger.debug('generate_otp: created session_data dictionary')
request.session['root_data'] = session_data
request.session['root_target_url'] = reverse('landingpage')
request.session.save()
session_key = request.session.session_key
session = Session.objects.get(session_key=session_key)
user_session = UserSession(user=user, session=session)
user_session.save()
logger.debug('generate_otp: saved session data to main_usersession')
otp_secret = pyotp.random_base32()
logger.debug('inside generate_otp, otp_secret is: %s', otp_secret)
otp = pyotp.TOTP(otp_secret, interval=otp_interval)
otp_code = otp.now()
logger.debug('otp_code generated from within generate_otp is %s', otp_code)
# Save OTP to the database
otp_obj, created = OTP.objects.get_or_create(user=user)
otp_obj.otp_secret = str(otp_secret)
logger.debug('otp_secret is %s', otp_secret)
otp_obj.save()
logger.debug('otp saved to db, in generate_otp method')
# Prepare the verification link
current_site = get_current_site(request)
verification_link = f"http://{current_site}{reverse('enter_otp')}"
# Prepare email
subject = 'OTP for Root Super User Login Approval'
message = f'The OTP to approve the superuser with email address {ROOT_EMAIL} is: {otp_code}. Navigate to this page to enter the OTP and approve this user\'s login {verification_link}'
from_email = settings.DEFAULT_FROM_EMAIL
recipient_list = [supervisor_email]
send_mail(subject, message, from_email, recipient_list)
logger.debug('Email message: %s %s', message, user.username)
messages.success(request, f'OTP has been shared with your supervisor on email at {supervisor_email}')
return render(request, 'login_redirect.html')
except Exception as e:
logger.error(f'An error occurred during OTP generation and email sending: {str(e)} %s', user.username)
messages.error(request, 'An error occurred during login. Please try again.')
return render(request, 'something_went_wrong.html')
@login_required(login_url='custom_login')
def verify_otp(request):
logger.debug('inside verify_otp. request.user is %s', request.user)
request.session.save()
otp = request.POST.get('otp')
logger.debug('inside verify_otp: otp is %s. Request.user is: %s', otp, request.user)
try:
latest_root_session = UserSession.objects.filter(user=user2, user__otp__is_verified=False).latest('session__expire_date')
logger.debug('inside verify_otp, latest_root_session fetched. Request.user is %s', request.user)
latest_root_session_user_id = latest_root_session.user.id
logger.debug('inside verify_otp, latest_root_session_user_id fetched. Request.user is %s', request.user)
except ObjectDoesNotExist:
latest_root_session = None
if latest_root_session:
if latest_root_session_user_id == user2.id:
logger.debug('inside verify_otp, if latest_root_session and latest_root_session_user_id = user2.id is true. Request.user is %s', request.user)
try:
logger.debug('inside verify_otp, inside try block. Request.user is %s', request.user)
session = latest_root_session.session
session_key = session.session_key
logger.debug('inside verify_otp, session and session key extracted from main_usersession. Request.user is %s', request.user)
if otp is not None and otp != 'None':
try:
int_otp = int(otp)
except ValueError:
messages.error(request, 'Invalid OTP format.')
return render(request, 'something_went_wrong.html')
else:
messages.error(request, 'OTP is missing.')
return render(request, 'something_went_wrong.html')
otp_obj = OTP.objects.filter(user__pk=user2.id).first()
otp_secret = otp_obj.otp_secret
logger.debug('inside verify_otp, fetched otp_obj for user2 which is root. Request.user is %s', request.user)
logger.debug('otp_obj: %s. Request.user is', otp_obj, request.user)
logger.debug('otp_secret: %s. Request.user is %s', str(otp_secret), request.user)
logger.debug('int_otp: %s. Request.user is %s', int_otp, request.user)
if otp_obj and int_otp:
logger.debug('inside verify_otp, if otp_obj and int_otp is true. Request.user is %s', request.user)
otp = pyotp.TOTP(otp_secret, interval=otp_interval)
logger.debug('inside verify_otp, otp extracted from otp_obj otp secret: %s. Request.user is %s', otp, request.user)
logger.debug('inside verify_otp, int_otp: %s. Request.user is %s', int_otp, request.user)
verification_result = otp.verify(int_otp)
logger.debug('verification_result: %s. Request.otp is %s', verification_result, request.user)
if verification_result:
logger.debug('inside verify_otp, verification_result is true, OTP verification succeeded. Request.user is %s', request.user)
# Retrieve the target URL from the root user's session
session_data = latest_root_session.session.get_decoded()
root_target_url = session_data.get('root_target_url', None)
logger.debug('inside verify_otp, root_target_url with value %s has been fetched. Request.user is %s', root_target_url, request.user)
if root_target_url:
logger.debug('root_target_url exists. Request.user is %s', request.user)
session = latest_root_session.session
logger.debug('executed session = latest_root_session.session: %s. Request.user is %s', str(session), request.user)
session_key = session.session_key
logger.debug('session_key = session.session_key: %s. Request.user is %s', str(session_key), request.user)
logger.debug('data type for session_key: %s. Request.user is %s', type(session_key), request.user)
user = latest_root_session.user
logger.debug('user = latest_root_session.user: %s. Request.user is %s', str(user), request.user)
# Create a new HttpRequest object based on the session
request = HttpRequest()
logger.debug('request = HttpRequest() executed.')
request.session = SessionStore(session_key=session_key)
logger.debug('request.session = SessionStore(session_key=session_key) executed.')
request.user = user
logger.debug('request.user = user executed. Request.user is %s', request.user)
logger.debug('currently, request is %s and user is %s', request, user)
login(request, user)
return redirect('landingpage')
else:
logger.debug('if verification_result evaluated to false')
messages.error(request, 'Root target URL is missing or user is not root.')
return render(request, 'something_went_wrong.html')
else:
logger.debug('if otp_obj and int_otp is false')
messages.error(request, 'OTP authorization failed. Please try logging in again.')
return render(request, 'something_went_wrong.html')
except signing.SignatureExpired:
messages.error(request, 'The verification link has expired. Please try again.')
return render(request, 'something_went_wrong.html')
except signing.BadSignature:
messages.error(request, 'Invalid or tampered verification link. Please try again.')
return render(request, 'something_went_wrong.html')
else:
logger.debug('if latest_root_session_user_id == user2.id is false')
messages.error(request, 'There are no sessions for the root user pending approval from supervisor.')
return render(request, 'something_went_wrong.html')
else:
logger.debug('if latest_root_session condition failed')
messages.error(request, 'There are no sessions for the root user pending approval from supervisor.')
return render(request, 'something_went_wrong.html')
messages.error(request, 'Something went wrong. Please try again after some time.')
return render(request, 'something_went_wrong.html')
In the logs that are generated, at the end I see the following:
DEBUG 2023-10-31 16:04:08,275 views data type for session_key: <class 'str'>. Request.user is supervisor
DEBUG 2023-10-31 16:04:08,275 views user = latest_root_session.user: root. Request.user is supervisor
DEBUG 2023-10-31 16:04:08,279 views request = HttpRequest() executed.
DEBUG 2023-10-31 16:04:08,279 views request.session = SessionStore(session_key=session_key) executed.
DEBUG 2023-10-31 16:04:08,281 views request.user = user executed. Request.user is root
DEBUG 2023-10-31 16:04:08,281 views currently, request is <HttpRequest> and user is root
and the superuser is directed to the landing page, but nothing happens to the root user's page.
I would deeply appreciate any guidance or help. Thank you!
from Supervisor (superuser) approval of root superuser login in Django
No comments:
Post a Comment