Question

ThreadPool.RegisterWaitForSingleObject leaks RegisteredWaitHandle objects (and memory) over time

I build an extension method for WaitHandles (especially ManualResetEventSlim) to make them usable in async code. Usage looks like this:

public class WaitHandleExampleClass : IDisposable
{
    private readonly ManualResetEventSlim _mre;
    public WaitHandleExampleClass()
    {
        _mre = new ManualResetEventSlim(false);
    }

    public async Task WaitForHandle()
    {
        using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
        {
            var cancelled = await _mre.WaitHandle.WaitForSignalOrCancelAsync(cts.Token)
.ConfigureAwait(false);
            
            if (cancelled)
            {
                DoCancelAction();
                return;
            }

            Proceed();
            // later, _mre.Reset() is called
        }
    }

    public void TriggerWaitHandleFromOtherThread()
    {
        _mre.Set();
    }

    public void Dispose()
    {
        _mre.Dispose();
    }
}

My Extension method WaitForSignalOrCancelAsync() looks like this:

public static Task<bool> WaitForSignalOrCancelAsync(this WaitHandle waitHandle, CancellationToken ct)
{
    if (waitHandle == null)
        throw new ArgumentNullException(nameof(waitHandle));
    
    var tcs = new TaskCompletionSource<bool>();
    var task = tcs.Task;
    var container = new CleanupContainer();
    // dont pass ct to ContinueWith - otherwise rwh is not unregistered (even with TaskContinuationOptions.None)
    task.ContinueWith(TaskContinueWith, container, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

    container.Ctr = ct.Register(SetCancel, tcs, false);
    container.Rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle, RwhCallback, tcs, -1, true);
    
    return task;
}
private static void SetCancel(object state) => ((TaskCompletionSource<bool>)state).TrySetResult(true);

private static void RwhCallback(object state, bool o) => ((TaskCompletionSource<bool>)state).TrySetResult(false);

private static void TaskContinueWith(Task t, object state)
{
    try
    {
        ((CleanupContainer)state).Cleanup();            
    }
    catch (Exception e)
    {
        DebugAssert.ShouldNotBeCalled();
    }
}

private class CleanupContainer
{
    public RegisteredWaitHandle Rwh { get; set; }
    public CancellationTokenRegistration Ctr { get; set; }

    public void Cleanup()
    {
        // so this just waits until the condition is true (returns true) or timeout is reached (returns false)
        var conditionOk = SpinWait.WaitFor(() => Rwh != null && Ctr != default, timeoutMs: 200);
        DebugAssert.MustBeTrue(conditionOk); // always true in debug -> Rwh and Ctr are set

        var rwhUnregisterOkay = Rwh?.Unregister(null);
        DebugAssert.MustBeTrue(rwhUnregisterOkay == true); // also always true
        Ctr.Dispose();

        Rwh = null;
        Ctr = default;
    }
}

Atfer deployment and running this code as Windows service for multiple hours, i can see that memory usage increases. Also CPU usages increases but i am not sure if this is related. So i attached dotmemory. Over time, i get more and more objects of:

  • TaskCompletionSource<bool> -> 241,890
  • CancellationCallbackInfo-> 241,984
  • ThreadPoolWaitOrTimerCallback -> 241,890

Dotmemory screenshot

It does not help if i force GC with dotmemory

I guess this is related to the extension method above, but i dont get why. I took this MSDN code as base ("From Wait Handles to TAP"): https://learn.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/interop-with-other-asynchronous-patterns-and-types#see-also

I also wrote my first unit test with dotmemory integration, not sure if its correct or helpful. But with that i know that Rwh.Unregister() makes a different as the object count different if its commented out in the Cleanup method. Its red because registeredWaitHandles.ObjectsCount == 1, not 0

[TestMethod]
[DotMemoryUnit(FailIfRunWithoutSupport = false)]
public async Task WaitForSignalOrCancelAsync_ResourcesAreReleased()
{
using (var waitHandle = new ManualResetEventSlim(false))
{
var cancellationTokenSource = new CancellationTokenSource();
var task = waitHandle.WaitHandle.WaitForSignalOrCancelAsync(cancellationTokenSource.Token);

        cancellationTokenSource.Cancel();
        await task;
        
        for (var i = 0; i < 10; i++)
        {
            GC.Collect();
            GC.WaitForPendingFinalizers();
        }
        
        Thread.Sleep(200);
    
        for (var i = 0; i < 10; i++)
        {
            GC.Collect();
            GC.WaitForPendingFinalizers();
        }
        
        dotMemory.Check(memory =>
        {
            var registeredWaitHandles = memory.GetObjects(where => where.Type.Is<RegisteredWaitHandle>());
            // if i comment out the Rwh.Unregister() in CleanupContainer.Cleanup() i get 2 as count here
            // with Rwh.Unregister(), i get 1 here
            registeredWaitHandles.ObjectsCount.MustBeEqualTo(0, "rwh > 1");
    
            var cancellationTokenRegistrations = memory.GetObjects(where => where.Type.Is<CancellationTokenRegistration>());
            cancellationTokenRegistrations.ObjectsCount.MustBeEqualTo(0, "ctr > 0");
        });
    }

}

Edit: I tried the solution from Ivan (stackoverflow link) but i run into the same problem. My unit test is still red with 2 remaining references I briefly checked it on the server, after 20 minutes i see increasing new objects of the 3 types (TaskCompletionSource, ...)

I quickly tried AsyncEx, but with that my unit test is also red with that

Code i tried:

public static async Task<bool> WaitForSignalOrCancelAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
{
    try
    {
        await waitHandle.WaitOneAsync(cancellationToken, timeoutMilliseconds).ConfigureAwait(false);
        return false; // no cancel
    }
    catch (OperationCanceledException)
    {
        return true; // cancel
    }
}

private static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
{
    if (waitHandle == null)
        throw new ArgumentNullException(nameof(waitHandle));

    TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
    CancellationTokenRegistration ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
    TimeSpan timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;

    RegisteredWaitHandle rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
        (_, timedOut) =>
        {
            if (timedOut)
            {
                tcs.TrySetCanceled();
            }
            else
            {
                tcs.TrySetResult(true);
            }
        },
        null, timeout, true);

    Task<bool> task = tcs.Task;

    _ = task.ContinueWith(_ =>
    {
        var ok = rwh.Unregister(null);
        var ok2 = rwh.Unregister(waitHandle);
        ctr.Dispose();
    }, CancellationToken.None);

    return task;
}
 3  59  3
1 Jan 1970

Solution

 2

Try a bit different approach - do not rely on task.ContinueWith.

Also SpinWait.WaitFor in your code is a bit odd, the need for it shows that you have race condition issues.

Try the following code:

public static class WaitHandleExtensions
{
    public static Task<bool> WaitForSignalOrCancelAsync(this WaitHandle waitHandle, CancellationToken ct)
    {
        ArgumentNullException.ThrowIfNull(waitHandle);

        if (ct.IsCancellationRequested)
            return Task.FromResult(true);

        if (waitHandle.WaitOne(0))
            return Task.FromResult(false);

        var op = new WaitForSignalOrCancelOperation(waitHandle, ct);
        return op.Task;
    }

    private sealed class WaitForSignalOrCancelOperation
    {
        private const int State_Awaiting = 0, State_Completed = 1;
        private static readonly WaitOrTimerCallback _onWaitCompleteCallback;
        private static readonly Action<object?> _onCancelCallback;
        private readonly TaskCompletionSource<bool> _tcs = new();
        private volatile int _state = State_Awaiting;
        private CancellationTokenRegistration _ctr;
        private RegisteredWaitHandle? _rwh;

        public Task<bool> Task
        {
            get => _tcs.Task;
        }

        static WaitForSignalOrCancelOperation()
        {
            _onWaitCompleteCallback = static (object? state, bool timedOut) =>
            {
                var self = (WaitForSignalOrCancelOperation)state!;
                self.HandleWaitComplete();
            };

            _onCancelCallback = static (object? state) =>
            {
                var self = (WaitForSignalOrCancelOperation)state!;
                self.HandleCanceled();
            };
        }

        public WaitForSignalOrCancelOperation(WaitHandle waitHandle, CancellationToken ct)
        {
            if (ct.CanBeCanceled)
            {
                _ctr = ct.Register(_onCancelCallback, this, useSynchronizationContext: false);

                if (_state != State_Awaiting)
                {
                    // CancellationToken is already canceled.
                    // _onCancelCallback has been called already, the result is already set.
                    UnregisterAll();
                    return;
                }
            }

            _rwh = ThreadPool.RegisterWaitForSingleObject(
                waitHandle,
                _onWaitCompleteCallback,
                state: this,
                Timeout.Infinite,
                executeOnlyOnce: true);

            if (_state != State_Awaiting && _rwh != null)
            {
                // waitHandle is already signaled.
                // _onWaitCompleteCallback has been called already, the result is already set.
                UnregisterAll();
            }
        }

        private void HandleWaitComplete()
        {
            // Check if already called and mark as called if not.
            if (Interlocked.Exchange(ref _state, State_Completed) == State_Awaiting)
            {
                UnregisterAll();
                _tcs.TrySetResult(false);
            }
        }

        private void HandleCanceled()
        {
            // Check if already called and mark as called if not.
            if (Interlocked.Exchange(ref _state, State_Completed) == State_Awaiting)
            {
                UnregisterAll();
                _tcs.TrySetResult(true);
            }
        }

        private void UnregisterAll()
        {
            // Unregister and clear CancellationTokenRegistration
            try
            {
                _ctr.Unregister();
                _ctr = default;
            }
            catch (ObjectDisposedException)
            {
                // Ignore.
            }
            catch (Exception)
            {
                // Ignore or log, whatever...
            }

            // Unregister and clear RegisteredWaitHandle
            if (_rwh != null)
            {
                try
                {
                    _rwh.Unregister(null);
                    _rwh = null;
                }
                catch (ObjectDisposedException)
                {
                    // Ignore.
                }
                catch (Exception)
                {
                    // Ignore or log, whatever...
                }
            }
        }
    }
}

Pay attention that testing framework may also use RegisteredWaitHandle by itself, so registeredWaitHandles.ObjectsCount may be grater than zero even if you don't use WaitForSignalOrCancelAsync at all.

2024-07-14
Sinus32