Python: how to make this color grading function more efficient

I wrote an adaptive color threshold function in Python (since OpenCV cv2.adaptiveThreshold did not fit my needs) and it is too slow. I did it as efficiently as I can, but it still takes almost 500 ms on a 1280x720 image.

I would really appreciate any suggestions that would make this feature more efficient!

Here's what this function does: the cross-shape of a single-pixel thickness is used as the structuring element. For each pixel in the image, it calculates the average ksize neighboring pixels in four directions independently (i.e., the average ksize pixels in the same row on the left, in the same column above, in the same row on the right, and in the same column below). I end up with four averages, one for each direction. A pixel satisfies only the threshold criterion if it is brighter than either the left or right middle, or both the upper and lower (plus constant C ).

I compute these averages in stages for all pixels at the same time using numpy.roll() , but I still need to do this ksize once. ksize will usually be 20-50.

Here is the code, the relevant part is really what happens inside the for loop:

 def bilateral_adaptive_threshold(img, ksize=20, C=0, mode='floor', true_value=255, false_value=0): mask = np.full(img.shape, false_value, dtype=np.int16) left_thresh = np.zeros_like(img, dtype=np.float32) #Store the right-side average of each pixel here right_thresh = np.zeros_like(img, dtype=np.float32) #Store the left-side average of each pixel here up_thresh = np.zeros_like(img, dtype=np.float32) #Store the top-side average of each pixel here down_thresh = np.zeros_like(img, dtype=np.float32) #Store the bottom-side average of each pixel here for i in range(1, ksize+1): roll_left = np.roll(img, -i, axis=1) roll_right = np.roll(img, i, axis=1) roll_up = np.roll(img, -i, axis=0) roll_down = np.roll(img, i, axis=0) roll_left[:,-i:] = 0 roll_right[:,:i] = 0 roll_up[-i:,:] = 0 roll_down[:i,:] = 0 left_thresh += roll_right right_thresh += roll_left up_thresh += roll_down down_thresh += roll_up left_thresh /= ksize right_thresh /= ksize up_thresh /= ksize down_thresh /= ksize if mode == 'floor': mask[((img > left_thresh+C) & (img > right_thresh+C)) | ((img > up_thresh+C) & (img > down_thresh+C))] = true_value elif mode == 'ceil': mask[((img < left_thresh-C) & (img < right_thresh-C)) | ((img < up_thresh-C) & (img < down_thresh-C))] = true_value else: raise ValueError("Unexpected mode value. Expected value is 'floor' or 'ceil'.") return mask 
+5
source share
1 answer

As you hint at your question, the dominant part of the function is to get 4 arrays of sums needed to calculate the average values ​​- here on average 190 ms out of 210 ms for the entire function. So let me focus on that.

Firstly, the necessary imports and convenient function of time.

 from timeit import default_timer as timer import numpy as np import cv2 ## =========================================================================== def time_fn(fn, img, ksize=20, iters=16): start = timer() for i in range(iters): fn(img, ksize) end = timer() return ((end - start) / iters) * 1000 ## =========================================================================== # Our test image img = np.uint8(np.random.random((720,1280)) * 256) 

Source implementation

We can reduce your function as follows, so that it simply calculates and returns 4 arrays of sums. Subsequently, we can use this to verify that optimized versions return the same result.

 # Original code def windowed_sum_v1(img, ksize=20): left_thresh = np.zeros_like(img, dtype=np.float32) right_thresh = np.zeros_like(img, dtype=np.float32) up_thresh = np.zeros_like(img, dtype=np.float32) down_thresh = np.zeros_like(img, dtype=np.float32) for i in range(1, ksize+1): roll_left = np.roll(img, -i, axis=1) roll_right = np.roll(img, i, axis=1) roll_up = np.roll(img, -i, axis=0) roll_down = np.roll(img, i, axis=0) roll_left[:,-i:] = 0 roll_right[:,:i] = 0 roll_up[-i:,:] = 0 roll_down[:i,:] = 0 left_thresh += roll_right right_thresh += roll_left up_thresh += roll_down down_thresh += roll_up return (left_thresh, right_thresh, up_thresh, down_thresh) 

Now we can find out how long this function takes on the local machine:

 >>> print "V1: %f ms" % time_fn(windowed_sum_v1, img, 20, 16) V1: 188.572077 ms 

Improvement # 1

numpy.roll necessarily associated with some overhead, but there is no need to delve into this. Note that after you drill the array, you will reset the rows or columns that spilled around the edge of the array. Then you add this to the battery. Adding zero does not change the result, so we can avoid this. Instead, we can simply add progressive smaller and accordingly offset slices of the entire array, avoiding roll and (slightly) reducing the total number of additions needed.

 # Summing up ROIs def windowed_sum_v2(img, ksize=20): h,w=(img.shape[0], img.shape[1]) left_thresh = np.zeros_like(img, dtype=np.float32) right_thresh = np.zeros_like(img, dtype=np.float32) up_thresh = np.zeros_like(img, dtype=np.float32) down_thresh = np.zeros_like(img, dtype=np.float32) for i in range(1, ksize+1): left_thresh[:,i:] += img[:,:wi] right_thresh[:,:wi] += img[:,i:] up_thresh[i:,:] += img[:hi,:] down_thresh[:hi,:] += img[i:,:] return (left_thresh, right_thresh, up_thresh, down_thresh) 

Let me check this and time:

 >>> print "Results equal (V1 vs V2): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v2(img))) Results equal (V1 vs V2): True >>> print "V2: %f ms" % time_fn(windowed_sum_v2, img, 20, 16) V2: 110.861794 ms 

This implementation takes only 60% of the time spent by the original. Can we do better?

Improvement # 2

We still have a loop. It would be nice if we could replace repeated additions with one call with some optimized function. One such function is cv2.filter2D , which calculates the following:

filter2D

We can create a kernel, so the points we want to add have a weight of 1.0 , and the point that the kernel is attached to has a weight of 0.0 .

For example, when ksize=8 , we could use the following kernels and anchor positions.

Kernels for ksize = 8

Then the function will be as follows:

 # Using filter2d def windowed_sum_v3(img, ksize=20): kernel_l = np.array([[1.0] * (ksize) + [0.0]]) kernel_r = np.array([[0.0] + [1.0] * (ksize)]) kernel_u = np.array([[1.0]] * (ksize) + [[0.0]]) kernel_d = np.array([[0.0]] + [[1.0]] * (ksize)) left_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_l, anchor=(ksize,0), borderType=cv2.BORDER_CONSTANT) right_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_r, anchor=(0,0), borderType=cv2.BORDER_CONSTANT) up_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_u, anchor=(0,ksize), borderType=cv2.BORDER_CONSTANT) down_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_d, anchor=(0,0), borderType=cv2.BORDER_CONSTANT) return (left_thresh, right_thresh, up_thresh, down_thresh) 

Again, let the time check this function:

 >>> print "Results equal (V1 vs V3): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v3(img))) Results equal (V1 vs V3): True >>> print "V2: %f ms" % time_fn(windowed_sum_v3, img, 20, 16) V3: 46.652996 ms 

We lose up to 25% of the original time.

Improvement # 3

We work with a floating point, but now we do not do any divisions, and the kernel contains only those and zeros. This means that we could work with integers. You indicate that the maximum window size is 50, which means that we are safe with 16-bit integer characters. Integer math tends to be faster, and if the code we use is correctly vectorized, we can process it twice in a row. Let's give this snapshot and also give a wrapper that returns the result in floating point format as previous versions.

 # Integer only def windowed_sum_v4(img, ksize=20): kernel_l = np.array([[1] * (ksize) + [0]], dtype=np.int16) kernel_r = np.array([[0] + [1] * (ksize)], dtype=np.int16) kernel_u = np.array([[1]] * (ksize) + [[0]], dtype=np.int16) kernel_d = np.array([[0]] + [[1]] * (ksize), dtype=np.int16) left_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_l, anchor=(ksize,0), borderType=cv2.BORDER_CONSTANT) right_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_r, anchor=(0,0), borderType=cv2.BORDER_CONSTANT) up_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_u, anchor=(0,ksize), borderType=cv2.BORDER_CONSTANT) down_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_d, anchor=(0,0), borderType=cv2.BORDER_CONSTANT) return (left_thresh, right_thresh, up_thresh, down_thresh) # Integer only, but returning floats def windowed_sum_v5(img, ksize=20): result = windowed_sum_v4(img, ksize) return map(np.float32,result) 

Let me check it out.

 >>> print "Results equal (V1 vs V4): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v4(img))) Results equal (V1 vs V4): True >>> print "Results equal (V1 vs V5): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v5(img))) Results equal (V1 vs V5): True >>> print "V4: %f ms" % time_fn(windowed_sum_v4, img, 20, 16) V4: 14.712223 ms >>> print "V5: %f ms" % time_fn(windowed_sum_v5, img, 20, 16) V5: 20.859744 ms 

We lose up to 7% if everything is fine with 16-bit integers or 10% if we want to swim.

Further improvements

Return to the full threshold task you wrote. Instead of dividing the sums as a separate step to get the average, we can scale the kernels so that filter2D returns the average directly. This is only a slight improvement (~ 3%).

Similarly, you can replace the addition or subtraction of C by providing the appropriate delta to call filter2D . This again cuts a few percent.

NB . You may encounter slight differences that occur outside of the floating point representation if you implement the two above changes.

Another possibility is to make the comparisons necessary to determine the mask for comparing the vs scalar matrix:

 input < threshold input - input < threshold - input 0 < threshold - input 0 < adjusted_threshold # determined using adjusted kernel 

We can achieve this by modifying the kernels to subtract the value of the anchor pixel scaled by the corresponding weight ( ksize ). With numpy, this seems to make only a slight difference, although, as I understand it, we could save half the reads in this part of the algorithm (while filter2D supposedly still reads and multiplies the corresponding values, even if the weight is 0).

Fast threshold function implementation

Taking all this into account, we can rewrite your function as follows and get the same result in ~ 12.5% ​​of the time as the original:

 def bilateral_adaptive_threshold5(img, ksize=20, C=0, mode='floor', true_value=255, false_value=0): mask = np.full(img.shape, false_value, dtype=np.uint8) kernel_l = np.array([[1] * (ksize) + [-ksize]], dtype=np.int16) kernel_r = np.array([[-ksize] + [1] * (ksize)], dtype=np.int16) kernel_u = np.array([[1]] * (ksize) + [[-ksize]], dtype=np.int16) kernel_d = np.array([[-ksize]] + [[1]] * (ksize), dtype=np.int16) if mode == 'floor': delta = C * ksize elif mode == 'ceil': delta = -C * ksize else: raise ValueError("Unexpected mode value. Expected value is 'floor' or 'ceil'.") left_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_l, anchor=(ksize,0), delta=delta, borderType=cv2.BORDER_CONSTANT) right_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_r, anchor=(0,0), delta=delta, borderType=cv2.BORDER_CONSTANT) up_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_u, anchor=(0,ksize), delta=delta, borderType=cv2.BORDER_CONSTANT) down_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_d, anchor=(0,0), delta=delta, borderType=cv2.BORDER_CONSTANT) if mode == 'floor': mask[((0 > left_thresh) & (0 > right_thresh)) | ((0 > up_thresh) & (0 > down_thresh))] = true_value elif mode == 'ceil': mask[((0 < left_thresh) & (0 < right_thresh)) | ((0 < up_thresh) & (0 < down_thresh))] = true_value return mask 
+6
source

Source: https://habr.com/ru/post/1264900/


All Articles