Question
Unable to understand why my program is using too much RAM
EDIT2: TL;DR Version
The following piece of code is causing huge memory burden. Library: Paruqet.NET
using (ParquetReader reader = await ParquetReader.CreateAsync(filePath, null))
{
string tableName = GetTableName(filePath);
Table parquetTable = await reader.ReadAsTableAsync(); //Main culprit for huge RAM consumption
DataTable dataTable = new DataTable();
string sql = $"CREATE TABLE {tableName} (";
foreach(Field field in parquetTable.Schema.Fields)
{
DataField? ptField = field as DataField;
string columnName = ptField.Name;
Type columnType = ptField.ClrType;
dataTable.Columns.Add(columnName, columnType);
sql += $"[{columnName}] {GetSqlDataType(columnType, field)},";
}
I am currently looking for any experts who can tell how I can read a paruqet file without overburdening RAM and dump the parquet data to SQL.
Complete Story
To give an overview of my project, I am writing a C# program that reads Parquet files, copies to a DataTable and then makes an SQL connection and using SqlBulkCopy dumps the data to an SQL server (SQL Server 2019, localdb located on the same machine).
I am using paralling processing, but I have to mention that I am new to C# as well parallel computing. Most of the code I have built here was by using ChatGPT and Googling.
Now, my program is going to read a directory, gather all files with extension ".parquet" and store them in a string array.
string[] fileList = GetParquetFiles(activeDirectory[0]);
These files will be read in parallel and I am using SemaphoreSlim to limit the number of active parallel threads.
public static async Task ProcessParquetFileAsync(string[] fileList, string databaseName)
{
int numberOfConcurrentFiles = 2;
using (SemaphoreSlim semaphore = new SemaphoreSlim(numberOfConcurrentFiles))
{
List<Task> tasks = new List<Task>();
foreach (var file in fileList)
{
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
await ReadAndDeployParquetFile(file, databaseName);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
}
}
Let's take a flow of 1 such thread. Inside this thread, I am reading the whole Parquet file as a Table (I am using Parquet.NET library to read).
In each thread, I am reading the ParquetTable completely and copying the schema to a DataTable (Just the schema, no data).
Next, I am calculating batchSize to split and read the ParquetTable into "chunks". These chunks of data are again processed parallelly using SemaphoreSlim
public static async Task ReadAndDeployParquetFile(string filePath, string databasename)
{
using (ParquetReader reader = await ParquetReader.CreateAsync(filePath, null))
{
string tableName = GetTableName(filePath);
Table parquetTable = await reader.ReadAsTableAsync();
DataTable dataTable = new DataTable();
string sql = $"CREATE TABLE {tableName} (";
foreach(Field field in parquetTable.Schema.Fields)
{
DataField? ptField = field as DataField;
string columnName = ptField.Name;
Type columnType = ptField.ClrType;
dataTable.Columns.Add(columnName, columnType);
sql += $"[{columnName}] {GetSqlDataType(columnType, field)},";
}
sql = sql.Trim(',') + ')';
SQLConnection conn = new SQLConnection();
conn.ExecuteSqlCommand(sql, tableName, databasename);
int rowCount = parquetTable.Count;
int batchSize = 100000;
decimal parts = Decimal.Ceiling((decimal)rowCount / (decimal)batchSize);
SemaphoreSlim semaphore = new SemaphoreSlim(Environment.ProcessorCount);
List<Task> tasks = new List<Task>();
Console.WriteLine($"File {tableName} has total batch {(int)parts}");
for (int i= 0; i < (int)parts; i++)
{
await semaphore.WaitAsync();
int currentPart = i;
tasks.Add(Task.Run (() =>
{
try
{
ProcessBatch(parquetTable, dataTable.Clone(), currentPart, batchSize, tableName, databasename);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
}
}
Finally, it is added row-by-row into a new DataTable called partTable that each sub thread is given (The schema of main DataTable is cloned and sent across).
public static void ProcessBatch(Table parquetTable, DataTable partTable, int currentPart, int batchSize, string tableName, string databaseName)
{
SQLConnection conn = new SQLConnection();
int columnCount = parquetTable.Schema.Fields.Count;
for (int i = currentPart * batchSize; (i < ((currentPart + 1) * batchSize)) && (i < parquetTable.Count); i++)
{
var row = parquetTable[i];
var dataRow = partTable.NewRow();
for (int j = 0; j < columnCount; j++)
{
if (row[j] != null)
{
dataRow[j] = row[j] ?? DBNull.Value;
}
}
partTable.Rows.Add(dataRow);
}
conn.InsertTable(tableName, partTable, databaseName, currentPart);
partTable.Dispose();
}
Now the issue is, there is a parquet file which as 2 million rows. The chunk size I have given is 100k, so now it would make 10 batches and run them in parallel but keep only 8 threads active at a time (Environment.ProcessorCount is 8 in my PC) and run the remaining 2 when any of the 8 frees up (correct me if I am wrong here).
The file itself is 24MB, but the RAM usage is shooting up to 3GB! How? My understanding of how the program works is When 1 sub-thread is done, it should free up all it's memory. But it appears as if this is not happening.
I used dotMemory application to check the memory usage and the RAM consumption keeps going UP and never comes down at any point.
Can anyone help me understand why the memory is not clearing up after the sub-thread job is done and also help me fix the code to reduce RAM usage? Again, I am very new to C# and even more new to parallel computing, so please go easy on explanation.
EDIT: Fixed batchSize variable number, had wrongly set as 10k instead of 100k.