Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions backend/app/deps/authorization_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from app.models.datasets import DatasetDB, DatasetStatus
from app.models.files import FileDB, FileStatus
from app.models.groups import GroupDB
from app.models.listeners import EventListenerDB
from app.models.metadata import MetadataDB
from app.routers.authentication import get_admin, get_admin_mode
from beanie import PydanticObjectId
Expand Down Expand Up @@ -389,6 +390,41 @@ async def __call__(
raise HTTPException(status_code=404, detail=f"Group {group_id} not found")


class ListenerAuthorization:
"""We use class dependency so that we can provide the `permission` parameter to the dependency.
For more info see https://fastapi.tiangolo.com/advanced/advanced-dependencies/.
Regular users are not allowed to run non-active listeners"""

# def __init__(self, optional_arg: str = None):
# self.optional_arg = optional_arg

async def __call__(
self,
listener_id: str,
current_user: str = Depends(get_current_username),
admin_mode: bool = Depends(get_admin_mode),
admin: bool = Depends(get_admin),
):
# If the current user is admin and has turned on admin_mode, user has access irrespective of any role assigned
if admin and admin_mode:
return True

# Else check if listener is active or current user is the creator of the extractor
if (
listener := await EventListenerDB.get(PydanticObjectId(listener_id))
) is not None:
if listener.active is True or (
listener.creator and listener.creator.email == current_user
):
return True
else:
raise HTTPException(
status_code=403,
detail=f"User `{current_user} does not have permission on listener `{listener_id}`",
)
raise HTTPException(status_code=404, detail=f"Listener {listener_id} not found")


class CheckStatus:
"""We use class dependency so that we can provide the `permission` parameter to the dependency.
For more info see https://fastapi.tiangolo.com/advanced/advanced-dependencies/."""
Expand Down
1 change: 1 addition & 0 deletions backend/app/models/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class EventListenerDB(Document, EventListenerBase):
modified: datetime = Field(default_factory=datetime.now)
lastAlive: datetime = None
alive: Optional[bool] = None # made up field to indicate if extractor is alive
active: bool = False
properties: Optional[ExtractorInfo] = None

class Settings:
Expand Down
29 changes: 23 additions & 6 deletions backend/app/routers/feeds.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from typing import List, Optional

from app.deps.authorization_deps import ListenerAuthorization
from app.keycloak_auth import get_current_user, get_current_username
from app.models.feeds import FeedDB, FeedIn, FeedOut
from app.models.files import FileOut
from app.models.listeners import EventListenerDB, FeedListener
from app.models.users import UserOut
from app.rabbitmq.listeners import submit_file_job
from app.routers.authentication import get_admin, get_admin_mode
from app.search.connect import check_search_result
from beanie import PydanticObjectId
from fastapi import APIRouter, Depends, HTTPException
Expand All @@ -23,7 +25,7 @@ async def disassociate_listener_db(feed_id: str, listener_id: str):
if (feed := await FeedDB.get(PydanticObjectId(feed_id))) is not None:
new_listeners = []
for feed_listener in feed.listeners:
if feed_listener.listener_id != listener_id:
if feed_listener.listener_id != PydanticObjectId(listener_id):
new_listeners.append(feed_listener)
feed.listeners = new_listeners
await feed.save()
Expand Down Expand Up @@ -122,6 +124,8 @@ async def associate_listener(
feed_id: str,
listener: FeedListener,
user=Depends(get_current_user),
admin=Depends(get_admin),
admin_mode=Depends(get_admin_mode),
):
"""Associate an existing Event Listener with a Feed, e.g. so it will be triggered on new Feed results.

Expand All @@ -131,22 +135,35 @@ async def associate_listener(
"""
if (feed := await FeedDB.get(PydanticObjectId(feed_id))) is not None:
if (
await EventListenerDB.get(PydanticObjectId(listener.listener_id))
listener_db := await EventListenerDB.get(
PydanticObjectId(listener.listener_id)
)
) is not None:
feed.listeners.append(listener)
await feed.save()
return feed.dict()
if (
(admin and admin_mode)
or (listener_db.creator and listener_db.creator.email == user.email)
or listener_db.active
):
feed.listeners.append(listener)
await feed.save()
return feed.dict()
else:
raise HTTPException(
status_code=403,
detail=f"User {user} doesn't have permission to submit job to listener {listener.listener_id}",
)
raise HTTPException(
status_code=404, detail=f"listener {listener.listener_id} not found"
)
raise HTTPException(status_code=404, detail=f"feed {feed_id} not found")


@router.delete("/{feed_id}/listeners/{listener_id}", response_model=FeedOut)
@router.delete("/{feed_id}/listeners/{listener_id}")
async def disassociate_listener(
feed_id: str,
listener_id: str,
user=Depends(get_current_user),
allow: bool = Depends(ListenerAuthorization()),
):
"""Disassociate an Event Listener from a Feed.

Expand Down
73 changes: 72 additions & 1 deletion backend/app/routers/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import List, Optional

from app.config import settings
from app.deps.authorization_deps import ListenerAuthorization
from app.keycloak_auth import get_current_user, get_current_username, get_user
from app.models.config import ConfigEntryDB
from app.models.feeds import FeedDB, FeedListener
Expand All @@ -18,6 +19,7 @@
from app.models.pages import Paged, _construct_page_metadata, _get_page_query
from app.models.search import SearchCriteria
from app.models.users import UserOut
from app.routers.authentication import get_admin, get_admin_mode
from app.routers.feeds import disassociate_listener_db
from beanie import PydanticObjectId
from beanie.operators import Or, RegEx
Expand Down Expand Up @@ -198,6 +200,8 @@ async def search_listeners(
heartbeat_interval: Optional[int] = settings.listener_heartbeat_interval,
user=Depends(get_current_username),
process: Optional[str] = None,
admin=Depends(get_admin),
admin_mode=Depends(get_admin_mode),
):
"""Search all Event Listeners in the db based on text.

Expand All @@ -221,6 +225,8 @@ async def search_listeners(
aggregation_pipeline.append(
{"$match": {"properties.process.dataset": {"$exists": True}}}
)
if not admin or not admin_mode:
aggregation_pipeline.append({"$match": {"active": True}})
# Add pagination
aggregation_pipeline.append(
_get_page_query(skip, limit, sort_field="name", ascending=True)
Expand Down Expand Up @@ -260,7 +266,11 @@ async def list_default_labels(user=Depends(get_current_username)):


@router.get("/{listener_id}", response_model=EventListenerOut)
async def get_listener(listener_id: str, user=Depends(get_current_username)):
async def get_listener(
listener_id: str,
user=Depends(get_current_username),
allow: bool = Depends(ListenerAuthorization()),
):
"""Return JSON information about an Event Listener if it exists."""
if (
listener := await EventListenerDB.get(PydanticObjectId(listener_id))
Expand All @@ -274,6 +284,7 @@ async def check_listener_livelihood(
listener_id: str,
heartbeat_interval: Optional[int] = settings.listener_heartbeat_interval,
user=Depends(get_current_username),
allow: bool = Depends(ListenerAuthorization()),
):
"""Return JSON information about an Event Listener if it exists."""
if (
Expand All @@ -293,6 +304,9 @@ async def get_listeners(
label: Optional[str] = None,
alive_only: Optional[bool] = False,
process: Optional[str] = None,
all: Optional[bool] = False,
admin=Depends(get_admin),
admin_mode=Depends(get_admin_mode),
):
"""Get a list of all Event Listeners in the db.

Expand All @@ -303,6 +317,7 @@ async def get_listeners(
category -- filter by category has to be exact match
label -- filter by label has to be exact match
alive_only -- filter by alive status
all -- boolean stating if we want to show all listeners irrespective of admin and admin_mode
"""
# First compute alive flag for all listeners
aggregation_pipeline = [
Expand All @@ -325,6 +340,9 @@ async def get_listeners(
aggregation_pipeline.append(
{"$match": {"properties.process.dataset": {"$exists": True}}}
)
# Non admin users can access only active listeners unless all is turned on for Extractor page
if not all and (not admin or not admin_mode):
aggregation_pipeline.append({"$match": {"active": True}})
# Add pagination
aggregation_pipeline.append(
_get_page_query(skip, limit, sort_field="name", ascending=True)
Expand All @@ -351,6 +369,7 @@ async def edit_listener(
listener_id: str,
listener_in: EventListenerIn,
user_id=Depends(get_user),
allow: bool = Depends(ListenerAuthorization()),
):
"""Update the information about an existing Event Listener..

Expand All @@ -374,10 +393,62 @@ async def edit_listener(
raise HTTPException(status_code=404, detail=f"listener {listener_id} not found")


@router.put("/{listener_id}/enable", response_model=EventListenerOut)
async def enable_listener(
listener_id: str,
user_id=Depends(get_user),
allow: bool = Depends(ListenerAuthorization()),
):
"""Enable an Event Listener. Only admins can enable listeners.

Arguments:
listener_id -- UUID of the listener to be enabled
"""
return await _set_active_flag(listener_id, True)


@router.put("/{listener_id}/disable", response_model=EventListenerOut)
async def disable_listener(
listener_id: str,
user_id=Depends(get_user),
allow: bool = Depends(ListenerAuthorization()),
):
"""Disable an Event Listener. Only admins can enable listeners.

Arguments:
listener_id -- UUID of the listener to be enabled
"""
return await _set_active_flag(listener_id, False)


async def _set_active_flag(
listener_id: str,
active: bool,
allow: bool = Depends(ListenerAuthorization()),
):
"""Set the active flag of an Event Listener. Only admins can enable/disable listeners.

Arguments:
listener_id -- UUID of the listener to be enabled/disabled
"""
listener = await EventListenerDB.find_one(
EventListenerDB.id == ObjectId(listener_id)
)
if listener:
try:
listener.active = active
await listener.save()
return listener.dict()
except Exception as e:
raise HTTPException(status_code=500, detail=e.args[0])
raise HTTPException(status_code=404, detail=f"listener {listener_id} not found")


@router.delete("/{listener_id}")
async def delete_listener(
listener_id: str,
user=Depends(get_current_username),
allow: bool = Depends(ListenerAuthorization()),
):
"""Remove an Event Listener from the database. Will not clear event history for the listener."""
listener = await EventListenerDB.find_one(
Expand Down
20 changes: 20 additions & 0 deletions backend/app/tests/test_extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,23 @@ def test_v1_mime_trigger(client: TestClient, headers: dict):
)
assert response.status_code == 200
assert len(response.json()) > 0


def test_enable_disable_extractor(client: TestClient, headers: dict):
# create a new extractor
ext_name = "test.v1_extractor"
extractor_id = register_v1_extractor(client, headers, ext_name).get("id")

# enable the extractor
response = client.put(
f"{settings.API_V2_STR}/listeners/{extractor_id}/enable", headers=headers
)
assert response.status_code == 200
assert response.json()["active"] is True

# disable the extractor
response = client.put(
f"{settings.API_V2_STR}/listeners/{extractor_id}/disable", headers=headers
)
assert response.status_code == 200
assert response.json()["active"] is False
45 changes: 43 additions & 2 deletions frontend/src/actions/listeners.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ export function fetchListeners(
category = null,
label = null,
aliveOnly = false,
process = null
process = null,
all = false
) {
return (dispatch) => {
// TODO: Parameters for dates? paging?
Expand All @@ -21,7 +22,8 @@ export function fetchListeners(
category,
label,
aliveOnly,
process
process,
all
)
.then((json) => {
dispatch({
Expand Down Expand Up @@ -49,6 +51,45 @@ export function fetchListeners(
};
}

export const TOGGLE_ACTIVE_FLAG_LISTENER = "TOGGLE_ACTIVE_FLAG_LISTENER";
export function enableListener(id) {
return (dispatch) => {
return V2.ListenersService.enableListenerApiV2ListenersListenerIdEnablePut(
id
)
.then((json) => {
// We could have called fetchListeners but it would be an overhead since we are just toggling the active flag for one listener.
// Hence we create a separate action to update the particular listener in state
dispatch({
type: TOGGLE_ACTIVE_FLAG_LISTENER,
listener: json,
});
})
.catch((reason) => {
dispatch(handleErrors(reason, enableListener(id)));
});
};
}

export function disableListener(id) {
return (dispatch) => {
return V2.ListenersService.disableListenerApiV2ListenersListenerIdDisablePut(
id
)
.then((json) => {
// We could have called fetchListeners but it would be an overhead since we are just toggling the active flag for one listener.
// Hence we create a separate action to update the particular listener in state
dispatch({
type: TOGGLE_ACTIVE_FLAG_LISTENER,
listener: json,
});
})
.catch((reason) => {
dispatch(handleErrors(reason, disableListener(id)));
});
};
}

export const SEARCH_LISTENERS = "SEARCH_LISTENERS";

export function queryListeners(
Expand Down
Loading