Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Use the official Python image
FROM python:3.10-slim

# Set the working directory inside the container
WORKDIR /usr/src/mylib


# Install any dependencies
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt && pip install tqdm

# You can also install additional tools you might need
RUN apt-get update && apt-get install -y --fix-missing\
libgeos-dev \
gcc \
g++ \
swig \
git \
libgl1-mesa-dev \
libglu1-mesa-dev \
freeglut3-dev \
xvfb \
&& rm -rf /var/lib/apt/lists/*

# Clone the multi_car_racing repository and install it
RUN git clone https://github.com/igilitschenski/multi_car_racing.git \
&& cd multi_car_racing \
&& pip install -e .

# Copy the Syllabus directory
COPY . .

WORKDIR /usr/src/mylib/syllabus/examples/experimental
ENV PYTHONPATH="${PYTHONPATH}:/usr/src/mylib"

CMD ["/bin/bash"]

# docker build -t syllabus-image .
# docker run -it --rm -p 4000:4000 syllabus-image
# docker run -it --rm -p 4000:4000 -v ${PWD}:/usr/src/mylib syllabus-image
# xvfb-run -s "-screen 0 1400x900x24" python -u multi_car_racing.py
# xvfb-run -s "-screen 0 1400x900x24" python -u multi_car_racing_v2.py
1 change: 1 addition & 0 deletions multi_car_racing
Submodule multi_car_racing added at 013190
11 changes: 7 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pettingzoo==1.19.0
#supersuit==3.5.0
gym==0.23.1
pettingzoo==1.23.0 # upgraded from 1.9 to solve BaseParallelWrapper import (typo)
# upgraded from 3.5.0 to 3.7.2 (as pettingZoo < 1.23 is a dependency,
# the typo error is also present here)
supersuit==3.7.2
gym==0.24.1
numpy==1.23.3
wandb>=0.15.3
grpcio<=1.48.2
Expand All @@ -9,7 +11,8 @@ torch>=2.0.1
ray==2.2.0
tabulate==0.9.0
tensorflow_probability
gymnasium

# documentation
pip install sphinx-tabs
# pip install sphinx-tabs

36 changes: 25 additions & 11 deletions syllabus/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
# flake8: noqa
# Environment Code
from .task_interface import TaskWrapper, SubclassTaskWrapper, ReinitTaskWrapper, TaskEnv, PettingZooTaskWrapper, PettingZooTaskEnv

# Curriculum Code
from .utils import decorate_all_functions, UsageError, enumerate_axes
from .curriculum_base import Curriculum
from .curriculum_sync_wrapper import (CurriculumWrapper,
MultiProcessingCurriculumWrapper,
RayCurriculumWrapper,
make_multiprocessing_curriculum,
make_ray_curriculum)

from .environment_sync_wrapper import MultiProcessingSyncWrapper, RaySyncWrapper, PettingZooMultiProcessingSyncWrapper, PettingZooRaySyncWrapper
from .curriculum_sync_wrapper import (
CurriculumWrapper,
MultiProcessingCurriculumWrapper,
RayCurriculumWrapper,
make_multiprocessing_curriculum,
make_ray_curriculum,
)
from .environment_sync_wrapper import (
MultiProcessingSyncWrapper,
PettingZooMultiProcessingSyncWrapper,
PettingZooRaySyncWrapper,
RaySyncWrapper,
)
from .multivariate_curriculum_wrapper import MultitaskWrapper
from .task_interface import (
PettingZooTaskEnv,
PettingZooTaskWrapper,
ReinitTaskWrapper,
SubclassTaskWrapper,
TaskEnv,
TaskWrapper,
)

# Curriculum Code
from .utils import UsageError, decorate_all_functions, enumerate_axes
62 changes: 42 additions & 20 deletions syllabus/core/curriculum_sync_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import threading
import time
from functools import wraps
from typing import List, Tuple

import ray
import time
from torch.multiprocessing import SimpleQueue

from syllabus.core import Curriculum, decorate_all_functions
from syllabus.core import Curriculum

from .utils import decorate_all_functions # fixed circular import


class CurriculumWrapper:
"""Wrapper class for adding multiprocessing synchronization to a curriculum.
"""
"""Wrapper class for adding multiprocessing synchronization to a curriculum."""

def __init__(self, curriculum: Curriculum) -> None:
self.curriculum = curriculum
self.task_space = curriculum.task_space
Expand All @@ -26,7 +28,7 @@ def count_tasks(self, task_space=None):

@property
def tasks(self):
return self.task_space.tasks
return self.task_space.tasks

def get_tasks(self, task_space=None):
return self.task_space.get_tasks(gym_space=task_space)
Expand Down Expand Up @@ -57,13 +59,15 @@ def add_task(self, task):


class MultiProcessingCurriculumWrapper(CurriculumWrapper):
"""Wrapper which sends tasks and receives updates from environments wrapped in a corresponding MultiprocessingSyncWrapper.
"""
def __init__(self,
curriculum: Curriculum,
task_queue: SimpleQueue,
update_queue: SimpleQueue,
sequential_start: bool = True):
"""Wrapper which sends tasks and receives updates from environments wrapped in a corresponding MultiprocessingSyncWrapper."""

def __init__(
self,
curriculum: Curriculum,
task_queue: SimpleQueue,
update_queue: SimpleQueue,
sequential_start: bool = True,
):
super().__init__(curriculum)
self.task_queue = task_queue
self.update_queue = update_queue
Expand All @@ -78,7 +82,9 @@ def start(self):
"""
Start the thread that reads the complete_queue and reads the task_queue.
"""
self.update_thread = threading.Thread(name='update', target=self._update_queues, daemon=True)
self.update_thread = threading.Thread(
name="update", target=self._update_queues, daemon=True
)
self.should_update = True
self.update_thread.start()

Expand Down Expand Up @@ -106,18 +112,27 @@ def _update_queues(self):
requested_tasks += 1
# Decode task and task progress
if update["update_type"] == "task_progress":
update["metrics"] = (self.task_space.decode(update["metrics"][0]), update["metrics"][1])
update["metrics"] = (
self.task_space.decode(update["metrics"][0]),
update["metrics"][1],
)
self.batch_update(batch_updates)

# Sample new tasks
if requested_tasks > 0:
# TODO: Move this to curriculum, not sync wrapper
# Sequentially sample task_space before using curriculum method
if (self.sequential_start and
self.task_space.num_tasks is not None and
self.num_assigned_tasks + requested_tasks < self.task_space.num_tasks):
if (
self.sequential_start
and self.task_space.num_tasks is not None
and self.num_assigned_tasks + requested_tasks
< self.task_space.num_tasks
):
# Sample unseen tasks sequentially before using curriculum method
new_tasks = self.task_space.list_tasks()[self.num_assigned_tasks:self.num_assigned_tasks + requested_tasks]
new_tasks = self.task_space.list_tasks()[
self.num_assigned_tasks : self.num_assigned_tasks
+ requested_tasks
]
else:
new_tasks = self.curriculum.sample(k=requested_tasks)
for i, task in enumerate(new_tasks):
Expand Down Expand Up @@ -149,6 +164,7 @@ def remote_call(func):

Note that this causes functions to block, and should be only used for operations that do not require parallelization.
"""

@wraps(func)
def wrapper(self, *args, **kw):
f_name = func.__name__
Expand All @@ -159,6 +175,7 @@ def wrapper(self, *args, **kw):
if child_func == parent_func:
curriculum_func = getattr(self.curriculum, f_name)
return ray.get(curriculum_func.remote(*args, **kw))

return wrapper


Expand All @@ -168,7 +185,9 @@ def make_multiprocessing_curriculum(curriculum, **kwargs):
"""
task_queue = SimpleQueue()
update_queue = SimpleQueue()
mp_curriculum = MultiProcessingCurriculumWrapper(curriculum, task_queue, update_queue, **kwargs)
mp_curriculum = MultiProcessingCurriculumWrapper(
curriculum, task_queue, update_queue, **kwargs
)
mp_curriculum.start()
return mp_curriculum, task_queue, update_queue

Expand All @@ -190,6 +209,7 @@ class RayCurriculumWrapper(CurriculumWrapper):
for convenience.
# TODO: Implement the Curriculum methods explicitly
"""

def __init__(self, curriculum, actor_name="curriculum") -> None:
super().__init__(curriculum)
self.curriculum = RayWrapper.options(name=actor_name).remote(curriculum)
Expand All @@ -202,7 +222,9 @@ def __init__(self, curriculum, actor_name="curriculum") -> None:
def sample(self, k: int = 1):
return ray.get(self.curriculum.sample.remote(k=k))

def update_on_step_batch(self, step_results: List[Tuple[int, int, int, int]]) -> None:
def update_on_step_batch(
self, step_results: List[Tuple[int, int, int, int]]
) -> None:
ray.get(self.curriculum._on_step_batch.remote(step_results))

def add_task(self, task):
Expand Down
Loading