Question

Unable to insert array and map data into the databricks table programmatically in python

Trying to insert array and map (dictionary/json) data into Azure databricks table using both SQLAlchemy as well as databricks-sql-connector.

getting differenet error

from sqlalchemy import create_engine, Column, String, text, Integer, JSON, BigInteger, Identity, ARRAY
from sqlalchemy.orm import sessionmaker, declarative_base

payload = {
    "schema_name": "catalog_schema",
    "table_name": "policy11",
    "column_names": ["CustomerId", "FirstName", "LastName", "Email", "DOB", "Gender", "AnnualIncome", "StreetAddress", "City", "State", "Country", "Zip", "CreatedDate", "UpdatedDate"],
    "choice": {"AnomalyDetection": "1", "BusinessContext": "1", "DQRules": "1", "Standardization": "0"},
    "standardization": []
}

Base = declarative_base()

class DataQuality(Base):
    __tablename__ = 'data_quality'

    id = Column(BigInteger, Identity(start=1, increment=1), primary_key=True)
    schema_name = Column(String, nullable=False)
    table_name = Column(String, nullable=False)
    column_names = Column(ARRAY(String))
    choice = Column(JSON())
    standardization = Column(ARRAY(String))

engine = create_engine(
  url = f"databricks://token:{access_token}@{server_hostname}?" +
        f"http_path={http_path}&catalog={catalog}&schema={schema}"
)

Session = sessionmaker(bind=engine)
session = Session()

def insert_data(session, data):
    try:
        
        result = {}
        result["schema_name"]=data["schema_name"]
        result["table_name"]=data["table_name"]
        result["column_names"]=data["column_names"]
        result["choice"]=str(data["choice"])
        result["standardization"]=data.get("standardization", [])

        # Add and commit the new record
        dq_config = DataQuality(**result)
        session.add(dq_config)
        session.commit()

    except Exception as e:
        session.rollback()
        print(e)
        
    finally:
        session.close()

insert_data(session, payload)

Getting following error: (builtins.AttributeError) 'DatabricksDialect' object has no attribute '_json_serializer'

Also tried using directly using databricks-sql-connector

connection = sql.connect(server_hostname = server_hostname,
                 http_path       = http_path,
                 access_token    = access_token)
cursor = connection.cursor()


cursor.execute("INSERT INTO dq_config_driven_execution.dq_configuration_test.data_quality (schema_name, table_name, column_names, choice) VALUES ('schema_name', 'table_name', ['CustomerId', 'FirstName'], {'AnomalyDetection': '1'})")

cursor.close()
connection.close()

Getting following error: databricks.sql.exc.ServerOperationError: [PARSE_SYNTAX_ERROR] Syntax error at or near '['. SQLSTATE: 42601 (line 1, pos 159)`

 2  54  2
1 Jan 1970

Solution

 0

error: databricks.sql.exc.ServerOperationError: [PARSE_SYNTAX_ERROR] Syntax error at or near '['. SQLSTATE: 42601 (line 1, pos 159)

For the databricks sql connector the above error is casuing dure to incorrect syntax.

While inserting Array and Map data to the databricks sql table it should be within round braces. so, the query would look like below:

INSERT INTO tablename (schema_name, table_name, column_names, choice) VALUES ('schema_name', 'table_name', Array('CustomerId', 'FirstName'), Map('AnomalyDetection', '1'))"

Query executed successfully:

enter image description here

Output:

enter image description here

2024-07-24
Pratik Lad