Email MFA using Cognito User Pool Custom Authentication Challenges

Email MFA using Cognito User Pool Custom Authentication Challenges

Takahiro Iwasa
(岩佐 孝浩)
Takahiro Iwasa (岩佐 孝浩)
7 min read
Cognito Custom Authentication Lambda Triggers

Cognito comes with built-in support for the MFA feature, but developers can only choose from either SMS or TOTP options. However, many websites offer an additional email authentication. Developers can implement it using Cognito User Pool custom authentication challenges.

Prerequisites

You need to clone an example from GitHub in your local environment. Then, install the following:

Directory Structure

./
|-- src/
|   |-- create_auth_challenge/
|   |   `-- app.py
|   |-- define_auth_challenge/
|   |   `-- app.py
|   |-- example/
|   |   |-- main.py
|   |   `-- requirements.txt
|   |-- layers/
|   |   `-- python/
|   |       `-- layers/
|   |          `-- cognito_custom_challenge_helper.py
|   `-- verify_auth_challenge/
|      `-- app.py
|-- tests/
|-- samconfig.toml
`-- template.yaml

Writing Python Scripts

Lambda Layer

To make it easy to handle custom challenge requests and responses, create src/layers/python/layers/cognito_custom_challenge_helper.py with the following code.

import copy
from enum import Enum


class CustomChallengeName(Enum):
    SRP_A = 'SRP_A'
    PASSWORD_VERIFIER = 'PASSWORD_VERIFIER'
    CUSTOM_CHALLENGE = 'CUSTOM_CHALLENGE'


class Session:
    def __init__(self, session: dict):
        _session = copy.deepcopy(session)
        self.challenge_name = _session['challengeName']
        self.challenge_result = _session['challengeResult']
        self.challenge_metadata = _session.get('challengeMetadata', '')

    def is_srp_a(self) -> bool:
        return self.challenge_name == CustomChallengeName.SRP_A.value \
            and self.challenge_result is True

    def is_password_verifier(self) -> bool:
        return self.challenge_name == CustomChallengeName.PASSWORD_VERIFIER.value \
            and self.challenge_result is True

    def is_custom_challenge(self) -> bool:
        return self.challenge_name == CustomChallengeName.CUSTOM_CHALLENGE.value

    def can_issue_tokens(self) -> bool:
        return self.is_custom_challenge() and self.challenge_result is True


class CustomChallengeRequest:
    def __init__(self, event: dict):
        _request = copy.deepcopy(event['request'])
        _session = _request.get('session', [])
        self.last_session = Session(_session[-1]) if _session else None
        self.user_attributes = _request['userAttributes']
        self.challenge_answer = _request.get('challengeAnswer', '')
        self.private_challenge_parameters = _request.get('privateChallengeParameters', {})

    def verify_answer(self) -> bool:
        return self.private_challenge_parameters.get('answer') == self.challenge_answer


class CustomChallengeResponse:
    def __init__(self, event: dict):
        _response = copy.deepcopy(event['response'])
        self._response = _response

    def set_answer(self, answer: str) -> None:
        self._response['privateChallengeParameters'] = {}
        self._response['privateChallengeParameters']['answer'] = answer

    def set_metadata(self, data: str) -> None:
        self._response['challengeMetadata'] = data

    def set_next_challenge(self, name: CustomChallengeName) -> None:
        self._response['challengeName'] = name.value
        self._response['issueTokens'] = False
        self._response['failAuthentication'] = False

    def set_answer_correct(self, correct: bool) -> None:
        self._response['answerCorrect'] = correct

    def issue_tokens(self) -> None:
        self._response['challengeName'] = ''
        self._response['issueTokens'] = True
        self._response['failAuthentication'] = False

    def fail(self) -> None:
        self._response['issueTokens'] = False
        self._response['failAuthentication'] = True

    def __dict__(self) -> dict:
        return self._response

Define Auth Challenge

When starting the custom authentication flow, Cognito invokes “Define Auth challenge Lambda trigger” placed in src/define_auth_challenge/app.py.

from layers.cognito_custom_challenge_helper import CustomChallengeRequest, CustomChallengeResponse, CustomChallengeName


def lambda_handler(event: dict, context: dict) -> dict:
    # Parse the event to create a request and response object.
    request = CustomChallengeRequest(event)
    response = CustomChallengeResponse(event)
    last_session = request.last_session

    if last_session.is_srp_a():
        # When the last session is SRP_A, require the client to authenticate with a password.
        response.set_next_challenge(CustomChallengeName.PASSWORD_VERIFIER)

    elif last_session.is_password_verifier():
        # When the last session is PASSWORD_VERIFIER, initiate the custom challenge.
        response.set_next_challenge(CustomChallengeName.CUSTOM_CHALLENGE)

    elif last_session.is_custom_challenge():
        if last_session.can_issue_tokens():
            # When the last session is CUSTOM_CHALLENGE and authentication has been completed, issue tokens.
            response.issue_tokens()
        else:
            # When the last session is CUSTOM_CHALLENGE and the client is still during authentication flow,
            # require the client to answer the next challenge.
            response.set_next_challenge(CustomChallengeName.CUSTOM_CHALLENGE)
    else:
        # If the client is in an unexpected flow, the current authentication must fail.
        response.fail()

    event['response'] = response.__dict__()
    return event

Create Auth Challenge

When creating OTP codes which will be sent to users in the authentication challenge flow, Cognito invokes “Create Auth challenge Lambda trigger” placed in src/create_auth_challenge/app.py. Environment variables CODE_LENGTH and EMAIL_SENDER are specified by an AWS SAM template described later.

import os
import random

import boto3

from layers.cognito_custom_challenge_helper import CustomChallengeRequest, CustomChallengeResponse

client = boto3.client('ses')

CODE_LENGTH = int(os.environ.get('CODE_LENGTH', 6))
EMAIL_SENDER = os.environ.get('EMAIL_SENDER')


def lambda_handler(event: dict, context: dict) -> dict:
    # Parse the event to create a request object.
    request = CustomChallengeRequest(event)
    last_session = request.last_session

    if last_session.is_custom_challenge():
        # When the last session is a custom challenge, extract the otp code from the last session metadata.
        code = last_session.challenge_metadata.replace('challenge-', '')
    else:
        # When the last session is not a custom challenge, generate an otp code and send it to the client.
        code = generate_code()
        message = create_message(code)
        send_mail_to(request.user_attributes['email'], message)

    # Create a response
    response = CustomChallengeResponse(event)
    response.set_answer(code)
    response.set_metadata(f'challenge-{code}')
    event['response'] = response.__dict__()
    return event


def generate_code(length=CODE_LENGTH) -> str:
    return str(random.randint(0, 10 ** length - 1)).zfill(length)


def create_message(code: str) -> str:
    return f'Your authentication code: {code}'


def send_mail_to(email: str, body: str) -> None:
    client.send_email(
        Source=EMAIL_SENDER,
        Destination={
            'ToAddresses': [email],
        },
        Message={
            'Subject': {
                'Charset': 'UTF-8',
                'Data': 'Authentication Code',
            },
            'Body': {
                'Text': {
                    'Charset': 'UTF-8',
                    'Data': body,
                },
            },
        },
    )

You can configure authentication flow session duration in Cognito.

Verify Auth Challenge

When verifying OTP codes, Cognito invokes “Verify Auth challenge Lambda trigger” placed in src/verify_auth_challenge/app.py.

from layers.cognito_custom_challenge_helper import CustomChallengeRequest, CustomChallengeResponse


def lambda_handler(event: dict, context: dict) -> dict:
    # Parse the event to create a request and response object.
    request = CustomChallengeRequest(event)
    response = CustomChallengeResponse(event)

    # Create a response.
    correct = request.verify_answer()
    response.set_answer_correct(correct)
    event['response'] = response.__dict__()
    return event

Creating AWS Resources

AWS SAM Template

To enable custom authentication challenges, set ALLOW_CUSTOM_AUTH (line 39) within ExplicitAuthFlows. Please ensure that CognitoEvent within each AWS::Serverless::Function is linked to the Lambda functions to serve as Cognito Lambda triggers. Additionally, setting Policies in the CreateAuthChallenge function is required for sending emails using Amazon SES.

AWSTemplateFormatVersion: 2010-09-09
Transform: AWS::Serverless-2016-10-31
Description: Email MFA using Cognito User Pool custom authentication challenges

Globals:
  Function:
    Timeout: 3
    MemorySize: 128

Parameters:
  CodeLength:
    Type: Number
    Default: '6'
  EmailSender:
    Type: String

Resources:
  CognitoUserPool:
    Type: AWS::Cognito::UserPool
    Properties:
      UserPoolName: cognito-custom-auth-email-mfa
      Policies:
        PasswordPolicy:
          MinimumLength: 8
      UsernameAttributes:
        - email
      Schema:
        - Name: email
          AttributeDataType: String
          Required: true

  CognitoUserPoolClient:
    Type: AWS::Cognito::UserPoolClient
    Properties:
      UserPoolId: !Ref CognitoUserPool
      ClientName: client
      GenerateSecret: false
      ExplicitAuthFlows:
        - ALLOW_CUSTOM_AUTH
        - ALLOW_REFRESH_TOKEN_AUTH
        - ALLOW_USER_SRP_AUTH

  LambdaLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: cognito_email_mfa_layer
      ContentUri: src/layers
      CompatibleRuntimes:
        - python3.11
      RetentionPolicy: Delete

  CreateAuthChallenge:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/create_auth_challenge
      Handler: app.lambda_handler
      Runtime: python3.11
      Layers:
        - !Ref LambdaLayer
      Environment:
        Variables:
          CODE_LENGTH: !Ref CodeLength
          EMAIL_SENDER: !Ref EmailSender
      Events:
        CognitoEvent:
          Type: Cognito
          Properties:
            Trigger: CreateAuthChallenge
            UserPool: !Ref CognitoUserPool
      Policies:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Action: ses:SendEmail
            Resource: "*"

  DefineAuthChallenge:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/define_auth_challenge
      Handler: app.lambda_handler
      Runtime: python3.11
      Layers:
        - !Ref LambdaLayer
      Events:
        CognitoEvent:
          Type: Cognito
          Properties:
            Trigger: DefineAuthChallenge
            UserPool: !Ref CognitoUserPool

  VerifyAuthChallenge:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/verify_auth_challenge
      Handler: app.lambda_handler
      Runtime: python3.11
      Layers:
        - !Ref LambdaLayer
      Events:
        CognitoEvent:
          Type: Cognito
          Properties:
            Trigger: VerifyAuthChallengeResponse
            UserPool: !Ref CognitoUserPool

Build and Deploy

Replace <YOUR_SES_EMAIL_SENDER> with a desired email address as a sender, and build and deploy with the following command. If you want to change the length of OTP codes, also specify CodeLength parameter.

sam build
sam deploy --parameter-overrides EmailSender=<YOUR_SES_EMAIL_SENDER>
# sam deploy --parameter-overrides EmailSender=<YOUR_SES_EMAIL_SENDER> CodeLength=10

Testing

Replace <YOUR_USER_POOL_ID> and <YOUR_EMAIL> with your values, and create a Cognito testing user with the following command.

POOL_ID=<YOUR_USER_POOL_ID>
EMAIL=<YOUR_EMAIL>

# Add a Cognito user.
aws cognito-idp admin-create-user \
  --user-pool-id $POOL_ID \
  --username $EMAIL

# Make the user confirmation status "Confirmed"
echo -n 'Password: '
read password
aws cognito-idp admin-set-user-password \
  --user-pool-id $POOL_ID \
  --username $EMAIL \
  --password $password \
  --permanent

Run the following command.

cd src/example
pip install -r requirements.txt
python main.py \
  --pool-id <YOUR_USER_POOL_ID> \
  --client-id <YOUR_CLIENT_ID> \
  --username <YOUR_EMAIL> \
  --password <YOUR_PASSWORD>

The main.py uses the following libraries to calculate values needed by SRP - Secure Remote Password.

You may also refer to the AWS official code example.

Cleaning Up

Clean up the provisioned AWS resources with the following command.

sam delete
Takahiro Iwasa
(岩佐 孝浩)

Takahiro Iwasa (岩佐 孝浩)

Software Developer at iret, Inc.
Architecting and developing cloud native applications mainly with AWS. Japan AWS Top Engineers 2020-2023