ETL MySQL data to AWS Redshift

We’ll use Rails framework and a Java service for this task. Ruby on Rails, or Rails, is a server-side web application framework written in Ruby under the MIT License.

Now before I explain how to automate the DDL (Data Definition Language) statements, let me explain the data flow.

MySQL -> Kafka -> AWS S3 -> AWS Redshift.

So let’s break it in steps:

• MySQL -> Kafka

We’ll use a Java service to read MySQL binary logs and push data into Kafka. This open source Java library, developed by Stanley Shyiko come in handy.

Why we use Kafka?

One of the many uses of Kafka is stream processing. Kafka acts as a buffer between RDS and our OLAP (Online Analytical Processing System) i.e. Redshift.

Why we use AWS S3?

AWS S3 is a storage service offered by Amazon. It stands for simple storage service and provides cloud storage for various types of web development applications. S3 is easily scalable and as per AWS S3, guarantees 99.9% of monthly uptime. As a result it serves best as a backup and recovery service.

• Kafka -> AWS S3

There are many Kafka clients for Ruby. But in this example we’ll be using ruby-kafka client. For this you have to install ruby-kafka gem, using the following command:

gem install ruby-kafka 

Now, to consume messages from Kafka topic, we’ll do

require "kafka"

kafka = Kafka.new( [server-ip],

consumer = kafka.consumer(group_id: "my_group")
consumer.subscribe("my_topic")

consumer.each_message do |message|
puts "#{message.topic}, #{message.partition}, #{message.offset},       #{message.key}, #{message.value}"
end 

We’ll use text/CSV format to save data into S3. And each table will have their own S3 folder with subdirectories in chronological order. To copy data to S3 we’ll use ‘aws-sdk-s3’ gem.

require 'aws-sdk-s3'

source_bucket_name = '*** Provide bucket name ***'
target_bucket_name = '*** Provide bucket name ***'
source_key = '*** Provide source key ***'
target_key = '*** Provide target key ***'

begin
s3 = Aws::S3::Client.new(region: 'us-west-2')
s3.copy_object(bucket: target_bucket_name,
copy_source:source_bucket_name + '/' + source_key,
key: target_key)
rescue StandardError => ex
puts 'Caught exception copying object ' + source_key + ' from bucket ' + source_bucket_name + ' to bucket ' + target_bucket_name + ' as ' + target_key + ':'
puts ex.message'
end

puts 'Copied ' +  source_key + ' from bucket ' + source_bucket_name + ' to bucket ' + target_bucket_name + ' as ' + target_key


S3 -> Redshift

For final step all you need is AWS Redshift copy command to copy data from S3 to Redshift.

How to manipulate schema and manage DDL statements?

We’ll read MySQL binary logs using the aforementioned Java library and push each ALTER statement with an ALTER flag (so that we can identify it while consuming Kafka logs) into Kafka.

Now for consuming logs and for copying them into Redshift you need to maintain your table configurations to specify the column order to be used in copy command.

We’ll move the configurations (table schema) to meta database as JSON files.

Process

So with each DDL statement, you encounter while consuming Kafka logs, your configuration needs to be updated. Now you can get the table schema from MySQL table “information_schema.columns”. You can dump this schema on server (where your Ruby code is running) into a text file. Now you have to parse this schema (a bit of String manipulation) and develop your table configuration(column order) to be later used for copying s3 file data into redshift (same column order is used to push data into s3).

So for example, if Kafka logs are like: