Skip to content

Self-fish/light-control

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

63 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Light Control

The purpose of this code is to be able to handle de lights in two different ways:

From remote preferences

It handles the light by reading the starting and finishing hour from the preferences-api by asking each minute what is the correct status and updating a local cache:

def handle_lights(use_case: HandleLightsUseCase):
    while True:
        use_case.check_from_preferences()
        time.sleep(60)

If for whatever reason the preferences-api is not available, it gets the preferences from the local cache:

def get_light_preferences():
    try:
        preferences = ApiDataSource.get_light_preferences()
        update_local_light_preferences(preferences)
        return LightPreferences(preferences.starting_hour, 
                                preferences.finishing_hour, preferences.light_mode,
                                LightPreferencesSource.API)
    except NoApiPreferenceException:
        try:
            preferences = LocalDataSource.get_light_preferences()
            return LightPreferences(preferences.starting_hour, 
                                    preferences.finishing_hour, preferences.light_mode,
                                    LightPreferencesSource.LOCAL)
        except NoLocalPreferencesException:
            raise HandleLightsException

From remote actions

We are able to send remote actions by using sockets. The messages are received by the actions-api who acts as a client sending those in the port 2021

def check_from_action(self):
    self.__socket_server.listen_messages(self.__handle_action)

def __handle_action(self, action):
    if action.decode("UTF-8") == LIGHTS_ON:
        self.__turn_on_lights()
    elif action.decode("UTF-8") == LIGHTS_OFF:
        self.__turn_off_lights()

It's done by creating a socket service listening in the port 2001

class SocketServer:

    def __init__(self, port):
        self.__service = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.__service.bind(("", port))
        self.__service.listen(1)

    def listen_messages(self, handle_message):
        while True:
            client, address = self.__service.accept()
            try:
                while True:
                    message = client.recv(1024)
                    if message:
                        handle_message(message)
                    else:
                        break
            finally:
                client.close()

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages