Using a boost index such as a relational database

Here is the situation I'm trying to simulate:

COL1 Col2 Col3 CBT.151.5.T.FEED S1 t1 CBT.151.5.T.FEED s2 t2 CBT.151.5.T.FEED s3 t3 CBT.151.5.T.FEED s4 t4 CBT.151.5.T.FEED s5 t1 CBT.151.8.T.FEED s7 t1 CBT.151.5.Q.FEED s8 t3 

COL1 is an identifier, for this identifier there can be several characters.
COL2 - characters, they are unique
COL3 - symbol update time, two different symbols can be updated at the same time, so they are not unique.

My goal is to get the most active tickers, say, symbols that have been updated in the last 60 seconds. For this, I used a multi-index boost.

Header file:

 #ifndef __TICKER_INFO_MANAGER_IMPL__ #define __TICKER_INFO_MANAGER_IMPL__ #include <boost/interprocess/containers/string.hpp> #include <boost/interprocess/shared_memory_object.hpp> #include <boost/multi_index_container.hpp> #include <boost/multi_index/member.hpp> #include <boost/multi_index/ordered_index.hpp> #include <TickerInfoManagerConstants.h> #include <TickerInfo.h> namespace bmi = boost::multi_index; namespace bip = boost::interprocess; struct id_index{}; struct symbol_index{}; struct last_update_time_index{}; struct Less { template<class T, class U> bool operator()(T const& t, U const& u) const { return t < u; } }; typedef bmi::multi_index_container< tickerUpdateInfoT, bmi::indexed_by< bmi::ordered_unique <bmi::tag<id_index>, BOOST_MULTI_INDEX_MEMBER( tickerUpdateInfo, shm_string, m_id), Less>, bmi::ordered_unique< bmi::tag<symbol_index>,BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, shm_string, m_symbol), Less>, bmi::ordered_non_unique <bmi::tag<last_update_time_index>, BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, int, m_last_update_time), Less> >, bip::managed_shared_memory::allocator<tickerUpdateInfo>::type > ticker_update_info_set; class tickerInfoMangerImplementation { public: tickerInfoMangerImplementation( const sharedMemoryNameT & name ); bool put_records( const tickerUpdateInfoT & record ); int get_active_ticker_count( const thresholdT seconds ); void print_contents(); bip::managed_shared_memory& get_managed_memory_segment() { return m_managed_memory_segment; } private: const sharedMemoryNameT m_name; bip::managed_shared_memory m_managed_memory_segment; ticker_update_info_set *p_ticker_info_set; }; #endif 

Cpp file

 #include <TickerInfoMangerImplementation.h> #include <boost/interprocess/managed_shared_memory.hpp> #include <iostream> #include "basic_time.h" using namespace boost::interprocess; tickerInfoMangerImplementation::tickerInfoMangerImplementation( const sharedMemoryNameT & name ): m_name(name), m_managed_memory_segment( open_or_create, "test", 65536 ) { p_ticker_info_set = m_managed_memory_segment.find_or_construct<ticker_update_info_set> ("SetOfTickerUpdateInformation") //Container name in shared memory ( ticker_update_info_set::ctor_args_list() , m_managed_memory_segment.get_allocator<tickerUpdateInfoT>()); //Ctor parameters } bool tickerInfoMangerImplementation::put_records( const tickerUpdateInfoT & record ) { std::pair<ticker_update_info_set::iterator, bool> result_pair = p_ticker_info_set->insert( record ); if( result_pair.second ) { return result_pair.second; } typedef ticker_update_info_set::index<symbol_index>::type ticker_update_info_set_by_symbol; ticker_update_info_set_by_symbol & sym_index = (*p_ticker_info_set).get<symbol_index>(); ticker_update_info_set_by_symbol::iterator it = sym_index.find( record.m_symbol ); tickerUpdateInfoT ticker_info = *it; ticker_info.m_last_update_time = record.m_last_update_time; return sym_index.replace( it, ticker_info ); } int tickerInfoMangerImplementation::calculate_historical_time_using_threshold( const thresholdT seconds ) { basic_time::Secs_t seconds( threshold ); basic_time tick_time; tick_time -= seconds; return ( tick_time.fullTime() ); } int tickerInfoMangerImplementation::get_active_ticker_count( const thresholdT seconds, std::string key ) { typedef ticker_update_info_set::index<id_index>::type ticker_update_info_set_by_id; ticker_update_info_set_by_id & id_index = (*p_ticker_info_set).get<id_index>(); int tick_time = calculate_historical_time_using_threshold( seconds ); //Here I would like to find the key //Based on that key I would like to fetch all the symbols which have updated after a certain time(using lower bound) std::copy( it, time_index.end(), std::ostream_iterator<tickerUpdateInfoT>(std::cout) ); } void tickerInfoMangerImplementation::print_contents() { const ticker_update_info_set::nth_index<1>::type& name_index = (*p_ticker_info_set).get<1>(); std::copy( name_index.begin(), name_index.end(), std::ostream_iterator<tickerUpdateInfoT>(std::cout) ); } std::ostream& operator<<(std::ostream& os, const tickerUpdateInfoT & obj) { os << obj.m_id << " "; os << obj.m_symbol << " "; os << obj.m_last_update_time << " " << "\n"; return os; }; 

The record structure that I would insert in boost multi index

 #ifndef __TICKER_INFO__ #define __TICKER_INFO__ #include <boost/interprocess/managed_shared_memory.hpp> #include <boost/interprocess/allocators/allocator.hpp> #include <boost/interprocess/containers/string.hpp> typedef boost::interprocess::managed_shared_memory::allocator<char>::type char_allocator; typedef boost::interprocess::basic_string<char, std::char_traits<char>, char_allocator> shm_string; //Data to insert in shared memory typedef struct tickerUpdateInfo { shm_string m_id; shm_string m_symbol; int m_last_update_time; tickerUpdateInfo( const char * id, const char *symbol, int last_update_time, const char_allocator &a) : m_id( id, a), m_symbol( symbol, a), m_last_update_time( last_update_time) { } tickerUpdateInfo& operator=(const tickerUpdateInfo& other) { if (this != &other) { m_last_update_time = other.m_last_update_time; } return *this; } } tickerUpdateInfoT; #endif 

Now, in the get_active_ticker_count () function, I want to specify a key, such as CBT.151.5.T.FEED, and it should return:

  S1 t1 s2 t2 s3 t3 s4 t4 s5 t1 

Suppose that t1> t2> t3> t4, then I would like to find such sets where times are more than t3, and also want to find the number of such characters. How can I continue the same thing, I was able to insert, but I was stuck with the search part. Please, help!

+1
source share
1 answer

I have simplified your (ridiculously complex) model for:

 enum TimePoints { // Lets assume t1 > t2 > t3 > t4 t1 = 100, t2 = 80, t3 = 70, t4 = 20, }; using IdType = std::string; using Symbol = std::string; using TimeT = unsigned int; struct tickerUpdateInfo { IdType m_id; Symbol m_symbol; TimeT m_last_update_time; friend std::ostream& operator<<(std::ostream& os, tickerUpdateInfo const& tui) { return os << "T[" << tui.m_id << ",\t" << tui.m_symbol << ",\t" << tui.m_last_update_time << "]"; } } static const data[] = { { "CBT.151.5.T.FEED", "S1", t1 }, { "CBT.151.5.T.FEED", "s2", t2 }, { "CBT.151.5.T.FEED", "s3", t3 }, { "CBT.151.5.T.FEED", "s4", t4 }, { "CBT.151.5.T.FEED", "s5", t1 }, { "CBT.151.8.T.FEED", "s7", t1 }, { "CBT.151.5.Q.FEED", "s8", t3 }, }; 

There. We can work with it. You need an index based primarily on time, but you can clarify for the character / identifier later:

 typedef bmi::multi_index_container<tickerUpdateInfo, bmi::indexed_by< bmi::ordered_non_unique<bmi::tag<struct most_active_index>, bmi::composite_key<tickerUpdateInfo, BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, TimeT, m_last_update_time), BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, Symbol, m_symbol), BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, IdType, m_id) > > > > ticker_update_info_set; 

For our implementation, we don’t even need to use secondary key components, we can just write

 std::map<Symbol, size_t> activity_histo(ticker_update_info_set const& tuis, TimeT since) { std::map<Symbol, size_t> histo; auto const& index = tuis.get<most_active_index>(); auto lb = index.upper_bound(since); // for greater-than-inclusive use lower_bound for (auto& rec : boost::make_iterator_range(lb, index.end())) histo[rec.m_symbol]++; return histo; } 

See Live On Coliru .

Now, if volumes become large, you may be tempted to optimize the bit using the secondary index component:

 std::map<Symbol, size_t> activity_histo_ex(ticker_update_info_set const& tuis, TimeT since) { std::map<Symbol, size_t> histo; auto const& index = tuis.get<most_active_index>(); for (auto lb = index.upper_bound(since), end = tuis.end(); lb != end;) // for greater-than-inclusive use lower_bound { auto ub = index.upper_bound(boost::make_tuple(lb->m_last_update_time, lb->m_symbol)); histo[lb->m_symbol] += std::distance(lb, ub); lb = ub; } return histo; } 

I'm not sure if this will be a faster approach (your profiler will know). See Live On Coliru .

Rethink the design?

TBH this whole thing with multiple indexes is likely to slow you down due to suboptimal insertion time and lack of link locality when re-writing.

I suggest a look

  • single flat_multimap ordered by update time
  • or even (linear ring buffer order (fixed size) in time. This will make a lot of sense, since you are likely to receive events in an increasing temporal order, so you can just keep adding to the end (and wrap when the history window full) This immediately eliminates all redistribution needs (given that you select the appropriate maximum capacity for ringbuffer), and also provide optimal performance for prefetching the cache through the statistics list.

The second approach should really get some merit as soon as you implement the ringbuffer using Boost Lockfree spsc_queue . What for? Since you can place it in shared memory :

IPC sync with shared memory (no lock)


ΒΉ complexity would be justified if your code were isolated. Unfortunately, this was not (at all). I had to trim it to make it work. This was, obviously, after deleting all line numbers :)

+7
source

Source: https://habr.com/ru/post/1205097/


All Articles