Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions distributed/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def versions_from_parentdir(parentdir_prefix, root, verbose):
"""
rootdirs = []

for i in range(3):
for _ in range(3):
dirname = os.path.basename(root)
if dirname.startswith(parentdir_prefix):
return {
Expand Down Expand Up @@ -520,7 +520,7 @@ def get_versions():
# versionfile_source is the relative path from the top of the source
# tree (where the .git directory might live) to this file. Invert
# this to find the root from __file__.
for i in cfg.versionfile_source.split("/"):
for _ in cfg.versionfile_source.split("/"):
root = os.path.dirname(root)
except NameError:
return {
Expand Down
2 changes: 1 addition & 1 deletion distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ async def _start(self, timeout=no_default, **kwargs):
elif self.scheduler_file is not None:
while not os.path.exists(self.scheduler_file):
await asyncio.sleep(0.01)
for i in range(10):
for _ in range(10):
try:
with open(self.scheduler_file) as f:
cfg = json.load(f)
Expand Down
2 changes: 1 addition & 1 deletion distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ async def start(self):
self.tcp_server = TCPServer(max_buffer_size=MAX_BUFFER_SIZE, **self.server_args)
self.tcp_server.handle_stream = self._handle_stream
backlog = int(dask.config.get("distributed.comm.socket-backlog"))
for i in range(5):
for _ in range(5):
try:
# When shuffling data between workers, there can
# really be O(cluster size) connection requests
Expand Down
8 changes: 4 additions & 4 deletions distributed/comm/tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ async def test_comm_failure_threading(tcp):

async def sleep_for_60ms():
max_thread_count = 0
for x in range(60):
for _ in range(60):
await asyncio.sleep(0.001)
thread_count = threading.active_count()
if thread_count > max_thread_count:
Expand Down Expand Up @@ -379,7 +379,7 @@ async def handle_comm(comm):
try:
assert comm.peer_address.startswith("inproc://" + addr_head)
client_addresses.add(comm.peer_address)
for i in range(N_MSGS):
for _ in range(N_MSGS):
msg = await comm.read()
msg["op"] = "pong"
await comm.write(msg)
Expand All @@ -399,7 +399,7 @@ async def client_communicate(key, delay=0):
comm = await connect(listener.contact_address)
try:
assert comm.peer_address == "inproc://" + listener_addr
for i in range(N_MSGS):
for _ in range(N_MSGS):
await comm.write({"op": "ping", "data": key})
if delay:
await asyncio.sleep(delay)
Expand Down Expand Up @@ -1020,7 +1020,7 @@ async def handle_comm(comm):
listeners = []
N = 100

for i in range(N):
for _ in range(N):
listener = await listen(addr, handle_comm)
listeners.append(listener)

Expand Down
2 changes: 1 addition & 1 deletion distributed/comm/tests/test_ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ async def test_stress(
x = x.persist()
await wait(x)

for i in range(10):
for _ in range(10):
x = x.rechunk((chunksize, -1))
x = x.rechunk((-1, chunksize))
x = x.persist()
Expand Down
2 changes: 1 addition & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,7 @@ def collect(self):
self.active,
len(self._connecting),
)
for addr, comms in self.available.items():
for comms in self.available.values():
for comm in comms:
IOLoop.current().add_callback(comm.close)
self.semaphore.release()
Expand Down
8 changes: 4 additions & 4 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ def __init__(self, scheduler, **kwargs):
def update(self):
agg_times = defaultdict(float)

for key, ts in self.scheduler.task_prefixes.items():
for ts in self.scheduler.task_prefixes.values():
for action, t in ts.all_durations.items():
agg_times[action] += t

Expand Down Expand Up @@ -2539,7 +2539,7 @@ def update(self):

durations = set()
nbytes = set()
for key, tg in self.scheduler.task_groups.items():
for tg in self.scheduler.task_groups.values():

if tg.duration and tg.nbytes_total:
durations.add(tg.duration)
Expand Down Expand Up @@ -3495,8 +3495,8 @@ def __init__(self, scheduler, width=800, **kwargs):
@without_property_validation
def update(self):
data = {name: [] for name in self.names + self.extra_names}
for i, (addr, ws) in enumerate(
sorted(self.scheduler.workers.items(), key=lambda kv: str(kv[1].name))
for i, ws in enumerate(
sorted(self.scheduler.workers.values(), key=lambda ws: str(ws.name))
):
minfo = ws.memory

Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/old_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ def __init__(

# Start worker nodes
self.workers = []
for i, addr in enumerate(worker_addrs):
for addr in worker_addrs:
self.add_worker(addr)

@gen.coroutine
Expand Down
4 changes: 2 additions & 2 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def test_adaptive_local_cluster_multi_workers():
await asyncio.sleep(0.01)

# no workers for a while
for i in range(10):
for _ in range(10):
assert not cluster.scheduler.workers
await asyncio.sleep(0.05)

Expand Down Expand Up @@ -236,7 +236,7 @@ async def test_adapt_quickly():

# Don't scale up for large sequential computations
x = await client.scatter(1)
for i in range(100):
for _ in range(100):
x = client.submit(slowinc, x)

await asyncio.sleep(0.1)
Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/tests/test_spec_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ async def test_MultiWorker():

adapt = cluster.adapt(minimum=0, maximum=4)

for i in range(adapt.wait_count): # relax down to 0 workers
for _ in range(adapt.wait_count): # relax down to 0 workers
await adapt.adapt()
await cluster
assert not s.workers
Expand Down
2 changes: 1 addition & 1 deletion distributed/diagnostics/tests/test_progress_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async def test_progress_stream(c, s, a, b):
futures = c.map(div, [1] * 10, range(10))

x = 1
for i in range(5):
for _ in range(5):
x = delayed(inc)(x)
future = c.compute(x)

Expand Down
2 changes: 1 addition & 1 deletion distributed/http/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def _descend_routes(router, routers=None, out=None):
if issubclass(rule.target, tornado.web.StaticFileHandler):
prefix = rule.matcher.regex.pattern.rstrip("(.*)$").rstrip("/")
path = rule.target_kwargs["path"]
for d, dirs, files in os.walk(path):
for d, _, files in os.walk(path):
for fn in files:
fullpath = d + "/" + fn
ourpath = fullpath.replace(path, prefix).replace("\\", "/")
Expand Down
2 changes: 1 addition & 1 deletion distributed/protocol/tests/test_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ async def test_profile_nested_sizeof():
original = outer = {}
inner = {}

for i in range(n):
for _ in range(n):
outer["children"] = inner
outer, inner = inner, {}

Expand Down
2 changes: 1 addition & 1 deletion distributed/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def process(record):
"integer batch sizes and timeouts"
)
raise NotImplementedError(msg)
for i in range(batch):
for _ in range(batch):
record = await q.get()
out.append(record)
out = [process(o) for o in out]
Expand Down
6 changes: 3 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6724,7 +6724,7 @@ def add_resources(self, worker: str, resources=None):

def remove_resources(self, worker):
ws: WorkerState = self.workers[worker]
for resource, quantity in ws.resources.items():
for resource in ws.resources:
dr: dict = self.resources.get(resource, None)
if dr is None:
self.resources[resource] = dr = {}
Expand Down Expand Up @@ -6849,7 +6849,7 @@ async def get_profile_metadata(
tt = t // dt * dt
if tt > last:
last = tt
for k, v in keys.items():
for v in keys.values():
v.append([tt, 0])
for k, v in d.items():
keys[k][-1][1] += v
Expand Down Expand Up @@ -7107,7 +7107,7 @@ async def reevaluate_occupancy(self, worker_index: int = 0):
workers: list = list(self.workers.values())
nworkers: int = len(workers)
i: int
for i in range(nworkers):
for _ in range(nworkers):
ws: WorkerState = workers[worker_index % nworkers]
worker_index += 1
try:
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/tests/test_multi_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def test_many(tmp_path, count):
with MultiFile(directory=tmp_path, dump=dump, load=load) as mf:
d = {i: [str(i).encode() * 100] for i in range(count)}

for i in range(10):
for _ in range(10):
await mf.put(d)

await mf.flush()
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def test_processing_chain():

filesystem = defaultdict(io.BytesIO)

for worker, partitions in splits_by_worker.items():
for partitions in splits_by_worker.values():
for partition, batches in partitions.items():
for batch in batches:
dump_batch(batch, filesystem[partition], schema)
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/make_tls_certs.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
def make_cert_key(hostname, sign=False):
print("creating cert for " + hostname)
tempnames = []
for i in range(3):
for _ in range(3):
with tempfile.NamedTemporaryFile(delete=False) as f:
tempnames.append(f.name)
req_file, cert_file, key_file = tempnames
Expand Down
12 changes: 6 additions & 6 deletions distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ async def test_failed_worker(c, s, a, b):
async def bench(c, s, a, b):
counter = await c.submit(Counter, actor=True)

for i in range(1000):
for _ in range(1000):
await counter.increment()


Expand Down Expand Up @@ -357,7 +357,7 @@ async def test_many_computations(c, s, a, b):
counter = await c.submit(Counter, actor=True)

def add(n, counter):
for i in range(n):
for _ in range(n):
counter.increment().result()

futures = c.map(add, range(10), counter=counter)
Expand All @@ -383,7 +383,7 @@ def f(self):
assert self.n == 0
self.n += 1

for i in range(20):
for _ in range(20):
sleep(0.002)
assert self.n == 1
self.n = 0
Expand Down Expand Up @@ -484,7 +484,7 @@ async def test_compute(c, s, a, b):
@dask.delayed
def f(n, counter):
assert isinstance(counter, Actor)
for i in range(n):
for _ in range(n):
counter.increment().result()

@dask.delayed
Expand All @@ -506,7 +506,7 @@ def test_compute_sync(client):
@dask.delayed
def f(n, counter):
assert isinstance(counter, Actor), type(counter)
for i in range(n):
for _ in range(n):
counter.increment().result()

@dask.delayed
Expand Down Expand Up @@ -544,7 +544,7 @@ def sleep(self, time):

sleeper = await c.submit(Sleeper, actor=True)

for i in range(5):
for _ in range(5):
await sleeper.sleep(0.200)
if (
list(a.profile_recent["children"])[0].startswith("sleep")
Expand Down
Loading