Snowflake Open Flow in Action – Part 3 – Setting up Openflow Connector for MySQL

Introduction:

Here is the next blog in the Snowflake Openflow series, where we dive into the implementation of the Openflow Connector for MySQL. This post explores the connector’s workflow, and illustrates how Snowflake streamlines replication by abstracting the underlying processors and controllers. This connector is ideal if you’re looking to enable continuous CDC replication of MySQL tables into Snowflake for centralized and comprehensive reporting. In this blog, we’ll use a standard MySQL database (version 8.0) to demonstrate how to replicate an entire table to Snowflake, how incremental load is managed, and how the connector handles schema changes.

Prerequisites

  1. Ensure that you have a MySQL 8 or a later version to synchronize data with Snowflake.
  2. Ensure that Openflow is setup on your snowflake account. You can refer to my previous blog to set it up.
  3. Perform the following settings in MySQL as an administrator:
  • Add the following entries in your MySQL configuration file
log_bin = on
binlog_format = row
binlog_row_metadata = full
binlog_row_image = full
binlog_row_value_options =
  • Increase the value of sort_buffer_size
sort_buffer_size = 4194304
  • Create a user for the connector. Grant the REPLICATION_SLAVE and REPLICATION_CLIENT privileges to this user
GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%'
GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
  • Grant the SELECT privilege on every replicated table to this user
GRANT SELECT ON <schema>.* TO '<username>'@'%'
GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
  • Perform the following settings in Snowflake as an administrator:
    • Create a user with type = ‘SERVICE’. Create a database to store the replicated data and grant the service user appropriate privileges to create objects in that database. Snowflake recommends starting with a MEDIUM warehouse and scale up based on the workload. We can use Multi-cluster Warehouses for if we have large number of tables.
CREATE DATABASE OPENFLOW_MYSQL;
CREATE USER OPENFLOW_MYSQL_USER TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
CREATE ROLE OPENFLOW_ROLE_MYSQL;
GRANT ROLE OPENFLOW_ROLE_MYSQL TO USER OPENFLOW_MYSQL_USER;
GRANT USAGE ON DATABASE OPENFLOW_MYSQL TO ROLE OPENFLOW_ROLE_MYSQL;
GRANT CREATE SCHEMA ON DATABASE OPENFLOW_MYSQL TO ROLE OPENFLOW_ROLE_MYSQL;
CREATE WAREHOUSE OPENFLOW_WH_MYSQL WITH 
          WAREHOUSE_SIZE = 'MEDIUM'
          AUTO_SUSPEND = 300
          AUTO_RESUME = TRUE;
GRANT USAGE, OPERATE ON WAREHOUSE OPENFLOW_WH_MYSQL TO ROLE OPENFLOW_ROLE_MYSQL;
GRANT ROLE OPENFLOW_ROLE_MYSQL TO USER <your_user_name>;
  • Create a public/private key pairs and assign the public key to this user
ALTER USER OPENFLOW_MYSQL_USER SET RSA_PUBLIC_KEY='MIIBI…';

Usage of a Tunneling Tool

Snowflake does not offer a native option for connecting directly to an on-prem MySQL server currently. To address this limitation, I employed Ngrok—a cross-platform tunneling tool—as a workaround. Ngrok allows you to expose a local server to the internet with minimal configuration, providing a publicly accessible TCP endpoint. This endpoint acts as a secure gateway to the locally hosted MySQL server and is specified in the MySQL connector’s configuration parameters to enable connectivity from Snowflake.

Here are the steps to get the TCP Endpoint from Ngrok:

  • Sign-up for Ngrok using your email id and card details. Please note that your card won’t be charged unless you subscribe to Ngrok.
  • Click on Setup & Installation option under Getting Started and choose Download menu to download Ngrok for Windows to your local.
  • Unzip the downloaded file and click on ngrok application.
  • Copy your Auth token from Your Authtoken on the left under Getting Started menu and replace it in the below command and run it
ngrok config add-authtoken <your_authtoken>
  • Once the authtoken is successfully updated in the config file, run the below command to get TCP endpoint for the MySQL installed on your local. Port number should be replaced with the port number where the MySQL is installed. In my case, it was 3306.
ngrok tcp <port>

Steps to setup the connector:

  • Launch Openflow from Data -> Openflow. Create a new Runtime MySQL-Runtime using the same Deployment that is created earlier (Refer previous blog to know how to create a Deployment). Once the Runtime is Active, navigate to the Openflow Overview page. Go to Featured Connections section and click on View more Connectors. Select MySQL connector and click on Add to Runtime.
  • Select your runtime from the Available runtimes drop-down list. Click Add.
  • Once the connector installation is complete, Openflow Canvas will appear with the connector process group added to it.
  • Now right-click on open area of the canvas and select Enable All Controller Services.
  • Now right-click on mysql-connector and select Controller Services to check whether all the services are enabled.
  • Now again right-click on mysql-connector and select Parameters to set the flow parameters.
  • There are three Parameters context to be set to replicate MySQL tables in real-time
  • Source Database Parameters:
Parameter NameParameter ValueDescription
MySQL Connection URLjdbc:mariadb://0.tcp.in.ngrok.io:17445?allowPublicKeyRetrieval=trueGive the JDBC URL for MySQL Database. Set allowPublicKeyRetrieval to True if SSL is disabled. Prefix the URL with jdbc:mariadb since this connector uses MariaDB to connect to MySQL
MySQL JDBC Driver Install the latest jar file from https://dlm.mariadb.com/4234102/Connectors/java/connector-java-3.5.3/mariadb-java-client-3.5.3.jar and upload it by checking the Reference Assets box
MySQL UsernamesnowflakeuserGive the username for the new user you created in MySQL
MySQL Password<password for the above user>Give the password for the above user
  • Snowflake Parameters:
Parameter NameParameter ValueDescription
Merge Task Schedule CRON* * * * * ?CRON expression defining periods when merge operations from Journal to Destination Table will be triggered. Set it to * * * * * ? if you want to have continuous merge.
Snowflake Account<Organization_Name>-<Account_Name>Give your Snowflake Account Identifier copied from your Account Details
Snowflake DatabaseOPENFLOW_MYSQLGive the Snowflake Database where the data should be stored
Snowflake Private Key RSA Private key Value with header and footer. Note that either Snowflake Private Key or Snowflake Privake Key File should be defined
Snowflake Private Key File Upload the RSA Private key file by checking the Reference Assets box
Snowflake Private Key Password Give the password if you have the encrypted RSA Private Key file
Snowflake User RoleOPENFLOW_ROLE_MYSQLGive the Snowflake Role to be used during execution
Snowflake UsernameOPENFLOW_MYSQL_USERGive the Snowflake Service username created earlier
Snowflake WarehouseOPENFLOW_WH_MYSQLGive the Snowflake Virtual warehouse to be used for execution
  • Replication Parameters:
Parameter NameParameter ValueDescription
Filter JSON[]A JSON containing a list of fully-qualified table names and a regex pattern for column names that should be included into replication. We will keep this as an empty array for now
Included Table Namesworld.countryA comma-separated list of fully qualified table names to be replicated
Included Table RegexsnowflakeuserA regular expression to match against table names. New tables matching the pattern that get created later will also be included automatically. We will keep it empty for now
  • Run the Flow by right-clicking on mysql-connector process group and then by clicking on Start. The connector will start the data ingestion task.
  • Navigate to the Database and check whether the table has been created. Query to table to check if records got loaded from MySQL

Snowflake Table:

MySQL Table:

Change Data Capture (CDC) Changes:

  • Now let’s see how this connector handles the Change Data Capture (CDC) changes. Let us do some DML operations on the MySQL table and see whether the CDC data gets captured in the snowflake table. We will insert two records, update a record and delete a record.
INSERT INTO `world`.`country` (`Code`, `Name`, `Continent`, `Region`, `SurfaceArea`, `IndepYear`, `Population`, `LocalName`) VALUES ('ABC', 'NEW_COUNTRY', 'Africa', 'Eastern Africa', '390757.00', '2020', '1000', 'NEW_COUNTRY');
INSERT INTO `world`.`country` (`Code`, `Name`, `Continent`, `Region`, `LocalName`) VALUES ('DEF', 'NEW_COUNTRY_1', 'Asia', 'Middle East', 'NEW_COUNTRY_1');
UPDATE `world`.`country` SET `Code` = 'AAD' WHERE (`Code` = 'DEF');
DELETE FROM `world`.`country` WHERE (`Code` = 'ABC');

Since we have set the Task Schedule to Continuous Merge, the source changes are automatically picked up and stored in Journal Tables created in the same schema as the destination table. These changes are then merged into destination tables using the MergeSnowflakeJournalTable processor.

Schema Changes:

  • In this step, we will check the connector’s capability to handle the schema changes in the source table. We will add a column to the MySQL table and see whether it gets reflected in snowflake in real-time.
ALTER TABLE WORLD.COUNTRY ADD COLUMN TIMEZONE VARCHAR(3);

These changes are picked up by Snowflake automatically and the column is now available in the Snowflake table.

Note that there is another Journal table that is created with a suffix _2 which is an increasing integer with every schema change on the source table. This means a source table that undergoes schema changes will have multiple journal tables. Snowflake always uses the latest generation of journal tables to merge with the destination table.

Conclusion:

In this blog, we explored the implementation of the Openflow Connector for MySQL, highlighting its workflow, and how it leverages Snowflake’s architecture to simplify data replication. This connector currently supports MySQL version 8 or higher and works with username/password authentication. For on-premise MySQL servers, a tunneling tool may be required to establish connectivity. We demonstrated full table replication, incremental CDC handling, and schema change support. This connector serves as a powerful tool for enabling real-time, centralized analytics in Snowflake.

Look out for upcoming posts where we’ll delve deeper into other OpenFlow connectors, share best practices, and explore real-world implementation scenarios.

Reference Links: https://docs.snowflake.com/en/user-guide/data-integration/openflow/connectors/mysql/setup

Please feel free to reach out to us for your Snowflake solution needs. Cittabase is a Premier  partner with Snowflake.



Leave a Reply