import ipaddress
import logging
import pathlib
import json
import re
import requests
import hashlib
from typing import List, Dict, Any, Union, AnyStr
from .exceptions import IntelOwlClientException
[docs]class IntelOwl:
logger: logging.Logger
def __init__(
self,
token: str,
instance_url: str,
certificate: str = None,
logger: logging.Logger = None,
cli: bool = False,
):
self.token = token
self.instance = instance_url
self.certificate = certificate
self.cli = cli
if logger:
self.logger = logger
else:
self.logger = logging.getLogger(__name__)
@property
def session(self) -> requests.Session:
"""
Internal use only.
"""
if not hasattr(self, "_session"):
session = requests.Session()
if self.certificate is not True:
session.verify = self.certificate
session.headers.update(
{
"Authorization": f"Token {self.token}",
"User-Agent": "IntelOwlClient/3.1.3",
}
)
self._session = session
return self._session
[docs] def ask_analysis_availability(
self,
md5: str,
analyzers_needed: List[str],
run_all_available_analyzers: bool = False,
check_reported_analysis_too: bool = False,
) -> Dict:
"""Search for already available analysis.\n
Endpoint: ``/api/ask_analysis_availability``
Args:
md5 (str): md5sum of the observable or file
analyzers_needed (List[str]): list of analyzers to invoke
run_all_available_analyzers (bool, optional):
If True, runs all compatible analyzers. Defaults to ``False``.
check_reported_analysis_too (bool, optional):
Check against all existing jobs. Defaults to ``False``.
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Dict: JSON body
"""
try:
params = {"md5": md5, "analyzers_needed": analyzers_needed}
if run_all_available_analyzers:
params["run_all_available_analyzers"] = True
if not check_reported_analysis_too:
params["running_only"] = True
url = self.instance + "/api/ask_analysis_availability"
response = self.session.get(url, params=params)
self.logger.debug(msg=(response.url, response.status_code))
response.raise_for_status()
answer = response.json()
status, job_id = answer.get("status", None), answer.get("job_id", None)
# check sanity cases
if not status:
raise IntelOwlClientException(
"API ask_analysis_availability gave result without status ?"
f" Response: {answer}"
)
if status != "not_available" and not job_id:
raise IntelOwlClientException(
"API ask_analysis_availability gave result without job_id ?"
f" Response: {answer}"
)
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def send_file_analysis_request(
self,
analyzers_requested: List[str],
filename: str,
binary: bytes,
force_privacy: bool = False,
private_job: bool = False,
disable_external_analyzers: bool = False,
run_all_available_analyzers: bool = False,
runtime_configuration: Dict = None,
tags: List[int] = None,
) -> Dict:
"""Send analysis request for a file.\n
Endpoint: ``/api/send_analysis_request``
Args:
analyzers_requested (List[str]):
List of analyzers to invoke
filename (str):
Filename
binary (bytes):
File contents as bytes
force_privacy (bool, optional):
Disable analyzers that can leak info. Defaults to ``False``.
private_job (bool, optional):
Limit view permissions to your groups . Defaults to ``False``.
disable_external_analyzers (bool, optional):
Disable analyzers that use external services. Defaults to ``False``.
tags (List[int]):
List of tags associated with this job
run_all_available_analyzers (bool, optional):
If True, runs all compatible analyzers. Defaults to ``False``.
runtime_configuration (Dict, optional):
Overwrite configuration for analyzers. Defaults to ``{}``.
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Dict: JSON body
"""
try:
if not tags:
tags = []
if not runtime_configuration:
runtime_configuration = {}
data = {
"is_sample": True,
"md5": self.get_md5(binary, type_="binary"),
"analyzers_requested": analyzers_requested,
"tags_id": tags,
"run_all_available_analyzers": run_all_available_analyzers,
"force_privacy": force_privacy,
"private": private_job,
"disable_external_analyzers": disable_external_analyzers,
"file_name": filename,
}
if runtime_configuration:
data["runtime_configuration"] = json.dumps(runtime_configuration)
files = {"file": (filename, binary)}
answer = self.__send_analysis_request(data=data, files=files)
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def send_observable_analysis_request(
self,
analyzers_requested: List[str],
observable_name: str,
force_privacy: bool = False,
private_job: bool = False,
disable_external_analyzers: bool = False,
run_all_available_analyzers: bool = False,
runtime_configuration: Dict = None,
tags: List[int] = None,
) -> Dict:
"""Send analysis request for an observable.\n
Endpoint: ``/api/send_analysis_request``
Args:
analyzers_requested (List[str]):
List of analyzers to invoke
observable_name (str):
Observable value
force_privacy (bool, optional):
Disable analyzers that can leak info. Defaults to ``False``.
private_job (bool, optional):
Limit view permissions to your groups . Defaults to ``False``.
disable_external_analyzers (bool, optional):
Disable analyzers that use external services. Defaults to ``False``.
tags (List[int]):
List of tags associated with this job
run_all_available_analyzers (bool, optional):
If True, runs all compatible analyzers. Defaults to ``False``.
runtime_configuration (Dict, optional):
Overwrite configuration for analyzers. Defaults to ``{}``.
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Dict: JSON body
"""
try:
if not tags:
tags = []
if not runtime_configuration:
runtime_configuration = {}
data = {
"is_sample": False,
"md5": self.get_md5(observable_name, type_="observable"),
"analyzers_requested": analyzers_requested,
"tags_id": tags,
"run_all_available_analyzers": run_all_available_analyzers,
"force_privacy": force_privacy,
"private": private_job,
"disable_external_analyzers": disable_external_analyzers,
"observable_name": observable_name,
"observable_classification": self._get_observable_classification(
observable_name
),
}
if runtime_configuration:
data["runtime_configuration"] = json.dumps(runtime_configuration)
answer = self.__send_analysis_request(data=data, files=None)
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def send_analysis_batch(self, rows: List[Dict]):
"""
Send multiple analysis requests.
Can be mix of observable or file analysis requests.
Used by the pyintelowl CLI.
Args:
rows (List[Dict]):
Each row should be a dictionary with keys,
`value`, `type`, `analyzers_list`, `run_all`
`force_privacy`, `private_job`, `disable_external_analyzers`,
`check`.
"""
for obj in rows:
try:
runtime_config = obj.get("runtime_config", {})
if runtime_config:
with open(runtime_config) as fp:
runtime_config = json.load(fp)
if not (obj.get("run_all", False)):
obj["analyzers_list"] = obj["analyzers_list"].split(",")
self._new_analysis_cli(
obj["value"],
obj["type"],
obj.get("analyzers_list", None),
obj.get("run_all", False),
obj.get("force_privacy", False),
obj.get("private_job", False),
obj.get("disable_external_analyzers", False),
obj.get("check", None),
runtime_config,
)
except IntelOwlClientException as e:
self.logger.fatal(str(e))
def __send_analysis_request(self, data=None, files=None):
"""
Internal use only.
"""
url = self.instance + "/api/send_analysis_request"
response = self.session.post(url, data=data, files=files)
self.logger.debug(
msg={
"url": response.url,
"code": response.status_code,
"headers": response.headers,
"body": response.json(),
}
)
answer = response.json()
if answer.get("error", "") == "814":
if self.cli:
err = """
Request failed..
Error: [i yellow]After the filter, no analyzers can be run.
Try with other analyzers.[/]
"""
else:
err = "Request failed. After the filter, no analyzers can be run"
raise IntelOwlClientException(err)
warnings = answer.get("warnings", [])
if self.cli:
info_log = f"""New Job running..
ID: {answer['job_id']} | Status: [u blue]{answer['status']}[/].
Got {len(warnings)} warnings:
[i yellow]{warnings if warnings else None}[/]
"""
else:
info_log = f"""New Job running..
ID: {answer['job_id']} | Status: {answer['status']}.
Got {len(warnings)} warnings:
{warnings if warnings else None}
"""
self.logger.info(info_log)
response.raise_for_status()
return answer
[docs] def create_tag(self, label: str, color: str):
"""Creates new tag by sending a POST Request
Endpoint: ``/api/tags``
Args:
label ([str]): [Label of the tag to be created]
color ([str]): [Color of the tag to be created]
"""
try:
url = self.instance + "/api/tags"
data = {"label": label, "color": color}
response = self.session.post(url, data=data)
self.logger.debug(msg=(response.url, response.status_code))
response.raise_for_status()
answer = response.json()
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def edit_tag(self, tag_id: Union[int, str], label: str, color: str):
"""Edits existing tag by sending PUT request
Endpoint: ``api/tags``
Args:
id ([int]): [Id of the existing tag]
label ([str]): [Label of the tag to be created]
color ([str]): [Color of the tag to be created]
"""
try:
url = self.instance + "/api/tags/" + str(tag_id)
data = {"label": label, "color": color}
response = self.session.put(url, data=data)
self.logger.debug(response.url)
response.raise_for_status()
answer = response.json()
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def get_analyzer_configs(self):
"""
Get current state of `analyzer_config.json` from the IntelOwl instance.\n
Endpoint: ``/api/get_analyzer_configs``
"""
try:
url = self.instance + "/api/get_analyzer_configs"
response = self.session.get(url)
self.logger.debug(msg=(response.url, response.status_code))
response.raise_for_status()
answer = response.json()
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def get_all_jobs(self) -> List[Dict[str, Any]]:
"""
Fetch list of all jobs.\n
Endpoint: ``/api/jobs``
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
List[Dict[str, Any]]: List of jobs
"""
try:
url = self.instance + "/api/jobs"
response = self.session.get(url)
self.logger.debug(msg=(response.url, response.status_code))
response.raise_for_status()
answer = response.json()
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def get_tag_by_id(self, tag_id: Union[int, str]) -> Dict[str, str]:
"""Fetch tag info by ID.\n
Endpoint: ``/api/tag/{tag_id}``
Args:
tag_id (Union[int, str]): Tag ID
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Dict[str, str]: Dict with 3 keys: `id`, `label` and `color`.
"""
try:
url = self.instance + "/api/tags/"
response = self.session.get(url + str(tag_id))
self.logger.debug(msg=(response.url, response.status_code))
response.raise_for_status()
answer = response.json()
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] def get_job_by_id(self, job_id: Union[int, str]) -> Dict[str, Any]:
"""Fetch job info by ID.
Endpoint: ``/api/job/{job_id}``
Args:
job_id (Union[int, str]): Job ID
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Dict[str, Any]: JSON body.
"""
try:
url = self.instance + "/api/jobs/" + str(job_id)
response = self.session.get(url)
self.logger.debug(msg=(response.url, response.status_code))
response.raise_for_status()
answer = response.json()
except Exception as e:
raise IntelOwlClientException(e)
return answer
[docs] @staticmethod
def get_md5(
to_hash: AnyStr,
type_="observable",
) -> str:
"""Returns md5sum of given observable or file object.
Args:
to_hash (AnyStr):
either an observable string, file contents as bytes or path to a file
type_ (Union["observable", "binary", "file"], optional):
`observable`, `binary`, `file`. Defaults to "observable".
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
str: md5sum
"""
md5 = ""
if type_ == "observable":
md5 = hashlib.md5(str(to_hash).lower().encode("utf-8")).hexdigest()
elif type_ == "binary":
md5 = hashlib.md5(to_hash).hexdigest()
elif type_ == "file":
path = pathlib.Path(to_hash)
if not path.exists():
raise IntelOwlClientException(f"{to_hash} does not exists")
binary = path.read_bytes()
md5 = hashlib.md5(binary).hexdigest()
return md5
def _new_analysis_cli(
self,
obj: str,
type_: str,
analyzers_list: List[str],
tags_list: List[int],
run_all: bool,
force_privacy,
private_job,
disable_external_analyzers,
check,
runtime_configuration: Dict = None,
should_poll: bool = False,
) -> None:
"""
For internal use by the pyintelowl CLI.
"""
if not runtime_configuration:
runtime_configuration = {}
# CLI sanity checks
if analyzers_list and run_all:
self.logger.warning(
"""
Can't use -al and -aa options together. See usage with -h.
"""
)
return
if not (analyzers_list or run_all):
self.logger.warning(
"""
Either one of -al, -aa must be specified. See usage with -h.
""",
)
return
analyzers = analyzers_list if analyzers_list else "all available analyzers"
self.logger.info(
f"""Requesting analysis..
{type_}: [blue]{obj}[/]
analyzers: [i green]{analyzers}[/]
"""
)
# 1st step: ask analysis availability
if check != "force-new":
md5 = self.get_md5(obj, type_=type_)
resp = self.ask_analysis_availability(
md5, analyzers_list, run_all, True if check == "reported" else False
)
status, job_id = resp.get("status", None), resp.get("job_id", None)
if status != "not_available":
self.logger.info(
f"""Found existing analysis!
Job: #{job_id}
status: [u blue]{status}[/]
[i]Hint: use [#854442]--check force-new[/] to perform new scan anyway[/]
"""
)
return
# 2nd step: send new analysis request
if type_ == "observable":
resp2 = self.send_observable_analysis_request(
analyzers_requested=analyzers_list,
observable_name=obj,
force_privacy=force_privacy,
private_job=private_job,
disable_external_analyzers=disable_external_analyzers,
tags=tags_list,
run_all_available_analyzers=run_all,
runtime_configuration=runtime_configuration,
)
else:
path = pathlib.Path(obj)
resp2 = self.send_file_analysis_request(
analyzers_requested=analyzers_list,
filename=path.name,
binary=path.read_bytes(),
force_privacy=force_privacy,
private_job=private_job,
disable_external_analyzers=disable_external_analyzers,
tags=tags_list,
run_all_available_analyzers=run_all,
runtime_configuration=runtime_configuration,
)
# 3rd step: poll for result
if should_poll:
if resp2["status"] != "accepted":
self.logger.fatal("Can't poll a failed job")
# import poll function
from .cli._jobs_utils import _poll_for_job_cli
job_id = resp2["job_id"]
_ = _poll_for_job_cli(self, job_id)
self.logger.info(
f"""
Polling finished.
Execute [i blue]pyintelowl jobs view {job_id}[/] to view the result
"""
)
def _get_observable_classification(self, value: str) -> str:
"""Returns observable classification for the given value.\n
Only following types are supported:
ip, domain, url, hash (md5, sha1, sha256), generic (if no match)
Args:
value (str):
observable value
Raises:
IntelOwlClientException:
if value type is not recognized
Returns:
str: one of `ip`, `url`, `domain`, `hash` or 'generic'.
"""
try:
ipaddress.ip_address(value)
except ValueError:
if re.match(
"^(?:ht|f)tps?://[a-z\d-]{1,63}(?:\.[a-z\d-]{1,63})+"
"(?:/[a-z\d-]{1,63})*(?:\.\w+)?",
value,
):
classification = "url"
elif re.match("^(\.)?[a-z\d-]{1,63}(\.[a-z\d-]{1,63})+$", value):
classification = "domain"
elif (
re.match("^[a-f\d]{32}$", value)
or re.match("^[a-f\d]{40}$", value)
or re.match("^[a-f\d]{64}$", value)
or re.match("^[A-F\d]{32}$", value)
or re.match("^[A-F\d]{40}$", value)
or re.match("^[A-F\d]{64}$", value)
):
classification = "hash"
else:
classification = "generic"
self.logger.warning(
"Couldn't detect observable classification, setting as 'generic'..."
)
else:
# its a simple IP
classification = "ip"
return classification
[docs] def kill_running_job(self, job_id: int) -> bool:
"""Send kill_running_job request.\n
Method: PATCH
Endpoint: ``/api/jobs/{job_id}/kill``
Args:
job_id (int):
id of job to kill
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: killed or not
"""
killed = False
try:
url = self.instance + f"/api/jobs/{job_id}/kill"
response = self.session.patch(url)
self.logger.debug(msg=(response.url, response.status_code))
killed = response.status_code == 200
response.raise_for_status()
except Exception as e:
raise IntelOwlClientException(e)
return killed
[docs] def delete_job_by_id(self, job_id: int) -> bool:
"""Send delete job request.\n
Method: DELETE
Endpoint: ``/api/jobs/{job_id}``
Args:
job_id (int):
id of job to kill
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: deleted or not
"""
deleted = False
try:
url = self.instance + "/api/jobs/" + str(job_id)
response = self.session.delete(url)
self.logger.debug(msg=(response.url, response.status_code))
deleted = response.status_code == 204
response.raise_for_status()
except Exception as e:
raise IntelOwlClientException(e)
return deleted
[docs] def delete_tag_by_id(self, tag_id: int) -> bool:
"""Send delete tag request.\n
Method: DELETE
Endpoint: ``/api/tags/{tag_id}``
Args:
tag_id (int):
id of tag to delete
Raises:
IntelOwlClientException: on client/HTTP error
Returns:
Bool: deleted or not
"""
deleted = False
try:
url = self.instance + "/api/tags/" + str(tag_id)
response = self.session.delete(url)
self.logger.debug(msg=(response.url, response.status_code))
deleted = response.status_code == 204
response.raise_for_status()
except Exception as e:
raise IntelOwlClientException(e)
return deleted