import os
import json
import pandas as pd
import snowflake.connector
class SnowflakeHandler:
"""
A wrapper class to upload local CSV files to a Snowflake stage, load the data into a table,
and download data from a Snowflake table to a local file.
"""
def __init__(self, local_file_path=None, table_name=None, stage_name="@~", config_path="config.json"):
"""
Initialize the SnowflakeHandler with file paths, table name, and connection details.
Args:
local_file_path (str): Local path to the CSV file for upload/download.
table_name (str): Target Snowflake table name.
stage_name (str): Snowflake stage name. Defaults to '@~'.
config_path (str): Path to the JSON configuration file with Snowflake credentials.
"""
self.local_file_path = local_file_path
self.table_name = table_name
self.stage_name = stage_name
self.snowflake_stage_file = os.path.basename(local_file_path) if local_file_path else None
# Load Snowflake connection parameters from the config file
with open(config_path, 'r') as f:
self.conn_params = json.load(f)
# Connect to Snowflake
self.conn = snowflake.connector.connect(**self.conn_params)
self.cur = self.conn.cursor()
def map_dtype_to_snowflake(self, dtype):
"""
Map pandas data types to Snowflake SQL types.
Args:
dtype (dtype): Pandas data type.
Returns:
str: Corresponding Snowflake SQL type.
"""
if pd.api.types.is_integer_dtype(dtype):
return "INTEGER"
elif pd.api.types.is_float_dtype(dtype):
return "FLOAT"
elif pd.api.types.is_bool_dtype(dtype):
return "BOOLEAN"
elif pd.api.types.is_datetime64_any_dtype(dtype):
return "TIMESTAMP"
else:
return "STRING"
def generate_create_table_command(self, df):
"""
Generate a CREATE TABLE SQL statement based on a DataFrame's schema.
Args:
df (pd.DataFrame): DataFrame for inferring table schema.
Returns:
str: SQL command to create the table.
"""
= []
column_definitions for col in df.columns:
= self.map_dtype_to_snowflake(df[col].dtype)
col_type f'"{col}" {col_type}')
column_definitions.append(= ",\n ".join(column_definitions)
column_definitions_str
return f"""
CREATE OR REPLACE TABLE {self.table_name} (
{column_definitions_str}
);
"""
def upload_file_to_stage(self):
"""
Upload the local CSV file to the Snowflake stage.
"""
= f"PUT 'file://{self.local_file_path}' {self.stage_name}/{self.snowflake_stage_file} AUTO_COMPRESS=FALSE"
put_command self.cur.execute(put_command)
print(f"File '{self.snowflake_stage_file}' uploaded to Snowflake stage '{self.stage_name}'.")
def load_data_into_table(self):
"""
Copy the staged file into the Snowflake table.
"""
= f"""
copy_command COPY INTO {self.table_name}
FROM {self.stage_name}/{self.snowflake_stage_file}
FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY = '"' SKIP_HEADER = 1)
"""
self.cur.execute(copy_command)
print(f"Data from '{self.snowflake_stage_file}' successfully loaded into table '{self.table_name}'.")
def download_table(self):
"""
Download data from a Snowflake table and save it as a local CSV file.
"""
= f"SELECT * FROM {self.table_name}"
query self.cur.execute(query)
# Fetch all rows and column names
= self.cur.fetchall()
data = [desc[0] for desc in self.cur.description]
column_names
# Save to CSV using pandas
= pd.DataFrame(data, columns=column_names)
df self.local_file_path, index=False)
df.to_csv(print(f"Data from table '{self.table_name}' successfully downloaded to '{self.local_file_path}'.")
def upload_table(self):
"""
Full process: Upload file, generate table schema, create table, and load data.
"""
# Read the CSV file to infer schema
= pd.read_csv(self.local_file_path)
df
# Step 1: Upload file to stage
self.upload_file_to_stage()
# Step 2: Generate and execute CREATE TABLE command
= self.generate_create_table_command(df)
create_table_sql self.cur.execute(create_table_sql)
print(f"Table '{self.table_name}' created successfully.")
# Step 3: Load data into table
self.load_data_into_table()
def close_connection(self):
"""
Close the Snowflake connection.
"""
self.cur.close()
self.conn.close()
print("Snowflake connection closed.")
I know—sketchy title. But bear with me for a second. Today, I want to dive into the world of data warehousing and explain why Snowflake, as a data platform, stands out as the “special snowflake” in this space. So…
What Is Snowflake?
Snowflake is a cloud-native data platform that provides a fully managed service for data warehousing, data lakes, and data analytics. Unlike traditional data warehouses, which often require heavy infrastructure management and scaling considerations, Snowflake was built from the ground up to leverage the power and flexibility of the cloud.
But what exactly makes Snowflake so special? Let’s break it down.
Traditional data warehouses often couple storage (where the data is saved) with compute resources (how the data is processed). This means if your queries suddenly require more processing power, you might have to scale up everything, including your storage capacity, even if it isn’t needed. Snowflake solves this by decoupling compute from storage.
Storage: Your data is stored in scalable cloud storage (e.g., Amazon S3, Azure Blob Storage, or Google Cloud Storage). You only pay for the storage you use. Compute: Compute resources are handled via “virtual warehouses.” These can scale up or down independently of the storage layer, allowing you to allocate resources based on your query load. This separation means you can handle huge amounts of data without bottlenecking your compute resources—or breaking the bank.
Snowflake is designed to run on multiple cloud providers, including AWS, Azure, and Google Cloud. This multi-cloud capability ensures you’re not locked into a single cloud provider. You can choose the one that best aligns with your organization’s needs—or even operate across clouds for redundancy and performance optimization.
For example: - You might store your data in AWS for its storage cost benefits. - You might analyze your data in Google Cloud to take advantage of specialized ML tools. - Snowflake abstracts the complexity of managing these environments and provides a unified experience.
Snowflake can scale elastically to meet your demands. If you’re running a massive data query, you can temporarily scale up your compute resources and scale them back down when the query completes. This auto-scaling happens on the fly, ensuring high performance even under heavy workloads.
For businesses, this translates to cost efficiency and operational flexibility: - No need to maintain oversized infrastructure for peak loads. - Pay only for the compute resources you use, when you use them.
One of Snowflake’s most innovative features is its “zero-copy cloning.” This allows you to create a copy of a database without duplicating the underlying data. Imagine needing to run analytics on production data but not wanting to interfere with live operations. With zero-copy cloning, you can spin up a clone of your database in seconds, run your queries, and discard it—all without consuming additional storage.
Unlike traditional databases that struggle with semi-structured data formats like JSON, Avro, or Parquet, Snowflake handles them seamlessly. Using its “VARIANT” data type, Snowflake can ingest semi-structured data and make it queryable with SQL—no need for complex transformations.
This makes it ideal for modern businesses that deal with mixed data formats, such as: - Log data from web servers (semi-structured) - Transactional data from databases (structured)
Snowflake’s design philosophy—decoupling compute and storage, supporting multi-cloud environments, and enabling seamless scalability—sets it apart from traditional data warehouses. Add in features like zero-copy cloning, seamless handling of semi-structured data, and real-time data sharing, and you have a platform that’s not only efficient but also innovative. So, next time someone asks you why Snowflake is so special, you’ll know exactly what to say. Alright, alright…but how do we actually handle the snowflake (upload/download files) using our favorite programming languange python? This is what I will show you today using a simple wrapper class.
Handle the snowflake from python
Now we can upload or download data from the snowflake however we want. Bear in mind that we currently only support upload of csv files. Similarly, downloading tables from the cloud will save them as csv files. Given that we can also upload semi-structured data, I’d like to see whether we can also upload parquet or feather files, which would drastically enhance the usability.
# Parameters for file upload
= "/Users/adaml9/Private/kaggle/playground-series/s4e11/train.csv"
local_csv_path = "/Users/adaml9/Private/snowflake/upload/config.json"
config_path = "@~"
stage_name = "TRAIN"
table_name
# Initialize and execute
= SnowflakeHandler(
handler =local_csv_path,
local_file_path=table_name,
table_name=stage_name,
stage_name=config_path
config_path
)
handler.upload_table() handler.close_connection()
# Parameters for file upload
= "/Users/adaml9/Private/kaggle/playground-series/s4e11/train2.csv"
local_csv_path = "/Users/adaml9/Private/snowflake/upload/config.json"
config_path = "@~"
stage_name = "TRAIN"
table_name
# Initialize and execute
= SnowflakeHandler(
handler =local_csv_path,
local_file_path=table_name,
table_name=stage_name,
stage_name=config_path
config_path
)
handler.download_table() handler.close_connection()