Asynchronous actions with AWS Step Functions

AWS Step functions are the core service we use in our team and has changed the way we look at our service architecture and process orchestration and also has shown us the way how humans and 3rd party APIs can interact with our products.

Several of our internal processes (e.g. client registration) often require an action "from the outside". That could be filling a form and clicking a button in the frontend side of the application, checking and approving a KYC document by a human, or just waiting for a 3rd party API response.

We used the Activities feature, of the AWS Step Functions, in the past; it used activity workers and would poll for tasks that are waiting and would process them subsequently. Implementation of this can get pretty hard to understand and overcomplicated, and for our use cases also un-optimal in event-driven architecture. It changed when AWS announced support for callback patterns in the Step Functions.

Having the possibility to pause execution and wait for external service to send information about success (or failure) back, we can easily create a pattern that handles asynchronous actions but keeps enough flexibility to be adapted to various business requirements, as shown below.

The state machine execution stops at the Wait for external action step, invokes createAsyncAction lambda function and will remain in this state until the sendTaskSuccess() is called.

createAsyncAction saves the execution information to AWS DynamoDB table:

  • execution ID as a unique identifier
  • taskToken that is passed from the state machine and serves as a pointer where the execution was paused

This remains intact until the "External service" triggers the continuation of execution. The service invokes the deleteAsyncAction lambda function passing the execution ID as a parameter. That function then reads the DB using the execution ID to get the relevant taskToken, so sendTaskSuccess() (or sendTaskFailure()) can be called, and the state machine can continue. The DB record can be deleted, as the execution is no longer on pause.

To demonstrate this pattern in a real-life example, I will implement a simplified version of the e-commerce order processing system using the Serverless framework and NodeJS. For simplicity sake, the system is triggered by simple API endpoints, and the UI part will not be implemented.

Example

An order is placed, by a customer, by calling a POST /order API endpoint, which starts a processOrder state machine execution. The order is created, with status PENDING, and the execution pauses until the online store staff update its status using the POST /order/update API call to status PROCESSING. Once this has been completed, it can be updated again, by calling the same endpoint to the status SENT. The customer can check the current status, with GET /order/{id}, at any time.

Demo repository

serverless-cz/5-step-functions-async-actions at master · purple-technology/serverless-cz
💜🚀 Podklady pro tutorial sérii o Serrverlessu. Contribute to purple-technology/serverless-cz development by creating an account on GitHub.

Let's start by defining the base of our Serverless service.

service: step-functions-async-actions

plugins:
  - serverless-step-functions

provider:
  name: aws
  runtime: nodejs14.x
  region: eu-central-1

We define two DynamoDB tables: orders track the order and its status and orderProcessingActions keeps the info of paused executions. Both tables use partition key as primary key:

  • orders primary key is orderId. For this example, I am using the state machines unique execution ID. However, it is possible to use any id which uniquely identifies an order (e.g. generated by uuid NPM package)
  • orderProcessingActions primary key is executionId. For this example, it will also be the state machines unique execution ID. It is important to note that the store staff will need to use this ID to update the status of the order.
resources:
  Resources:
    OrdersTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: orders
        AttributeDefinitions:
          - AttributeName: orderId
            AttributeType: S
        KeySchema:
          - AttributeName: orderId
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST
    OrderProcessingActionsTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: orderProcessingActions
        AttributeDefinitions:
          - AttributeName: executionId
            AttributeType: S
        KeySchema:
          - AttributeName: executionId
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST
serverless.yml

We'll need to read, update and delete records from these tables. So, this is also the right time to properly set the permissions. Let's also set tables names as environment variables for later use in the lambda functions.

provider:
  ...
  environment:
    ORDERS_TABLE_NAME: { "Ref": "OrdersTable" }
    ORDER_PROCESSING_ACTIONS_TABLE_NAME: { "Ref": "OrderProcessingActionsTable" }
  iamRoleStatements:
    - Effect: "Allow"
      Action:
        - dynamodb:PutItem
        - dynamodb:GetItem
        - dynamodb:DeleteItem
      Resource:
        [
          { "Fn::GetAtt": ["OrdersTable", "Arn"] },
          { "Fn::GetAtt": ["OrderProcessingActionsTable", "Arn"] }
        ]
serverless.yml

Now we can proceed with the state machine definition. Let's keep it simple and define an http POST event within the state machine, which will start the execution.

stepFunctions:
  validate: true
  stateMachines:
    processOrder:
      events:
        - http:
            path: order
            method: POST
serverless.yml

So later we can use POST /order with JSON body.

{
  "items": ["Really nice hat", "Cool jeans"]
}
POST /order

The order status will be set and tracked in the state machine. Pass task type, with name Set PENDING status, is used here, and the status PENDING is appended to task input using Parameters property.

stepFunctions:
  ...
  stateMachines:
    processOrder:
      ...
      definition:
        StartAt: Set PENDING status
        States:
          Set PENDING status:
            Type: Pass
            Parameters:
              status: PENDING
              "items.$": $.items
            ResultPath: $
serverless.yml

Next, we want to create a record in the orders table. For this straightforward operation, we can call DynamoDB API directly from the Create order record step definition.

stepFunctions:
  ...
  stateMachines:
    processOrder:
      ...
      definition:
        StartAt: Set PENDING status
        States:
          Set PENDING status:
            ...
            Next: Create order record
          Create order record:
          	Type: Task
            Resource: "arn:aws:states:::dynamodb:putItem"
            Parameters:
              TableName: { "Ref": "OrdersTable" }
              Item:
                orderId:
                  "S.$": $$.Execution.Id
                items:
                  "SS.$": $.items
                status:
                  "S.$": $.status
            ResultPath: null
serverless.yml

The execution ID from the context object is used as the orderId. Also, notice that the ResultPath property is set to null. It indicates that we don't want to pass the result of DynamoDB putItem operation to the step output. With null, it is ignored and actual step input is returned from the step. Check the ResultPath documentation for detailed information.

Let's also create a simple API endpoint for checking the current status of an order.

  getOrder:
    handler: src/getOrder.handler
    events:
      - http:
          path: order/{id}
          method: GET
          request:
            parameters:
              paths:
                id: true

We read from the table using the id argument.

'use strict'

const AWS = require('aws-sdk')
const db = new AWS.DynamoDB.DocumentClient()

module.exports.handler = async (event) => {
	const result = await db.get({
		TableName: process.env.ORDERS_TABLE_NAME,
		Key: {
			orderId: event.pathParameters.id,
		}
	}).promise()

	return {
		statusCode: 200,
		body: JSON.stringify(result.Item)
	}
}

It'll be nice to let the customer know that something is happening with their order. Let's notify him using the next step, Notify customer PENDING.

stepFunctions:
  ...
  stateMachines:
    processOrder:
      ...
      definition:
        StartAt: Set PENDING status
        States:
          Set PENDING status:
            ...
            Next: Create order record
          Create order record:
          	...
            Next: Notify customer PENDING
          Notify customer PENDING:
            Type: Task
            Resource: { "Fn::GetAtt": ["notifyCustomer", "Arn"] }
serverless.yml

The resource needed for this step is a notifyCustomer lambda function, so let's add it to the stack.

functions:
  getOrder:
    ...
  notifyCustomer:
    handler: src/notifyCustomer.handler
serverless.yml

In reality, the function would send a notification email or would send a message to AWS SQS (to be processed in a different service). For the sake of simplicity, the message is just logged to the console here.

'use strict'

module.exports.handler = async (input) => {
	switch (input.status) {
		case 'PENDING':
			console.log('Thank you for using our services. Your order has been set.')
			break
		case 'PROCESSING':
			console.log('Your order is being processed.')
			break
		case 'SENT':
			console.log('Yay your order has been sent.')
			break
	}
	return input
}
src/notifyCustomer.js

Now is the right time to pause the process until some store employee updates the status. That happens in the following Wait for status update from PENDING step.

stepFunctions:
  ...
  stateMachines:
    processOrder:
      ...
      definition:
        StartAt: Set PENDING status
        States:
          Set PENDING status:
            ...
            Next: Create order record
          Create order record:
          	...
            Next: Notify customer PENDING
          Notify customer PENDING:
            ...
            Next: Wait for status update from PENDING
          Wait for status update from PENDING:
            Type: Task
            Resource: "arn:aws:states:::lambda:invoke.waitForTaskToken"
            Parameters:
              FunctionName: { "Ref": "createAction" }
              Payload:
               "executionId.$": $$.Execution.Id
               "taskToken.$": $$.Task.Token
               "status.$": $.status        
            ResultPath: null
serverless.yml

A createAction lambda function is invoked, with Payload object as parameter. waitForTaskToken part in the Resource definition indicates that this step pauses the execution and waits until another part of the system calls the sendTaskSuccess or sendTaskFailure callback with token that points to this exact step as a parameter. And yes, that token is passed to taskToken from context object in the Payload property here. Let's check what createAction handler does. After defining the function in serverless.yml

functions:
  getOrder:
    ...
  notifyCustomer:
    ...
  createAction:
    handler: src/createAction.handler
serverless.yml

we create a record in the OrderProcessingActionsTable.

'use strict'

const AWS = require('aws-sdk')
const db = new AWS.DynamoDB.DocumentClient()

module.exports.handler = async (event) => {
	await db.put({
		TableName: process.env.ORDER_PROCESSING_ACTIONS_TABLE_NAME,
		Item: {
			executionId: event.executionId,
			status: event.status,
			taskToken: event.taskToken
        }
	}).promise()
}

So what does this record mean? It says that:

  • a state machine execution with ID executionId
  • is on hold at step defined with the unique taskToken
  • and the status of the order is status (could be PENDING, PROCESSING, SENT)

So, from this point, the online store employee can check the table and see the orders waiting to be processed. They'd like to unpause the process, continue and update the status of the order, so let's define an API endpoint for that called updateOrder that invokes finishAction lambda function.

functions:
  getOrder:
    ...
  notifyCustomer:
    ...
  createAction:
    ...
  finishAction:
    handler: src/finishAction.handler
    events:
      - http:
          path: order/update
          method: POST
serverless.yml

This function checks if a particular execution is awaiting unpausing; if yes, we can call sendTaskSuccess using the taskToken from the record, which enables the execution to continue. The output parameter is set to stringified empty object, as we need not return anything, in this example, to execution.

We can delete the records hereafter.

'use strict'

const AWS = require('aws-sdk')
const db = new AWS.DynamoDB.DocumentClient()
const stepFunctions = new AWS.StepFunctions()

module.exports.handler = async (event) => {
	const body = JSON.parse(event.body);
	const executionId = body.executionId

	const action = await db.get({
		TableName: process.env.ORDER_PROCESSING_ACTIONS_TABLE_NAME,
		Key: {
			executionId
		}
	}).promise()


	await stepFunctions.sendTaskSuccess({
		taskToken: action.Item.taskToken,
		output: '{}'
	}).promise()

	await db.delete({
		TableName: process.env.ORDER_PROCESSING_ACTIONS_TABLE_NAME,
		Key: {
			executionId
		}
	}).promise()

	return {
		statusCode: 200
	}
}
finishAction.js

The permissions have to be updated as well.

provider:
  ...
  iamRoleStatements:
    ...
    - Effect: "Allow"
      Action:
        - states:SendTaskSuccess
      Resource: "*"
serverless.yml

The execution continues to the Set PROCESSING status, which sets the status property the same way as Set PENDING status step, followed by Update order record PROCESSING that updates the order in orders table using DynamoDB API directly, then Notify customer PROCESSING and Wait for status update from PROCESSING doing the same as the previous waiting step - it waits for sendTaskSuccess() to be called by the finishAction handler.

stepFunctions:
  ...
  stateMachines:
    processOrder:
      ...
      definition:
        StartAt: Set PENDING status
        States:
          Set PENDING status:
            ...
            Next: Create order record
          Create order record:
          	...
            Next: Notify customer PENDING
          Notify customer PENDING:
            ...
            Next: Wait for status update from PENDING
          Wait for status update from PENDING:
            ...
            Next: Set PROCESSING status
          Set PROCESSING status:
            Type: Pass
            Parameters:
              status: PROCESSING
              "items.$": $.items
            ResultPath: $
            Next: Update order record PROCESSING
          Update order record PROCESSING:
            Type: Task
            Resource: "arn:aws:states:::dynamodb:updateItem"
            Parameters:
              TableName: { "Ref": "OrdersTable" }
              Key:
                orderId:
                  "S.$": $$.Execution.Id
              ExpressionAttributeValues:
                ":s":
                  "S.$": $.status
              ExpressionAttributeNames:
                "#s": "status"
              UpdateExpression: "set #s = :s"
            ResultPath: null
            Next: Notify customer PROCESSING
          Notify customer PROCESSING:
            Type: Task
            Resource: { "Fn::GetAtt": ["notifyCustomer", "Arn"] }
            Next: Wait for status update from PROCESSING
          Wait for status update from PROCESSING:
            Type: Task
            Resource: "arn:aws:states:::lambda:invoke.waitForTaskToken"
            Parameters:
              FunctionName: { "Ref": "createAction" }
              Payload:
               "executionId.$": $$.Execution.Id
               "taskToken.$": $$.Task.Token
               "status.$": $.status
            ResultPath: null
serverless.yml

So this part does pretty much the same as the "pending status" part of the state machine. The only difference is that the order record is not created here, just updated.

We can also reuse this approach to cover the last order status - SENT.

stepFunctions:
  ...
  stateMachines:
    processOrder:
      ...
      definition:
        StartAt: Set PENDING status
        States:
          Set PENDING status:
            ...
            Next: Create order record
          Create order record:
          	...
            Next: Notify customer PENDING
          Notify customer PENDING:
            ...
            Next: Wait for status update from PENDING
          Wait for status update from PENDING:
            ...
            Next: Set PROCESSING status
          Set PROCESSING status:
            ...
            Next: Update order record PROCESSING
          Update order record PROCESSING:
            ...
            Next: Notify customer PROCESSING
          Notify customer PROCESSING:
            ...
            Next: Wait for status update from PROCESSING
          Wait for status update from PROCESSING:
            ...
            Next: Set SENT status
          Set SENT status:
            ..
            Next: Update order record SENT
          Update order record SENT:
            ...
            Next: Notify customer SENT
          Notify customer SENT:
            ...
            End: true
serverless.yml

This state machine is now ready to cover the complete lifecycle of the order from  the PENDING status to PROCESSING and finally SENT.

One last point that needs to be covered is an ERROR status that might appear if something is wrong with the order; it is out of stock, or just cannot be sent to the customer.

Let's update the finishAction handler so it accepts also "success" property in the event body.

'use strict'

const AWS = require('aws-sdk')
const db = new AWS.DynamoDB.DocumentClient()
const stepFunctions = new AWS.StepFunctions()

module.exports.handler = async (event) => {
	const body = JSON.parse(event.body);
	const executionId = body.executionId
	const success = body.success

	const action = await db.get({
		TableName: process.env.ORDER_PROCESSING_ACTIONS_TABLE_NAME,
		Key: {
			executionId
		}
	}).promise()

	if (success) {
		await stepFunctions.sendTaskSuccess({
			taskToken: action.Item.taskToken,
			output: '{}'
		}).promise()
	} else {
		await stepFunctions.sendTaskFailure({
			taskToken: action.Item.taskToken
		}).promise()
	}

	await db.delete({
		TableName: process.env.ORDER_PROCESSING_ACTIONS_TABLE_NAME,
		Key: {
			executionId
		}
	}).promise()

	return {
		statusCode: 200
	}
}
finishAction.js

The permissions have to be updated once again.

provider:
  ...
  iamRoleStatements:
    ...
    - Effect: "Allow"
      Action:
        - states:SendTaskSuccess
		- states:SendTaskFailure
      Resource: "*"
serveless.yml

Now, if the store employee, for some reason, is not able to process the order, they can set success to false in the POST order/update payload; the handler will invoke sendTaskFailure() instead of sendTaskSuccess().

{
  "success": false
}
POST order/update

This failure, of course, has to be handled in the state machine definition.

We add the Catch field to each "wait" task.

stepFunctions:
  ...
  stateMachines:
    processOrder:
      ...
      definition:
        StartAt: Set PENDING status
        States:
          ...
          Wait for status update from PENDING:
            ...
            Catch:
              - ErrorEquals: ["States.TaskFailed"]
                ResultPath: null
                Next: Set ERROR status
            Next: Set PROCESSING status
          ...
          Wait for status update from PROCESSING:
            ...
            Catch:
              - ErrorEquals: ["States.TaskFailed"]
                ResultPath: null
                Next: Set ERROR status
            Next: Set SENT status
          ...
          Set ERROR status:
            Type: Pass
            Parameters:
              status: ERROR
              "items.$": $.items
            ResultPath: $
            Next: Notify customer ERROR
          Notify customer ERROR:
            Type: Task
            Resource: { "Fn::GetAtt": ["notifyCustomer", "Arn"] }
            End: true
serverless.yml

Once sendTaskFailure() is invoked, the "wait" step throws an error that is caught based on the ErrorEquals definition, in this case States.TaskFailed. Read more about other error names in the official documentation.

Catch also defines the step that the state machine should continue with in case of an error. In this case it's Set ERROR status followed by the last Notify customer ERROR. The switch in notifyCustomer handler has to be updated as well, so the ERROR status also has its own message.

That's it. This is how the state machine looks like after these last and final changes.

Wrap up

I hope now you have better understanding how other services can dynamically interact with AWS Step Functions. This pattern is flexible enough to cover also more complex requirements.

For example the action record doesn't have to be deleted from the OrderProcessingActionsTable. A flag can be set there to indicate that action was already handled and together with timestamp and external service identifier you can have a history record of who and when interacted with the state machine.

In more complex service i'd also cover more steps with Catch error handling, especially those where DynamoDB API is involved.

That's it for now. In case you have any questions, feel free to contact me at Twitter @fekecrad.