Azure DevOps Pipelines for Snowflake

One of the best resources I’ve found that details the setup of a DevOps pipeline to build/release changes for a Snowflake database:

https://jeremiahhansen.medium.com/building-snowflake-ci-cd-pipelines-with-azure-devops-and-snowchange-c9fe19ea7d67

More info on schemachange available here: https://github.com/Snowflake-Labs/schemachange

My Python script(modified from a source to allow spaces in object names by enclosing them within quotes) to split Snowflake objects: tables/views/procedures etc into separate .sql files can get you started on the initial commit to the Azure Repo.

Until I figure out how to setup a download txt file link, apparently not easy in WordPress… This is the zip file with the text file:

or here’s the full text –>

import os
import shutil
import subprocess
import time
from datetime import datetime
import snowflake.connector
import logging

# Run this on Py 3.7 shell: Sharma 5/5/2023
# Information for Snowflake
SNOWSQL_ACCOUNT = "youracct.us-east-1"
SNOWSQL_USER = "sharma.vaibhav@#####.com"
SNOWSQL_PASSWORD = "###"
SNOWSQL_WAREHOUSE = "DEMO_WH"
SNOWSQL_SCHEMA = "public"
SNOWSQL_ROLE = "ACCOUNTADMIN"

# Parameter
SNOWSQL_DATABASE = "###"
BASE_PATH = "/docs/snow"

OBJECT_LIST_QUERY ="""\
SELECT seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script
FROM (
    select '01' seq, DATABASE_NAME catalog_name, '*' schema_name, 'DATABASE' object_type, DATABASE_NAME object_name, '' ARGUMENT_SIGNATURE
           , 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
           'DATABASE ' || D.DATABASE_NAME || ' ' ||
           '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
           CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';' script
      from INFORMATION_SCHEMA.DATABASES D
      WHERE DATABASE_NAME = '{}' 
      UNION ALL
    select '02' seq, catalog_name, schema_name schema_name, 'SCHEMA' object_type, schema_name object_name, '' ARGUMENT_SIGNATURE
           , 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
           'SCHEMA "' || S.SCHEMA_NAME || '" ' ||
           CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' ||
           '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
           CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';' script
      from INFORMATION_SCHEMA.SCHEMATA S
      UNION ALL
    select '03' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'TABLE' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
      from INFORMATION_SCHEMA.TABLES
      WHERE TABLE_TYPE != 'VIEW'
      UNION ALL
    select '04' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'VIEW' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
      from INFORMATION_SCHEMA.VIEWS 
      WHERE TABLE_SCHEMA != 'INFORMATION_SCHEMA'
      UNION ALL
    select '06' seq, SEQUENCE_catalog catalog_name, SEQUENCE_SCHEMA schema_name, 'SEQUENCE' object_type, SEQUENCE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
      from INFORMATION_SCHEMA.SEQUENCES
      UNION ALL
    select '07' seq, FILE_FORMAT_catalog catalog_name, FILE_FORMAT_SCHEMA schema_name, 'FILE_FORMAT' object_type, FILE_FORMAT_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
      from INFORMATION_SCHEMA.FILE_FORMATS
      UNION ALL
    select '08' seq, PIPE_catalog catalog_name, PIPE_SCHEMA schema_name, 'PIPE' object_type, PIPE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
      from INFORMATION_SCHEMA.PIPES 
      UNION ALL
    select '09' seq, FUNCTION_catalog catalog_name, FUNCTION_SCHEMA schema_name, 'FUNCTION' object_type, FUNCTION_NAME object_name, ARGUMENT_SIGNATURE, '' script
      from INFORMATION_SCHEMA.FUNCTIONS 
      UNION ALL
    select '10' seq, PROCEDURE_catalog catalog_name, PROCEDURE_SCHEMA schema_name, 'PROCEDURE' object_type, PROCEDURE_NAME object_name, ARGUMENT_SIGNATURE, '' script
      from "INFORMATION_SCHEMA"."PROCEDURES"
  ) T
ORDER BY seq, catalog_name, schema_name, object_name"""

DDL_DATABASE_QUERY = """\
SELECT 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
       'DATABASE ' || D.DATABASE_NAME || ' ' ||
       '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
       CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';'
FROM INFORMATION_SCHEMA.DATABASES D
WHERE D.DATABASE_NAME = '{}'"""

DDL_SCHEMA_QUERY = """\
SELECT 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
       'SCHEMA "' || S.SCHEMA_NAME || '" ' ||
       CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' ||
       '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
       CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';'
FROM INFORMATION_SCHEMA.SCHEMATA S"""

GET_DDL_QUERY = "SELECT GET_DDL('{}','{}') script"


ALL_PERMISSION_QUERY = """\
SELECT CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
       CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME OBJECT_NAME,
       REPLACE(REPLACE(REPLACE(TO_VARCHAR(ARRAY_AGG('GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' ||
       CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
       CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || '##' || A.OBJECT_NAME || '##' ||
       ' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END ||
       ' ' || A.GRANTEE || ';')),'["',''),'"]',''),';","',';\n') GRANTED
FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A
GROUP BY CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
         CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME"""

GET_DDL_WITH_PERMISSION_QUERY = """\
SELECT GET_DDL('{}','{}') script
UNION ALL
SELECT '\nGRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' ||
       CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
       CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME ||
       ' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END ||
       ' ' || A.GRANTEE || ';' GRANTED
FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A
WHERE 'True' = '{}'
AND A.OBJECT_SCHEMA = '{}' 
AND A.OBJECT_NAME = '{}'"""

# STREAM - SHOW - GET_DDL
STREAM_LIST_QUERY = "SHOW STREAMS IN DATABASE {}"

# TASK - SHOW
TASK_LIST_QUERY = "SHOW TASKS IN DATABASE {}"
""" 'CREATE OR REPLACE TASK ' || schema_name || '.' || name ||
'/n  WAREHOUSE = ' || wareshouse ||
CASE WHEN schedule IS NULL THEN '' ELSE '/n  SCHEDULE = ''' || schedule || '''' END ||
CASE WHEN predecessor IS NULL THEN '' ELSE ' AFTER ' || predecessor END ||
-- Can't find column for the session_parameters
CASE WHEN comment IS NULL THEN '' ELSE '\nCOMMENT = ''' || comment || '''' END || ';'
CASE WHEN condition IS NULL THEN '' ELSE '/n  WHEN' || condition END ||
'/nAS' ||
'/n' || definition || ';'
"""

logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')
    
def ScriptOut(database, base_path, includePermission=True):
    now = datetime.now()

    ctx = snowflake.connector.connect(
        account=SNOWSQL_ACCOUNT,
        warehouse=SNOWSQL_WAREHOUSE,
        database=database,
        user=SNOWSQL_USER,
        password=SNOWSQL_PASSWORD,
        schema=SNOWSQL_SCHEMA,
        role=SNOWSQL_ROLE
        )
    cs = ctx.cursor()
    logging.info("Connected to Snowflake")

    try:
        # Change database      
        cs.execute("USE database {}".format(database))
        
        # Get permission at once
        if includePermission:
            logging.info("Get permission at once")
            permResults = cs.execute(ALL_PERMISSION_QUERY).fetchall()
            perms = {}
            for rec in permResults:
                perms[rec[0]] = rec[1]
            
        # Get object list using information_schema
        results = cs.execute(OBJECT_LIST_QUERY.format(database)).fetchall()
        ddlStmt = ""
        for rec in results:
            #seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script
            logging.info('processing {}, {}'.format(rec[3], rec[4]))
            if rec[3] == 'DATABASE':            
                outFile = os.path.join(BASE_PATH, rec[1], rec[4] + '.sql')
                ddlStmt = rec[6]
                if includePermission:
                    logging.info("get permission")
                    permStmt = perms.get(rec[4])                    
                    if permStmt:
                        logging.debug(permStmt)
                        ddlStmt += '\n\n' + permStmt
            elif rec[3] == 'SCHEMA':
                outFile = os.path.join(BASE_PATH, rec[1], rec[3]+'_DDL', rec[4] + '.sql')
                ddlStmt = rec[6]
                # if includePermission:
                #     logging.info("get permission")
                #     rslt = cs.execute(OBJECT_TYPE_PERMISSION_QUERY.format(rec[3], rec[4])).fetchone() #object_type, object_name
                #     if rslt is not None:
                #         ddlStmt += '\n\n' + rslt[0]   
                if includePermission:
                    logging.info("get permission")
                    permStmt = perms.get(rec[1] + '.' + rec[4])                    
                  
                    if permStmt:
                        logging.debug(permStmt)
                        ddlStmt += '\n\n' + permStmt
            else:
                if rec[3] in ('FUNCTION', 'PROCEDURE'):
                    #need to provide ARGUMENT_SIGNATURE only with data_type
                    argumentSignature = str(rec[5])
                    if argumentSignature != "()":
                        args = argumentSignature.replace("(","").replace(")","").split(",")                    
                        #print(args)
                        newargs = []
                        for arg in args:
                            newargs.append(arg.strip().split()[1])
                        argumentSignature = '(' + ', '.join(newargs) + ')'
                    # object_type, schema_name.object_name(ARGUMENT_SIGNATURE)
                    objectType = rec[3]
                    objectName =  SNOWSQL_DATABASE +'.'+ rec[2] + '.' + rec[4]  + argumentSignature 

                    outFile = os.path.join(BASE_PATH, rec[1],rec[2], rec[3]+'S', rec[2] + '.' + rec[4] + '.sql')
                    rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone()
                    #print(rslt[0])
                    ddlStmt = rslt[0]
                    #with open(outFile, "w") as f:
                     #   f.write(rec[6].replace("##","\""))
                    
                else:
                    # object_type, schema_name.object_name
                    # Edited - Sharma - to add quotes for objects with spaces in name but .sql file without quotes.
                    objectType = rec[3]
                    objectName = '"'+rec[2] +'"' + '.' +'"'+ rec[4] +'"'                   
                    outFile = os.path.join(BASE_PATH,rec[1],rec[2],rec[3]+'S', objectName.replace("\"","") + '.sql')

                    rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone()
                    #print(rslt[0])
                    ddlStmt = rslt[0]
                    if includePermission:
                        logging.info("get permission")
                        permStmt = perms.get(rec[1] + '.' + objectName)                    
                        if permStmt:
                            logging.debug(permStmt)
                            ddlStmt += '\n\n' + permStmt
            # Write it to a file
             logging.info("write to a file: {}".format(outFile))
             os.makedirs(os.path.dirname(outFile), exist_ok=True)
             logging.info("directory: {}".format(os.path.dirname(outFile)))
             with open(outFile, "w") as f:
                f.write(ddlStmt.replace("##","\""))
                ddlStmt=""
        

        # STREAM - SHOW - GET_DDL 
        # created_on, name, database_name, schema_name
        logging.info("stream")
        stremResults = cs.execute(STREAM_LIST_QUERY.format(database)).fetchall()
        for rec in stremResults:
            objectType = 'STREAM'
            objectName = rec[3] + '.' + rec[1] 
            outFile = os.path.join(BASE_PATH, rec[2], 'STREAM', objectName + '.sql')
            rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone()
            ddlStmt = rslt[0]
            if includePermission:
                logging.info("get permission")
                permStmt = perms.get(rec[2] + '.' + objectName)                    
                if permStmt:
                    logging.debug(permStmt)
                    ddlStmt += '\n\n' + permStmt
            logging.info("write to a file")
            os.makedirs(os.path.dirname(outFile), exist_ok=True)
            with open(outFile, "w") as f:
                f.write(ddlStmt)

        # TASK - SHOW
        # created_on,name,database_name,schema_name,owner,comment,warehouse,schedule,predecessor,state,definition,condition
        logging.info("taskResults")
        taskResults = cs.execute(TASK_LIST_QUERY.format(database)).fetchall()        
        for rec in taskResults:
            logging.info('processing {}, {}'.format(rec[3], rec[1]))#schema_name, name
            objectType = 'TASK'
            objectName = rec[3] + '.' + rec[1] 
            ddlStmt = 'CREATE OR REPLACE TASK ' +  rec[3] + '.' + rec[1] + '\n' + '  WAREHOUSE = ' + rec[6]    
            if rec[7] is not None: #schedule
                ddlStmt += "\n  SCHEDULE = '" + rec[7] + "'"
            if rec[8] is not None: #predecessor
                ddlStmt += "\n  AFTER " + rec[8]
            if rec[5] is not None: #comment
                ddlStmt += "\n  COMMENT = '" + rec[5] + "'"
            if rec[11] is not None: #condition
                ddlStmt += "\n  WHEN " + rec[11]
            ddlStmt += '\nAS\n' + rec[10] + ';' #definition
            outFile = os.path.join(BASE_PATH, rec[2], objectType, objectName + '.sql')
            if includePermission:
                logging.info("get permission")
                permStmt = perms.get(rec[2] + '.' + objectName)                    
                if permStmt:
                    logging.debug(permStmt)
                    ddlStmt += '\n\n' + permStmt
            logging.info("write to a file")        
            os.makedirs(os.path.dirname(outFile), exist_ok=True)
            with open(outFile, "w") as f:
                f.write(ddlStmt)

    except snowflake.connector.errors.ProgrammingError as e:
        # default error message
        logging.error(e)        
    finally:
        cs.close()
    ctx.close()

def main():
    start_time = time.time()
    ScriptOut(SNOWSQL_DATABASE, BASE_PATH, True)    
    elapsed_time = time.time() - start_time
    logging.info("Completed after: {}".format(str(elapsed_time)))

if __name__ == '__main__':
    main()

By:

Posted in:


Leave a comment