Question

Optimal multiplication of two 3D arrays having a variable dimension

I would like to multiply tensors R = {R_1, R_2, ..., R_M} and X = {X_1, X_2, ..., X_M} where R_i and X_i are 3×3 and 3×N_i matrices, respectively. How can I make maximum use of NumPy functionalities during the formation of the R_i × X_i arrays?

My MWE is the following:

import numpy as np

np.random.seed(0)

M = 5
R = [np.random.rand(3, 3) for _ in range(M)]
X = []
for i in range(M):
    N_i = np.random.randint(1, 6)
    X_i = np.random.rand(3, N_i)
    X.append(X_i)
    
result = np.zeros((3, 0))
for i in range(M):
    R_i = R[i]
    X_i = X[i]
    result = np.hstack((result, np.dot(R_i, X_i)))

print(result)

Edit #1:

Thanks for everyone who helped me with his valuable comments. Meanwhile I was thinking about the role of N_is in my real problem and came to the conclusion that the number of unique N_is is in fact small (1 to 5; 2 is the most common one, but 1 is also very frequent). In this case, would there be a more efficient solution to the treatment of multiplications?

Another aspect which would be important: in practice, I store a 3 × N matrix X, not the individual X_i blocks. The columns of X are not ordered w.r.t. the R list. Instead, I store only an index vector p which provides the correct ordering for the X columns. In this case, an einsum version would be the following (in comparison with the "direct" multiplication):

import numpy as np

M = 30
N = 100

np.random.seed(0)
p = np.random.randint(M, size=N)
R = np.random.rand(M, 3, 3)
X = np.random.rand(3, N)

result_einsum = np.einsum('ijk,ki->ji', R[p], X)

result_direct = np.zeros((3, N))
for i in range(N):
    result_direct[:, i] = np.dot(R[p[i]], X[:, i])

print(np.allclose(result_einsum, result_direct))

Edit #2:

It seems that Numba helps quite a lot:

import numpy as np
import numba
from timeit import Timer

M = 30
N = 100

np.random.seed(0)
p = np.random.randint(M, size=N)
R = np.random.rand(M, 3, 3)
X = np.random.rand(3, N)

@numba.njit
def numba_direct(R, p, X, result_direct, N):
    for i in range(N):
        p_i = p[i]
        for j in range(3):
            res = 0.0
            for k in range(3):
                res += R[p_i, j, k] * X[k, i]
            result_direct[j, i] = res

result_direct = np.zeros((3, N))
numba_direct(R, p, X, result_direct, N)
result_einsum = np.einsum('ijk,ki->ji', R[p], X)
print(np.allclose(result_einsum, result_direct))

ntimes = 10000

einsum_timer = Timer(lambda: np.einsum('ijk,ki->ji', R[p], X))
einsum_time = einsum_timer.timeit(number=ntimes)

numba_direct_timer = Timer(lambda: numba_direct(R, p, X, result_direct, N))
numba_direct_time = numba_direct_timer.timeit(number=ntimes)

print(f'Einsum runtime: {einsum_time:.4f} seconds')
print(f'Numba direct runtime: {numba_direct_time:.4f} seconds')

The execution times are the following for the above code:

Einsum runtime: 0.0979 seconds
Numba direct runtime: 0.0129 seconds
 4  112  4
1 Jan 1970

Solution

 4

I know I am neither @mozway nor @hpaulj (this is referring to @chrslg's comment), but indeed there seems to be a feasible solution with einsum:

np.einsum("ijk,ki->ji",
          np.repeat(R, [x.shape[1] for x in X], axis=0),
          np.hstack(X))

Here is the full code with which I tested:

import numpy as np
from timeit import Timer

np.random.seed(0)

L, M, timeit_n_times = 3, 5, 10_000
R = [np.random.rand(L, L) for _ in range(M)]
X = []
for i in range(M):
    N_i = np.random.randint(1, 6)
    X_i = np.random.rand(L, N_i)
    X.append(X_i)

def original(R, X):    
    result = np.zeros((L, 0))
    for i in range(M):
        R_i = R[i]
        X_i = X[i]
        result = np.hstack((result, np.dot(R_i, X_i)))
    return result

def list_stack(R, X):
    result = []
    for i in range(M):
        R_i = R[i]
        X_i = X[i]
        result.append(np.dot(R_i, X_i))
    return np.hstack(result)

def preallocate(R, X):
    result=np.empty((L, sum(x.shape[1] for x in X)))
    k=0
    for i in range(M):
        R_i = R[i]
        X_i = X[i]
        result[:, k:k+X_i.shape[1]] = np.dot(R_i, X_i)
        k += X_i.shape[1]
    return result

def with_einsum(R, X):
    return np.einsum("ijk,ki->ji",
                     np.repeat(R, [x.shape[1] for x in X], axis=0),
                     np.hstack(X))
    
assert np.allclose(original(R, X), list_stack(R, X))
assert np.allclose(original(R, X), preallocate(R, X))
assert np.allclose(original(R, X), with_einsum(R, X))

print("original", Timer(lambda: original(R, X)).timeit(timeit_n_times))
print("list_stack", Timer(lambda: list_stack(R, X)).timeit(timeit_n_times))
print("preallocate", Timer(lambda: preallocate(R, X)).timeit(timeit_n_times))
print("with_einsum", Timer(lambda: with_einsum(R, X)).timeit(timeit_n_times))

Some observations:

  • The calculation times of list_stack() and preallocate() are marginally different, but are always faster than original(), which is also what is suggested and implied in @chrslg's answer.
  • Whether or not with_einsum() is faster or slower than the others depends pretty much on the shape of the problem:
    • For small, but many L×L matrices (L==3 in the question), with_einsum() wins.
      Here are the results for L, M, timeit_n_times = 3, 500, 10_000:
      original 19.07941191700229
      list_stack 6.437358160997974
      preallocate 7.638774587001535
      with_einsum 3.6944152869982645
      
    • For large, but few L×L matrices, with_einsum() loses.
      Here are the results for L, M, timeit_n_times = 100, 5, 100_000:
      original 6.661783802999707
      list_stack 4.67112236200046
      preallocate 4.94899292899936
      with_einsum 28.233751856001618
      

I did not check the influence of the size and variation of N_i.

Update

Based on the update to the question and @chrslg's thoughts, I tried whether zero-padding would also be a viable way to go. Bottom line is: probably not (at least not for the given example).

Here is the new testing code:

from timeit import Timer
import numpy as np

M, N, timeit_n_times = 30, 100, 10_000

np.random.seed(0)
p = np.random.randint(M, size=N)
R = np.random.rand(M, 3, 3)
X = np.random.rand(3, N)

einsum = lambda: np.einsum('ijk,ki->ji', R[p], X)

def direct():
    res = np.zeros((3, N))
    for i in range(N):
        res[:, i] = np.dot(R[p[i]], X[:, i])
    return res

# Rearrange data for padding
max_count = np.max(np.unique(p, return_counts=True)[1])
counts = np.zeros(M, dtype=int)

X_padded = np.zeros((M, max_count, 3))
for i, idx in enumerate(p):
    X_padded[idx, counts[idx]] = X[:, i]
    counts[idx] += 1

padded = lambda: np.einsum('ijk,ilk->ilj', R, X_padded)
    
result_einsum = einsum()
result_direct = direct()
result_padded_raw = padded()

# Extract padding result (reverse steps of getting from X to X_padded)
result_padded = np.zeros((3, N))
counts = np.zeros(M, dtype=int)
for i, idx in enumerate(p):
    result_padded[:, i] = result_padded_raw[idx, counts[idx]]
    counts[idx] += 1

assert np.allclose(result_einsum, result_direct)
assert np.allclose(result_padded, result_direct)

print("einsum", Timer(einsum).timeit(timeit_n_times))
print("direct", Timer(direct).timeit(timeit_n_times))
print("padded", Timer(padded).timeit(timeit_n_times))

Here, we first rearrange the data into X_padded, which is M×max_count×3-shaped, where max_count is the maximum number of references to one of the indices in R from p. We iteratively fill up X_padded for each index, leaving unused space zero-filled. In the end, we can again use a version of einsum to calculate the result (padded = lambda: np.einsum('ijk,ilk->ilj', R, X_padded)). If we want to compare the result to the results of the other methods, then we need to rearrange it back again, basically inverting the steps of getting from X to X_padded.

Observations:

  • Memory-wise, we have:

    • Saved on the side of R, as we don't need to replicate the individual 3×3 matrices, any more.
    • Paid on the side of X, as we need zero-padding.
  • Speed-wise, we have lost a bit, it seems;
    here are the results for M, N, timeit_n_times = 30, 100, 100_000:

    einsum 0.7587459350002064
    direct 13.305958173999898
    padded 1.502786388000004
    

    Note that this does not even include the times for rearranging the data. So probably padding won't help here – at least not in the way that I tried.

2024-06-27
simon

Solution

 2

I don't think there is much more to do.
Depending on how the N_i are distributed, for example, if half of N_i are 5, and other half 10, it might help to have two X (one M×5 X for the 5 1st columns, and one (M/2)×5 X for the others). That is, generally speaking, to work column-wise rather than "X-wise"
Or, in some other cases, to fill X with 0s (for example, if N_i is 10000 for 99% of the i, and 9999 for the remaining %).

But, roughly, numpy is to handle homogeneous dimensions array.

One point tho: never ever use hstack stack vstack, append or things like that in a for loop. That is very slow. That implies reallocating a new array and copying all the content of the former array into the new one.

Either use plain simple python lists to accumulate the results. And do the stacking at the end only

res=[]
for i in range(M):
    R_i = R[i]
    X_i = X[i]
    res.append(np.dot(R_i, X_i))

np.hstack(res)

Or allocate in advance the the result numpy array, and store the result as you iterate.

result=np.empty((3,sum(x.shape[1] for x in X)))
k=0
for i in range(M):
    R_i = R[i]
    X_i = X[i]
    result[:,k:k+X_i.shape[1]]=np.dot(R_i,X_i)
    k+=X_i.shape[1]
2024-06-27
chrslg