Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions apps/api/plane/bgtasks/issue_activities_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,67 @@ def extract_ids(data: dict | None, primary_key: str, fallback_key: str) -> set[s
return {str(x) for x in data.get(fallback_key, [])}


def _deserialize_for_broadcast(raw_value):
if raw_value is None:
return None
if isinstance(raw_value, (dict, list)):
return raw_value
if isinstance(raw_value, str):
try:
return json.loads(raw_value)
except ValueError:
return raw_value
return str(raw_value)


def broadcast_issue_event(
*,
project_id,
issue_id,
event_type,
actor_id,
epoch,
requested_data,
current_instance,
):
if not issue_id or not project_id:
return

try:
redis_client = redis_instance()
except Exception as redis_error: # pragma: no cover - defensive guard
log_exception(redis_error)
return

if not redis_client:
return

channel_id = str(project_id)
payload = {
"type": event_type,
"issue_id": str(issue_id),
"project_id": channel_id,
"actor_id": str(actor_id) if actor_id else None,
"timestamp": int(epoch) if isinstance(epoch, (int, float)) else None,
}

requested_payload = _deserialize_for_broadcast(requested_data)
if requested_payload is not None:
payload["requested_data"] = requested_payload

current_payload = _deserialize_for_broadcast(current_instance)
if current_payload is not None:
payload["current_instance"] = current_payload

try:
redis_client.publish(
f"issue_events:{channel_id}",
json.dumps(payload, default=str),
)
except Exception as publish_error: # pragma: no cover - defensive guard
log_exception(publish_error)


# Track Changes in name
def track_name(
requested_data,
Expand Down Expand Up @@ -1579,6 +1640,17 @@ def issue_activity(
# Save all the values to database
issue_activities_created = IssueActivity.objects.bulk_create(issue_activities)

# Broadcast project issue updates for realtime listeners
broadcast_issue_event(
project_id=project_id,
issue_id=issue_id,
event_type=type,
actor_id=actor_id,
epoch=epoch,
requested_data=requested_data,
current_instance=current_instance,
)

if notification:
notifications.delay(
type=type,
Expand Down
3 changes: 2 additions & 1 deletion apps/live/src/controllers/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CollaborationController } from "./collaboration.controller";
import { ConvertDocumentController } from "./convert-document.controller";
import { HealthController } from "./health.controller";
import { IssueEventsController } from "./issue-events.controller";

export const CONTROLLERS = [CollaborationController, ConvertDocumentController, HealthController];
export const CONTROLLERS = [CollaborationController, ConvertDocumentController, HealthController, IssueEventsController];
178 changes: 178 additions & 0 deletions apps/live/src/controllers/issue-events.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import type Redis from "ioredis";
import type { Request } from "express";
import type WebSocket from "ws";
// plane imports
import { Controller, WebSocket as WSDecorator } from "@plane/decorators";
import { logger } from "@plane/logger";
// redis
import { redisManager } from "@/redis";
// auth
import { handleAuthentication } from "@/lib/auth";

type TokenPayload = {
id?: string;
cookie?: string;
};

type ConnectionParams = {
projectId: string;
workspaceSlug: string;
token: string;
};

const getFirstQueryValue = (value?: string | string[]) => (Array.isArray(value) ? value[0] : value);

const extractConnectionParams = (req: Request): ConnectionParams | null => {
const query = req.query as Record<string, string | string[]>;
const projectId = getFirstQueryValue(query.projectId);
const workspaceSlug = getFirstQueryValue(query.workspaceSlug);
const token = getFirstQueryValue(query.token);

if (!projectId || !workspaceSlug || !token) {
return null;
}

return { projectId, workspaceSlug, token };
};

const parseToken = (rawToken: string): TokenPayload | null => {
try {
const parsed: unknown = JSON.parse(rawToken);
if (!parsed || typeof parsed !== "object") {
return null;
}
return parsed as TokenPayload;
} catch (error) {
logger.error("Invalid token payload for issue events", error);
return null;
}
};

const closeSocket = (ws: WebSocket, code: number, reason: string) => {
if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) {
return;
}

try {
ws.close(code, reason);
} catch (error) {
logger.error("Issue events websocket close failure", error);
}
};

const ensureAuthenticated = async (ws: WebSocket, token: TokenPayload, req: Request) => {
const cookie = token?.cookie || req.headers.cookie || "";
if (cookie) {
try {
await handleAuthentication({
cookie,
userId: token?.id ?? "",
});
return true;
} catch (error) {
logger.error("Failed to authenticate issue events connection", error);
closeSocket(ws, 4003, "Unauthorized");
return false;
}
}

if (!token?.id) {
closeSocket(ws, 4003, "Unauthorized");
return false;
}

return true;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: WebSocket Authentication Bypass

The ensureAuthenticated function allows WebSocket connections to proceed without proper authentication when a token.id is present but no cookie is provided. This bypasses the handleAuthentication check, potentially allowing unauthorized access.

Fix in Cursor Fix in Web

};

@Controller("/issues")
export class IssueEventsController {
@WSDecorator("/")
async handleConnection(ws: WebSocket, req: Request) {
const params = extractConnectionParams(req);
if (!params) {
closeSocket(ws, 4001, "Missing required parameters");
return;
}

const tokenPayload = parseToken(params.token);
if (!tokenPayload) {
closeSocket(ws, 4002, "Invalid token");
return;
}

const authenticated = await ensureAuthenticated(ws, tokenPayload, req);
if (!authenticated) {
return;
}

const redisClient = redisManager.getClient();
if (!redisClient) {
closeSocket(ws, 1011, "Realtime service unavailable");
return;
}

let subscriber: Redis;
try {
subscriber = redisClient.duplicate();
} catch (error) {
logger.error("Failed to create issue events redis subscriber", error);
closeSocket(ws, 1011, "Realtime service unavailable");
return;
}

const channel = `issue_events:${params.projectId}`;
let cleanupStarted = false;

const cleanup = async () => {
if (cleanupStarted) return;
cleanupStarted = true;

subscriber.removeAllListeners("message");
subscriber.removeAllListeners("error");

try {
await subscriber.unsubscribe(channel);
} catch (error) {
logger.error("Failed to unsubscribe issue events channel", error);
}

try {
subscriber.disconnect();
} catch (error) {
logger.error("Failed to disconnect issue events subscriber", error);
}
};

try {
subscriber.on("error", (error) => {
logger.error("Issue events redis subscriber error", error);
closeSocket(ws, 1011, "Realtime service unavailable");
void cleanup();
});

await subscriber.connect();

subscriber.on("message", (incomingChannel, message) => {
if (incomingChannel === channel && ws.readyState === ws.OPEN) {
ws.send(message);
}
});

await subscriber.subscribe(channel);

ws.on("close", () => {
void cleanup();
});

ws.on("error", (error) => {
logger.error("Issue events websocket error", error);
closeSocket(ws, 1011, "Issue events websocket error");
void cleanup();
});
} catch (error) {
logger.error("Failed to subscribe to issue events channel", error);
closeSocket(ws, 1011, "Subscription failure");
void cleanup();
}
}
}
Loading