import hashlib
import ipaddress
import json
import logging
import pathlib
import re
from typing import Any, AnyStr, Callable, Dict, List, Optional, Union
import requests
from typing_extensions import Literal
from pyintelowl.version import __version__
from .exceptions import IntelOwlClientException
TLPType = Literal["WHITE", "GREEN", "AMBER", "RED", "CLEAR"]
[docs]class IntelOwl:
logger: logging.Logger
def __init__(
self,
token: str,
instance_url: str,
certificate: str = None,
proxies: dict = None,
logger: logging.Logger = None,
cli: bool = False,
):
self.token = token
self.instance = instance_url
self.certificate = certificate
if logger:
self.logger = logger
else:
self.logger = logging.getLogger(__name__)
if proxies and not isinstance(proxies, dict):
raise TypeError("proxies param must be a dictionary")
self.proxies = proxies
self.cli = cli
@property
def session(self) -> requests.Session:
"""
Internal use only.
"""
if not hasattr(self, "_session"):
session = requests.Session()
if self.certificate is not True:
session.verify = self.certificate
if self.proxies:
session.proxies = self.proxies
session.headers.update(
{
"Authorization": f"Token {self.token}",
"User-Agent": f"PyIntelOwl/{__version__}",
}
)
self._session = session
return self._session
def __make_request(
self,
method: Literal["GET", "POST", "PUT", "PATCH", "DELETE"] = "GET",
*args,
**kwargs,
) -> requests.Response:
"""
For internal use only.
"""
response: requests.Response = None
requests_function_map: Dict[str, Callable] = {
"GET": self.session.get,
"POST": self.session.post,
"PUT": self.session.put,
"PATCH": self.session.patch,
"DELETE": self.session.delete,
}
func = requests_function_map.get(method, None)
if not func:
raise RuntimeError(f"Unsupported method name: {method}")
try:
response = func(*args, **kwargs)
self.logger.debug(
msg=(response.url, response.status_code, response.content)
)
response.raise_for_status()
except Exception as e:
raise IntelOwlClientException(e, response=response)
return response
[docs] def ask_analysis_availability(
self,
md5: str,
analyzers: List[str] = None,
check_reported_analysis_too: bool = False,
minutes_ago: int = None,
) -> Dict:
"""Search for already available analysis.\n
Endpoint: ``/api/ask_analysis_availability``
Args:
md5 (str): md5sum of the observable or file
analyzers (List[str], optional):
list of analyzers to trigger.
Defaults to `None` meaning automatically select all configured analyzers.
check_reported_analysis_too (bool, optional):
Check against all existing jobs. Defaults to ``False``.
minutes_ago (int, optional):
number of minutes to check back for analysis.
Default is None so the check does not have any time limits.
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Dict: JSON body
"""
if not analyzers:
analyzers = []
data = {"md5": md5, "analyzers": analyzers}
if not check_reported_analysis_too:
data["running_only"] = True
if minutes_ago:
data["minutes_ago"] = int(minutes_ago)
url = self.instance + "/api/ask_analysis_availability"
response = self.__make_request("POST", url=url, data=data)
answer = response.json()
status, job_id = answer.get("status", None), answer.get("job_id", None)
# check sanity cases
if not status:
raise IntelOwlClientException(
"API ask_analysis_availability gave result without status ?"
f" Response: {answer}"
)
if status != "not_available" and not job_id:
raise IntelOwlClientException(
"API ask_analysis_availability gave result without job_id ?"
f" Response: {answer}"
)
return answer
[docs] def send_file_analysis_request(
self,
filename: str,
binary: bytes,
tlp: TLPType = None,
analyzers_requested: List[str] = None,
connectors_requested: List[str] = None,
runtime_configuration: Dict = None,
tags_labels: List[str] = None,
) -> Dict:
"""Send analysis request for a file.\n
Endpoint: ``/api/analyze_file``
Args:
filename (str):
Filename
binary (bytes):
File contents as bytes
analyzers_requested (List[str], optional):
List of analyzers to invoke
Defaults to ``[]`` i.e. all analyzers.
connectors_requested (List[str], optional):
List of specific connectors to invoke.
Defaults to ``[]`` i.e. all connectors.
tlp (str, optional):
TLP for the analysis.
(options: ``WHITE, GREEN, AMBER, RED``). Defaults to ``WHITE``.
runtime_configuration (Dict, optional):
Overwrite configuration for analyzers. Defaults to ``{}``.
tags_labels (List[str], optional):
List of tag labels to assign (creates non-existing tags)
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Dict: JSON body
"""
try:
if not tlp:
tlp = "WHITE"
if not analyzers_requested:
analyzers_requested = []
if not connectors_requested:
connectors_requested = []
if not tags_labels:
tags_labels = []
if not runtime_configuration:
runtime_configuration = {}
data = {
"file_name": filename,
"analyzers_requested": analyzers_requested,
"connectors_requested": connectors_requested,
"tlp": tlp,
"tags_labels": tags_labels,
}
if runtime_configuration:
data["runtime_configuration"] = json.dumps(runtime_configuration)
files = {"file": (filename, binary)}
answer = self.__send_analysis_request(data=data, files=files)
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def send_file_analysis_playbook_request(
self,
filename: str,
binary: bytes,
tlp: TLPType = None,
playbooks_requested: List[str] = None,
runtime_configuration: Dict = None,
tags_labels: List[str] = None,
) -> Dict:
"""Send playbook analysis request for a file.\n
Endpoint: ``/api/playbook/analyze_multiple_files``
Args:
filename (str):
Filename
binary (bytes):
File contents as bytes
playbooks_requested (List[str], optional):
List of specific playbooks to invoke.
Defaults to ``[]`` i.e. all playbooks.
tlp (str, optional):
TLP for the analysis.
(options: ``WHITE, GREEN, AMBER, RED``). Defaults to ``WHITE``.
runtime_configuration (Dict, optional):
Overwrite configuration for analyzers. Defaults to ``{}``.
tags_labels (List[str], optional):
List of tag labels to assign (creates non-existing tags)
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Dict: JSON body
"""
try:
if not tlp:
tlp = "WHITE"
if not playbooks_requested:
playbooks_requested = []
if not tags_labels:
tags_labels = []
if not runtime_configuration:
runtime_configuration = {}
data = {
"playbooks_requested": playbooks_requested,
"tlp": tlp,
"tags_labels": tags_labels,
}
if runtime_configuration:
data["runtime_configuration"] = json.dumps(runtime_configuration)
# `files` is wanted to be different from the other
# /api/analyze_file endpoint
# because the server is using different serializers
files = {"files": (filename, binary)}
answer = self.__send_analysis_request(
data=data, files=files, playbook_mode=True
)
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def send_observable_analysis_request(
self,
observable_name: str,
tlp: TLPType = None,
analyzers_requested: List[str] = None,
connectors_requested: List[str] = None,
runtime_configuration: Dict = None,
tags_labels: List[str] = None,
observable_classification: str = None,
) -> Dict:
"""Send analysis request for an observable.\n
Endpoint: ``/api/analyze_observable``
Args:
observable_name (str):
Observable value
analyzers_requested (List[str], optional):
List of analyzers to invoke
Defaults to ``[]`` i.e. all analyzers.
connectors_requested (List[str], optional):
List of specific connectors to invoke.
Defaults to ``[]`` i.e. all connectors.
tlp (str, optional):
TLP for the analysis.
(options: ``WHITE, GREEN, AMBER, RED``). Defaults to ``WHITE``.
runtime_configuration (Dict, optional):
Overwrite configuration for analyzers. Defaults to ``{}``.
tags_labels (List[str], optional):
List of tag labels to assign (creates non-existing tags)
observable_classification (str):
Observable classification, Default to None.
By default launch analysis with an automatic classification.
(options: ``url, domain, hash, ip, generic``)
Raises:
IntelOwlClientException: on client/HTTP error
IntelOwlClientException: on wrong observable_classification
Returns:
Dict: JSON body
"""
try:
if not tlp:
tlp = "WHITE"
if not analyzers_requested:
analyzers_requested = []
if not connectors_requested:
connectors_requested = []
if not tags_labels:
tags_labels = []
if not runtime_configuration:
runtime_configuration = {}
if not observable_classification:
observable_classification = self._get_observable_classification(
observable_name
)
elif observable_classification not in [
"generic",
"hash",
"ip",
"domain",
"url",
]:
raise IntelOwlClientException(
"Observable classification only handle"
" 'generic', 'hash', 'ip', 'domain' and 'url' "
)
data = {
"observable_name": observable_name,
"observable_classification": observable_classification,
"analyzers_requested": analyzers_requested,
"connectors_requested": connectors_requested,
"tlp": tlp,
"tags_labels": tags_labels,
"runtime_configuration": runtime_configuration,
}
answer = self.__send_analysis_request(data=data, files=None)
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def send_observable_analysis_playbook_request(
self,
observable_name: str,
tlp: TLPType = None,
playbooks_requested: List[str] = None,
runtime_configuration: Dict = None,
tags_labels: List[str] = None,
observable_classification: str = None,
) -> Dict:
"""Send playbook analysis request for an observable.\n
Endpoint: ``/api/playbook/analyze_multiple_observables``
Args:
observable_name (str):
Observable value
playbooks_requested (List[str], optional):
List of specific playbooks to invoke.
Defaults to ``[]`` i.e. all playbooks.
tlp (str, optional):
TLP for the analysis.
(options: ``WHITE, GREEN, AMBER, RED``). Defaults to ``WHITE``.
runtime_configuration (Dict, optional):
Overwrite configuration for analyzers. Defaults to ``{}``.
tags_labels (List[str], optional):
List of tag labels to assign (creates non-existing tags)
observable_classification (str):
Observable classification, Default to None.
By default launch analysis with an automatic classification.
(options: ``url, domain, hash, ip, generic``)
Raises:
IntelOwlClientException: on client/HTTP error
IntelOwlClientException: on wrong observable_classification
Returns:
Dict: JSON body
"""
try:
if not tlp:
tlp = "WHITE"
if not playbooks_requested:
playbooks_requested = []
if not tags_labels:
tags_labels = []
if not runtime_configuration:
runtime_configuration = {}
if not observable_classification:
observable_classification = self._get_observable_classification(
observable_name
)
elif observable_classification not in [
"generic",
"hash",
"ip",
"domain",
"url",
]:
raise IntelOwlClientException(
"Observable classification only handle"
" 'generic', 'hash', 'ip', 'domain' and 'url' "
)
data = {
"observables": [[observable_classification, observable_name]],
"playbooks_requested": playbooks_requested,
"tlp": tlp,
"tags_labels": tags_labels,
"runtime_configuration": runtime_configuration,
}
answer = self.__send_analysis_request(
data=data, files=None, playbook_mode=True
)
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def send_analysis_batch(self, rows: List[Dict]):
"""
Send multiple analysis requests.
Can be mix of observable or file analysis requests.
Used by the pyintelowl CLI.
Args:
rows (List[Dict]):
Each row should be a dictionary with keys,
`value`, `type`, `check`, `tlp`,
`analyzers_list`, `connectors_list`, `runtime_config`
`tags_list`.
"""
for obj in rows:
try:
runtime_config = obj.get("runtime_config", {})
if runtime_config:
with open(runtime_config) as fp:
runtime_config = json.load(fp)
analyzers_list = obj.get("analyzers_list", [])
connectors_list = obj.get("connectors_list", [])
if isinstance(analyzers_list, str):
analyzers_list = analyzers_list.split(",")
if isinstance(connectors_list, str):
connectors_list = connectors_list.split(",")
self._new_analysis_cli(
obj["value"],
obj["type"],
obj.get("check", None),
obj.get("tlp", "WHITE"),
analyzers_list,
connectors_list,
runtime_config,
obj.get("tags_list", []),
obj.get("should_poll", False),
)
except IntelOwlClientException as e:
self.logger.fatal(str(e))
def __send_analysis_request(self, data=None, files=None, playbook_mode=False):
"""
Internal use only.
"""
response = None
answer = {}
if files is None:
url = self.instance + "/api/analyze_observable"
if playbook_mode:
url = self.instance + "/api/playbook/analyze_multiple_observables"
args = {"json": data}
else:
url = self.instance + "/api/analyze_file"
if playbook_mode:
url = self.instance + "/api/playbook/analyze_multiple_files"
args = {"data": data, "files": files}
try:
response = self.session.post(url, **args)
self.logger.debug(
msg={
"url": response.url,
"code": response.status_code,
"headers": response.headers,
"body": response.json(),
}
)
answer = response.json()
if playbook_mode:
# right now, we are only supporting single input result
answers = answer.get("results", [])
if answers:
answer = answers[0]
warnings = answer.get("warnings", [])
errors = answer.get("errors", {})
if self.cli:
info_log = f"""New Job running..
ID: {answer.get('job_id')} |
Status: [u blue]{answer.get('status')}[/].
Got {len(warnings)} warnings:
[i yellow]{warnings if warnings else None}[/]
Got {len(errors)} errors:
[i red]{errors if errors else None}[/]
"""
else:
info_log = (
f"New Job running.. ID: {answer.get('job_id')} "
f"| Status: {answer.get('status')}."
f" Got {len(warnings)} warnings:"
f" {warnings if warnings else None}"
f" Got {len(errors)} errors:"
f" {errors if errors else None}"
)
self.logger.info(info_log)
response.raise_for_status()
except Exception as e:
raise IntelOwlClientException(e, response=response)
return answer
[docs] def create_tag(self, label: str, color: str):
"""Creates new tag by sending a POST Request
Endpoint: ``/api/tags``
Args:
label ([str]): [Label of the tag to be created]
color ([str]): [Color of the tag to be created]
"""
url = self.instance + "/api/tags"
data = {"label": label, "color": color}
response = self.__make_request("POST", url=url, data=data)
return response.json()
[docs] def edit_tag(self, tag_id: Union[int, str], label: str, color: str):
"""Edits existing tag by sending PUT request
Endpoint: ``api/tags``
Args:
id ([int]): [Id of the existing tag]
label ([str]): [Label of the tag to be created]
color ([str]): [Color of the tag to be created]
"""
url = self.instance + "/api/tags/" + str(tag_id)
data = {"label": label, "color": color}
response = self.__make_request("PUT", url=url, data=data)
return response.json()
[docs] def get_analyzer_configs(self):
"""
Get current state of `analyzer_config.json` from the IntelOwl instance.\n
Endpoint: ``/api/get_analyzer_configs``
"""
url = self.instance + "/api/get_analyzer_configs"
response = self.__make_request("GET", url=url)
return response.json()
[docs] def get_connector_configs(self):
"""
Get current state of `connector_config.json` from the IntelOwl instance.\n
Endpoint: ``/api/get_connector_configs``
"""
url = self.instance + "/api/get_connector_configs"
response = self.__make_request("GET", url=url)
return response.json()
[docs] def get_playbook_configs(self):
"""
Get current state of `playbook_config.json` from the IntelOwl instance.\n
Endpoint: ``/api/get_playbook_configs``
"""
url = self.instance + "/api/get_playbook_configs"
response = self.__make_request("GET", url=url)
return response.json()
[docs] def get_all_jobs(self) -> List[Dict[str, Any]]:
"""
Fetch list of all jobs.\n
Endpoint: ``/api/jobs``
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Dict: Dict with 3 keys: "count", "total_pages", "results"
"""
url = self.instance + "/api/jobs"
response = self.__make_request("GET", url=url)
return response.json()
[docs] def get_tag_by_id(self, tag_id: Union[int, str]) -> Dict[str, str]:
"""Fetch tag info by ID.\n
Endpoint: ``/api/tag/{tag_id}``
Args:
tag_id (Union[int, str]): Tag ID
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Dict[str, str]: Dict with 3 keys: `id`, `label` and `color`.
"""
url = self.instance + "/api/tags/" + str(tag_id)
response = self.__make_request("GET", url=url)
return response.json()
[docs] def get_job_by_id(self, job_id: Union[int, str]) -> Dict[str, Any]:
"""Fetch job info by ID.
Endpoint: ``/api/jobs/{job_id}``
Args:
job_id (Union[int, str]): Job ID
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Dict[str, Any]: JSON body.
"""
url = self.instance + "/api/jobs/" + str(job_id)
response = self.__make_request("GET", url=url)
return response.json()
[docs] @staticmethod
def get_md5(
to_hash: AnyStr,
type_="observable",
) -> str:
"""Returns md5sum of given observable or file object.
Args:
to_hash (AnyStr):
either an observable string, file contents as bytes or path to a file
type_ (Union["observable", "binary", "file"], optional):
`observable`, `binary`, `file`. Defaults to "observable".
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
str: md5sum
"""
md5 = ""
if type_ == "observable":
md5 = hashlib.md5(str(to_hash).lower().encode("utf-8")).hexdigest()
elif type_ == "binary":
md5 = hashlib.md5(to_hash).hexdigest()
elif type_ == "file":
path = pathlib.Path(to_hash)
if not path.exists():
raise IntelOwlClientException(f"{to_hash} does not exists")
binary = path.read_bytes()
md5 = hashlib.md5(binary).hexdigest()
return md5
def _new_analysis_cli(
self,
obj: str,
type_: str,
check,
tlp: TLPType = "CLEAR",
analyzers_list: List[str] = None,
connectors_list: List[str] = None,
runtime_configuration: Dict = None,
tags_labels: List[str] = None,
should_poll: bool = False,
minutes_ago: int = None,
) -> None:
"""
For internal use by the pyintelowl CLI.
"""
if not analyzers_list:
analyzers_list = []
if not connectors_list:
connectors_list = []
if not runtime_configuration:
runtime_configuration = {}
if not tags_labels:
tags_labels = []
self.logger.info(
f"""Requesting analysis..
{type_}: [blue]{obj}[/]
analyzers: [i green]{analyzers_list if analyzers_list else 'all'}[/]
connectors: [i green]{connectors_list if connectors_list else 'all'}[/]
tags: [i green]{tags_labels}[/]
"""
)
# 1st step: ask analysis availability
if check != "force-new":
md5 = self.get_md5(obj, type_=type_)
resp = self.ask_analysis_availability(
md5,
analyzers_list,
True if check == "reported" else False,
minutes_ago,
)
status, job_id = resp.get("status", None), resp.get("job_id", None)
if status != "not_available":
self.logger.info(
f"""Found existing analysis!
Job: #{job_id}
status: [u blue]{status}[/]
[i]Hint: use [#854442]--check force-new[/] to perform new scan anyway[/]
"""
)
return
# 2nd step: send new analysis request
if type_ == "observable":
resp2 = self.send_observable_analysis_request(
observable_name=obj,
tlp=tlp,
analyzers_requested=analyzers_list,
connectors_requested=connectors_list,
runtime_configuration=runtime_configuration,
tags_labels=tags_labels,
)
else:
path = pathlib.Path(obj)
resp2 = self.send_file_analysis_request(
filename=path.name,
binary=path.read_bytes(),
tlp=tlp,
analyzers_requested=analyzers_list,
connectors_requested=connectors_list,
runtime_configuration=runtime_configuration,
tags_labels=tags_labels,
)
# 3rd step: poll for result
if should_poll:
if resp2["status"] != "accepted":
self.logger.fatal("Can't poll a failed job")
# import poll function
from .cli._jobs_utils import _poll_for_job_cli
job_id = resp2["job_id"]
_ = _poll_for_job_cli(self, job_id)
self.logger.info(
f"""
Polling finished.
Execute [i blue]pyintelowl jobs view {job_id}[/] to view the result
"""
)
def _new_analysis_playbook_cli(
self,
obj: str,
type_: str,
check,
tlp: TLPType = "CLEAR",
playbooks_list: List[str] = None,
runtime_configuration: Dict = None,
tags_labels: List[str] = None,
should_poll: bool = False,
minutes_ago: int = None,
) -> None:
"""
For internal use by the pyintelowl CLI.
"""
if not playbooks_list:
playbooks_list = []
if not runtime_configuration:
runtime_configuration = {}
if not tags_labels:
tags_labels = []
if len(playbooks_list) == 0:
print(("No Playbooks selected!\n"))
return
self.logger.info(
f"""Requesting analysis..
{type_}: [blue]{obj}[/]
playbooks: [i green]{playbooks_list}[/]
tags: [i green]{tags_labels}[/]
"""
)
# 1st step, make request
if type_ == "observable":
resp = self.send_observable_analysis_playbook_request(
observable_name=obj,
tlp=tlp,
playbooks_requested=playbooks_list,
runtime_configuration=runtime_configuration,
tags_labels=tags_labels,
)
else:
path = pathlib.Path(obj)
resp = self.send_file_analysis_playbook_request(
filename=path.name,
binary=path.read_bytes(),
tlp=tlp,
playbooks_requested=playbooks_list,
runtime_configuration=runtime_configuration,
tags_labels=tags_labels,
)
# 2nd step: poll for result
if should_poll:
if resp.get("status", "") != "accepted":
self.logger.fatal("Can't poll a failed job")
# import poll function
from .cli._jobs_utils import _poll_for_job_cli
job_id = resp.get("job_id", 0)
_ = _poll_for_job_cli(self, job_id)
self.logger.info(
f"""
Polling finished.
Execute [i blue]pyintelowl jobs view {job_id}[/] to view the result
"""
)
def _get_observable_classification(self, value: str) -> str:
"""Returns observable classification for the given value.\n
Only following types are supported:
ip, domain, url, hash (md5, sha1, sha256), generic (if no match)
Args:
value (str):
observable value
Raises:
IntelOwlClientException:
if value type is not recognized
Returns:
str: one of `ip`, `url`, `domain`, `hash` or 'generic'.
"""
try:
ipaddress.ip_address(value)
except ValueError:
if re.match(
r"^(?:htt|ft|tc)ps?://[a-z\d-]{1,63}(?:\.[a-z\d-]{1,63})+"
r"(?:/[a-z\d-]{1,63})*(?:\.\w+)?",
value,
):
classification = "url"
elif re.match(r"^(\.)?[a-z\d-]{1,63}(\.[a-z\d-]{1,63})+$", value):
classification = "domain"
elif (
re.match(r"^[a-f\d]{32}$", value)
or re.match(r"^[a-f\d]{40}$", value)
or re.match(r"^[a-f\d]{64}$", value)
or re.match(r"^[A-F\d]{32}$", value)
or re.match(r"^[A-F\d]{40}$", value)
or re.match(r"^[A-F\d]{64}$", value)
):
classification = "hash"
else:
classification = "generic"
self.logger.warning(
"Couldn't detect observable classification, setting as 'generic'..."
)
else:
# its a simple IP
classification = "ip"
return classification
[docs] def download_sample(self, job_id: int) -> bytes:
"""
Download file sample from job.\n
Method: GET
Endpoint: ``/api/jobs/{job_id}/download_sample``
Args:
job_id (int):
id of job to download sample from
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bytes: Raw file data.
"""
url = self.instance + f"/api/jobs/{job_id}/download_sample"
response = self.__make_request("GET", url=url)
return response.content
[docs] def kill_running_job(self, job_id: int) -> bool:
"""Send kill_running_job request.\n
Method: PATCH
Endpoint: ``/api/jobs/{job_id}/kill``
Args:
job_id (int):
id of job to kill
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: killed or not
"""
url = self.instance + f"/api/jobs/{job_id}/kill"
response = self.__make_request("PATCH", url=url)
killed = response.status_code == 204
return killed
[docs] def delete_job_by_id(self, job_id: int) -> bool:
"""Send delete job request.\n
Method: DELETE
Endpoint: ``/api/jobs/{job_id}``
Args:
job_id (int):
id of job to kill
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: deleted or not
"""
url = self.instance + "/api/jobs/" + str(job_id)
response = self.__make_request("DELETE", url=url)
deleted = response.status_code == 204
return deleted
[docs] def delete_tag_by_id(self, tag_id: int) -> bool:
"""Send delete tag request.\n
Method: DELETE
Endpoint: ``/api/tags/{tag_id}``
Args:
tag_id (int):
id of tag to delete
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: deleted or not
"""
url = self.instance + "/api/tags/" + str(tag_id)
response = self.__make_request("DELETE", url=url)
deleted = response.status_code == 204
return deleted
def __run_plugin_action(
self, job_id: int, plugin_type: str, plugin_name: str, plugin_action: str
) -> bool:
"""Internal method for kill/retry for analyzer/connector"""
response = None
url = (
self.instance
+ f"/api/jobs/{job_id}/{plugin_type}/{plugin_name}/{plugin_action}"
)
response = self.__make_request("PATCH", url=url)
success = response.status_code == 204
return success
[docs] def kill_analyzer(self, job_id: int, analyzer_name: str) -> bool:
"""Send kill running/pending analyzer request.\n
Method: PATCH
Endpoint: ``/api/jobs/{job_id}/analyzer/{analyzer_name}/kill``
Args:
job_id (int):
id of job
analyzer_name (str):
name of analyzer to kill
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: killed or not
"""
killed = self.__run_plugin_action(
job_id=job_id,
plugin_name=analyzer_name,
plugin_type="analyzer",
plugin_action="kill",
)
return killed
[docs] def kill_connector(self, job_id: int, connector_name: str) -> bool:
"""Send kill running/pending connector request.\n
Method: PATCH
Endpoint: ``/api/jobs/{job_id}/connector/{connector_name}/kill``
Args:
job_id (int):
id of job
connector_name (str):
name of connector to kill
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: killed or not
"""
killed = self.__run_plugin_action(
job_id=job_id,
plugin_name=connector_name,
plugin_type="connector",
plugin_action="kill",
)
return killed
[docs] def retry_analyzer(self, job_id: int, analyzer_name: str) -> bool:
"""Send retry failed/killed analyzer request.\n
Method: PATCH
Endpoint: ``/api/jobs/{job_id}/analyzer/{analyzer_name}/retry``
Args:
job_id (int):
id of job
analyzer_name (str):
name of analyzer to retry
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: success or not
"""
success = self.__run_plugin_action(
job_id=job_id,
plugin_name=analyzer_name,
plugin_type="analyzer",
plugin_action="retry",
)
return success
[docs] def retry_connector(self, job_id: int, connector_name: str) -> bool:
"""Send retry failed/killed connector request.\n
Method: PATCH
Endpoint: ``/api/jobs/{job_id}/connector/{connector_name}/retry``
Args:
job_id (int):
id of job
connector_name (str):
name of connector to retry
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: success or not
"""
success = self.__run_plugin_action(
job_id=job_id,
plugin_name=connector_name,
plugin_type="connector",
plugin_action="retry",
)
return success
[docs] def analyzer_healthcheck(self, analyzer_name: str) -> Optional[bool]:
"""Send analyzer(docker-based) health check request.\n
Method: GET
Endpoint: ``/api/analyzer/{analyzer_name}/healthcheck``
Args:
analyzer_name (str):
name of analyzer
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: success or not
"""
url = self.instance + f"/api/analyzer/{analyzer_name}/healthcheck"
response = self.__make_request("GET", url=url)
return response.json().get("status", None)
[docs] def connector_healthcheck(self, connector_name: str) -> Optional[bool]:
"""Send connector health check request.\n
Method: GET
Endpoint: ``/api/connector/{connector_name}/healthcheck``
Args:
connector_name (str):
name of connector
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: success or not
"""
url = self.instance + f"/api/connector/{connector_name}/healthcheck"
response = self.__make_request("GET", url=url)
return response.json().get("status", None)