Computer Programming

ETL MySQL data to AWS Redshift

We’ll use Rails framework and a Java service for this task. Ruby on Rails, or Rails, is a server-side web application framework written in Ruby under the MIT License.

Mysql

Now before I explain how to automate the DDL (Data Definition Language) statements, let me explain the data flow.

Data pipeline is like this:

MySQL -> Kafka -> AWS S3 -> AWS Redshift.

So let’s break it in steps:

  • MySQL -> Kafka

We’ll use a Java service to read MySQL binary logs and push data into Kafka. This open source Java library, developed by Stanley Shyiko come in handy.

Why we use Kafka?

One of the many uses of Kafka is stream processing. Kafka acts as a buffer between RDS and our OLAP (Online Analytical Processing System) i.e. Redshift.

Why we use AWS S3?

AWS S3 is a storage service offered by Amazon. It stands for simple storage service and provides cloud storage for various types of web development applications. S3 is easily scalable and as per AWS S3, guarantees 99.9% of monthly uptime. As a result it serves best as a backup and recovery service.

  • Kafka -> AWS S3

There are many Kafka clients for Ruby. But in this example we’ll be using ruby-kafka client. For this you have to install ruby-kafka gem, using the following command:

gem install ruby-kafka 

Now, to consume messages from Kafka topic, we’ll do

require "kafka" 
 
kafka = Kafka.new( [server-ip], 
sasl_scram_username: 'username', 
sasl_scram_password: 'password') 
 
consumer = kafka.consumer(group_id: "my_group") 
consumer.subscribe("my_topic") 
 
consumer.each_message do |message|  
	puts "#{message.topic}, #{message.partition}, #{message.offset},       #{message.key}, #{message.value}" 
end 

We’ll use text/CSV format to save data into S3. And each table will have their own S3 folder with subdirectories in chronological order. To copy data to S3 we’ll use ‘aws-sdk-s3’ gem.

require 'aws-sdk-s3' 
 
source_bucket_name = '*** Provide bucket name ***' 
target_bucket_name = '*** Provide bucket name ***' 
source_key = '*** Provide source key ***' 
target_key = '*** Provide target key ***' 
 
begin   
	s3 = Aws::S3::Client.new(region: 'us-west-2')  
	s3.copy_object(bucket: target_bucket_name,  
	copy_source:source_bucket_name + '/' + source_key,  
	key: target_key) 
rescue StandardError => ex   
	puts 'Caught exception copying object ' + source_key + ' from bucket ' + source_bucket_name + ' to bucket ' + target_bucket_name + ' as ' + target_key + ':'   
	puts ex.message' 
end 
 
puts 'Copied ' +  source_key + ' from bucket ' + source_bucket_name + ' to bucket ' + target_bucket_name + ' as ' + target_key 

Also read: What is sharding? Why is it important for database?

S3 -> Redshift

For final step all you need is AWS Redshift copy command to copy data from S3 to Redshift.

How to manipulate schema and manage DDL statements?

We’ll read MySQL binary logs using the aforementioned Java library and push each ALTER statement with an ALTER flag (so that we can identify it while consuming Kafka logs) into Kafka.

Now for consuming logs and for copying them into Redshift you need to maintain your table configurations to specify the column order to be used in copy command.

We’ll move the configurations (table schema) to meta database as JSON files.

Process

So with each DDL statement, you encounter while consuming Kafka logs, your configuration needs to be updated. Now you can get the table schema from MySQL table “information_schema.columns”. You can dump this schema on server (where your Ruby code is running) into a text file. Now you have to parse this schema (a bit of String manipulation) and develop your table configuration(column order) to be later used for copying s3 file data into redshift (same column order is used to push data into s3).

So for example, if Kafka logs are like:

ADD 1, Name1, 100, Delhi

ADD 2, Name2, 200, Bangalore

ADD 3, Name3, 300, Mumbai

ALTER ALTER TABLE data ADD COLUMN newcolumn varchar(100)

ADD 4, Name4, 400 Delhi, India gate

Now consumer while reading(consuming) kafka logs, for first three lines it will just write the columnar data(of table) to s3. But when it encounters fourth line i.e. ALTER statement, it will update and reload the table configuration and now will read(consume) data according to new configuration. A similar ALTER flag will also be written to s3 file, so that while copying data(iteratively) from s3 to Redshift latest(respective) configuration can be loaded.

However to facilitate backfilling of data you can maintain versions of configuration (MySQL schema) and add the version number to your S3 files.

So that for each folder(table) you select the respected version.