Question

pyarrow ipc stream - how to communicate using it?

After reading pyarrow IPC docs, I had an impression that RecordBatchStreamReader will read stream until it is done (0-s in the end, that are written by closing stream writer).

But I see that reader stops very soon, and this is not what I expect to see.

The example below starts writing into file stream in one process, and after delay starts to read that file stream in another process. I expect reader to read all the batches that are written by writer, but it reads only first bunch and exits.

How can I make reader() to read everything that writer() wrote? The sink.flush() was added to check if it works - it doesn't, the code behaves the same with and without it.

import pyarrow as pa
import time
import multiprocessing as mp
import os

schema = pa.schema([("value", pa.int32())])

if os.path.exists("/tmp/stream.arrow"):
    os.remove("/tmp/stream.arrow")


def writer():
    print("writer started")
    with pa.OSFile("/tmp/stream.arrow", "wb") as sink:
        with pa.ipc.new_stream(sink, schema) as writer:
            for i in range(100):
                writer.write_batch(pa.RecordBatch.from_pylist([{"value": i}], schema))
                print(f"writer wrote {i}")
                time.sleep(0.25)
                sink.flush()
    print("writer done")


def reader():
    print("reader started")
    with pa.OSFile("/tmp/stream.arrow", "r") as source:
        with pa.ipc.open_stream(source) as reader:
            try:
                while True:
                    batch = reader.read_next_batch()
                    print(batch)
            except StopIteration:
                print("reader done")
                return


write_process = mp.Process(target=writer)
read_process = mp.Process(target=reader)

write_process.start()
time.sleep(2)
read_process.start()

write_process.join()
read_process.join()

example output

writer started
writer wrote 0
writer wrote 1
writer wrote 2
writer wrote 3
reader started
pyarrow.RecordBatch
value: int32
----
value: [0]
pyarrow.RecordBatch
value: int32
----
value: [1]
pyarrow.RecordBatch
value: int32
----
value: [2]
pyarrow.RecordBatch
value: int32
----
value: [3]
reader done
writer wrote 4
writer wrote 5
writer wrote 6
writer wrote 7
 2  51  2
1 Jan 1970

Solution

 0

When a PyArrow IPC reader is reading RecordBatches from a file, it will raise StopIteration when it tries to read the next RecordBatch and detects that it's at the end of the file.

If an Arrow IPC writer is concurrently appending RecordBatches to this file, you must include wait-and-retry logic in your reader application code if you want it to continue reading newly appended RecordBatches.

As a basic example, you could modify your reader function like this:

def reader():
    print("reader started")
    with pa.OSFile("/tmp/stream.arrow", "r") as source:
        with pa.ipc.open_stream(source) as reader:
            while True:
                try:
                    batch = reader.read_next_batch()
                    print(batch)
                except StopIteration:
                    time.sleep(0.50)
                    try:
                        batch = reader.read_next_batch()
                        print(batch)
                    except StopIteration:
                        print("reader done")
                        return

For a real use case, you would want to make this more robust, for example by retrying multiple times with successively longer waits.

We recommend using a FIFO queue instead of a file for applications like this. When a PyArrow IPC reader is reading RecordBatches from queue, I believe it will block waiting for new RecordBatches if the queue is still open for writing, so you will not need to include wait-and-retry logic.

2024-07-25
ianmcook