Add a complete contact form system with the following changes: - Create ContactMessage model to store form submissions with tracking fields (is_read, is_responded) - Implement ContactMessage admin interface with custom actions, filters, and bulk operations - Add contact endpoint documentation to API root view - Update email configuration to use admin@attunehearttherapy.com as sender address This enables users to submit contact inquiries and allows administrators to track and manage these messages efficiently through the Django admin panel.
437 lines
16 KiB
Python
437 lines
16 KiB
Python
from rest_framework import status, generics
|
|
from rest_framework.decorators import api_view, permission_classes
|
|
from rest_framework.views import APIView
|
|
from rest_framework.response import Response
|
|
from rest_framework.permissions import AllowAny, IsAuthenticated, IsAdminUser
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
from django.contrib.auth import authenticate
|
|
from .models import CustomUser, UserProfile, ContactMessage
|
|
from .serializers import UserRegistrationSerializer, UserSerializer, ResetPasswordSerializer, ForgotPasswordSerializer, VerifyPasswordResetOTPSerializer, ContactMessageSerializer
|
|
from .utils import send_otp_via_email, is_otp_expired, generate_otp,send_email_notifications
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
from rest_framework.reverse import reverse
|
|
from meetings.models import AppointmentRequest
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ContactMessageView(APIView):
|
|
def post(self, request):
|
|
serializer = ContactMessageSerializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
try:
|
|
contact_message = serializer.save()
|
|
|
|
send_email_notifications(contact_message)
|
|
|
|
return Response({
|
|
'success': True,
|
|
'message': 'Thank you for your message. We will get back to you soon!',
|
|
'data': serializer.data
|
|
}, status=status.HTTP_201_CREATED)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing contact form: {str(e)}")
|
|
return Response({
|
|
'success': False,
|
|
'message': 'There was an error processing your request. Please try again later.'
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response({
|
|
'success': False,
|
|
'message': 'Please check your input and try again.',
|
|
'errors': serializer.errors
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def register_user(request):
|
|
serializer = UserRegistrationSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
user = serializer.save()
|
|
|
|
UserProfile.objects.create(user=user)
|
|
|
|
otp = generate_otp()
|
|
user.verify_otp = otp
|
|
user.verify_otp_expiry = timezone.now() + timedelta(minutes=10)
|
|
user.save()
|
|
|
|
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
|
|
email_sent = send_otp_via_email(user.email, otp, user_name, 'registration')
|
|
|
|
if not email_sent:
|
|
return Response({
|
|
'error': 'Failed to send OTP. Please try again later.'
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response({
|
|
'user': UserSerializer(user).data,
|
|
'otp_sent': email_sent,
|
|
'otp_expires_in': 10
|
|
}, status=status.HTTP_201_CREATED)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def verify_otp(request):
|
|
email = request.data.get('email')
|
|
otp = request.data.get('otp')
|
|
|
|
if not email or not otp:
|
|
return Response({
|
|
'error': 'Email and OTP are required'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
if user.isVerified:
|
|
return Response({
|
|
'error': 'User is already verified'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
if (user.verify_otp == otp and
|
|
not is_otp_expired(user.verify_otp_expiry)):
|
|
|
|
user.isVerified = True
|
|
user.verify_otp = None
|
|
user.verify_otp_expiry = None
|
|
user.save()
|
|
|
|
refresh = RefreshToken.for_user(user)
|
|
|
|
return Response({
|
|
'message': 'Email verified successfully',
|
|
'verified': True,
|
|
}, status=status.HTTP_200_OK)
|
|
else:
|
|
return Response({
|
|
'error': 'Invalid or expired OTP'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'error': 'User not found'
|
|
}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def resend_otp(request):
|
|
email = request.data.get('email')
|
|
context = request.data.get('context', 'registration')
|
|
|
|
if not email:
|
|
return Response({
|
|
'error': 'Email is required'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
if user.isVerified and context == 'registration':
|
|
return Response({
|
|
'error': 'Already verified',
|
|
'message': 'Your email is already verified. You can login now.'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
otp = generate_otp()
|
|
|
|
if context == 'password_reset':
|
|
user.forgot_password_otp = otp
|
|
user.forgot_password_otp_expiry = timezone.now() + timedelta(minutes=10)
|
|
else:
|
|
user.verify_otp = otp
|
|
user.verify_otp_expiry = timezone.now() + timedelta(minutes=10)
|
|
|
|
user.save()
|
|
|
|
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
|
|
email_sent = send_otp_via_email(user.email, otp, user_name, context)
|
|
|
|
if not email_sent:
|
|
return Response({
|
|
'error': 'Failed to send OTP',
|
|
'message': 'Please try again later.'
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response({
|
|
'message': f'OTP resent to your email successfully',
|
|
'otp_sent': email_sent,
|
|
'otp_expires_in': 10,
|
|
'context': context
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'error': 'User not found',
|
|
'message': 'No account found with this email address.'
|
|
}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def login_user(request):
|
|
email = request.data.get('email')
|
|
password = request.data.get('password')
|
|
|
|
user = authenticate(request, email=email, password=password)
|
|
|
|
if user is not None:
|
|
if not user.isVerified:
|
|
return Response({
|
|
'error': 'Email not verified',
|
|
'message': 'Please verify your email address before logging in.',
|
|
'email': user.email,
|
|
'can_resend_otp': True
|
|
}, status=status.HTTP_403_FORBIDDEN)
|
|
|
|
if not user.is_active:
|
|
return Response({
|
|
'error': 'Account deactivated',
|
|
'message': 'Your account has been deactivated. Please contact support.'
|
|
}, status=status.HTTP_403_FORBIDDEN)
|
|
|
|
refresh = RefreshToken.for_user(user)
|
|
|
|
return Response({
|
|
'user': UserSerializer(user).data,
|
|
'refresh': str(refresh),
|
|
'access': str(refresh.access_token),
|
|
'message': 'Login successful'
|
|
})
|
|
else:
|
|
return Response(
|
|
{'error': 'Invalid credentials'},
|
|
status=status.HTTP_401_UNAUTHORIZED
|
|
)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def forgot_password(request):
|
|
serializer = ForgotPasswordSerializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
email = serializer.validated_data['email']
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
if not user.isVerified:
|
|
return Response({
|
|
'error': 'Email not verified',
|
|
'message': 'Please verify your email address first.'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
if not user.is_active:
|
|
return Response({
|
|
'error': 'Account deactivated',
|
|
'message': 'Your account has been deactivated.'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
otp = generate_otp()
|
|
user.forgot_password_otp = otp
|
|
user.forgot_password_otp_expiry = timezone.now() + timedelta(minutes=10)
|
|
user.save()
|
|
|
|
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
|
|
email_sent = send_otp_via_email(user.email, otp, user_name, 'password_reset')
|
|
|
|
if not email_sent:
|
|
return Response({
|
|
'error': 'Failed to send OTP',
|
|
'message': 'Please try again later.'
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response({
|
|
'message': 'Password reset OTP sent to your email',
|
|
'otp_sent': True,
|
|
'otp_expires_in': 10,
|
|
'email': user.email
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'message': 'If the email exists, a password reset OTP has been sent.'
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def verify_password_reset_otp(request):
|
|
serializer = VerifyPasswordResetOTPSerializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
email = serializer.validated_data['email']
|
|
otp = serializer.validated_data['otp']
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
if (user.forgot_password_otp == otp and
|
|
not is_otp_expired(user.forgot_password_otp_expiry)):
|
|
|
|
return Response({
|
|
'message': 'OTP verified successfully',
|
|
'verified': True,
|
|
'email': user.email
|
|
}, status=status.HTTP_200_OK)
|
|
else:
|
|
return Response({
|
|
'error': 'Invalid or expired OTP'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'error': 'User not found'
|
|
}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def reset_password(request):
|
|
serializer = ResetPasswordSerializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
email = serializer.validated_data['email']
|
|
otp = serializer.validated_data['otp']
|
|
new_password = serializer.validated_data['new_password']
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
if (user.forgot_password_otp == otp and
|
|
not is_otp_expired(user.forgot_password_otp_expiry)):
|
|
|
|
# Set new password
|
|
user.set_password(new_password)
|
|
|
|
user.forgot_password_otp = None
|
|
user.forgot_password_otp_expiry = None
|
|
user.save()
|
|
|
|
return Response({
|
|
'message': 'Password reset successfully',
|
|
'success': True
|
|
}, status=status.HTTP_200_OK)
|
|
else:
|
|
return Response({
|
|
'error': 'Invalid or expired OTP'
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'error': 'User not found'
|
|
}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def resend_password_reset_otp(request):
|
|
serializer = ForgotPasswordSerializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
email = serializer.validated_data['email']
|
|
|
|
try:
|
|
user = CustomUser.objects.get(email=email)
|
|
|
|
otp = generate_otp()
|
|
user.forgot_password_otp = otp
|
|
user.forgot_password_otp_expiry = timezone.now() + timedelta(minutes=10)
|
|
user.save()
|
|
|
|
user_name = f"{user.first_name} {user.last_name}".strip() or user.email
|
|
email_sent = send_otp_via_email(user.email, otp, user_name, 'password_reset')
|
|
|
|
if not email_sent:
|
|
return Response({
|
|
'error': 'Failed to send OTP',
|
|
'message': 'Please try again later.'
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response({
|
|
'message': 'Password reset OTP resent to your email',
|
|
'otp_sent': True,
|
|
'otp_expires_in': 10
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
except CustomUser.DoesNotExist:
|
|
return Response({
|
|
'message': 'If the email exists, a password reset OTP has been sent.'
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@api_view(['GET'])
|
|
@permission_classes([IsAuthenticated])
|
|
def get_user_profile(request):
|
|
serializer = UserSerializer(request.user)
|
|
return Response(serializer.data)
|
|
|
|
@api_view(['PUT'])
|
|
@permission_classes([IsAuthenticated])
|
|
def update_user_profile(request):
|
|
serializer = UserSerializer(request.user, data=request.data, partial=True)
|
|
if serializer.is_valid():
|
|
serializer.save()
|
|
return Response(serializer.data)
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
class UserDetailView(generics.RetrieveAPIView):
|
|
serializer_class = UserSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get_object(self):
|
|
return self.request.user
|
|
|
|
class GetAllUsersView(generics.ListAPIView):
|
|
serializer_class = UserSerializer
|
|
permission_classes = [IsAdminUser, IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return CustomUser.objects.filter(is_staff=False)
|
|
|
|
def list(self, request, *args, **kwargs):
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
serializer = self.get_serializer(queryset, many=True)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class DeleteUserView(generics.DestroyAPIView):
|
|
queryset = CustomUser.objects.all()
|
|
serializer_class = UserSerializer
|
|
permission_classes = [IsAdminUser, IsAuthenticated]
|
|
|
|
def destroy(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
self.perform_destroy(instance)
|
|
return Response(status=status.HTTP_204_NO_CONTENT)
|
|
|
|
def perform_destroy(self, instance):
|
|
instance.delete()
|
|
|
|
UserProfile.objects.filter(user=instance).delete()
|
|
|
|
AppointmentRequest.objects.filter(email=instance.email).delete()
|
|
|
|
class ActivateOrDeactivateUserView(generics.UpdateAPIView):
|
|
queryset = CustomUser.objects.all()
|
|
serializer_class = UserSerializer
|
|
permission_classes = [IsAdminUser, IsAuthenticated]
|
|
|
|
def update(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
instance.is_active = not instance.is_active
|
|
instance.save()
|
|
serializer = self.get_serializer(instance)
|
|
return Response(serializer.data) |