Question
Unable to insert array and map data into the databricks table programmatically in python
Trying to insert array and map (dictionary/json) data into Azure databricks table using both SQLAlchemy as well as databricks-sql-connector.
getting differenet error
from sqlalchemy import create_engine, Column, String, text, Integer, JSON, BigInteger, Identity, ARRAY
from sqlalchemy.orm import sessionmaker, declarative_base
payload = {
"schema_name": "catalog_schema",
"table_name": "policy11",
"column_names": ["CustomerId", "FirstName", "LastName", "Email", "DOB", "Gender", "AnnualIncome", "StreetAddress", "City", "State", "Country", "Zip", "CreatedDate", "UpdatedDate"],
"choice": {"AnomalyDetection": "1", "BusinessContext": "1", "DQRules": "1", "Standardization": "0"},
"standardization": []
}
Base = declarative_base()
class DataQuality(Base):
__tablename__ = 'data_quality'
id = Column(BigInteger, Identity(start=1, increment=1), primary_key=True)
schema_name = Column(String, nullable=False)
table_name = Column(String, nullable=False)
column_names = Column(ARRAY(String))
choice = Column(JSON())
standardization = Column(ARRAY(String))
engine = create_engine(
url = f"databricks://token:{access_token}@{server_hostname}?" +
f"http_path={http_path}&catalog={catalog}&schema={schema}"
)
Session = sessionmaker(bind=engine)
session = Session()
def insert_data(session, data):
try:
result = {}
result["schema_name"]=data["schema_name"]
result["table_name"]=data["table_name"]
result["column_names"]=data["column_names"]
result["choice"]=str(data["choice"])
result["standardization"]=data.get("standardization", [])
# Add and commit the new record
dq_config = DataQuality(**result)
session.add(dq_config)
session.commit()
except Exception as e:
session.rollback()
print(e)
finally:
session.close()
insert_data(session, payload)
Getting following error: (builtins.AttributeError) 'DatabricksDialect' object has no attribute '_json_serializer'
Also tried using directly using databricks-sql-connector
connection = sql.connect(server_hostname = server_hostname,
http_path = http_path,
access_token = access_token)
cursor = connection.cursor()
cursor.execute("INSERT INTO dq_config_driven_execution.dq_configuration_test.data_quality (schema_name, table_name, column_names, choice) VALUES ('schema_name', 'table_name', ['CustomerId', 'FirstName'], {'AnomalyDetection': '1'})")
cursor.close()
connection.close()
Getting following error: databricks.sql.exc.ServerOperationError: [PARSE_SYNTAX_ERROR] Syntax error at or near '['. SQLSTATE: 42601 (line 1, pos 159)`