403Webshell
Server IP : 82.180.147.116  /  Your IP : 216.73.216.4
Web Server : Apache
System : Linux server.vsyshosting.com 5.14.0-503.40.1.el9_5.x86_64 #1 SMP PREEMPT_DYNAMIC Mon May 5 06:06:04 EDT 2025 x86_64
User : demoplace ( 1009)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/model/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/model//tls_check.py
import logging
import threading
import time
import traceback

from playhouse.sqlite_ext import SqliteExtDatabase

from defence360agent.internals.global_scope import g


class OverridingReset(Exception):
    """
    Overriding reset could be a signal of logic error
    thus need to be explicitly handled in all places where
    this exception is expected to occur.
    """

    pass


logger = logging.getLogger(__name__)
_thread_local_storage = threading.local()

_SLOW_TXN_THRESHOLD_S = 5.0


class _TimedAtomic:
    def __init__(self, inner: object):
        self._inner = inner
        self._start: float = 0.0
        self._caller: str = ""

    def __enter__(self):
        self._start = time.monotonic()
        self._caller = "".join(traceback.format_stack(limit=4)[:-1])
        return self._inner.__enter__()

    def __exit__(self, *args):
        result = self._inner.__exit__(*args)
        elapsed = time.monotonic() - self._start
        if elapsed > _SLOW_TXN_THRESHOLD_S:
            logger.warning(
                "Slow transaction held for %.2fs\n%s",
                elapsed,
                self._caller,
            )
        return result


class SqliteDatabaseWrapper(SqliteExtDatabase):
    def execute_sql(self, *args, **kwargs):
        _validate(*args, **kwargs)
        return super().execute_sql(*args, **kwargs)

    def atomic(self, lock_type: str = "IMMEDIATE"):
        inner = super().atomic(lock_type)
        if g.get("DEBUG"):
            return _TimedAtomic(inner)
        return inner


def reset(new_value=None):
    if hasattr(_thread_local_storage, "thread_ident_memo"):
        raise OverridingReset()

    _thread_local_storage.thread_ident_memo = (
        new_value or threading.get_ident()
    )


def _validate(*args, **kwargs):
    thread_ident_memo = getattr(
        _thread_local_storage, "thread_ident_memo", None
    )

    if thread_ident_memo is None:
        logger.error("wrong thread or _validate() was not preceded by reset()")

    elif thread_ident_memo != threading.get_ident():
        logger.error(
            "thread_ident_memo check failed [%r != %r]\n"
            "context:\nargs: %s\nkwargs: %s",
            thread_ident_memo,
            threading.get_ident(),
            args,
            kwargs,
        )

Youez - 2016 - github.com/yon3zu
LinuXploit