Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

### 1.1.0

- **Important Note and Breaking Changes:**
- This is a performance oriented patch, no new features will be added.
- **BREAKING**: TraceID/SegmentID generated by Python agent is now in the same format as SkyWalking Java agent.

- Fixes:
- Optimized agent TraceID/SegmentID logic to a simplified snowflake algorithm in sync with SkyWalking Java agent (#311)
- This change speeds up ID generation by 4x, tracing core thus benefit from a ~30% performance gain.
- Before v1.1.0: UUID1, Since v1.1.0: PROCESS_ID(UUID4).THREAD_ID.TIMESTAMP_SEQ(0-9999)

### 1.0.1

- Feature:
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ install: gen-basic

.PHONY: lint
# flake8 configurations should go to the file setup.cfg
# pylint is installed here due to it requires python3.7.2+
lint: clean
poetry run flake8 .
poetry run pip install -U pylint==2.17.4
poetry run pylint --disable=all --enable=E0602,E0603,E1101 skywalking tests

.PHONY: fix
Expand Down
1,785 changes: 696 additions & 1,089 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ httpx = "^0.23.3"
confluent-kafka = "^2.0.2"

[tool.poetry.group.lint.dependencies]
pylint = '2.13.9'
# Pylint is directly installed in makefile due to incompatible python version (3.7.2+)
# pylint = '^2.16.4'
flake8 = "^5.0.4"
# isort = "^5.10.1"
unify = "^0.5"
Expand Down
14 changes: 11 additions & 3 deletions skywalking/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from skywalking.profile.snapshot import TracingThreadSnapshot
from skywalking.protocol.language_agent.Meter_pb2 import MeterData
from skywalking.protocol.logging.Logging_pb2 import LogData

from skywalking.trace.global_id import global_id_generator
from skywalking.utils.singleton import Singleton

if TYPE_CHECKING:
Expand Down Expand Up @@ -192,11 +194,14 @@ def __fork_after_in_child(self) -> None:
logger.info('New process detected, re-initializing SkyWalking Python agent')
# Note: this is for experimental change, default config should never reach here
# Fork support is controlled by config.agent_fork_support :default: False
# Important: This does not impact pre-forking server support (uwsgi, gunicorn, etc...)
# This is only for explicit long-running fork() calls.
# Important: This only impacts generic Python os.forks and gunicorn, etc...) but not uwsgi
# any postfork fixups below are also done inside skywalking.bootstrap.hooks.uwsgi_hook.py
config.agent_instance_name = f'{config.agent_instance_name}-child({os.getpid()})'
# Regenerate PROCESS_ID, uwsgi is again undetected due to uncaught call to os.fork()
global_id_generator.refresh_process_id()
self.start()
logger.info(f'Agent spawned as {config.agent_instance_name} for service {config.agent_name}.')
logger.info(f'SkyWalking Python agent successfully respawned in forked process as {config.agent_instance_name} '
f'for service {config.agent_name}.')

def start(self) -> None:
"""
Expand Down Expand Up @@ -275,6 +280,9 @@ def start(self) -> None:
os.register_at_fork(before=self.__fork_before, after_in_parent=self.__fork_after_in_parent,
after_in_child=self.__fork_after_in_child)

logger.info(f'SkyWalking Python agent successfully spawned in main process as {config.agent_instance_name} '
f'for service {config.agent_name}.')

def __fini(self):
"""
This method is called when the agent is shutting down.
Expand Down
8 changes: 2 additions & 6 deletions skywalking/bootstrap/hooks/uwsgi_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,12 @@
"""
from uwsgidecorators import postfork
from skywalking import agent, config
from skywalking.trace.global_id import global_id_generator
import os

from skywalking.loggings import logger


#
# config.init(collector_address='localhost:12800', protocol='http', service_name='test-fastapi-service',
# log_reporter_active=True, service_instance=f'test_instance-{os.getpid()} forkfork',
# logging_level='CRITICAL')


@postfork
def setup_skywalking():
"""
Expand All @@ -54,6 +49,7 @@ def setup_skywalking():
(e.g. the new agent_instance_name with the new PID)
"""
config.agent_instance_name = f'{config.agent_instance_name}-child({os.getpid()})'
global_id_generator.refresh_process_id()

agent.start()
# append pid-suffix to instance name
Expand Down
14 changes: 0 additions & 14 deletions skywalking/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import uuid

from skywalking.utils.counter import AtomicCounter

_id = AtomicCounter()


class ID(object):
def __init__(self, raw_id: str = None):
self.value = raw_id or str(uuid.uuid1()).replace('-', '')

def __str__(self):
return self.value
2 changes: 1 addition & 1 deletion skywalking/trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from skywalking import profile
from skywalking.agent import agent
from skywalking.profile.profile_status import ProfileStatusReference
from skywalking.trace import ID
from skywalking.trace.global_id import ID
from skywalking.trace.carrier import Carrier
from skywalking.trace.segment import Segment, SegmentRef
from skywalking.trace.snapshot import Snapshot
Expand Down
105 changes: 105 additions & 0 deletions skywalking/trace/global_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import uuid
import threading
import time
import contextvars
from typing import Optional


class GlobalIdGenerator:
"""
A fast alternative to uuid
"""

# The PROCESS_ID must be regenerated upon fork to avoid collision
PROCESS_ID = uuid.uuid1().hex

@staticmethod
def refresh_process_id():
"""
Upon fork, a new processid will be given
os.register_at_fork() does not consider uwsgi, so additional handling is done on the hook side
This method is called from both agent.__init__.py and bootstrap.hooks.uwsgi_hook
"""
GlobalIdGenerator.PROCESS_ID = uuid.uuid1().hex

class IDContext:
def __init__(self, last_timestamp, thread_seq):
self.last_timestamp = last_timestamp
self.thread_seq = thread_seq
self.last_shift_timestamp = 0
self.last_shift_value = 0

def next_seq(self) -> int:
"""
Concatenate timestamp(ms) and thread sequence to generate a unique id
"""
return self.timestamp() * 10000 + self.next_thread_seq()

def timestamp(self) -> int:
"""
Simple solution to time shift back problems, originally implemented by Snoyflake
"""
current_time_millis = time.time_ns() // 1_000_000
if current_time_millis >= self.last_timestamp:
self.last_timestamp = current_time_millis
return self.last_timestamp
else:
if self.last_shift_timestamp != current_time_millis:
self.last_shift_value += 1
self.last_shift_timestamp = current_time_millis
return self.last_shift_value

def next_thread_seq(self) -> int:
"""
Generate sequence 0-9999 for the current thread
"""
if self.thread_seq == 10000:
self.thread_seq = 0
self.thread_seq += 1
return self.thread_seq

THREAD_ID_SEQUENCE = contextvars.ContextVar(
'thread_id_sequence', default=IDContext(time.time_ns() // 1_000_000, 0))

@staticmethod
def generate() -> str:
"""
Concatenate uids
"""
return f'{GlobalIdGenerator.PROCESS_ID}.{threading.get_ident()}.' \
f'{GlobalIdGenerator.THREAD_ID_SEQUENCE.get().next_seq()}'


class ID(object):
"""
This class is kept to maintain backward compatibility
"""
def __init__(self, raw_id: Optional[str] = None):
if raw_id is None:
self.value = GlobalIdGenerator.generate()
else:
self.value = raw_id

def __str__(self):
return self.value


# Import alias to bypass pylint error
global_id_generator = GlobalIdGenerator
2 changes: 1 addition & 1 deletion skywalking/trace/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import List, TYPE_CHECKING

from skywalking import config
from skywalking.trace import ID
from skywalking.trace.global_id import ID
from skywalking.utils.lang import tostring

if TYPE_CHECKING:
Expand Down
2 changes: 1 addition & 1 deletion skywalking/trace/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
if TYPE_CHECKING:
from skywalking.trace.context import SpanContext

from skywalking.trace import ID
from skywalking.trace.global_id import ID


class Snapshot:
Expand Down
2 changes: 1 addition & 1 deletion skywalking/trace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import TYPE_CHECKING

from skywalking import Kind, Layer, Log, Component, LogItem, config
from skywalking.trace import ID
from skywalking.trace.global_id import ID
from skywalking.trace.carrier import Carrier
from skywalking.trace.segment import SegmentRef, Segment
from skywalking.trace.tags import Tag
Expand Down
9 changes: 7 additions & 2 deletions skywalking/utils/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
# limitations under the License.
#

""" TODO: call time.time_ns() // 1_000_000 directly"""

import time


def current_milli_time():
return round(time.time() * 1000)
def current_milli_time() -> int:
"""
Returning current millisecond in int
"""
return time.time_ns() // 1_000_000
84 changes: 84 additions & 0 deletions tests/unit/test_global_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the 'License'); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import multiprocessing
import unittest
import time
import os
from skywalking.trace.global_id import global_id_generator, ID


class TestGlobalIdGenerator(unittest.TestCase):
def test_refresh_process_id(self):
old_id = global_id_generator.PROCESS_ID
global_id_generator.refresh_process_id()
new_id = global_id_generator.PROCESS_ID
self.assertNotEqual(old_id, new_id, 'Process ID should change after refresh')

def test_id_context(self):
id_context = global_id_generator.IDContext(time.time_ns() // 1_000_000, 0)
old_id = id_context.next_seq()
new_id = id_context.next_seq()
self.assertNotEqual(old_id, new_id, 'IDContext should generate different sequences')

def test_generate(self):
old_id = global_id_generator.generate()
new_id = global_id_generator.generate()
self.assertNotEqual(old_id, new_id, 'GlobalIdGenerator should generate different IDs')

def test_id(self):
id_obj = ID()
self.assertIsInstance(id_obj.value, str, 'ID should contain a string value')
self.assertEqual(id_obj.value, str(id_obj), '__str__ should return the ID value')

def test_multiprocessing_behavior(self):
# create a separate process to test refresh_process_id
# it will call os.fork() to create a child process in linux
os.register_at_fork(after_in_child=global_id_generator.refresh_process_id)

def worker(q):
q.put(global_id_generator.PROCESS_ID)

q = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(q,))
p.start()
p.join()

process_id_in_child_process = q.get()

self.assertNotEqual(process_id_in_child_process, global_id_generator.PROCESS_ID,
'Process ID should change in child process after fork')

@unittest.skipIf(not hasattr(os, 'fork'), 'os.fork is not available on this system')
def test_fork_behavior(self):
parent_id = global_id_generator.PROCESS_ID
os.register_at_fork(after_in_child=global_id_generator.refresh_process_id)

newpid = os.fork()
if newpid == 0:
# This is the child process.
child_id = global_id_generator.PROCESS_ID
assert parent_id != child_id
os._exit(0) # It's important to terminate the child process!
else:
# This is the parent process.
# Wait for the child process to exit.
os.waitpid(newpid, 0)


if __name__ == '__main__':
unittest.main()