Deep Into NodeJs

Node's Event-driven Architecture

Posted by 許敲敲 on April 5, 2021

Node’s Event-driven Architecture

refer from jscomplete.com, author Samer Buna

Callbacks, Promises, and Async/Await

The original way Node handled asynchronous calls is with callbacks. This was a long time ago, before JavaScript had native promises support and the async/await feature. We saw how asynchronous callbacks work with the event loop, but callbacks in the general definition are just functions that you pass to other functions, which, of course, is possible in JavaScript, because functions are first class objects. It’s important to understand here that callbacks do not indicate an asynchronous call in the code. For the details about each, you can refer the MDN or Callbacks, Promises and Async/Await

Event Emitter

The EventEmitter is a module that facilitates communication between objects in Node. EventEmitter is at the core of Node asynchronous event-driven architecture. Many of Node’s built-in modules inherit from EventEmitter. The concept is simple: emitter objects emit named events that cause listeners to be called. So an emitter object has two main features, emitting named events, and registering listener functions. To work with the EventEmitter, we just create a class that extends EventEmitter. Emitter objects are what we instantiate from EventEmitter-based classes. At any point in their lifecycle, we can use the emit function to emit any named event we want. Emitting an event is the signal that some condition has occurred. This condition is usually about a state change in the emitting object. We can add listener functions using the on method, and those listener functions will simply be executed every time the emitter object emits their associated named event.

Let’s take a look at an example. Class WithLog is an event emitter. It defines one instance function execute. Execute receives a task function, and wrap its execution with log statements. It fires events before and after the execution. To see the sequence of what will happen here, we register listeners on both named events and actually execute a sample task here. What I want you to notice about the output here is that it all happens synchronously, there is nothing async about this code. We get the Before executing line first, the begin named event causes the about to execute log line, then we execute, the end named event causes the done with execute line, and then we get the after execute line.

// sync event
const EventEmitter = require('events');

class WithLog extends EventEmitter {
  execute(taskFunc) {
    console.log('Before executing');
    this.emit('begin');
    taskFunc();
    this.emit('end');
    console.log('After executing');
  }
}

const withLog = new WithLog();

withLog.on('begin', () => console.log('About to execute'));
withLog.on('end', () => console.log('Done with execute'));

withLog.execute(() => console.log('*** Executing task ***'));

>node sync-events.js 
Before executing
About to execute      
*** Executing task ***
Done with execute     
After executing       

So just like plain-old callbacks, do not assume that events mean synchronous or asynchronous code. This is important, because say that the execute taksFunction was actually async, we can simulate that with a setTimeout call here instead of a direct function. Now, this line will be async, and the two lines after are not accurate anymore.

const EventEmitter = require('events');

class WithLog extends EventEmitter {
  execute(taskFunc) {
    console.log('Before executing');
    this.emit('begin');
    taskFunc();
    this.emit('end');
    console.log('After executing');
  }
}

const withLog = new WithLog();

withLog.on('begin', () => console.log('About to execute'));
withLog.on('end', () => console.log('Done with execute'));

withLog.execute(() => 
  setTimeout( 
    ()=> console.log('*** Executing task ***'),
  500
  )
);

>node sync-events.js 
Before executing
About to execute      
Done with execute     
After executing 
*** Executing task ***     

To emit an event after an asynchronous function is done, we’ll probably need to combine callbacks or promises with this event-based communication. One benefit for using events instead of regular callbacks is that we can react to the same signal multiple times by defining multiple listeners. To accomplish the same with callbacks, we have to write more logic for this inside the single available callback. Events are a great way for applications to allow multiple external plugins to build functionality on top of the application’s core. You can think of them as hook points to allow for customizing the story around a state change. Let’s convert this synchronous sample example into something asynchronous and a little bit more useful. The WithTime class executes an asyncFunction and reports the time that’s taken by that asyncFunction using console. time and console. timeEnd calls. It emits the right sequence of events before and after the execution. And also emits error/data events to work with the usual signal of asynchronous callback. We call it simply by flatting the arguments of an async method. And instead of handling data with callbacks, we can listen to the data event now. When we execute this code now, we get the right sequence of events as expected, and we get a reported time for the execution, which is helpful.

// async event
const fs = require('fs');
const EventEmitter = require('events');

class WithTime extends EventEmitter {
  execute(asyncFunc, ...args) {
    console.time('execute');
    this.emit('begin');
    asyncFunc(...args, (err, data) => {
      if (err) {
        return this.emit('error', err);
      }

      this.emit('data', data);
      console.timeEnd('execute');
      this.emit('end');
    });
  }
}

const withTime = new WithTime();

withTime.on('begin', () => console.log('About to execute'));
withTime.on('end', () => console.log('Done with execute'));

withTime.execute(fs.readFile, __filename);

>node async-events.js 
About to execute
execute: 8.616ms 
Done with execute

Arguments, Errors, and Order of Listeners

In the previous section, there were two events that were emitted with extra arguments. The error event is emitted with an error object, and the data event is emitted with a data object. We can use as many arguments as we want after the named event, and all these arguments will be available inside the listener functions. For example, to work with this data event, the listener function that we register will get access to the data argument that was passed to the emitted event. And that data is exactly what the read file callback exposes.

const fs = require('fs');
const EventEmitter = require('events');

class WithTime extends EventEmitter {
  execute(asyncFunc, ...args) {
    console.time('execute');
    asyncFunc(...args, (err, data) => {
      if (err) {
        return this.emit('error', err);
      }

      this.emit('data', data);
      console.timeEnd('execute');
    });
  }
}

const withTime = new WithTime();

withTime.on('data', (data) => {
  console.log(`Length: ${data.length}`);
});

withTime.execute(fs.readFile, __filename);

// output
// Length: 529
// execute: 8.370ms

The other special event was the error event. The error event is a special one, because if we don’t handle it with a listener, the node process will actually exit. Let me show you what I mean. If we have two calls to the execute method, and the first call is triggering an error, the node process is going to crash and exit. To prevent this behavior, we need to register a listener for the special error event. And if we do so, that error will be reported and both lines will be executed. So the node process did not crash and exit in that case. The other way to handle exceptions from emitted errors is to register a listener for the uncaught exception process event. This will essentially be the same, but remember the advice that on uncaught exception, we should just let the process exit anyway. So usually when you use the uncaught exception, we will do some kind of cleanup and then we’ll have the process exit. However, imagine that multiple error events happened. This means the uncaught exception listener will be triggered multiple times, which might be a problem for any cleanup code we are doing here. The eventemitter module expose a once method instead of an on. This once method means invoke the listener just once, not every time it happens. So this is a practical use case to use with the uncaught exception, because with the first uncaught exception we should start doing the cleanup and exit the process anyway.

 const fs = require('fs');
const EventEmitter = require('events');

class WithTime extends EventEmitter {
  execute(asyncFunc, ...args) {
    console.time('execute');
    asyncFunc(...args, (err, data) => {
      if (err) {
        return this.emit('error', err);
      }

      this.emit('data', data);
      console.timeEnd('execute');
    });
  }
}

const withTime = new WithTime();

withTime.on('data', (data) => {
  console.log(`Length: ${data.length}`);
});

process.once('uncaughtException', (err) => {
  console.log(err);
  // do some cleanup
  process.exit(1); // exit anyway
});

withTime.execute(fs.readFile, '');
withTime.execute(fs.readFile, __filename);


// // ********* output *********
// (node:5736) Warning: Label 'execute' already exists for console.time()
// [Error: ENOENT: no such file or directory, open ''] {
//   errno: -4058,
//   code: 'ENOENT',
//   syscall: 'open',
//   path: ''
// }

If we register multiple listeners for the same event, the invocation for those listeners will be in order. The first listener that we register is the first listener that gets invoked. In here we have two listeners, and the length listener was invoked before the characters listener. If you have a need to define a new listener, but have that listener invoked first, you can use the prepend listener method, and if you need to remove a listener you can use the remove listener method.

const fs = require('fs');
const EventEmitter = require('events');

class WithTime extends EventEmitter {
  execute(asyncFunc, ...args) {
    console.time('execute');
    asyncFunc(...args, (err, data) => {
      if (err) {
        return this.emit('error', err);
      }

      this.emit('data', data);
      console.timeEnd('execute');
    });
  }
}

const withTime = new WithTime();

// 
withTime.on('data', (data) => {
  console.log(`Length: ${data.length}`);
});

// 
withTime.prependListener('data', (data) => {
  console.log(`Charaters: ${data.toString().length}`);
});

// withTime.removeListener ...

withTime.execute(fs.readFile, __filename);

// ********* output*********
// Charaters: 694
// Length: 714
// execute: 8.723ms

Practical Example: Task List Manager

Let’s create a practical example using Node’s event emitter. Let’s create a simple task list manager. We’ll keep the structure simple. We’ll have a client file and a server file. The client emits a command event for the server, and the server emits a response event for the client. We’ll make it support four commands: help to display the list of commands, ls to list the current tasks, add to add a new task, and delete to delete a task.

// server.js
const EventEmitter = require('events');

class Server extends EventEmitter {
  constructor(client) {
    super();
    this.tasks = {};
    this.taskId = 1;
    process.nextTick(() => {
      this.emit(
        'response',
        'Type a command (help to list commands)'
      );
    });
    client.on('command', (command, args) => {
      switch (command) {
      case 'help':
      case 'add':
      case 'ls':
      case 'delete':
        this[command](args);
        break;
      default:
        this.emit('response', 'Unknown command...');
      }
    });
  }

  tasksString() {
    return Object.keys(this.tasks).map(key => {
      return `${key}: ${this.tasks[key]}`;
    }).join('\n');
  }

  help() {
    this.emit('response', `Available Commands:
  add task
  ls
  delete :id`
    );
  }
  add(args) {
    this.tasks[this.taskId] = args.join(' ');
    this.emit('response', `Added task ${this.taskId}`);
    this.taskId++;
  }
  ls() {
    this.emit('response', `Tasks:\n${this.tasksString()}`);
  }
  delete(args) {
    delete(this.tasks[args[0]]);
    this.emit('response', `Deleted task ${args[0]}`);
  }
}

module.exports = (client) => new Server(client);

// clent.js
const EventEmitter = require('events');
const readline = require('readline');

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout
});

const client = new EventEmitter();
const server = require('./server')(client);

server.on('response', (resp) => {
  process.stdout.write('\u001B[2J\u001B[0;0f');
  process.stdout.write(resp);
  process.stdout.write('\n\> ');
});

let command, args;
rl.on('line', (input) => {
  [command, ...args] = input.split(' ');
  client.emit('command', command, args);
});

Summary

Enjoy the demo!