Question

Why does iterating break up my text file lines while a generator doesn't?

For each line of a text file I want to do heavy calculations. The amount of lines can be millions so I'm using multiprocessing:

num_workers = 1
with open(my_file, 'r') as f:
    with multiprocessing.pool.ThreadPool(num_workers) as pool:    
        for data in pool.imap(my_func, f, 100):
            print(data)

I'm testing interactively hence ThreadPool() (will be replaced in final version). For map or imap documentation says:

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks

Since my opened file is an iterable (each iteration is a line) I expect this to work but it breaks up lines in the middle. I made a generator that returns lines as expected, but I want to understand why the generator is needed at all. Why isn't the chunking happening on line boundaries?

UPDATE:

to clarify if I use this generator:

def chunked_reader(file, chunk_size=100):
    with open(file, 'r') as f:
        chunk = []
        i = 0
        for line in f:            
            if i == chunk_size:
                yield chunk
                chunk = []
                i = 0
            i += 1            
            chunk.append(line)
    # yield final chunk
    yield chunk

the calculation works, returns expected values. If I use the file object directly, I get errors somehow indicating the chunking is not splitting on line boundaries.

UPDATE 2:

it's chunking the line into each individual character and not line by line. I can also take another environment with python 3.9 and that has the same behavior. So it has been like this for some time and seems to be "as designed" but not very intuitive.

UPDATE 3:

To clarify on the marked solution, my misunderstanding was that chunk_size will send a list of data to process to my_func. Internally my_func iterates over the passed in data. But since my assumption wrong and each line gets send separately to my_func regardless of chunk_size, the internal iteration was iterating over the string, not as I expected list of strings.

 3  123  3
1 Jan 1970

Solution

 2

imap chunking is an implementation detail, your function will be called with a single element of the iterable regardless of the chunk size.

for line in file:
    result = my_func(line)

Becomes

for result in pool.imap(my_func, file, some_chunk_size):
    pass

It is syntactically the same as builtins.map without the chunk size, but people don't really use it either.

The chunking size controls how many elements are packed in a list before being pickled and sent over a pipe, the chunking process is transparent to your code, your function wil always be called with a single line regardless of the chunk size, you should play around with the chunk size and see if it improves performance in your case.

Note that imap interally has a for loop, so it should work exactly like a simple for loop on the iterated object.

2024-07-15
Ahmed AEK