Question
pyarrow ipc stream - how to communicate using it?
After reading pyarrow IPC docs, I had an impression that RecordBatchStreamReader will read stream until it is done (0-s in the end, that are written by closing stream writer).
But I see that reader stops very soon, and this is not what I expect to see.
The example below starts writing into file stream in one process, and after delay starts to read that file stream in another process. I expect reader to read all the batches that are written by writer, but it reads only first bunch and exits.
How can I make reader()
to read everything that writer()
wrote? The sink.flush()
was added to check if it works - it doesn't, the code behaves the same with and without it.
import pyarrow as pa
import time
import multiprocessing as mp
import os
schema = pa.schema([("value", pa.int32())])
if os.path.exists("/tmp/stream.arrow"):
os.remove("/tmp/stream.arrow")
def writer():
print("writer started")
with pa.OSFile("/tmp/stream.arrow", "wb") as sink:
with pa.ipc.new_stream(sink, schema) as writer:
for i in range(100):
writer.write_batch(pa.RecordBatch.from_pylist([{"value": i}], schema))
print(f"writer wrote {i}")
time.sleep(0.25)
sink.flush()
print("writer done")
def reader():
print("reader started")
with pa.OSFile("/tmp/stream.arrow", "r") as source:
with pa.ipc.open_stream(source) as reader:
try:
while True:
batch = reader.read_next_batch()
print(batch)
except StopIteration:
print("reader done")
return
write_process = mp.Process(target=writer)
read_process = mp.Process(target=reader)
write_process.start()
time.sleep(2)
read_process.start()
write_process.join()
read_process.join()
example output
writer started
writer wrote 0
writer wrote 1
writer wrote 2
writer wrote 3
reader started
pyarrow.RecordBatch
value: int32
----
value: [0]
pyarrow.RecordBatch
value: int32
----
value: [1]
pyarrow.RecordBatch
value: int32
----
value: [2]
pyarrow.RecordBatch
value: int32
----
value: [3]
reader done
writer wrote 4
writer wrote 5
writer wrote 6
writer wrote 7