Add comprehensive API documentation for user management endpoints including profile updates, user listing, and admin user management features. Update appointment model to include additional status options (completed, cancelled) and add max_length constraint to email field. Change appointment creation endpoint to require user authentication instead of being public. Changes: - Add API docs for update_profile, get_profile, all-users endpoints - Add API docs for activate-deactivate-user and delete-user admin endpoints - Update appointment creation to require authentication - Add 'completed' and 'cancelled' status options to Appointment model - Add max_length constraint to EncryptedEmailField - Regenerate initial migration with updated model definitions
408 lines
14 KiB
Python
408 lines
14 KiB
Python
from rest_framework import status, generics
|
|
from rest_framework.decorators import api_view, permission_classes
|
|
from rest_framework.response import Response
|
|
from rest_framework.permissions import AllowAny, IsAuthenticated
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
from django.contrib.auth import authenticate
|
|
from .models import CustomUser, UserProfile
|
|
from .serializers import UserRegistrationSerializer, UserSerializer, ResetPasswordSerializer, ForgotPasswordSerializer, VerifyPasswordResetOTPSerializer
|
|
from .utils import send_otp_via_email, is_otp_expired, generate_otp
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
from rest_framework.reverse import reverse
|
|
from meetings.models import AppointmentRequest
|
|
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def register_user(request):
|
|
serializer = UserRegistrationSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
user = serializer.save()
|
|
|
|
UserProfile.objects.create(user=user)
|
|
|
|
otp = generate_otp()
|
|
user.verify_otp = otp
|
|
user.verify_otp_expiry = timezone.now() + timedelta(minutes=10)
|
|
user.save()
|
|
|
|
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
|
|
email_sent = send_otp_via_email(user.email, otp, user_name, 'registration')
|
|
|
|
if not email_sent:
|
|
return Response({
|
|
'error': 'Failed to send OTP. Please try again later.'
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response({
|
|
'user': UserSerializer(user).data,
|
|
'otp_sent': email_sent,
|
|
'otp_expires_in': 10
|
|
}, status=status.HTTP_201_CREATED)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def verify_otp(request):
|
|
email = request.data.get('email')
|
|
otp = request.data.get('otp')
|
|
|
|
if not email or not otp:
|
|
return Response({
|
|
'error': 'Email and OTP are required'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
if user.isVerified:
|
|
return Response({
|
|
'error': 'User is already verified'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
if (user.verify_otp == otp and
|
|
not is_otp_expired(user.verify_otp_expiry)):
|
|
|
|
user.isVerified = True
|
|
user.verify_otp = None
|
|
user.verify_otp_expiry = None
|
|
user.save()
|
|
|
|
refresh = RefreshToken.for_user(user)
|
|
|
|
return Response({
|
|
'message': 'Email verified successfully',
|
|
'verified': True,
|
|
}, status=status.HTTP_200_OK)
|
|
else:
|
|
return Response({
|
|
'error': 'Invalid or expired OTP'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'error': 'User not found'
|
|
}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def resend_otp(request):
|
|
email = request.data.get('email')
|
|
context = request.data.get('context', 'registration')
|
|
|
|
if not email:
|
|
return Response({
|
|
'error': 'Email is required'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
if user.isVerified and context == 'registration':
|
|
return Response({
|
|
'error': 'Already verified',
|
|
'message': 'Your email is already verified. You can login now.'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
otp = generate_otp()
|
|
|
|
if context == 'password_reset':
|
|
user.forgot_password_otp = otp
|
|
user.forgot_password_otp_expiry = timezone.now() + timedelta(minutes=10)
|
|
else:
|
|
user.verify_otp = otp
|
|
user.verify_otp_expiry = timezone.now() + timedelta(minutes=10)
|
|
|
|
user.save()
|
|
|
|
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
|
|
email_sent = send_otp_via_email(user.email, otp, user_name, context)
|
|
|
|
if not email_sent:
|
|
return Response({
|
|
'error': 'Failed to send OTP',
|
|
'message': 'Please try again later.'
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response({
|
|
'message': f'OTP resent to your email successfully',
|
|
'otp_sent': email_sent,
|
|
'otp_expires_in': 10,
|
|
'context': context
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'error': 'User not found',
|
|
'message': 'No account found with this email address.'
|
|
}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def login_user(request):
|
|
email = request.data.get('email')
|
|
password = request.data.get('password')
|
|
|
|
user = authenticate(request, email=email, password=password)
|
|
|
|
if user is not None:
|
|
if not user.isVerified:
|
|
return Response({
|
|
'error': 'Email not verified',
|
|
'message': 'Please verify your email address before logging in.',
|
|
'email': user.email,
|
|
'can_resend_otp': True
|
|
}, status=status.HTTP_403_FORBIDDEN)
|
|
|
|
if not user.is_active:
|
|
return Response({
|
|
'error': 'Account deactivated',
|
|
'message': 'Your account has been deactivated. Please contact support.'
|
|
}, status=status.HTTP_403_FORBIDDEN)
|
|
|
|
refresh = RefreshToken.for_user(user)
|
|
|
|
return Response({
|
|
'user': UserSerializer(user).data,
|
|
'refresh': str(refresh),
|
|
'access': str(refresh.access_token),
|
|
'message': 'Login successful'
|
|
})
|
|
else:
|
|
return Response(
|
|
{'error': 'Invalid credentials'},
|
|
status=status.HTTP_401_UNAUTHORIZED
|
|
)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def forgot_password(request):
|
|
serializer = ForgotPasswordSerializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
email = serializer.validated_data['email']
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
if not user.isVerified:
|
|
return Response({
|
|
'error': 'Email not verified',
|
|
'message': 'Please verify your email address first.'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
if not user.is_active:
|
|
return Response({
|
|
'error': 'Account deactivated',
|
|
'message': 'Your account has been deactivated.'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
otp = generate_otp()
|
|
user.forgot_password_otp = otp
|
|
user.forgot_password_otp_expiry = timezone.now() + timedelta(minutes=10)
|
|
user.save()
|
|
|
|
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
|
|
email_sent = send_otp_via_email(user.email, otp, user_name, 'password_reset')
|
|
|
|
if not email_sent:
|
|
return Response({
|
|
'error': 'Failed to send OTP',
|
|
'message': 'Please try again later.'
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response({
|
|
'message': 'Password reset OTP sent to your email',
|
|
'otp_sent': True,
|
|
'otp_expires_in': 10,
|
|
'email': user.email
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'message': 'If the email exists, a password reset OTP has been sent.'
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def verify_password_reset_otp(request):
|
|
serializer = VerifyPasswordResetOTPSerializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
email = serializer.validated_data['email']
|
|
otp = serializer.validated_data['otp']
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
if (user.forgot_password_otp == otp and
|
|
not is_otp_expired(user.forgot_password_otp_expiry)):
|
|
|
|
return Response({
|
|
'message': 'OTP verified successfully',
|
|
'verified': True,
|
|
'email': user.email
|
|
}, status=status.HTTP_200_OK)
|
|
else:
|
|
return Response({
|
|
'error': 'Invalid or expired OTP'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'error': 'User not found'
|
|
}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def reset_password(request):
|
|
serializer = ResetPasswordSerializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
email = serializer.validated_data['email']
|
|
otp = serializer.validated_data['otp']
|
|
new_password = serializer.validated_data['new_password']
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
if (user.forgot_password_otp == otp and
|
|
not is_otp_expired(user.forgot_password_otp_expiry)):
|
|
|
|
# Set new password
|
|
user.set_password(new_password)
|
|
|
|
user.forgot_password_otp = None
|
|
user.forgot_password_otp_expiry = None
|
|
user.save()
|
|
|
|
return Response({
|
|
'message': 'Password reset successfully',
|
|
'success': True
|
|
}, status=status.HTTP_200_OK)
|
|
else:
|
|
return Response({
|
|
'error': 'Invalid or expired OTP'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'error': 'User not found'
|
|
}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def resend_password_reset_otp(request):
|
|
serializer = ForgotPasswordSerializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
email = serializer.validated_data['email']
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
otp = generate_otp()
|
|
user.forgot_password_otp = otp
|
|
user.forgot_password_otp_expiry = timezone.now() + timedelta(minutes=10)
|
|
user.save()
|
|
|
|
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
|
|
email_sent = send_otp_via_email(user.email, otp, user_name, 'password_reset')
|
|
|
|
if not email_sent:
|
|
return Response({
|
|
'error': 'Failed to send OTP',
|
|
'message': 'Please try again later.'
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response({
|
|
'message': 'Password reset OTP resent to your email',
|
|
'otp_sent': True,
|
|
'otp_expires_in': 10
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'message': 'If the email exists, a password reset OTP has been sent.'
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@api_view(['GET'])
|
|
@permission_classes([IsAuthenticated])
|
|
def get_user_profile(request):
|
|
serializer = UserSerializer(request.user)
|
|
return Response(serializer.data)
|
|
|
|
@api_view(['PUT'])
|
|
@permission_classes([IsAuthenticated])
|
|
def update_user_profile(request):
|
|
serializer = UserSerializer(request.user, data=request.data, partial=True)
|
|
if serializer.is_valid():
|
|
serializer.save()
|
|
return Response(serializer.data)
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
class UserDetailView(generics.RetrieveAPIView):
|
|
serializer_class = UserSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get_object(self):
|
|
return self.request.user
|
|
|
|
|
|
def IsAdminUser(user):
|
|
return user.is_staff
|
|
|
|
class GetAllUsersView(generics.ListAPIView):
|
|
serializer_class = UserSerializer
|
|
permission_classes = [IsAdminUser, IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return CustomUser.objects.filter(is_staff=False)
|
|
|
|
def list(self, request, *args, **kwargs):
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
serializer = self.get_serializer(queryset, many=True)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class DeleteUserView(generics.DestroyAPIView):
|
|
queryset = CustomUser.objects.all()
|
|
serializer_class = UserSerializer
|
|
permission_classes = [IsAdminUser, IsAuthenticated]
|
|
|
|
def destroy(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
self.perform_destroy(instance)
|
|
return Response(status=status.HTTP_204_NO_CONTENT)
|
|
|
|
def perform_destroy(self, instance):
|
|
instance.delete()
|
|
|
|
# Delete associated UserProfile
|
|
UserProfile.objects.filter(user=instance).delete()
|
|
|
|
# Delete associated AppointmentRequests
|
|
AppointmentRequest.objects.filter(email=instance.email).delete()
|
|
|
|
class ActivateOrDeactivateUserView(generics.UpdateAPIView):
|
|
queryset = CustomUser.objects.all()
|
|
serializer_class = UserSerializer
|
|
permission_classes = [IsAdminUser, IsAuthenticated]
|
|
|
|
def update(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
instance.is_active = not instance.is_active
|
|
instance.save()
|
|
serializer = self.get_serializer(instance)
|
|
return Response(serializer.data) |