HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //home/arjun/python-apt/utils/mirrortest
#!/usr/bin/env python3
import os
import queue
import random
import re
import socket
import threading
import time
import urllib.error
import urllib.parse
import urllib.request

import aptsources.distro
import aptsources.sourceslist

socket.setdefaulttimeout(2)


class MirrorTest:
    class PingWorker(threading.Thread):
        def __init__(self, jobs, results, id):
            self.id = id
            self.jobs = jobs
            self.results = results
            self.match_result = re.compile(r"^rtt .* = [\.\d]+/([\.\d]+)/.*")
            threading.Thread.__init__(self)

        def run(self):
            result = None
            while MirrorTest.completed_pings < MirrorTest.todo:
                try:
                    mirror = self.jobs.get(True, 1)
                    host = mirror.hostname
                except:
                    continue
                print(
                    "Pinging (Worker %s) %s (%s) ..."
                    % (self.id, host, MirrorTest.completed_pings)
                )
                commando = os.popen("ping -q -c 4 -W 2 -i 0.3 %s" % host, "r")
                while True:
                    line = commando.readline()
                    if not line:
                        break
                    result = re.findall(self.match_result, line)
                MirrorTest.completed_pings_lock.acquire()
                MirrorTest.completed_pings += 1
                if result:
                    self.results.append([float(result[0]), host, mirror])
                MirrorTest.completed_pings_lock.release()

    def speed_test(self, mirror):
        url = f"{mirror.get_repo_urls()[0]}/{self.test_file}"
        print("Downloading %s ..." % url)
        start = time.time()
        try:
            urllib.request.urlopen(url).read(51200)
            return 50 / (time.time() - start)
        except:
            return 0

    def __init__(self, hosts, test_file):
        self.test_file = test_file
        jobs = queue.Queue()
        results = []
        for h in hosts:
            jobs.put(h)
        self.threads = []
        MirrorTest.completed_pings = 0
        MirrorTest.completed_pings_lock = threading.Lock()
        MirrorTest.todo = len(hosts)

        for i in range(10):
            t = MirrorTest.PingWorker(jobs, results, i)
            self.threads.append(t)
            t.start()

        for t in self.threads:
            t.join()

        results.sort()
        print("\n\nTop ten RTTs:")
        for r in results[0:10]:
            print(f"{r[1]}: {r[0]}")
        print("\n\n")

        results.insert(0, [0, "rand", hosts[random.randint(1, len(hosts))]])
        results.insert(0, [0, "rand", hosts[random.randint(1, len(hosts))]])

        final = [(self.speed_test(r[2]), r[2]) for r in results[0:12]]
        final.sort()
        final.reverse()
        print("\n\nBest mirrors:")
        for f in final:
            print(f"{f[1].hostname}: {int(f[0])} KByte/s")


if __name__ == "__main__":
    distro = aptsources.distro.get_distro()
    distro.get_sources(aptsources.sourceslist.SourcesList())
    pipe = os.popen("dpkg --print-architecture")
    arch = pipe.read().strip()
    test_file = "dists/{}/{}/binary-{}/Packages.gz".format(
        distro.source_template.name,
        distro.source_template.components[0].name,
        arch,
    )
    app = MirrorTest(distro.source_template.mirror_set.values(), test_file)