Sorting an array in parallel using threads

I wrote a program that sorts an array with multiple threads by splitting the array into equal pieces and sorting in separate threads with sorting bubbles. Then I used the merge algorithm to combine the two arrays.

I would like to compare this program with the one that sorts the array using Streams. My question is whether I am passing an array to a thread, how would I go about splitting, sorting and merging to sort in parallel, but using parallel threads, rather than creating my own threads / runnables, etc.

Any ideas?

+4
source share
2 answers

, - , Java. Stream API , spliterator, , .

spliterator:

static class BubbleSpliterator<T> implements Spliterator<T> {
    private final Comparator<? super T> cmp;
    private final Spliterator<T> source;
    private T[] data;
    private int offset;

    public BubbleSpliterator(Spliterator<T> source, Comparator<? super T> cmp) {
        this.source = source;
        this.cmp = cmp;
    }

    @SuppressWarnings("unchecked")
    private void init() {
        if (data != null)
            return;
        Stream.Builder<T> buf = Stream.builder();
        source.forEachRemaining(buf);
        data = (T[]) buf.build().toArray();
        bubble(data, cmp);
    }

    private static <T> void bubble(T[] data, Comparator<? super T> cmp) {
        for (int i = 0; i < data.length - 1; i++)
            for (int j = i + 1; j < data.length; j++) {
                if (cmp.compare(data[i], data[j]) > 0) {
                    T tmp = data[i];
                    data[i] = data[j];
                    data[j] = tmp;
                }
            }
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        init();
        if (offset >= data.length)
            return false;
        action.accept(data[offset++]);
        return true;
    }

    @Override
    public void forEachRemaining(Consumer<? super T> action) {
        init();
        for (int i = offset; i < data.length; i++)
            action.accept(data[i]);
        offset = data.length;
    }

    @Override
    public Spliterator<T> trySplit() {
        if (data != null)
            return null;
        Spliterator<T> prefix = source.trySplit();
        return prefix == null ? null : new BubbleSpliterator<>(prefix, cmp);
    }

    @Override
    public long estimateSize() {
        if (data != null)
            return data.length - offset;
        return source.estimateSize();
    }

    @Override
    public int characteristics() {
        return source.characteristics();
    }

    public static <T> Stream<T> stream(Stream<T> source, 
                                       Comparator<? super T> comparator) {
        Spliterator<T> spltr = source.spliterator();
        return StreamSupport.stream(new BubbleSpliterator<>(spltr, comparator), 
               source.isParallel()).onClose(source::close);
    }
}

, , , . :

int[] data = new Random(1).ints(100, 0, 1000).toArray();
Comparator<Integer> comparator = Comparator.naturalOrder();
List<Integer> list = BubbleSpliterator.stream(Arrays.stream(data).parallel().boxed(), comparator).collect(
    Collectors.toList());
System.out.println(list);

:

[254, 313, 588, 847, 904, 985, 434, 473, 569, 606, 748, 978, 234, 262, 263, 317, 562, 592, 99, 189, 310,...]

, . , Stream API.

, , , :

static <T> List<T> merge(List<T> l1, List<T> l2, Comparator<? super T> cmp) {
    List<T> result = new ArrayList<>(l1.size()+l2.size());
    int i=0, j=0;
    while(i < l1.size() && j < l2.size()) {
        if(cmp.compare(l1.get(i), l2.get(j)) <= 0) {
            result.add(l1.get(i++));
        } else {
            result.add(l2.get(j++));
        }
    }
    result.addAll(l1.subList(i, l1.size()));
    result.addAll(l2.subList(j, l2.size()));
    return result;
}

static <T> Collector<T, ?, List<T>> mergeSorting(Comparator<? super T> cmp) {
    return Collector.<T, List<T>> of(ArrayList::new, List::add, 
                                     (l1, l2) -> merge(l1, l2, cmp));
}

, Collectors.toList(), , , . mergeSorting, , , - .

, Stream API, BubbleSpliterator mergeSorting:

int[] data = new Random(1).ints(100, 0, 1000).toArray();
Comparator<Integer> comparator = Comparator.naturalOrder();
List<Integer> list = BubbleSpliterator.stream(Arrays.stream(data).parallel().boxed(), comparator).collect(
    mergeSorting(comparator));
System.out.println(list);

.

, , bubble + merge .

+2

Java 8 Stream API, :

IntStream randomIntegers = ThreadLocalRandom.current().ints(100, 0, 100);
int[] sortedArray = randomIntegers
        .parallel() // (1)
        .sorted() // (2)
        .toArray();
System.out.println(Arrays.toString(sortedArray));

, Stream , parallel(), sorted(). ( )

, , :

final class SortedOps {

    private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
        //...

        @Override
        public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
                                                       Spliterator<P_IN> spliterator,
                                                       IntFunction<Integer[]> generator) {
            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
                return helper.evaluate(spliterator, false, generator);
            } else {
                Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);

                int[] content = n.asPrimitiveArray();
                Arrays.parallelSort(content); // <== this

                return Nodes.node(content);
            }
        }
    }

    private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
        //...

        @Override
        public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
                                                 Spliterator<P_IN> spliterator,
                                                 IntFunction<T[]> generator) {
            // If the input is already naturally sorted and this operation
            // naturally sorts then collect the output
            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
                return helper.evaluate(spliterator, false, generator);
            }
            else {
                // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
                T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
                Arrays.parallelSort(flattenedData, comparator); // <== this
                return Nodes.node(flattenedData);
            }
        }
    }

}

Arrays.parallelSort() .

+1

Source: https://habr.com/ru/post/1630075/


All Articles