File: //lib/python3/dist-packages/compose/metrics/client.py
import os
from enum import Enum
import requests
from docker import ContextAPI
from docker.transport import UnixHTTPAdapter
from compose.const import IS_WINDOWS_PLATFORM
if IS_WINDOWS_PLATFORM:
from docker.transport import NpipeHTTPAdapter
class Status(Enum):
SUCCESS = "success"
FAILURE = "failure"
CANCELED = "canceled"
class MetricsSource:
CLI = "docker-compose"
if IS_WINDOWS_PLATFORM:
METRICS_SOCKET_FILE = 'npipe://\\\\.\\pipe\\docker_cli'
else:
METRICS_SOCKET_FILE = 'http+unix:///var/run/docker-cli.sock'
class MetricsCommand(requests.Session):
"""
Representation of a command in the metrics.
"""
def __init__(self, command,
context_type=None, status=Status.SUCCESS,
source=MetricsSource.CLI, uri=None):
super().__init__()
self.command = ("compose " + command).strip() if command else "compose --help"
self.context = context_type or ContextAPI.get_current_context().context_type or 'moby'
self.source = source
self.status = status.value
self.uri = uri or os.environ.get("METRICS_SOCKET_FILE", METRICS_SOCKET_FILE)
if IS_WINDOWS_PLATFORM:
self.mount("http+unix://", NpipeHTTPAdapter(self.uri))
else:
self.mount("http+unix://", UnixHTTPAdapter(self.uri))
def send_metrics(self):
try:
return self.post("http+unix://localhost/usage",
json=self.to_map(),
timeout=.05,
headers={'Content-Type': 'application/json'})
except Exception as e:
return e
def to_map(self):
return {
'command': self.command,
'context': self.context,
'source': self.source,
'status': self.status,
}