HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/.pm2/modules/pm2-logrotate/node_modules/deep-metrics/probes/mqtt-probe.js
/*******************************************************************************
 * Copyright 2015 IBM Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
'use strict';
var Probe = require('../lib/probe.js');
var aspect = require('../lib/aspect.js');
var request = require('../lib/request.js');
var util = require('util');
var am = require('../');

function MqttProbe() {
  Probe.call(this, 'mqtt');
}
util.inherits(MqttProbe, Probe);

MqttProbe.prototype.attach = function(name, target) {
  var that = this;
  if (name != 'mqtt') return target;
  target.__ddProbeAttached__ = true;

  aspect.after(target, 'connect', {}, function(target, methodName, methodArgs, context, client) {
    aspect.around(
      client,
      'publish',
      function(target, methodName, methodArgs, context) {
        that.metricsProbeStart(context, methodName, methodArgs);
        that.requestProbeStart(context, methodName, methodArgs);
        aspect.aroundCallback(methodArgs, context, function(target, args, context) {
          that.metricsProbeEnd(context, methodName, methodArgs);
          that.requestProbeEnd(context, methodName, methodArgs);
        });
      },
      function(target, methodName, methodArgs, context, rc) {
        if (aspect.findCallbackArg(methodArgs) == undefined) {
          that.metricsProbeEnd(context, methodName, methodArgs);
          that.requestProbeEnd(context, methodName, methodArgs);
        }
        return rc;
      }
    );

    var methods = ['on', 'addListener'];
    aspect.before(client, methods, function(target, methodName, methodArgs, context) {
      var eventName = 'message';
      if (methodArgs[0] !== eventName) return;
      if (aspect.findCallbackArg(methodArgs) != undefined) {
        aspect.aroundCallback(
          methodArgs,
          context,
          function(target, args, context) {
            that.metricsProbeStart(context, eventName, methodArgs);
            that.requestProbeStart(context, eventName, methodArgs);
          },
          function(target, methodArgs, context, rc) {
            that.metricsProbeEnd(context, eventName, methodArgs);
            that.requestProbeEnd(context, eventName, methodArgs);
            return rc;
          }
        );
      }
    });
    return client;
  });
  return target;
};

/*
 * Lightweight metrics probe for MQTT messaging
 *
 * These provide:
 *		time:		time event started
 *		method:		whether this was a received 'message' or a 'publish'
 *		topic:		the topic the message was received on
 *		qos:		the quality of service (QoS) for the message
 *		duration:	the time for the request to respond
 */
MqttProbe.prototype.metricsEnd = function(context, methodName, methodArgs) {
  if (context && context.timer) {
    context.timer.stop();
    // default to quality of service (qos) 0, as that's what the mqtt module does
    var qos = 0;
    if (methodArgs[2] && typeof methodArgs[2] !== 'function') {
      qos = methodArgs[2].qos;
    }
    am.emit('mqtt', {
      time: context.timer.startTimeMillis,
      method: methodName,
      topic: methodArgs[0],
      qos: qos,
      duration: context.timer.timeDelta,
    });
  }
};

/*
 * Heavyweight request probes for MQTT messages
 */
MqttProbe.prototype.requestStart = function(context, methodName, methodArgs) {
  if (methodName === 'message') {
    context.req = request.startRequest('mqtt', methodName, true, context.timer);
  } else {
    context.req = request.startRequest('mqtt', methodName, false, context.timer);
  }
};

MqttProbe.prototype.requestEnd = function(context, methodName, methodArgs) {
  if (context && context.req) {
    var qos = 0;
    if (methodArgs[2] && typeof methodArgs[2] !== 'function') {
      qos = methodArgs[2].qos;
    }
    context.req.stop({ topic: methodArgs[0], qos: qos });
  }
};

module.exports = MqttProbe;