How to detect that a package of related events will be executed, and then emit a single event

My first question! I really want to ask him correctly, so please help me improve it if I could ask for it better.

Here is the only question I found that seemed remotely connected, but I couldn’t figure out how to relate it to what I was trying to do (their question is JQuery-specific, mine is Node.JS specific-ish [although I found the browser version EventEmitter and was able to check in the browser]): Run the function once for the event package using jQuery


Question

I have a process that, as I know, will generate a burst of events over a period of time.

To simulate this process, I wrote this code:

/*******************************************************/ /* This part taken directly from */ /* https://nodejs.org/api/events.html */ /* (with addition of "burstID") */ /* */ /* */ /* */ const EventEmitter = require('events'); /* */ /* */ /* */ /* */ class MyEmitter extends EventEmitter {} /* */ /* */ /* */ /* */ const myEmitter = new MyEmitter(); /* */ /* */ myEmitter.on('event', (burstID) => { /* */ /* */ console.log('an event occurred!', burstID); /* */ /* */ }); /* */ /* */ /* */ /*******************************************************/ const millisecondsToSustainBurst = 3000 ; const millisecondsBetweenPossibleEventEmissions = 200 ; const millisecondsUntilStartNextBurst = 5000 ; const millisecondsUntilNoMoreBursts = 23000 ; const now = new Date() ; console.log('Time now: ' + now + '; should run until about ' + new Date(now.getTime() + millisecondsUntilNoMoreBursts)) ; const doRandomEmitBurst = (startTimestamp, millisecondsToSustainBurst, burstID) => { if (Math.random() > 0.5) myEmitter.emit('event', burstID) ; if ( !((new Date()) - startTimestamp > millisecondsToSustainBurst) ) setTimeout(() => doRandomEmitBurst(startTimestamp, millisecondsToSustainBurst, burstID), millisecondsBetweenPossibleEventEmissions) ; } const doRegularRandomBursts = (startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback) => { if ( !((new Date()) - startTimestamp > millisecondsUntilNoMoreBursts) ) { const now = new Date() ; console.log('Time now: ' + now + '; starting random-event burst which will run for ' + (millisecondsToSustainBurst/1000) + ' seconds. ') ; setTimeout(() => doRegularRandomBursts(startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback), millisecondsUntilStartNextBurst) ; doRandomEmitBurst(new Date(), millisecondsToSustainBurst, 'burstThatStartedAt' + now.getHours() + 'h' + now.getMinutes() + 'm' + now.getSeconds() + 's') ; } else callback() ; } doRegularRandomBursts(new Date(), millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, () => console.log('Done at ' + (new Date()))) ; const myBurstDetectedEmitter = new MyEmitter() ; // NOW, what do I do HERE to say: // I've seen a number of events occur in a 5-second period // Now they've stopped // Therefore I'm going to emit a different kind of event 

Now let's say that I want to listen to these surges.

I want to make sure that the package has ended before taking further action.

How can I do it?


What I tried so far

To start, I can create a global "var" (yuck-I'd to avoid permutations) as follows:

 var timeLastUpdated = {} ; 

... and then...

 function keepCheckingTimeLastUpdated(keyForUpdateCheck, callback) { const timestampToCheckInOneSecond = (typeof timeLastUpdated[keyForUpdateCheck] !== 'undefined' ? timeLastUpdated[keyForUpdateCheck] : (new Date())) ; setTimeout(() => { console.log( 'checking if modifications to "' + keyForUpdateCheck + '" have occurred since ' + timestampToCheckInOneSecond ) ; if (timeLastUpdated[keyForUpdateCheck] === timestampToCheckInOneSecond) { delete timeLastUpdated[keyForUpdateCheck] ; callback() ; } else keepCheckingTimeLastUpdated(keyForUpdateCheck, callback) ; }, 5000) ; } const makeNotificationHandler = () => (keyForUpdateCheck) => { const timeNow = new Date() ; if (typeof timeLastUpdated[keyForUpdateCheck] === 'undefined') keepCheckingTimeLastUpdated(keyForUpdateCheck, () => console.log(keyForUpdateCheck + ' changed')) ; timeLastUpdated[keyForUpdateCheck] = timeNow ; } ; myEmitter.on('event', makeNotificationHandler()) ; 

It just looks like an anti-pattern (I hope that I use this term correctly). My gut says that having a global object is the wrong approach here, and that there is a more functional programming solution.


FOR INTERESTED ONLY:

(feel free to ignore the answers to the questions)

Added complication: in my example, the code “burstID” would never be the same, but in the real world it could be. I would like to wait until a certain amount of time has passed since the last appearance of "burstID" to find out if there really has been a spike in the changes.

For context, in a real application, I configure "LISTEN" in the PostGres database using node-postgres . "burstID" is the primary key in one table and is also used as a foreign key in several other tables. I listen to all the tables that use the shared key, and the message I return contains that key.

+5
source share
1 answer

Answering my question using the comments above.

Thank you very much!" to Scott Soyet for his help and support.


I decided to create something that I call "Accumulomator": an automatic battery. Messages are forwarded by these batteries, which I think are people in a warehouse with a hopper and a stopwatch.

As soon as an Accumulomator instance is created, he begins to look at the stopwatch. Every time the stopwatch ends, Accumulomator looks to see if it has the same bunch of messages as the last time. If it is the same as last time, Accumulomator packs its message bunker, gives it to the warehouse and cleans it somewhere in the sunlight.

The full updated code is given below. I welcome any changes to improve the code.


 // To test in a browser, use: // https://github.com/Olical/EventEmitter // …in place of const EventEmitter = require('events'); /*******************************************************/ /* This part taken directly from */ /* https://nodejs.org/api/events.html */ /* (with addition of "burstID") */ /* */ /* */ /* */ const EventEmitter = require('events'); /* */ /* */ /* */ /* */ class MyEmitter extends EventEmitter {} /* */ /* */ /* */ /* */ const myEmitter = new MyEmitter(); /* */ /* */ myEmitter.on('event', (burstID) => { /* */ /* console.log('an event occurred!', burstID); */ /* */ }); /* */ /* */ /* */ /*******************************************************/ const millisecondsToSustainBurst = 3000 ; const millisecondsBetweenPossibleEventEmissions = 200 ; const millisecondsUntilStartNextBurst = 5000 ; const millisecondsUntilNoMoreBursts = 23000 ; const now = new Date() ; console.log('Bursts starting. Time now: ' + now + '; should run until about ' + new Date(now.getTime() + millisecondsUntilNoMoreBursts)) ; const doRandomEmitBurst = (startTimestamp, millisecondsToSustainBurst, burstID) => { if (Math.random() > 0.5) myEmitter.emit('event', burstID) ; if ( !((new Date()) - startTimestamp > millisecondsToSustainBurst) ) setTimeout(() => doRandomEmitBurst(startTimestamp, millisecondsToSustainBurst, burstID), millisecondsBetweenPossibleEventEmissions) ; } const doRegularRandomBursts = (startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback) => { if ( !((new Date()) - startTimestamp > millisecondsUntilNoMoreBursts) ) { const now = new Date() ; console.log('Time now: ' + now + '; starting random-event burst which will run for ' + (millisecondsToSustainBurst/1000) + ' seconds. ') ; setTimeout(() => doRegularRandomBursts(startTimestamp, millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, callback), millisecondsUntilStartNextBurst) ; doRandomEmitBurst(new Date(), millisecondsToSustainBurst, 'burstThatStartedAt' + now.getHours() + 'h' + now.getMinutes() + 'm' + now.getSeconds() + 's') ; } else callback() ; } doRegularRandomBursts(new Date(), millisecondsUntilStartNextBurst, millisecondsUntilNoMoreBursts, () => console.log('Done at ' + (new Date()))) ; const makeAccumulomator = (config) => { if (typeof config !== 'object') throw new Error('Must specify configuration object.') ; if (typeof config.callback !== 'function') throw new Error('Must specify callback function for when the end of new messages is reached.') ; if (typeof config.millisecondsBetweenChecks !== 'number') throw new Error('Must specify milliseconds between checks.') ; if (Number.isInteger(config.millisecondsBetweenChecks) === false) throw new Error('Must specify milliseconds between checks as an integer.') ; if (typeof config.onStop !== 'function' && typeof config.onStop !== 'undefined') throw new Error('If defined at all, onStop must be a function.') ; const accumulomator = {} ; var accumulatedMessages = [] ; var stop = false ; const keepCheckingTimeLastUpdated = (callback) => { const timestampToCheckInOneSecond = (accumulatedMessages.length > 0 ? accumulatedMessages[accumulatedMessages.length - 1].timestamp : (new Date())) ; setTimeout(() => { if (stop) { if (typeof config.onStop === 'function') config.onStop() ; } else if (accumulatedMessages.length < 1) keepCheckingTimeLastUpdated(callback) ; else if (accumulatedMessages[accumulatedMessages.length - 1].timestamp === timestampToCheckInOneSecond) { stop = true ; callback() ; } else keepCheckingTimeLastUpdated(callback) ; }, config.millisecondsBetweenChecks) ; } ; keepCheckingTimeLastUpdated(config.callback) ; accumulomator.receiveMessage = (message) => accumulatedMessages.push({ message: message, timestamp: (new Date())}) ; accumulomator.stopOnNextCheck = () => { if (stop === true) throw new Error('Accumulomator is already stopped.') ; else stop = true ; } accumulomator.isActive = () => stop === false ; accumulomator.getAccumulatedMessages = () => accumulatedMessages ; return accumulomator ; } const makeAccumulomatorWarehouse = (config) => { if (typeof config !== 'object') throw new Error('Must specify configuration object.') ; if (typeof config.callback !== 'function') throw new Error('Must specify callback function.') ; if (typeof config.millisecondsBetweenChecks !== 'number') throw new Error('Must specify milliseconds between checks.') ; if (Number.isInteger(config.millisecondsBetweenChecks) === false) throw new Error('Must specify milliseconds between checks as an integer.') ; if (typeof config.messageRouter !== 'function') throw new Error('Must specify message router function.') ; if (typeof config.sendCallbackAccumulatedMessages !== 'undefined') if (typeof config.sendCallbackAccumulatedMessages !== 'boolean') throw new Error('Must specify whether or not to send callback accumulated messages as a boolean (if unspecified, accumulated messages will not be included).') ; if (typeof config.onAccumulomatorStop !== 'function' && typeof config.onAccumulomatorStop !== 'undefined') throw new Error('If defined at all, onAccumulomatorStop must be a function.') ; var sendCallbackAccumulatedMessages = false ; if (typeof config.sendCallbackAccumulatedMessages !== 'undefined') sendCallbackAccumulatedMessages = config.sendCallbackAccumulatedMessages ; const accumulomatorWarehouse = {} ; const accumulomators = {} ; var warehouseIsShuttingDown = false ; accumulomatorWarehouse.receiveMessage = (message) => { accumulomatorName = config.messageRouter(message) ; if (typeof accumulomatorName !== 'string') throw new Error('The value returned from messageRouter must be a string with a unique identifier.') ; if (typeof accumulomators[accumulomatorName] === 'object') if (accumulomators[accumulomatorName].isActive() === false) delete accumulomators[accumulomatorName] ; if (typeof accumulomators[accumulomatorName] === 'undefined') { if (warehouseIsShuttingDown === false) accumulomators[accumulomatorName] = makeAccumulomator({ callback: () => config.callback( (() => { objectToReturn = {} ; objectToReturn.key = accumulomatorName ; if (sendCallbackAccumulatedMessages === true) objectToReturn.messages = accumulomators[accumulomatorName].getAccumulatedMessages() ; return objectToReturn ; })() ) , millisecondsBetweenChecks: config.millisecondsBetweenChecks , onStop: () => config.onAccumulomatorStop(accumulomatorName) }) ; } if (typeof accumulomators[accumulomatorName] === 'object') accumulomators[accumulomatorName].receiveMessage(message) ; } periodicallyRetireAccumulomators = () => { Object.keys(accumulomators).forEach((accumulomator) => { if (accumulomators[accumulomator].isActive() === false) delete accumulomators[accumulomator] ; }) ; if (!(warehouseIsShuttingDown === true && Object.keys(accumulomators).length === 0)) setTimeout(periodicallyRetireAccumulomators, 10000) ; } ; periodicallyRetireAccumulomators() ; accumulomatorWarehouse.shutDownWarehouse = () => { Object.keys(accumulomators).forEach((accumulomator) => { if (accumulomators[accumulomator].isActive() === true) accumulomators[accumulomator].stopOnNextCheck() ; }) ; warehouseIsShuttingDown = true ; } return accumulomatorWarehouse ; } myAccumulomatorWarehouse = makeAccumulomatorWarehouse({ callback: (accumulomatorWarehousePackage) => console.log('Done with accumulomator.', accumulomatorWarehousePackage.key, accumulomatorWarehousePackage.messages) , millisecondsBetweenChecks: 2000 , messageRouter: (message) => message , sendCallbackAccumulatedMessages: true , onAccumulomatorStop: (accumulomatorName) => console.log('Accumulomator for ' + accumulomatorName + ' manually stopped') }) ; myEmitter.on('event', myAccumulomatorWarehouse.receiveMessage) ; setTimeout(myAccumulomatorWarehouse.shutDownWarehouse, 10000) ; 
0
source

Source: https://habr.com/ru/post/1263521/


All Articles