Question

Unable to understand why my program is using too much RAM

EDIT2: TL;DR Version

The following piece of code is causing huge memory burden. Library: Paruqet.NET

using (ParquetReader reader = await ParquetReader.CreateAsync(filePath, null))
{
    string tableName = GetTableName(filePath);
    Table parquetTable = await reader.ReadAsTableAsync(); //Main culprit for huge RAM consumption
    DataTable dataTable = new DataTable();

    string sql = $"CREATE TABLE {tableName} (";
    foreach(Field field in parquetTable.Schema.Fields)
    {

        DataField? ptField = field as DataField;
        string columnName = ptField.Name;
        Type columnType = ptField.ClrType;
        dataTable.Columns.Add(columnName, columnType);
        sql += $"[{columnName}] {GetSqlDataType(columnType, field)},";
    }

I am currently looking for any experts who can tell how I can read a paruqet file without overburdening RAM and dump the parquet data to SQL.

Complete Story

To give an overview of my project, I am writing a C# program that reads Parquet files, copies to a DataTable and then makes an SQL connection and using SqlBulkCopy dumps the data to an SQL server (SQL Server 2019, localdb located on the same machine).

I am using paralling processing, but I have to mention that I am new to C# as well parallel computing. Most of the code I have built here was by using ChatGPT and Googling.

Now, my program is going to read a directory, gather all files with extension ".parquet" and store them in a string array.

string[] fileList = GetParquetFiles(activeDirectory[0]);

These files will be read in parallel and I am using SemaphoreSlim to limit the number of active parallel threads.

public static async Task ProcessParquetFileAsync(string[] fileList, string databaseName)
{
    int numberOfConcurrentFiles = 2;
    using (SemaphoreSlim semaphore = new SemaphoreSlim(numberOfConcurrentFiles))
    {
        List<Task> tasks = new List<Task>();
        foreach (var file in fileList)
        {
            await semaphore.WaitAsync();
            tasks.Add(Task.Run(async () =>
            {
                try
                {
                    await ReadAndDeployParquetFile(file, databaseName);
                }
                finally
                {
                    semaphore.Release();
                }
            }));
        }
        await Task.WhenAll(tasks);
    }
    
}

Let's take a flow of 1 such thread. Inside this thread, I am reading the whole Parquet file as a Table (I am using Parquet.NET library to read).

In each thread, I am reading the ParquetTable completely and copying the schema to a DataTable (Just the schema, no data).

Next, I am calculating batchSize to split and read the ParquetTable into "chunks". These chunks of data are again processed parallelly using SemaphoreSlim

public static async Task ReadAndDeployParquetFile(string filePath, string databasename)
{
    using (ParquetReader reader = await ParquetReader.CreateAsync(filePath, null))
    {
        string tableName = GetTableName(filePath);
        Table parquetTable = await reader.ReadAsTableAsync();
        DataTable dataTable = new DataTable();

        string sql = $"CREATE TABLE {tableName} (";
        foreach(Field field in parquetTable.Schema.Fields)
        {

            DataField? ptField = field as DataField;
            string columnName = ptField.Name;
            Type columnType = ptField.ClrType;
            dataTable.Columns.Add(columnName, columnType);
            sql += $"[{columnName}] {GetSqlDataType(columnType, field)},";
        }
        sql = sql.Trim(',') + ')';
        SQLConnection conn = new SQLConnection();
        conn.ExecuteSqlCommand(sql, tableName, databasename);

        int rowCount = parquetTable.Count;
        int batchSize = 100000;
        decimal parts = Decimal.Ceiling((decimal)rowCount / (decimal)batchSize);
        
        SemaphoreSlim semaphore = new SemaphoreSlim(Environment.ProcessorCount);
        List<Task> tasks = new List<Task>();
        Console.WriteLine($"File {tableName} has total batch {(int)parts}");
        for (int i= 0; i < (int)parts; i++)
        {
            await semaphore.WaitAsync();
            int currentPart = i;
            tasks.Add(Task.Run (() =>
            {
                try
                {
                    ProcessBatch(parquetTable, dataTable.Clone(), currentPart, batchSize, tableName, databasename);
                }
                finally
                {
                    semaphore.Release();
                }
            }));
        }
        await Task.WhenAll(tasks);
        
        
    }
}

Finally, it is added row-by-row into a new DataTable called partTable that each sub thread is given (The schema of main DataTable is cloned and sent across).

public static void ProcessBatch(Table parquetTable, DataTable partTable, int currentPart, int batchSize, string tableName, string databaseName)
{
    SQLConnection conn = new SQLConnection();
    int columnCount = parquetTable.Schema.Fields.Count;
    for (int i = currentPart * batchSize; (i < ((currentPart + 1) * batchSize)) && (i < parquetTable.Count); i++)
    {
        var row = parquetTable[i];
        var dataRow = partTable.NewRow();
        for (int j = 0; j < columnCount; j++)
        {
            if (row[j] != null)
            {
                dataRow[j] = row[j] ?? DBNull.Value;
            }
        }
        partTable.Rows.Add(dataRow);
    }
    conn.InsertTable(tableName, partTable, databaseName, currentPart);
    partTable.Dispose();
}

Now the issue is, there is a parquet file which as 2 million rows. The chunk size I have given is 100k, so now it would make 10 batches and run them in parallel but keep only 8 threads active at a time (Environment.ProcessorCount is 8 in my PC) and run the remaining 2 when any of the 8 frees up (correct me if I am wrong here).

The file itself is 24MB, but the RAM usage is shooting up to 3GB! How? My understanding of how the program works is When 1 sub-thread is done, it should free up all it's memory. But it appears as if this is not happening.

I used dotMemory application to check the memory usage and the RAM consumption keeps going UP and never comes down at any point. enter image description here

Can anyone help me understand why the memory is not clearing up after the sub-thread job is done and also help me fix the code to reduce RAM usage? Again, I am very new to C# and even more new to parallel computing, so please go easy on explanation.

EDIT: Fixed batchSize variable number, had wrongly set as 10k instead of 100k.

 3  122  3
1 Jan 1970

Solution

 1

I used dotMemory

That's the only pertinent thing in your entire question, using a memory profiler to fix your problems. There's also one built into VS itself, you can find it in Debug > Performance Profile > Memory Profile.

However you didn't focus on the important part: what's rooting your objects. Rooting in .Net means objects that are directly stored as fields in classes that are forced to never be cleaned up. Their fields and their fields' fields and so on recursively keeps everything alive for as long as needed.

So if you ever have a question as to what's keeping things in memory (which is your entire question), follow that instance's references up to its root and you'll find out exactly what's not getting cleaned up and why.

2024-06-29
Blindy

Solution

 0

There some metrics that you should consider:

  1. use Release and publish in order to measure your RAM usage. 2)use string builder instead of string[] for big string arrays and when you want add sth.
  2. set null for values after you do not need them, this will help GC.
  3. In some case that you use event handler and delegates ensure that when you do not want to work with them unregister them or check that do not register that event multiple times.
  4. use using for classes which inherited IDisposable
2024-06-28
Behtash