X7ROOT File Manager
Current Path:
/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/integrations
opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sentry_sdk
/
integrations
/
📁
..
📄
__init__.py
(9.98 KB)
📁
__pycache__
📄
_asgi_common.py
(3.11 KB)
📄
_wsgi_common.py
(7.38 KB)
📄
aiohttp.py
(12.59 KB)
📄
anthropic.py
(9.21 KB)
📄
argv.py
(911 B)
📄
ariadne.py
(5.7 KB)
📄
arq.py
(7.67 KB)
📄
asgi.py
(12.48 KB)
📄
asyncio.py
(3.94 KB)
📄
asyncpg.py
(6.37 KB)
📄
atexit.py
(1.61 KB)
📄
aws_lambda.py
(17.53 KB)
📄
beam.py
(5.06 KB)
📄
boto3.py
(4.31 KB)
📄
bottle.py
(6.46 KB)
📁
celery
📄
chalice.py
(4.59 KB)
📄
clickhouse_driver.py
(5.12 KB)
📄
cloud_resource_context.py
(7.6 KB)
📄
cohere.py
(9.11 KB)
📄
dedupe.py
(1.38 KB)
📁
django
📄
dramatiq.py
(5.45 KB)
📄
excepthook.py
(2.35 KB)
📄
executing.py
(1.95 KB)
📄
falcon.py
(9.28 KB)
📄
fastapi.py
(4.61 KB)
📄
flask.py
(8.54 KB)
📄
gcp.py
(8.08 KB)
📄
gnu_backtrace.py
(2.83 KB)
📄
gql.py
(4.08 KB)
📄
graphene.py
(4.92 KB)
📁
grpc
📄
httpx.py
(5.73 KB)
📄
huey.py
(5.32 KB)
📄
huggingface_hub.py
(6.4 KB)
📄
langchain.py
(17.3 KB)
📄
launchdarkly.py
(1.89 KB)
📄
litestar.py
(11.3 KB)
📄
logging.py
(13.19 KB)
📄
loguru.py
(3.54 KB)
📄
modules.py
(820 B)
📄
openai.py
(15.22 KB)
📄
openfeature.py
(1.21 KB)
📁
opentelemetry
📄
pure_eval.py
(4.47 KB)
📄
pymongo.py
(6.23 KB)
📄
pyramid.py
(7.19 KB)
📄
quart.py
(7.26 KB)
📄
ray.py
(4.06 KB)
📁
redis
📄
rq.py
(5.18 KB)
📄
rust_tracing.py
(8.87 KB)
📄
sanic.py
(12.66 KB)
📄
serverless.py
(1.76 KB)
📄
socket.py
(3.09 KB)
📁
spark
📄
sqlalchemy.py
(4.27 KB)
📄
starlette.py
(25.79 KB)
📄
starlite.py
(10.37 KB)
📄
statsig.py
(1.2 KB)
📄
stdlib.py
(8.62 KB)
📄
strawberry.py
(13.79 KB)
📄
sys_exit.py
(2.43 KB)
📄
threading.py
(5.27 KB)
📄
tornado.py
(7.05 KB)
📄
trytond.py
(1.61 KB)
📄
typer.py
(1.77 KB)
📄
unleash.py
(1.03 KB)
📄
wsgi.py
(10.5 KB)
Editing: cloud_resource_context.py
import json import urllib3 from sentry_sdk.integrations import Integration from sentry_sdk.api import set_context from sentry_sdk.utils import logger from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Dict CONTEXT_TYPE = "cloud_resource" HTTP_TIMEOUT = 2.0 AWS_METADATA_HOST = "169.254.169.254" AWS_TOKEN_URL = "http://{}/latest/api/token".format(AWS_METADATA_HOST) AWS_METADATA_URL = "http://{}/latest/dynamic/instance-identity/document".format( AWS_METADATA_HOST ) GCP_METADATA_HOST = "metadata.google.internal" GCP_METADATA_URL = "http://{}/computeMetadata/v1/?recursive=true".format( GCP_METADATA_HOST ) class CLOUD_PROVIDER: # noqa: N801 """ Name of the cloud provider. see https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud/ """ ALIBABA = "alibaba_cloud" AWS = "aws" AZURE = "azure" GCP = "gcp" IBM = "ibm_cloud" TENCENT = "tencent_cloud" class CLOUD_PLATFORM: # noqa: N801 """ The cloud platform. see https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud/ """ AWS_EC2 = "aws_ec2" GCP_COMPUTE_ENGINE = "gcp_compute_engine" class CloudResourceContextIntegration(Integration): """ Adds cloud resource context to the Senty scope """ identifier = "cloudresourcecontext" cloud_provider = "" aws_token = "" http = urllib3.PoolManager(timeout=HTTP_TIMEOUT) gcp_metadata = None def __init__(self, cloud_provider=""): # type: (str) -> None CloudResourceContextIntegration.cloud_provider = cloud_provider @classmethod def _is_aws(cls): # type: () -> bool try: r = cls.http.request( "PUT", AWS_TOKEN_URL, headers={"X-aws-ec2-metadata-token-ttl-seconds": "60"}, ) if r.status != 200: return False cls.aws_token = r.data.decode() return True except urllib3.exceptions.TimeoutError: logger.debug( "AWS metadata service timed out after %s seconds", HTTP_TIMEOUT ) return False except Exception as e: logger.debug("Error checking AWS metadata service: %s", str(e)) return False @classmethod def _get_aws_context(cls): # type: () -> Dict[str, str] ctx = { "cloud.provider": CLOUD_PROVIDER.AWS, "cloud.platform": CLOUD_PLATFORM.AWS_EC2, } try: r = cls.http.request( "GET", AWS_METADATA_URL, headers={"X-aws-ec2-metadata-token": cls.aws_token}, ) if r.status != 200: return ctx data = json.loads(r.data.decode("utf-8")) try: ctx["cloud.account.id"] = data["accountId"] except Exception: pass try: ctx["cloud.availability_zone"] = data["availabilityZone"] except Exception: pass try: ctx["cloud.region"] = data["region"] except Exception: pass try: ctx["host.id"] = data["instanceId"] except Exception: pass try: ctx["host.type"] = data["instanceType"] except Exception: pass except urllib3.exceptions.TimeoutError: logger.debug( "AWS metadata service timed out after %s seconds", HTTP_TIMEOUT ) except Exception as e: logger.debug("Error fetching AWS metadata: %s", str(e)) return ctx @classmethod def _is_gcp(cls): # type: () -> bool try: r = cls.http.request( "GET", GCP_METADATA_URL, headers={"Metadata-Flavor": "Google"}, ) if r.status != 200: return False cls.gcp_metadata = json.loads(r.data.decode("utf-8")) return True except urllib3.exceptions.TimeoutError: logger.debug( "GCP metadata service timed out after %s seconds", HTTP_TIMEOUT ) return False except Exception as e: logger.debug("Error checking GCP metadata service: %s", str(e)) return False @classmethod def _get_gcp_context(cls): # type: () -> Dict[str, str] ctx = { "cloud.provider": CLOUD_PROVIDER.GCP, "cloud.platform": CLOUD_PLATFORM.GCP_COMPUTE_ENGINE, } try: if cls.gcp_metadata is None: r = cls.http.request( "GET", GCP_METADATA_URL, headers={"Metadata-Flavor": "Google"}, ) if r.status != 200: return ctx cls.gcp_metadata = json.loads(r.data.decode("utf-8")) try: ctx["cloud.account.id"] = cls.gcp_metadata["project"]["projectId"] except Exception: pass try: ctx["cloud.availability_zone"] = cls.gcp_metadata["instance"][ "zone" ].split("/")[-1] except Exception: pass try: # only populated in google cloud run ctx["cloud.region"] = cls.gcp_metadata["instance"]["region"].split("/")[ -1 ] except Exception: pass try: ctx["host.id"] = cls.gcp_metadata["instance"]["id"] except Exception: pass except urllib3.exceptions.TimeoutError: logger.debug( "GCP metadata service timed out after %s seconds", HTTP_TIMEOUT ) except Exception as e: logger.debug("Error fetching GCP metadata: %s", str(e)) return ctx @classmethod def _get_cloud_provider(cls): # type: () -> str if cls._is_aws(): return CLOUD_PROVIDER.AWS if cls._is_gcp(): return CLOUD_PROVIDER.GCP return "" @classmethod def _get_cloud_resource_context(cls): # type: () -> Dict[str, str] cloud_provider = ( cls.cloud_provider if cls.cloud_provider != "" else CloudResourceContextIntegration._get_cloud_provider() ) if cloud_provider in context_getters.keys(): return context_getters[cloud_provider]() return {} @staticmethod def setup_once(): # type: () -> None cloud_provider = CloudResourceContextIntegration.cloud_provider unsupported_cloud_provider = ( cloud_provider != "" and cloud_provider not in context_getters.keys() ) if unsupported_cloud_provider: logger.warning( "Invalid value for cloud_provider: %s (must be in %s). Falling back to autodetection...", CloudResourceContextIntegration.cloud_provider, list(context_getters.keys()), ) context = CloudResourceContextIntegration._get_cloud_resource_context() if context != {}: set_context(CONTEXT_TYPE, context) # Map with the currently supported cloud providers # mapping to functions extracting the context context_getters = { CLOUD_PROVIDER.AWS: CloudResourceContextIntegration._get_aws_context, CLOUD_PROVIDER.GCP: CloudResourceContextIntegration._get_gcp_context, }
Upload File
Create Folder