Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opencti-platform/opencti-front/lang/back/de.json
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"Primary motivation": "Primäre Motivation",
"Print date": "Druckdatum",
"Priority": "Priorität",
"Priority group": "Vorrangige Gruppe",
"Private key usage not after": "Verwendung des privaten Schlüssels nicht nach",
"Private key usage not before": "Verwendung privater Schlüssel nicht vor",
"Privileged": "Privilegiert",
Expand Down
1 change: 1 addition & 0 deletions opencti-platform/opencti-front/lang/back/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"Primary motivation": "Primary motivation",
"Print date": "Print date",
"Priority": "Priority",
"Priority group": "Priority group",
"Private key usage not after": "Private key usage not after",
"Private key usage not before": "Private key usage not before",
"Privileged": "Privileged",
Expand Down
1 change: 1 addition & 0 deletions opencti-platform/opencti-front/lang/back/es.json
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"Primary motivation": "Motivación principal",
"Print date": "Fecha de impresión",
"Priority": "Prioridad",
"Priority group": "Grupo prioritario",
"Private key usage not after": "Uso de clave privada no después",
"Private key usage not before": "Uso de clave privada no antes",
"Privileged": "Privilegiado",
Expand Down
1 change: 1 addition & 0 deletions opencti-platform/opencti-front/lang/back/fr.json
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"Primary motivation": "Motivation primaire",
"Print date": "Date d'impression",
"Priority": "Priorité",
"Priority group": "Groupe de priorité",
"Private key usage not after": "Utilisation de la clé privée pas après",
"Private key usage not before": "Utilisation de la clé privée pas avant",
"Privileged": "Privilégié",
Expand Down
1 change: 1 addition & 0 deletions opencti-platform/opencti-front/lang/back/it.json
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"Primary motivation": "Motivazione primaria",
"Print date": "Data di stampa",
"Priority": "Priorità",
"Priority group": "Gruppo prioritario",
"Private key usage not after": "Utilizzo della chiave privata non oltre",
"Private key usage not before": "Utilizzo della chiave privata non prima",
"Privileged": "Privilegiato",
Expand Down
1 change: 1 addition & 0 deletions opencti-platform/opencti-front/lang/back/ja.json
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"Primary motivation": "二次的動機",
"Print date": "アップロード日",
"Priority": "優先度",
"Priority group": "優先グループ",
"Private key usage not after": "秘密鍵の使用は",
"Private key usage not before": "秘密鍵の使用",
"Privileged": "特権",
Expand Down
1 change: 1 addition & 0 deletions opencti-platform/opencti-front/lang/back/ko.json
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"Primary motivation": "주요 동기",
"Print date": "인쇄 날짜",
"Priority": "우선순위",
"Priority group": "우선순위 그룹",
"Private key usage not after": "개인 키 사용 종료 날짜",
"Private key usage not before": "개인 키 사용 시작 날짜",
"Privileged": "특권",
Expand Down
1 change: 1 addition & 0 deletions opencti-platform/opencti-front/lang/back/ru.json
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"Primary motivation": "Основная мотивация",
"Print date": "Дата печати",
"Priority": "Приоритет",
"Priority group": "Приоритетная группа",
"Private key usage not after": "Использование закрытого ключа не после",
"Private key usage not before": "Использование закрытого ключа не ранее",
"Privileged": "Привилегированный",
Expand Down
1 change: 1 addition & 0 deletions opencti-platform/opencti-front/lang/back/zh.json
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@
"Primary motivation": "主要动机",
"Print date": "打印日期",
"Priority": "优先级",
"Priority group": "优先群体",
"Private key usage not after": "之后不允许使用私钥",
"Private key usage not before": "之前不允许使用私钥",
"Privileged": "特权",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2300,6 +2300,11 @@ type ManagerContractExcerpt {
slug: String!
}

enum ConnectorPriorityGroup {
REALTIME
DEFAULT
}

type Connector implements BasicObject & InternalObject {
id: ID!
standard_id: String!
Expand All @@ -2324,6 +2329,7 @@ type Connector implements BasicObject & InternalObject {
connector_user: User
connector_queue_details: ConnectorQueueDetails!
connector_info: ConnectorInfo
connector_priority_group: ConnectorPriorityGroup!
updated_at: DateTime
refreshed_at: DateTime
created_at: DateTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2204,6 +2204,11 @@ type ManagerContractExcerpt {
slug: String!
}

enum ConnectorPriorityGroup {
REALTIME
DEFAULT
}

type Connector implements BasicObject & InternalObject {
id: ID! # internal_id
standard_id: String!
Expand All @@ -2229,6 +2234,7 @@ type Connector implements BasicObject & InternalObject {
connector_user: User
connector_queue_details: ConnectorQueueDetails!
connector_info: ConnectorInfo
connector_priority_group: ConnectorPriorityGroup!
updated_at: DateTime
refreshed_at: DateTime
created_at: DateTime
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { AuthContext, AuthUser } from '../types/user';
import { BasicStoreEntityCsvMapper } from '../modules/internal/csvMapper/csvMapper-types';
import type { ConnectorPriorityGroup } from '../generated/graphql';

/**
* This is the type for internal connector like import CSV.
Expand All @@ -22,6 +23,7 @@ export interface Connector {
auto: boolean
connector_scope: string;
connector_type: string;
connector_priority_group?: ConnectorPriorityGroup
name: string;
built_in: boolean;
connector_schema_runtime_fn?: <T extends BasicStoreEntityCsvMapper> (context: AuthContext, user: AuthUser) => Promise<T[]>;
Expand Down
3 changes: 3 additions & 0 deletions opencti-platform/opencti-graphql/src/database/repository.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import { ENTITY_TYPE_PIR } from '../modules/pir/pir-types';
import { getEntitiesMapFromCache } from './cache';
import { SYSTEM_USER } from '../utils/access';
import { logApp } from '../config/conf';
import { ConnectorPriorityGroup } from '../generated/graphql';

export const CONNECTOR_PRIORITY_GROUP_VALUES = Object.values(ConnectorPriorityGroup);

export const completeConnector = (connector) => {
if (connector) {
Expand Down
24 changes: 14 additions & 10 deletions opencti-platform/opencti-graphql/src/domain/connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,28 @@ import { isEmptyField, READ_INDEX_HISTORY } from '../database/utils';
import { ABSTRACT_INTERNAL_OBJECT, CONNECTOR_INTERNAL_EXPORT_FILE, OPENCTI_NAMESPACE } from '../schema/general';
import { isUserHasCapability, SETTINGS_SET_ACCESSES, SYSTEM_USER } from '../utils/access';
import {
type ConnectorHealthMetrics,
delEditContext,
notify,
redisGetConnectorHealthMetrics,
redisGetWork,
redisSetConnectorLogs,
setEditContext,
redisSetConnectorHealthMetrics,
redisGetConnectorHealthMetrics,
type ConnectorHealthMetrics
redisSetConnectorLogs,
setEditContext
} from '../database/redis';
import { internalLoadById, fullEntitiesList, pageEntitiesConnection, storeLoadById } from '../database/middleware-loader';
import { fullEntitiesList, internalLoadById, pageEntitiesConnection, storeLoadById } from '../database/middleware-loader';
import { completeContextDataForEntity, publishUserAction, type UserImportActionContextData } from '../listener/UserActionListener';
import type { AuthContext, AuthUser } from '../types/user';
import type { BasicStoreEntityConnector, BasicStoreEntityConnectorManager, BasicStoreEntitySynchronizer, ConnectorInfo } from '../types/connector';
import {
type AddManagedConnectorInput,
ConnectorPriorityGroup,
ConnectorType,
type CurrentConnectorStatusInput,
type EditContext,
type EditInput,
type EditManagedConnectorInput,
type HealthConnectorStatusInput,
IngestionAuthType,
type LogsConnectorStatusInput,
type MutationSynchronizerTestArgs,
Expand All @@ -42,8 +44,7 @@ import {
type SynchronizerAddInput,
type SynchronizerFetchInput,
type UpdateConnectorManagerStatusInput,
type HealthConnectorStatusInput,
ValidationMode,
ValidationMode
} from '../generated/graphql';
import { BUS_TOPICS, logApp } from '../config/conf';
import { deleteWorkForConnector } from './work';
Expand All @@ -53,7 +54,7 @@ import { controlUserConfidenceAgainstElement } from '../utils/confidence-level';
import { extractEntityRepresentativeName } from '../database/entity-representative';
import type { BasicStoreCommon } from '../types/store';
import type { Connector } from '../connector/internalConnector';
import { addWorkbenchDraftConvertionCount, addWorkbenchValidationCount, addConnectorDeployedCount } from '../manager/telemetryManager';
import { addConnectorDeployedCount, addWorkbenchDraftConvertionCount, addWorkbenchValidationCount } from '../manager/telemetryManager';
import { computeConnectorTargetContract, getSupportedContractsByImage } from '../modules/catalog/catalog-domain';
import { getEntitiesMapFromCache } from '../database/cache';
import { removeAuthenticationCredentials } from '../modules/ingestion/ingestion-common';
Expand Down Expand Up @@ -160,7 +161,8 @@ export const resetStateConnector = async (context: AuthContext, user: AuthUser,
interface RegisterOptions {
built_in?: boolean
active?: boolean
connector_user_id?: string | null
connector_user_id?: string | null,
connector_priority_group?: ConnectorPriorityGroup,
}

export const registerConnectorsManager = async (context: AuthContext, user: AuthUser, input: RegisterConnectorsManagerInput) => {
Expand Down Expand Up @@ -344,6 +346,7 @@ export const registerConnector = async (
connector_user_id: opts.connector_user_id ?? user.id,
connector_state_timestamp: now(),
built_in: opts.built_in ?? false,
connector_priority_group: opts.connector_priority_group ?? ConnectorPriorityGroup.Default,
};
if (opts.active !== undefined) {
connectorToCreate.active = opts.active;
Expand Down Expand Up @@ -474,7 +477,8 @@ interface ConnectorIngestionInput {
type: 'RSS' | 'CSV' | 'TAXII' | 'TAXII-PUSH' | 'JSON' | 'FORM',
name: string,
connector_user_id?: string | null,
is_running: boolean
is_running: boolean,
connector_priority_group?: ConnectorPriorityGroup,
}
export const connectorIdFromIngestId = (id: string) => uuidv5(id, OPENCTI_NAMESPACE);
export const registerConnectorForIngestion = async (context: AuthContext, input: ConnectorIngestionInput) => {
Expand Down
8 changes: 8 additions & 0 deletions opencti-platform/opencti-graphql/src/generated/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3956,6 +3956,7 @@ export type Connector = BasicObject & InternalObject & {
config?: Maybe<ConnectorConfig>;
configurations?: Maybe<Array<ConnectorConfiguration>>;
connector_info?: Maybe<ConnectorInfo>;
connector_priority_group: ConnectorPriorityGroup;
connector_queue_details: ConnectorQueueDetails;
connector_schema?: Maybe<Scalars['String']['output']>;
connector_schema_ui?: Maybe<Scalars['String']['output']>;
Expand Down Expand Up @@ -4076,6 +4077,11 @@ export type ConnectorMetadata = {
configuration: Scalars['String']['output'];
};

export enum ConnectorPriorityGroup {
Default = 'DEFAULT',
Realtime = 'REALTIME'
}

export type ConnectorQueueDetails = {
__typename?: 'ConnectorQueueDetails';
messages_number: Scalars['Float']['output'];
Expand Down Expand Up @@ -34060,6 +34066,7 @@ export type ResolversTypes = ResolversObject<{
ConnectorInfoInput: ConnectorInfoInput;
ConnectorManager: ResolverTypeWrapper<ConnectorManager>;
ConnectorMetadata: ResolverTypeWrapper<ConnectorMetadata>;
ConnectorPriorityGroup: ConnectorPriorityGroup;
ConnectorQueueDetails: ResolverTypeWrapper<ConnectorQueueDetails>;
ConnectorRequestStatus: ConnectorRequestStatus;
ConnectorType: ConnectorType;
Expand Down Expand Up @@ -37122,6 +37129,7 @@ export type ConnectorResolvers<ContextType = any, ParentType extends ResolversPa
config?: Resolver<Maybe<ResolversTypes['ConnectorConfig']>, ParentType, ContextType>;
configurations?: Resolver<Maybe<Array<ResolversTypes['ConnectorConfiguration']>>, ParentType, ContextType>;
connector_info?: Resolver<Maybe<ResolversTypes['ConnectorInfo']>, ParentType, ContextType>;
connector_priority_group?: Resolver<ResolversTypes['ConnectorPriorityGroup'], ParentType, ContextType>;
connector_queue_details?: Resolver<ResolversTypes['ConnectorQueueDetails'], ParentType, ContextType>;
connector_schema?: Resolver<Maybe<ResolversTypes['String']>, ParentType, ContextType>;
connector_schema_ui?: Resolver<Maybe<ResolversTypes['String']>, ParentType, ContextType>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { EVENT_ACCESS_VALUES, EVENT_SCOPE_VALUES, EVENT_STATUS_VALUES, EVENT_TYP
import { RETENTION_SCOPE_VALUES, RETENTION_UNIT_VALUES } from '../../manager/retentionManager';
import { ENTITY_TYPE_PIR } from '../pir/pir-types';
import { computeAccountStatusChoices } from '../../config/conf';
import { CONNECTOR_PRIORITY_GROUP_VALUES } from '../../database/repository';

const HistoryDefinition: AttributeDefinition[] = [
{ name: 'event_type', label: 'Event type', type: 'string', format: 'enum', values: EVENT_TYPE_VALUES, editDefault: false, mandatoryType: 'internal', multiple: false, upsert: false, isFilterable: true },
Expand Down Expand Up @@ -410,6 +411,7 @@ const internalObjectsAttributes: { [k: string]: Array<AttributeDefinition> } = {
{ name: 'connector_scope', label: 'Connector scope', type: 'string', format: 'short', mandatoryType: 'external', editDefault: false, multiple: false, upsert: false, isFilterable: true },
{ name: 'connector_state', label: 'Connector state', type: 'string', format: 'json', mandatoryType: 'no', editDefault: false, multiple: false, upsert: false, isFilterable: true },
{ name: 'connector_state_reset', label: 'State reset', type: 'boolean', mandatoryType: 'no', editDefault: false, multiple: false, upsert: false, isFilterable: true },
{ name: 'connector_priority_group', label: 'Priority group', type: 'string', format: 'enum', values: CONNECTOR_PRIORITY_GROUP_VALUES, mandatoryType: 'no', editDefault: false, multiple: false, upsert: false, isFilterable: false },
{ name: 'connector_state_timestamp', label: 'State reset timestamp', type: 'date', mandatoryType: 'no', editDefault: false, multiple: false, upsert: false, isFilterable: true },
{ name: 'connector_trigger_filters', label: 'Connector trigger filters', type: 'string', format: 'text', mandatoryType: 'no', editDefault: false, multiple: false, upsert: false, isFilterable: false },
{ name: 'connector_user_id', label: 'Connector user id', type: 'string', format: 'short', mandatoryType: 'no', editDefault: false, multiple: false, upsert: false, isFilterable: false },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Connector, ConnectorConfig } from '../../connector/internalConnect
import { CONNECTOR_INTERNAL_INGESTION } from '../../schema/general';
import { logApp } from '../../config/conf';
import { registerConnectorQueues } from '../../database/rabbitmq';
import { ConnectorPriorityGroup } from '../../generated/graphql';

export const DRAFT_VALIDATION_CONNECTOR_ID = 'c194e700-afb6-4c4e-ad1b-d4a00590e735';

Expand All @@ -12,6 +13,7 @@ export const DRAFT_VALIDATION_CONNECTOR: Connector = {
auto: false,
connector_scope: 'draft',
connector_type: CONNECTOR_INTERNAL_INGESTION,
connector_priority_group: ConnectorPriorityGroup.Realtime,
name: '[DRAFT] Draft validation',
built_in: true,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { generateStandardId } from '../../schema/identifier';
import { logApp } from '../../config/conf';
import { pushToWorkerForConnector } from '../../database/rabbitmq';
import { createWork, updateExpectationsNumber } from '../../domain/work';
import { ConnectorType, FilterMode, type FormSubmissionInput } from '../../generated/graphql';
import { ConnectorPriorityGroup, ConnectorType, FilterMode, type FormSubmissionInput } from '../../generated/graphql';
import { now, nowTime } from '../../utils/format';
import { SYSTEM_USER } from '../../utils/access';
import { convertStoreToStix } from '../../database/stix-2-1-converter';
Expand Down Expand Up @@ -90,7 +90,8 @@ export const addForm = async (
type: 'FORM',
name: element.name,
is_running: element.active ?? false,
connector_user_id: user.id
connector_user_id: user.id,
connector_priority_group: ConnectorPriorityGroup.Realtime,
});

await publishUserAction({
Expand Down
2 changes: 2 additions & 0 deletions opencti-platform/opencti-graphql/src/resolvers/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import {
import { getConnectorQueueSize } from '../database/rabbitmq';
import { redisGetConnectorLogs } from '../database/redis';
import pjson from '../../package.json';
import { ConnectorPriorityGroup } from '../generated/graphql';

export const PLATFORM_VERSION = pjson.version;

Expand Down Expand Up @@ -92,6 +93,7 @@ const connectorResolvers = {
Connector: {
works: (cn, args, context) => worksForConnector(context, context.user, cn.id, args),
connector_queue_details: (cn) => queueDetails(cn.id),
connector_priority_group: (cn) => { return cn.connector_priority_group ?? ConnectorPriorityGroup.Default; },
connector_user: (cn, _, context) => connectorUser(context, context.user, cn.connector_user_id),
manager_connector_logs: (cn) => redisGetConnectorLogs(cn.id),
manager_health_metrics: (cn, _, context) => connectorGetHealth(context, context.user, cn.id),
Expand Down
19 changes: 18 additions & 1 deletion opencti-worker/src/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
)


def is_priority_connector(connector_priority_group: str) -> bool:
return connector_priority_group == "REALTIME"


@dataclass(unsafe_hash=True)
class Worker: # pylint: disable=too-few-public-methods, too-many-instance-attributes
consumers: Dict[str, MessageQueueConsumer] = field(default_factory=dict, hash=False)
Expand Down Expand Up @@ -79,6 +83,13 @@ def __post_init__(self) -> None:
True,
default=5,
)
self.opencti_realtime_pool_size = get_config_variable(
"OPENCTI_REALTIME_EXECUTION_POOL_SIZE",
["opencti", "realtime_execution_pool_size"],
config,
True,
default=5,
)
self.listen_pool_size = get_config_variable(
"WORKER_LISTEN_POOL_SIZE",
["worker", "listen_pool_size"],
Expand Down Expand Up @@ -206,6 +217,9 @@ def stop(self) -> None:
# Start the main loop
def start(self) -> None:
push_execution_pool = ThreadPoolExecutor(max_workers=self.opencti_pool_size)
realtime_push_execution_pool = ThreadPoolExecutor(
max_workers=self.opencti_realtime_pool_size
)
listen_execution_pool = ThreadPoolExecutor(max_workers=self.listen_pool_size)

while not self.exit_event.is_set():
Expand Down Expand Up @@ -250,12 +264,15 @@ def start(self) -> None:
bundles_processing_time_gauge,
self.objects_max_refs,
)
execution_pool = push_execution_pool
if is_priority_connector(connector["connector_priority_group"]):
execution_pool = realtime_push_execution_pool
self.consumers[push_queue] = MessageQueueConsumer(
self.worker_logger,
"push",
push_queue,
pika_parameters,
push_execution_pool,
execution_pool,
push_handler.handle_message,
)

Expand Down