Consider the following table of data examples,
dt <- data.table(src = LETTERS[1:10], dst = LETTERS[10:1], src1 = letters[15:24], dst1 = letters[24:15]) #which looks like, # src dst src1 dst1 # 1: AJ ox # 2: BI pw # 3: CH qv # 4: DG ru # 5: EF st # 6: FE ts # 7: GD ur # 8: HC vq # 9: IB wp #10: JA xo
The first goal is to arrange it on the basis of the inverse role pair elements (src - dst and src1 - dst1), which can be achieved as follows to create 5 'pairs:
dt[, key := paste0(pmin(src, dst), pmax(src, dst), pmin(src1, dst1), pmax(src1, dst1))][order(key)] # src dst src1 dst1 key # 1: AJ ox AJox # 2: JA xo AJox # 3: BI pw BIpw # 4: IB wp BIpw # 5: CH qv CHqv # 6: HC vq CHqv # 7: DG ru DGru # 8: GD ur DGru # 9: EF st EFst #10: FE ts EFst
However, in real life there can be at least one line that does not have a pair ("incomplete stream"). So the real life example is
# cdatetime srcaddr dstaddr srcport dstport key totals time_diff # 1: 2017-05-12 14:58:32 IP_1 IP_2 54793 8080 182808054793 3 NA # 2: 2017-05-12 14:58:32 IP_2 IP_1 8080 54793 182808054793 3 0 # 3: 2017-05-17 08:37:16 IP_1 IP_2 54793 8080 182808054793 3 409124 # 4: 2017-05-11 08:12:28 IP_1 IP_2 54813 8080 182808054813 3 NA # 5: 2017-05-11 08:12:28 IP_2 IP_1 8080 54813 182808054813 3 0 # 6: 2017-05-17 08:37:16 IP_1 IP_2 54813 8080 182808054813 3 519888 # 7: 2017-05-02 06:51:16 IP_1 IP_2 50794 8080 182808050794 5 NA # 8: 2017-05-02 06:51:16 IP_2 IP_1 8080 50794 182808050794 5 0 # 9: 2017-05-08 06:57:08 IP_1 IP_2 50794 8080 182808050794 5 518752 #10: 2017-05-11 06:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741 #11: 2017-05-11 06:32:49 IP_2 IP_1 8080 50794 182808050794 5 0 #12: 2017-05-04 06:52:05 IP_1 IP_2 51896 8080 182808051896 5 NA #13: 2017-05-04 06:52:05 IP_2 IP_1 8080 51896 182808051896 5 0 #14: 2017-05-04 10:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621 #15: 2017-05-04 10:22:26 IP_2 IP_1 8080 51896 182808051896 5 0 #16: 2017-05-08 07:22:47 IP_1 IP_2 51896 8080 182808051896 5 334821 #17: 2017-05-15 05:56:00 IP_1 IP_2 62744 162 17016262744 3 NA #18: 2017-05-17 10:41:00 IP_1 IP_2 62744 162 17016262744 3 189900 #19: 2017-05-18 09:31:00 IP_1 IP_2 62744 162 17016262744 3 82200
The second goal is to remove those incomplete threads. Now, to identify these "incomplete flows", we calculate the time difference between them and set 30 as a threshold. Here's the hard part; If we have only 2 lines and they are at a distance of more than 30 seconds, and then filter both of them for 3 lines (for a specific key), then one with a time difference> 30 should go - or if two of them have > 30 in seconds, delete all 3. However , when we have 4 or more lines, then we need to drop one with a time difference> 30 , which has no other pair with <lt; 30 . From the above table, lines 3, 6, 9, 16, 17, 18, 19 should be removed depending on the fact that they are at a distance of more than 30 seconds from each other. By looking at cdatetime , you will know which ones are full streams.
The expected result will be
# cdatetime srcaddr dstaddr srcport dstport key totals time_diff # 1: 2017-05-12 14:58:32 IP_1 IP_2 54793 8080 182808054793 3 NA # 2: 2017-05-12 14:58:32 IP_2 IP_1 8080 54793 182808054793 3 0 # 3: 2017-05-11 08:12:28 IP_1 IP_2 54813 8080 182808054813 3 NA # 4: 2017-05-11 08:12:28 IP_2 IP_1 8080 54813 182808054813 3 0 # 5: 2017-05-02 06:51:16 IP_1 IP_2 50794 8080 182808050794 5 NA # 6: 2017-05-02 06:51:16 IP_2 IP_1 8080 50794 182808050794 5 0 # 7: 2017-05-11 06:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741 # 8: 2017-05-11 06:32:49 IP_2 IP_1 8080 50794 182808050794 5 0 # 9: 2017-05-04 06:52:05 IP_1 IP_2 51896 8080 182808051896 5 NA #10: 2017-05-04 06:52:05 IP_2 IP_1 8080 51896 182808051896 5 0 #11: 2017-05-04 10:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621 #12: 2017-05-04 10:22:26 IP_2 IP_1 8080 51896 182808051896 5 0
REAL LIFE EXAMPLES DATA ABOVE
structure(list(cdatetime = structure(c(1494590312, 1494590312, 1494999436, 1494479548, 1494479548, 1494999436, 1493697076, 1493697076, 1494215828, 1494473569, 1494473569, 1493869925, 1493869925, 1493882546, 1493882546, 1494217367, 1494816960, 1495006860, 1495089060), class = c("POSIXct", "POSIXt"), tzone = ""), srcaddr = structure(c(1L, 2L, 1L, 1L, 2L, 1L, 1L, 2L, 1L, 1L, 2L, 1L, 2L, 1L, 2L, 1L, 1L, 1L, 1L), .Label = c("IP_1", "IP_2"), class = "factor"), dstaddr = structure(c(2L, 1L, 2L, 2L, 1L, 2L, 2L, 1L, 2L, 2L, 1L, 2L, 1L, 2L, 1L, 2L, 2L, 2L, 2L ), .Label = c("IP_1", "IP_2"), class = "factor"), srcport = c(54793L, 8080L, 54793L, 54813L, 8080L, 54813L, 50794L, 8080L, 50794L, 50794L, 8080L, 51896L, 8080L, 51896L, 8080L, 51896L, 62744L, 62744L, 62744L), dstport = c(8080L, 54793L, 8080L, 8080L, 54813L, 8080L, 8080L, 50794L, 8080L, 8080L, 50794L, 8080L, 51896L, 8080L, 51896L, 8080L, 162L, 162L, 162L), key = c(182808054793, 182808054793, 182808054793, 182808054813, 182808054813, 182808054813, 182808050794, 182808050794, 182808050794, 182808050794, 182808050794, 182808051896, 182808051896, 182808051896, 182808051896, 182808051896, 17016262744, 17016262744, 17016262744), totals = c(3L, 3L, 3L, 3L, 3L, 3L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 3L, 3L, 3L), time_diff = c(NA, 0, 409124, NA, 0, 519888, NA, 0, 518752, 257741, 0, NA, 0, 12621, 0, 334821, NA, 189900, 82200)), .Names = c("cdatetime", "srcaddr", "dstaddr", "srcport", "dstport", "key", "totals", "time_diff" ), row.names = c(NA, -19L), class = c("data.table", "data.frame" ))
All of the above will work on a database with approx. 180M lines, so efficiency is the key word here.