Quick sort / filter based on variable values

Consider the following table of data examples,

dt <- data.table(src = LETTERS[1:10], dst = LETTERS[10:1], src1 = letters[15:24], dst1 = letters[24:15]) #which looks like, # src dst src1 dst1 # 1: AJ ox # 2: BI pw # 3: CH qv # 4: DG ru # 5: EF st # 6: FE ts # 7: GD ur # 8: HC vq # 9: IB wp #10: JA xo 

The first goal is to arrange it on the basis of the inverse role pair elements (src - dst and src1 - dst1), which can be achieved as follows to create 5 'pairs:

 dt[, key := paste0(pmin(src, dst), pmax(src, dst), pmin(src1, dst1), pmax(src1, dst1))][order(key)] # src dst src1 dst1 key # 1: AJ ox AJox # 2: JA xo AJox # 3: BI pw BIpw # 4: IB wp BIpw # 5: CH qv CHqv # 6: HC vq CHqv # 7: DG ru DGru # 8: GD ur DGru # 9: EF st EFst #10: FE ts EFst 

However, in real life there can be at least one line that does not have a pair ("incomplete stream"). So the real life example is

 # cdatetime srcaddr dstaddr srcport dstport key totals time_diff # 1: 2017-05-12 14:58:32 IP_1 IP_2 54793 8080 182808054793 3 NA # 2: 2017-05-12 14:58:32 IP_2 IP_1 8080 54793 182808054793 3 0 # 3: 2017-05-17 08:37:16 IP_1 IP_2 54793 8080 182808054793 3 409124 # 4: 2017-05-11 08:12:28 IP_1 IP_2 54813 8080 182808054813 3 NA # 5: 2017-05-11 08:12:28 IP_2 IP_1 8080 54813 182808054813 3 0 # 6: 2017-05-17 08:37:16 IP_1 IP_2 54813 8080 182808054813 3 519888 # 7: 2017-05-02 06:51:16 IP_1 IP_2 50794 8080 182808050794 5 NA # 8: 2017-05-02 06:51:16 IP_2 IP_1 8080 50794 182808050794 5 0 # 9: 2017-05-08 06:57:08 IP_1 IP_2 50794 8080 182808050794 5 518752 #10: 2017-05-11 06:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741 #11: 2017-05-11 06:32:49 IP_2 IP_1 8080 50794 182808050794 5 0 #12: 2017-05-04 06:52:05 IP_1 IP_2 51896 8080 182808051896 5 NA #13: 2017-05-04 06:52:05 IP_2 IP_1 8080 51896 182808051896 5 0 #14: 2017-05-04 10:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621 #15: 2017-05-04 10:22:26 IP_2 IP_1 8080 51896 182808051896 5 0 #16: 2017-05-08 07:22:47 IP_1 IP_2 51896 8080 182808051896 5 334821 #17: 2017-05-15 05:56:00 IP_1 IP_2 62744 162 17016262744 3 NA #18: 2017-05-17 10:41:00 IP_1 IP_2 62744 162 17016262744 3 189900 #19: 2017-05-18 09:31:00 IP_1 IP_2 62744 162 17016262744 3 82200 

The second goal is to remove those incomplete threads. Now, to identify these "incomplete flows", we calculate the time difference between them and set 30 as a threshold. Here's the hard part; If we have only 2 lines and they are at a distance of more than 30 seconds, and then filter both of them for 3 lines (for a specific key), then one with a time difference> 30 should go - or if two of them have > 30 in seconds, delete all 3. However , when we have 4 or more lines, then we need to drop one with a time difference> 30 , which has no other pair with <lt; 30 . From the above table, lines 3, 6, 9, 16, 17, 18, 19 should be removed depending on the fact that they are at a distance of more than 30 seconds from each other. By looking at cdatetime , you will know which ones are full streams.

The expected result will be

  # cdatetime srcaddr dstaddr srcport dstport key totals time_diff # 1: 2017-05-12 14:58:32 IP_1 IP_2 54793 8080 182808054793 3 NA # 2: 2017-05-12 14:58:32 IP_2 IP_1 8080 54793 182808054793 3 0 # 3: 2017-05-11 08:12:28 IP_1 IP_2 54813 8080 182808054813 3 NA # 4: 2017-05-11 08:12:28 IP_2 IP_1 8080 54813 182808054813 3 0 # 5: 2017-05-02 06:51:16 IP_1 IP_2 50794 8080 182808050794 5 NA # 6: 2017-05-02 06:51:16 IP_2 IP_1 8080 50794 182808050794 5 0 # 7: 2017-05-11 06:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741 # 8: 2017-05-11 06:32:49 IP_2 IP_1 8080 50794 182808050794 5 0 # 9: 2017-05-04 06:52:05 IP_1 IP_2 51896 8080 182808051896 5 NA #10: 2017-05-04 06:52:05 IP_2 IP_1 8080 51896 182808051896 5 0 #11: 2017-05-04 10:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621 #12: 2017-05-04 10:22:26 IP_2 IP_1 8080 51896 182808051896 5 0 

REAL LIFE EXAMPLES DATA ABOVE

 structure(list(cdatetime = structure(c(1494590312, 1494590312, 1494999436, 1494479548, 1494479548, 1494999436, 1493697076, 1493697076, 1494215828, 1494473569, 1494473569, 1493869925, 1493869925, 1493882546, 1493882546, 1494217367, 1494816960, 1495006860, 1495089060), class = c("POSIXct", "POSIXt"), tzone = ""), srcaddr = structure(c(1L, 2L, 1L, 1L, 2L, 1L, 1L, 2L, 1L, 1L, 2L, 1L, 2L, 1L, 2L, 1L, 1L, 1L, 1L), .Label = c("IP_1", "IP_2"), class = "factor"), dstaddr = structure(c(2L, 1L, 2L, 2L, 1L, 2L, 2L, 1L, 2L, 2L, 1L, 2L, 1L, 2L, 1L, 2L, 2L, 2L, 2L ), .Label = c("IP_1", "IP_2"), class = "factor"), srcport = c(54793L, 8080L, 54793L, 54813L, 8080L, 54813L, 50794L, 8080L, 50794L, 50794L, 8080L, 51896L, 8080L, 51896L, 8080L, 51896L, 62744L, 62744L, 62744L), dstport = c(8080L, 54793L, 8080L, 8080L, 54813L, 8080L, 8080L, 50794L, 8080L, 8080L, 50794L, 8080L, 51896L, 8080L, 51896L, 8080L, 162L, 162L, 162L), key = c(182808054793, 182808054793, 182808054793, 182808054813, 182808054813, 182808054813, 182808050794, 182808050794, 182808050794, 182808050794, 182808050794, 182808051896, 182808051896, 182808051896, 182808051896, 182808051896, 17016262744, 17016262744, 17016262744), totals = c(3L, 3L, 3L, 3L, 3L, 3L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 3L, 3L, 3L), time_diff = c(NA, 0, 409124, NA, 0, 519888, NA, 0, 518752, 257741, 0, NA, 0, 12621, 0, 334821, NA, 189900, 82200)), .Names = c("cdatetime", "srcaddr", "dstaddr", "srcport", "dstport", "key", "totals", "time_diff" ), row.names = c(NA, -19L), class = c("data.table", "data.frame" )) 

All of the above will work on a database with approx. 180M lines, so efficiency is the key word here.

+5
source share
2 answers

It is my suggestion to identify pairs of complete flows. (However, note the cautions at the end)

 library(data.table) # CRAN version 1.10.4 used # set keys to avoid using order() repeatedly setkey(DT, key, cdatetime) # compute time diff again, grouped by key, # using shift() with default "lag" type and a useful fill value DT[, time_diff := difftime(cdatetime, shift(cdatetime, fill = -Inf), units = "secs"), by = key] # now find the begin of a potentially new pair where time_diff > 30 secs # and count the jumps within each key group using cumsum() DT[, pair.id := cumsum(time_diff > 30), by = key] # count the number of partners within each supposedly pair DT[, count.pairs := .N, .(key, pair.id)] 

Note that in R, boolean values ​​can be forcibly bound to numeric:

 as.integer(FALSE) #[1] 0 as.integer(TRUE) #[1] 1 

So, each time cumsum() finds time_diff > 30 equal to TRUE , the pair.id counter pair.id counted by one. Otherwise, i.e. If the time difference is less than 30 seconds, the pair.id value remains unchanged. In this way, pairs or groups of events lying in a 30 second time window are identified.

Now DT got two extra columns (note that setkey() rows):

  cdatetime srcaddr dstaddr srcport dstport key totals time_diff pair.id count.pairs 1: 2017-05-15 04:56:00 IP_1 IP_2 62744 162 17016262744 3 Inf secs 1 1 2: 2017-05-17 09:41:00 IP_1 IP_2 62744 162 17016262744 3 189900 secs 2 1 3: 2017-05-18 08:31:00 IP_1 IP_2 62744 162 17016262744 3 82200 secs 3 1 4: 2017-05-02 05:51:16 IP_1 IP_2 50794 8080 182808050794 5 Inf secs 1 2 5: 2017-05-02 05:51:16 IP_2 IP_1 8080 50794 182808050794 5 0 secs 1 2 6: 2017-05-08 05:57:08 IP_1 IP_2 50794 8080 182808050794 5 518752 secs 2 1 7: 2017-05-11 05:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741 secs 3 2 8: 2017-05-11 05:32:49 IP_2 IP_1 8080 50794 182808050794 5 0 secs 3 2 9: 2017-05-04 05:52:05 IP_1 IP_2 51896 8080 182808051896 5 Inf secs 1 2 10: 2017-05-04 05:52:05 IP_2 IP_1 8080 51896 182808051896 5 0 secs 1 2 11: 2017-05-04 09:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621 secs 2 2 12: 2017-05-04 09:22:26 IP_2 IP_1 8080 51896 182808051896 5 0 secs 2 2 13: 2017-05-08 06:22:47 IP_1 IP_2 51896 8080 182808051896 5 334821 secs 3 1 14: 2017-05-12 13:58:32 IP_1 IP_2 54793 8080 182808054793 3 Inf secs 1 2 15: 2017-05-12 13:58:32 IP_2 IP_1 8080 54793 182808054793 3 0 secs 1 2 16: 2017-05-17 07:37:16 IP_1 IP_2 54793 8080 182808054793 3 409124 secs 2 1 17: 2017-05-11 07:12:28 IP_1 IP_2 54813 8080 182808054813 3 Inf secs 1 2 18: 2017-05-11 07:12:28 IP_2 IP_1 8080 54813 182808054813 3 0 secs 1 2 19: 2017-05-17 07:37:16 IP_1 IP_2 54813 8080 182808054813 3 519888 secs 2 1 

Now you need to decide which lines to keep. In this small sample, there are only groups with two members (pairs) or singles.

 DT[count.pairs > 1] 

shows only pairs

  cdatetime srcaddr dstaddr srcport dstport key totals time_diff pair.id count.pairs 1: 2017-05-02 05:51:16 IP_1 IP_2 50794 8080 182808050794 5 Inf secs 1 2 2: 2017-05-02 05:51:16 IP_2 IP_1 8080 50794 182808050794 5 0 secs 1 2 3: 2017-05-11 05:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741 secs 3 2 4: 2017-05-11 05:32:49 IP_2 IP_1 8080 50794 182808050794 5 0 secs 3 2 5: 2017-05-04 05:52:05 IP_1 IP_2 51896 8080 182808051896 5 Inf secs 1 2 6: 2017-05-04 05:52:05 IP_2 IP_1 8080 51896 182808051896 5 0 secs 1 2 7: 2017-05-04 09:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621 secs 2 2 8: 2017-05-04 09:22:26 IP_2 IP_1 8080 51896 182808051896 5 0 secs 2 2 9: 2017-05-12 13:58:32 IP_1 IP_2 54793 8080 182808054793 3 Inf secs 1 2 10: 2017-05-12 13:58:32 IP_2 IP_1 8080 54793 182808054793 3 0 secs 1 2 11: 2017-05-11 07:12:28 IP_1 IP_2 54813 8080 182808054813 3 Inf secs 1 2 12: 2017-05-11 07:12:28 IP_2 IP_1 8080 54813 182808054813 3 0 secs 1 2 

while

 DT[count.pairs <= 1] 

shows the singles to be deleted:

  cdatetime srcaddr dstaddr srcport dstport key totals time_diff pair.id count.pairs 1: 2017-05-15 04:56:00 IP_1 IP_2 62744 162 17016262744 3 Inf secs 1 1 2: 2017-05-17 09:41:00 IP_1 IP_2 62744 162 17016262744 3 189900 secs 2 1 3: 2017-05-18 08:31:00 IP_1 IP_2 62744 162 17016262744 3 82200 secs 3 1 4: 2017-05-08 05:57:08 IP_1 IP_2 50794 8080 182808050794 5 518752 secs 2 1 5: 2017-05-08 06:22:47 IP_1 IP_2 51896 8080 182808051896 5 334821 secs 3 1 6: 2017-05-17 07:37:16 IP_1 IP_2 54793 8080 182808054793 3 409124 secs 2 1 7: 2017-05-17 07:37:16 IP_1 IP_2 54813 8080 182808054813 3 519888 secs 2 1 

Caveat In production data, it may happen that more than two events with the same key fall into the 30 second time interval. There are several solutions to this problem:

  • Keep only exact pairs: DT[count.pairs == 2] ignoring all other cases.
  • Keep only pairs with an even number of partners: DT[count.pairs %% 2L == 0L] (although this may contain cases when the number of incoming connections is not equal to the number of outgoing connections)
  • and other parameters that require additional work, for example, separate counting of incoming and outgoing connections.
  • Finally, the time window of 30 seconds can be reduced.

Statistics can be created using

 DT[, .N, count.pairs] # count.pairs N #1: 1 7 #2: 2 12 

It would be interesting to see this for a production dataset.

+1
source

This seems to work:

 DT[, keep := (!is.na(time_diff) & time_diff < 30) | shift(time_diff, type="lead", fill = 999) < 30 , by=key] DT[(keep), !"keep"] cdatetime srcaddr dstaddr srcport dstport key totals time_diff 1: 2017-05-12 07:58:32 IP_1 IP_2 54793 8080 182808054793 3 NA 2: 2017-05-12 07:58:32 IP_2 IP_1 8080 54793 182808054793 3 0 3: 2017-05-11 01:12:28 IP_1 IP_2 54813 8080 182808054813 3 NA 4: 2017-05-11 01:12:28 IP_2 IP_1 8080 54813 182808054813 3 0 5: 2017-05-01 23:51:16 IP_1 IP_2 50794 8080 182808050794 5 NA 6: 2017-05-01 23:51:16 IP_2 IP_1 8080 50794 182808050794 5 0 7: 2017-05-10 23:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741 8: 2017-05-10 23:32:49 IP_2 IP_1 8080 50794 182808050794 5 0 9: 2017-05-03 23:52:05 IP_1 IP_2 51896 8080 182808051896 5 NA 10: 2017-05-03 23:52:05 IP_2 IP_1 8080 51896 182808051896 5 0 11: 2017-05-04 03:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621 12: 2017-05-04 03:22:26 IP_2 IP_1 8080 51896 182808051896 5 0 

There are many ways to do this faster, I think, but it's probably more important to find out if it does what the OP wants (given that the code is much simpler than the OP rules).

+1
source

Source: https://habr.com/ru/post/1268596/


All Articles