Serverless on AWS with DynamoDB, SNS and CloudWatch

Serverless on AWS with DynamoDB, SNS and CloudWatch

In one of my previous posts Serverless on AWS Lambda I presented an example of creating REST API based on AWS Lambda functions. However, we should keep in mind that this mechanism is also used to exchange events between services (SaaS) provided by AWS. Now I will show such an example of using object database like DynamoDB, sending messages with Simple Notification Service (SNS) and monitoring logs with CloudWatch.

Let’s begin from our sample application. For our test purposes I designed simple system which grants some bonuses basing on incoming orders. First, we are invoking service which put order record into DynamoDB table. Basing on insert event which triggers Lambda function we are processing this event and perform transaction on customer account which id is stored in another DynamoDB table. Afterwards we are sending message to the topic with order information. This topic is created using Amazon SNS service and there are three Lambda functions listening for incoming messages. Each of them grants a bonus that recharges customer account basing on different input data. System architecture is visualized on the figure below. Sample application source code is available on GitHub.

aws

Every AWS Lambda function needs to implement RequestHandler interface. For more details about basic rules, deployment process and usable tools go to my first article about that subject Serverless on AWS Lambda. Coming back to our sample below you can see implementation of first lambda function PostOrder. It does nothing more saving incoming Order object in DynamoDB table. For storing data in DynamoDB we can use ORM mechanism available inside AWS Java libraries. How to use basic DynamoDB annotations you can also read in my first article about serverless.

[code language=”java”]
public class PostOrder implements RequestHandler<Order, Order> {

private DynamoDBMapper mapper;

public PostOrder() {
AmazonDynamoDBClient client = new AmazonDynamoDBClient();
mapper = new DynamoDBMapper(client);
}

@Override
public Order handleRequest(Order o, Context ctx) {
LambdaLogger logger = ctx.getLogger();
mapper.save(o);
Order r = o;
logger.log("Order saved: " + r.getId());
return r;
}

}
[/code]

Assuming we have our first function implemented and deployed on AWS we should configure API gateway which expose it outside. To achieve it go to Lambda Management Console on AWS, select PostOrder function. Then go to Triggers section and select API Gateway as a trigger for calling your function.

lambda-1

Unfortunately it’s not all we need to have our API gateway redirecting requests to Lambda function. Go to API Gateway section and select OrderService. We should remove ANY method and configure POST invoking our function as you see on the picture below.

lambda-3

Then you should see diagram visible below where all the steps of calling lambda function from API Gateway are visualized.

lambda-4

What’s worth doing is to create a model object in Model section. For Order class it should look like in the picture below, which is with compatible JSON schema notation. After creating model definition set it as a request body inside Method Request panel and response body inside Method Response panel.

lambda-5

Finally, deploy the resource using Deploy API action and try to call it on your which can be checked in Stages section.

lambda-6

Let’s see the second implementation of lambda function – ProcessOrderFunction. It is triggered by insert event received from DynamoDB order table. This function is responsible for reading data from incoming event, then create and send message to the target topic. DynamodbEvent stores data as a map, where the key is column name in order table. To get value from map we have to pass data type, for example string is collected using getS method and integer using getN method. The message send to SNS topic is serialized to JSON string with Jackson library.

[code language=”java”]
public class ProcessOrder implements RequestHandler<DynamodbEvent, String> {

private AmazonSNSClient client;
private ObjectMapper jsonMapper;

public ProcessOrder() {
client = new AmazonSNSClient();
jsonMapper = new ObjectMapper();
}

public String handleRequest(DynamodbEvent event, Context ctx) {
LambdaLogger logger = ctx.getLogger();
final List<DynamodbStreamRecord> records = event.getRecords();

for (DynamodbStreamRecord record : records) {
try {
logger.log(String.format("DynamoEvent: %s, %s", record.getEventName(), record.getDynamodb().getNewImage().values().toString()));
Map<String, AttributeValue> m = record.getDynamodb().getNewImage();
Order order = new Order(m.get("id").getS(), m.get("accountId").getS(), Integer.parseInt(m.get("amount").getN()));
String msg = jsonMapper.writeValueAsString(order);
logger.log(String.format("SNS message: %s", msg));
PublishRequest req = new PublishRequest("arn:aws:sns:us-east-1:658226682183:order", jsonMapper.writeValueAsString(new OrderMessage(msg)), "Order");
req.setMessageStructure("json");
PublishResult res = client.publish(req);
logger.log(String.format("SNS message sent: %s", res.getMessageId()));
} catch (JsonProcessingException e) {
logger.log(e.getMessage());
}
}

return "OK";
}
}
[/code]

Same as for PostOrder function we also should add trigger for ProcessOrder – but this time the trigger is DynamoDB table.

lambda-10

In the Simple Notification Service section create order topic. Amazon SNS client uses ARN address for identifying the right topic. As you can see on the picture below there is also topic for DynamoDB which was created with database trigger.

lambda-11

The last implementation step in the sample is to create lambda functions which are listening on SNS topic for incoming order messages. Here’s OrderAmountHandler function code. The logic is simple. After message receive it needs to perform deserialization from JSON, then check order amount and modify balance value in account table using accountId from order object.

[code language=”java”]
public class OrderAmountHandler implements RequestHandler<SNSEvent, Object> {

private final static int AMOUNT_THRESHOLD = 1500;
private final static int AMOUNT_BONUS_PERCENTAGE = 10;

private DynamoDBMapper mapper;
private ObjectMapper jsonMapper;

public OrderAmountHandler() {
AmazonDynamoDBClient client = new AmazonDynamoDBClient();
mapper = new DynamoDBMapper(client);
jsonMapper = new ObjectMapper();
}

@Override
public Object handleRequest(SNSEvent event, Context context) {
final LambdaLogger logger = context.getLogger();
final List<SNSRecord> records = event.getRecords();

for (SNSRecord record : records) {
logger.log(String.format("SNSEvent: %s, %s", record.getSNS().getMessageId(), record.getSNS().getMessage()));
try {
Order o = jsonMapper.readValue(record.getSNS().getMessage(), Order.class);
if (o.getAmount() >= AMOUNT_THRESHOLD) {
logger.log(String.format("Order allowed: id=%s, amount=%d", o.getId(), o.getAmount()));
Account a = mapper.load(Account.class, o.getId());
a.setBalance(a.getBalance() + o.getAmount() * AMOUNT_BONUS_PERCENTAGE);
mapper.save(a);
logger.log(String.format("Account balande update: id=%s, amount=%d", a.getId(), a.getBalance()));
}
} catch (IOException e) {
logger.log(e.getMessage());
}
}

return "OK";
}

}
[/code]

After creating and deploying all our three functions we have to subscribe them into the order topic.

lambda-7

All logs from your lambda functions can be inspected with CloudWatch service.

lambda-8

lambda-9

Don’t forget about permissions in My Security Credentials section. For my example I had to attach the following policies to my default execution role: AmazonAPIGatewayInvokeFullAccess, AmazonDynamoDBFullAccess, AWSLambdaDynamoDBExecutionRole and AmazonSNSFullAccess.

Leave a Reply