HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux spn-python 5.15.0-89-generic #99-Ubuntu SMP Mon Oct 30 20:42:41 UTC 2023 x86_64
User: arjun (1000)
PHP: 8.1.2-1ubuntu2.20
Disabled: NONE
Upload Files
File: //proc/self/root/usr/local/lib/python3.10/dist-packages/langsmith/_internal/_compressed_traces.py
import io
import threading

from zstandard import ZstdCompressor  # type: ignore[import]

from langsmith import utils as ls_utils

compression_level = ls_utils.get_env_var("RUN_COMPRESSION_LEVEL", 3)


class CompressedTraces:
    def __init__(self):
        self.buffer = io.BytesIO()
        self.trace_count = 0
        self.lock = threading.Lock()
        self.uncompressed_size = 0

        self.compressor_writer = ZstdCompressor(
            level=compression_level, threads=-1
        ).stream_writer(self.buffer, closefd=False)

    def reset(self):
        self.buffer = io.BytesIO()
        self.trace_count = 0
        self.uncompressed_size = 0

        self.compressor_writer = ZstdCompressor(
            level=compression_level, threads=-1
        ).stream_writer(self.buffer, closefd=False)