Help with the vector merge algorithm

I need a very fast algorithm for the next task. I have already implemented several algorithms that complete it, but they are too slow for the performance I need. It must be fast enough so that the algorithm can run at least 100,000 times per second on a modern processor. It will be implemented in C ++.

I work with ranges / ranges, a structure that has a start and an end coordinate on a line.

I have two vectors (dynamic arrays) of spans, and I need to combine them. One vector is src and the other is dst. Vectors are sorted by the initial coordinates of the range, and spans do not overlap within the same vector.

The spaces in the src vector must be combined with the spans in the dst vector, so the resulting vector is still sorted and has no overlap. I.e. if overlaps are detected during the merger, the two spans merge into one. (The merging of the two spans is simply a matter of changing the coordinates in the structure.)

Now there is another catch, the spans in the src vector should be “expanded” during the merge. This means that the constant will be added to the beginning and another (large) constant in the final coordinate of each interval in src. This means that after expanding src extensions, they may overlap.


So far, I have come to the conclusion that it cannot be fully executed in place, some temporary storage is required. I think this should be done in linear time by the number of src and dst elements summarized.

, , .

, , , :

  • src dst, . . , , "" "", , , . ( ) dst .

  • . , , src dst . , dst, .

, O ((m + n) * log (m + n)) O (m + n) . , dst , .

/ .

, / /, , , .

: , . - 4 30 , dst , src dst.

+3
9

, - O (m + n), , , , , . , .

, , ? , , , , . - - , . . , .

, stl swap, temp , .

, 2 , , , ( , ). , , .

, , . , , .

, .

+8

- , ? , ( ), alloca malloc.

, , CMOV , , :

if(src1[x] < src2[x])
    dst[x] = src1[x];
else
    dst[x] = src2[x];

50% , . , , , , , , .

0
0

, , src ( , )

, ; , , .

:

"" , Intel Core 2 Extreme QX9770, 3,2 , 59 455 MIPS

100 000 594 550 . .

ref: wikipedia MIPS

, , src- - , src- , dst;

0

1 - , .

, 2 ( - ).

, .

, node, , , .

, . , .

, , , , , dst , node . , "", , . , , , , node.

10 - n m...

0

, .

?

0

, . , .

, STL, . , ... .

, . MSVC, -, "" ( ), .

, , , , .

class SpanBuffer {
private:
    int *data;
    size_t allocated_size;
    size_t count;

    inline void EnsureSpace()
    {
        if (count == allocated_size)
            Reserve(count*2);
    }

public:
    struct Span {
        int start, end;
    };

public:
    SpanBuffer()
        : data(0)
        , allocated_size(24)
        , count(0)
    {
        data = new int[allocated_size];
    }

    SpanBuffer(const SpanBuffer &src)
        : data(0)
        , allocated_size(src.allocated_size)
        , count(src.count)
    {
        data = new int[allocated_size];
        memcpy(data, src.data, sizeof(int)*count);
    }

    ~SpanBuffer()
    {
        delete [] data;
    }

    inline void AddIntersection(int x)
    {
        EnsureSpace();
        data[count++] = x;
    }

    inline void AddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);
        EnsureSpace();
        data[count] = s;
        data[count+1] = e;
        count += 2;
    }

    inline void Clear()
    {
        count = 0;
    }

    inline size_t GetCount() const
    {
        return count;
    }

    inline int GetIntersection(size_t i) const
    {
        return data[i];
    }

    inline const Span * GetSpanIteratorBegin() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data);
    }

    inline Span * GetSpanIteratorBegin()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data);
    }

    inline const Span * GetSpanIteratorEnd() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data+count);
    }

    inline Span * GetSpanIteratorEnd()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data+count);
    }

    inline void MergeOrAddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);

        if (count == 0)
        {
            AddSpan(s, e);
            return;
        }

        int *lastspan = data + count-2;

        if (s > lastspan[1])
        {
            AddSpan(s, e);
        }
        else
        {
            if (s < lastspan[0])
                lastspan[0] = s;
            if (e > lastspan[1])
                lastspan[1] = e;
        }
    }

    inline void Reserve(size_t minsize)
    {
        if (minsize <= allocated_size)
            return;

        int *newdata = new int[minsize];

        memcpy(newdata, data, sizeof(int)*count);

        delete [] data;
        data = newdata;

        allocated_size = minsize;
    }

    inline void SortIntersections()
    {
        assert((count & 1) == 0);
        std::sort(data, data+count, std::less<int>());
        assert((count & 1) == 0);
    }

    inline void Swap(SpanBuffer &other)
    {
        std::swap(data, other.data);
        std::swap(allocated_size, other.allocated_size);
        std::swap(count, other.count);
    }
};


struct ShapeWidener {
    // How much to widen in the X direction
    int widen_by;
    // Half of width difference of src and dst (width of the border being produced)
    int xofs;

    // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call
    SpanBuffer buffer;

    inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst);

    ShapeWidener(int _xofs) : xofs(_xofs) { }
};


inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst)
{
    if (src.GetCount() == 0) return;
    if (src.GetCount() + dst.GetCount() == 0) return;

    assert((src.GetCount() & 1) == 0);
    assert((dst.GetCount() & 1) == 0);

    assert(buffer.GetCount() == 0);

    dst.Swap(buffer);

    const int widen_s = xofs - widen_by;
    const int widen_e = xofs + widen_by;

    size_t resta = src.GetCount()/2;
    size_t restb = buffer.GetCount()/2;
    const SpanBuffer::Span *spa = src.GetSpanIteratorBegin();
    const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin();

    while (resta > 0 || restb > 0)
    {
        if (restb == 0)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else if (resta == 0)
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
        else if (spa->start < spb->start)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
    }

    buffer.Clear();
}
0

. LOT - .

, :

  • span

.

std:: set_union ( , ).

( ).

Then you need to combine your gaps. This should be pretty doable now and possibly in linear time.

Ok, here's the trick. Do not try to do it locally. Use one or more time vectors (and reserve enough space ahead of time). Then, at the end, call std :: vector :: swap to put the results in the input vector of your choice.

Hope this is enough for you to go.

0
source

What is your target system? Is it multi-core? If so, you can consider the multithreading of this algorithm

0
source

Source: https://habr.com/ru/post/1697092/


All Articles