AWS: Kinesis Data Firehose with Lambda and ElasticSearch
In this post, we'll learn how Kinesis Firehose captures streaming data and transforms the data, and then sends it to ElasticSearch service.
We'll do the following:
- Generate streaming data containing stock quote information
- Send the data to an Kinesis Firehose delivery stream
- Kinesis Firehose will then call an Lambda function to transform the data
- Kinesis Firehose will then collect the data into batches and send the batches to an Elasticsearch service cluster
- We will use Kibana to visualize the streaming data stored in the Elasticsearch cluster
- AWS : EKS (Elastic Container Service for Kubernetes)
- AWS : Creating a snapshot (cloning an image)
- AWS : Attaching Amazon EBS volume to an instance
- AWS : Adding swap space to an attached volume via mkswap and swapon
- AWS : Creating an EC2 instance and attaching Amazon EBS volume to the instance using Python boto module with User data
- AWS : Creating an instance to a new region by copying an AMI
- AWS : S3 (Simple Storage Service) 1
- AWS : S3 (Simple Storage Service) 2 - Creating and Deleting a Bucket
- AWS : S3 (Simple Storage Service) 3 - Bucket Versioning
- AWS : S3 (Simple Storage Service) 4 - Uploading a large file
- AWS : S3 (Simple Storage Service) 5 - Uploading folders/files recursively
- AWS : S3 (Simple Storage Service) 6 - Bucket Policy for File/Folder View/Download
- AWS : S3 (Simple Storage Service) 7 - How to Copy or Move Objects from one region to another
- AWS : S3 (Simple Storage Service) 8 - Archiving S3 Data to Glacier
- AWS : Creating a CloudFront distribution with an Amazon S3 origin
- AWS : Creating VPC with CloudFormation
- AWS : WAF (Web Application Firewall) with preconfigured CloudFormation template and Web ACL for CloudFront distribution
- AWS : CloudWatch & Logs with Lambda Function / S3
- AWS : Lambda Serverless Computing with EC2, CloudWatch Alarm, SNS
- AWS : Lambda and SNS - cross account
- AWS : CLI (Command Line Interface)
- AWS : CLI (ECS with ALB & autoscaling)
- AWS : ECS with cloudformation and json task definition
- AWS Application Load Balancer (ALB) and ECS with Flask app
- AWS : Load Balancing with HAProxy (High Availability Proxy)
- AWS : VirtualBox on EC2
- AWS : NTP setup on EC2
- AWS: jq with AWS
- AWS & OpenSSL : Creating / Installing a Server SSL Certificate
- AWS : OpenVPN Access Server 2 Install
- AWS : VPC (Virtual Private Cloud) 1 - netmask, subnets, default gateway, and CIDR
- AWS : VPC (Virtual Private Cloud) 2 - VPC Wizard
- AWS : VPC (Virtual Private Cloud) 3 - VPC Wizard with NAT
- DevOps / Sys Admin Q & A (VI) - AWS VPC setup (public/private subnets with NAT)
- AWS - OpenVPN Protocols : PPTP, L2TP/IPsec, and OpenVPN
- AWS : Autoscaling group (ASG)
- AWS : Setting up Autoscaling Alarms and Notifications via CLI and Cloudformation
- AWS : Adding a SSH User Account on Linux Instance
- AWS : Windows Servers - Remote Desktop Connections using RDP
- AWS : Scheduled stopping and starting an instance - python & cron
- AWS : Detecting stopped instance and sending an alert email using Mandrill smtp
- AWS : Elastic Beanstalk with NodeJS
- AWS : Elastic Beanstalk Inplace/Rolling Blue/Green Deploy
- AWS : Identity and Access Management (IAM) Roles for Amazon EC2
- AWS : Identity and Access Management (IAM) Policies, sts AssumeRole, and delegate access across AWS accounts
- AWS : Identity and Access Management (IAM) sts assume role via aws cli2
- AWS : Creating IAM Roles and associating them with EC2 Instances in CloudFormation
- AWS Identity and Access Management (IAM) Roles, SSO(Single Sign On), SAML(Security Assertion Markup Language), IdP(identity provider), STS(Security Token Service), and ADFS(Active Directory Federation Services)
- AWS : Amazon Route 53
- AWS : Amazon Route 53 - DNS (Domain Name Server) setup
- AWS : Amazon Route 53 - subdomain setup and virtual host on Nginx
- AWS Amazon Route 53 : Private Hosted Zone
- AWS : SNS (Simple Notification Service) example with ELB and CloudWatch
- AWS : Lambda with AWS CloudTrail
- AWS : SQS (Simple Queue Service) with NodeJS and AWS SDK
- AWS : Redshift data warehouse
- AWS : CloudFormation
- AWS : CloudFormation Bootstrap UserData/Metadata
- AWS : CloudFormation - Creating an ASG with rolling update
- AWS : Cloudformation Cross-stack reference
- AWS : OpsWorks
- AWS : Network Load Balancer (NLB) with Autoscaling group (ASG)
- AWS CodeDeploy : Deploy an Application from GitHub
- AWS EC2 Container Service (ECS)
- AWS EC2 Container Service (ECS) II
- AWS Hello World Lambda Function
- AWS Lambda Function Q & A
- AWS Node.js Lambda Function & API Gateway
- AWS API Gateway endpoint invoking Lambda function
- AWS API Gateway invoking Lambda function with Terraform
- AWS API Gateway invoking Lambda function with Terraform - Lambda Container
- Amazon Kinesis Streams
- AWS: Kinesis Data Firehose with Lambda and ElasticSearch
- Amazon DynamoDB
- Amazon DynamoDB with Lambda and CloudWatch
- Loading DynamoDB stream to AWS Elasticsearch service with Lambda
- Amazon ML (Machine Learning)
- Simple Systems Manager (SSM)
- AWS : RDS Connecting to a DB Instance Running the SQL Server Database Engine
- AWS : RDS Importing and Exporting SQL Server Data
- AWS : RDS PostgreSQL & pgAdmin III
- AWS : RDS PostgreSQL 2 - Creating/Deleting a Table
- AWS : MySQL Replication : Master-slave
- AWS : MySQL backup & restore
- AWS RDS : Cross-Region Read Replicas for MySQL and Snapshots for PostgreSQL
- AWS : Restoring Postgres on EC2 instance from S3 backup
- AWS : Q & A
- AWS : Security
- AWS : Security groups vs. network ACLs
- AWS : Scaling-Up
- AWS : Networking
- AWS : Single Sign-on (SSO) with Okta
- AWS : JIT (Just-in-Time) with Okta
Amazon Kinesis Data Firehose is the easiest way to reliably load streaming data into data stores and analytics tools. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk, enabling near real-time analytics with existing business intelligence tools and dashboards you’re already using today. It is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt the data before loading it, minimizing the amount of storage used at the destination and increasing security.- from Amazon Kinesis Data Firehose
With Firehose, we do not need to write any applications or manage any resources. We just configure our data producers to send data to Firehose and it automatically delivers the data to the specified destination.
As mentioned earlier, in this post, we will be sending streaming data to Kinesis Firehose, which will capture, transform and batch the data before sending it to Elasticsearch:
Data producers will send records to our stream which we will transform using Lambda functions that will be created in this section. After that, the transformed records will be send to ElasticSearch service via Kinesis Firehose.
Here, we'll create a lambda function that will transform the incoming stock data into a format suitable for visualization.
It is quite common for data to require modification prior to processing, such as adding/removing fields, combining fields, converting formats and dropping irrelevant records. AWS Lambda enables data transformation on-the-fly when the streaming data arrives for processing in Amazon Kinesis Firehose.
In this section, incoming stock data will be in JSON format, such as:
{"ticker_symbol":"QXZ", "sector":"HEALTHCARE", "change":-0.05, "price":84.51}
To visualize this data, it requires the addition of a Timestamp to identify when the record was received. We will configure an AWS Lambda function that will transform the data into:
{'timestamp': '2019-02-26T04:33:19.581522', 'ticker_symbol': QXZ', 'price': 84.51}
Let's create the function:
where the role has the following policy:
{ "Version": "2012-10-17", "Statement": [ { "Action": [ "cloudwatch:*" ], "Resource": [ "*" ], "Effect": "Allow" } ] }
The role gives execution permissions to our Lambda function so it can send log output to Amazon CloudWatch Logs.
lambda_function.lambda_handler:
import base64 import json from datetime import datetime # Incoming Event def lambda_handler(event, context): output = [] now = datetime.utcnow().isoformat() # Loop through records in incoming Event for record in event['records']: # Extract message message = json.loads(base64.b64decode(record['data'])) # Construct output data_field = { 'timestamp': now, 'ticker_symbol': message['ticker_symbol'], 'price': message['price'] } output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(json.dumps(data_field)) } output.append(output_record) return {'records': output}
In this task, we will create a Kinesis Firehose delivery stream. It will transform incoming data by using the Lambda function we just created and will then send the output to Elasticsearch. Here, we assume, we've already created an Elasticsearch Service cluster.
On the Kinesis service, click "Get started" and "Create delivery stream".
For Delivery stream name, enter "stocks-stream".
The information on the screen explains options for accepting incoming streaming data.
Scroll to the bottom of the screen, then click "Next".
The Transform source records page allows a Lambda function to be specified for transforming incoming data. We will be transforming the content of the incoming data to add a timestamp.
For Record transformation, click Enabled. For Lambda function, select Add-Stock-Timestamp.
Then, scroll to the bottom of the screen, click "Next"
For Destination, select Amazon Elasticsearch Service, then configure: Domain: stocks, Index: "stock", Type: "stock".
An Amazon S3 bucket should have already been created. It will be used to store any records that fail to be processed. In the S3 backup section, for Backup S3 bucket, select the bucket, and click "Next".
The Configure settings page will be displayed. The Elasticsearch buffer conditions can be used to specify when Kinesis Firehose should send data to the Elasticsearch cluster. We will configure data to be sent whenever there is 1 MB of data or when 60 seconds has passed.
We will now specify the permissions assigned to your Firehose. It will be given permission to use Amazon S3, AWS Lambda, Amazon Elasticsearch Service and Amazon CloudWatch Logs.
Scroll down to IAM role and then click "Create new", then configure: IAM Role: Demo-Firehose-Role, Policy Name: Demo-Firehose-Policy
where Demo-Firehose-Role:
{ "Version": "2012-10-17", "Statement": [ { "Action": [ "s3:Get*", "s3:List*", "s3:PutObject", "lambda:*", "es:*", "logs:PutLogEvents" ], "Resource": [ "*" ], "Effect": "Allow", "Sid": "S3" } ] }
Click "Create delivery stream".
On the Elasticsearch Service, from the list of My Elasticsearch domains, click stocks.
Examine the Domain status. We will need to wait for the status to be Active, then Click the Kibana link.
A new tab opens with Kibana. Kibana is an open-source data visualization and exploration that has tight integration with Elasticsearch, which makes Kibana the default choice for visualizing data stored in Elasticsearch.
We are now ready to send data to the Firehose delivery stream.
We will use a built-in testing function. In the Kinesis service, click the name of our "stocks-stream". Expand Test with demo data section at the top of the page, and click "Start sending demo data"
This will now start sending random stock data to the delivery stream, such as:
{"ticker_symbol":"QXZ", "sector":"HEALTHCARE", "change":-0.05, "price":84.51}
The Lambda function will then transform the data by adding a timestamp and only including fields necessary for visualization, such as:
{'timestamp': '2017-07-30T04:33:19.581522', 'ticker_symbol': QXZ', 'price': 84.51}
The data will then be sent to the Elasticsearch cluster, where it will be available for visualization in Kibana.
Keep the Kinesis Firehose tab open so that it continues to send data.
Switch back to the Kibana tab in our web browser. For Index name or pattern, replace logstash-* with "stock". In the Time-field name pull-down, select timestamp.
Click "Create", then a page showing the stock configuration should appear, in the left navigation pane, click Visualize, and click "Create a visualization". For Select visualization type, click Line chart, click stock.
We will now configure a chart to show stock prices over time. Under the metrics heading, click the arrow beside Y-Axis, then configure: Aggregation: Average, Field: price. Under buckets, click X-Axis, then configure: Aggregation: Date Histogram, Field: timestamp.
At the bottom of the screen, click "Add sub-buckets", click Split Lines, then configure: Sub Aggregation: Terms, Field: ticker_symbol.keyword.
Click the play button at the top of the screen to view the chart. The chart will now display stock prices over time. This data is being received in real-time from Amazon Kinesis Firehose. Click Refresh above the cart every 30 seconds to update the display.
You can monitor information about the Firehose delivery stream by viewing metrics captured by Amazon CloudWatch. Scroll to the bottom of the Kinesis console, then click the Monitoring tab:
AWS (Amazon Web Services)
Ph.D. / Golden Gate Ave, San Francisco / Seoul National Univ / Carnegie Mellon / UC Berkeley / DevOps / Deep Learning / Visualization