X7ROOT File Manager
Current Path:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
utils
/
📁
..
📄
__init__.py
(52.55 KB)
📁
__pycache__
📄
_shutil.py
(795 B)
📄
antivirus_mode.py
(497 B)
📄
async_utils.py
(718 B)
📄
benchmark.py
(538 B)
📄
buffer.py
(1.24 KB)
📄
check_db.py
(7.35 KB)
📄
check_lock.py
(636 B)
📄
cli.py
(7.08 KB)
📄
common.py
(14.41 KB)
📄
config.py
(999 B)
📄
cronjob.py
(902 B)
📄
doctor.py
(1 KB)
📄
hyperscan.py
(149 B)
📄
importer.py
(2.67 KB)
📄
ipecho.py
(1.9 KB)
📄
json.py
(953 B)
📄
kwconfig.py
(1.56 KB)
📄
parsers.py
(11.12 KB)
📄
resource_limits.py
(2.29 KB)
📄
safe_fileops.py
(7.99 KB)
📄
safe_sequence.py
(363 B)
📄
serialization.py
(1.72 KB)
📄
sshutil.py
(7.94 KB)
📄
subprocess.py
(1.53 KB)
📄
support.py
(5.2 KB)
📄
threads.py
(1005 B)
📄
validate.py
(4.27 KB)
📄
whmcs.py
(7.6 KB)
📄
wordpress_mu_plugin.py
(1.41 KB)
Editing: whmcs.py
import json import os import urllib.parse import defence360agent.subsys.panels.hosting_panel as hp from logging import getLogger from defence360agent.contracts import config from defence360agent.utils.config import update_config from defence360agent.myimunify.model import update_users_protection, MyImunify from defence360agent.utils.wordpress_mu_plugin import ( MU_PLUGIN_INSTALLATION, ADVICE_EMAIL_NOTIFICATION, WordPressMuPlugin, ) logger = getLogger(__name__) MU_PLUGIN_KEYS = [MU_PLUGIN_INSTALLATION, ADVICE_EMAIL_NOTIFICATION] class WhmcsConf: """ read/write data passed by whmcs Internal use, for commands called from whcms only it saves ALL data came from whcms w/o any validation deliberately in order to simplify compatability with current installed whmcs plugin """ path = "/var/imunify360/whmcs_data.json" def read(self): if not os.path.exists(self.path): return {} try: with open(self.path, "r") as f: raw_data = f.read() except IOError as e: logger.error("Failed to read whmcs data file: %s", str(e)) return {} try: data = json.loads(raw_data) except (json.JSONDecodeError, ValueError): logger.error("Malformed file with whmcs data: %s", raw_data) return {} return data def save(self, data): """ Saves ALL data passed by WHMCS it should not have any validations deliberately to be as compatible as possible with current installed WHMCS plugin """ current_data = self.read() # no validation needed current_data.update(data) try: with open(self.path, "w") as file: json.dump(current_data, file, indent=4) except IOError as e: logger.error("Failed to write whmcs data to file: %s", str(e)) async def sync_billing_data(sink, data): my_imunify_updates = data.get(config.MY_IMUNIFY_KEY) return await mi_update(sink, my_imunify_updates) def convert_to_config_key_value(key, value): """ Convert several keys to config key, otherwise just return same key any key is acceptable """ if key == "status": return ( "enable", { "active": True, "inactive": False, }[value], ) elif key == "protection": return ( "protection", { "enabled": True, "disabled": False, }[value], ) elif key == "mu_plugin_installation": return "smart_advice_allowed", value return key, value def convert_from_config_key_value(key, value): """ Convert several keys from config format, otherwise just return same key any key is acceptable """ if key == "enable": return "status", ("active" if value else "inactive") elif key == "protection": return "protection", ("enabled" if value else "disabled") elif key == "smart_advice_allowed": return "mu_plugin_installation", value return key, value async def get_users(): return await hp.HostingPanel().get_users() async def mi_update(sink, requested_myimunify_data): """ Updates supported parameters if passed, otherwise does nothing updates 2 config parameters (if specified): status and purchase_page_url updates protection status for users (if specified) """ if not requested_myimunify_data: logger.info("Nothing to update for MyImunify") return whmcs_activation_status = requested_myimunify_data.get("status") if whmcs_activation_status: # no validation needed WhmcsConf().save({"status": requested_myimunify_data.get("status")}) await update_configs(sink, requested_myimunify_data) WordPressMuPlugin().prepare_for_mu_plugin_installation( whmcs_activation_status, requested_myimunify_data.get(MU_PLUGIN_INSTALLATION), ) if not requested_myimunify_data.get("protection"): return await get_current_whmcs_data([]) all_users = await get_users() target_users = requested_myimunify_data.get("users", []) or all_users filtered_passed_users = [ user for user in target_users if user in all_users ] if filtered_passed_users: logger.info( "Updating protection status for users=%s", str(filtered_passed_users), ) await update_users_protection( sink, filtered_passed_users, convert_to_config_key_value( "protection", requested_myimunify_data["protection"] )[1], ) else: logger.warning("No users to update protection for") return await get_current_whmcs_data(filtered_passed_users) async def update_configs(sink, requested_myimunify_data): # those params are stored in config mi_config_parameters = ( ["purchase_page_url"] if config.is_mi_freemium_license() else ["purchase_page_url", "status"] ) mi_config_data = dict( convert_to_config_key_value(param, value) for param, value in requested_myimunify_data.items() if param in mi_config_parameters ) mu_plugin_data = dict( convert_to_config_key_value(param, value) for param, value in requested_myimunify_data.items() if param in MU_PLUGIN_KEYS ) config_dict = {} if mi_config_data: config_dict[config.MY_IMUNIFY_KEY] = mi_config_data if mu_plugin_data: config_dict["CONTROL_PANEL"] = mu_plugin_data if config_dict: logger.info("Updating config with data: %s", str(config_dict)) # updates only 2 supported keys: purchase_page_url and status await update_config(sink, config_dict) async def get_users_info(users): """ Returns information from database based on passed users if no users passed - returns for all users """ result = ( MyImunify.select().where(MyImunify.user.in_(users)).dicts() if users else MyImunify.select().dicts() ) return [ { "user": item["user"], "protection": convert_from_config_key_value( "protection", item["protection"] )[1], } for item in result ] async def get_current_whmcs_data(users): """ Returns the current configuration and user protection status. {MY_IMUNIFY: {'status': 'active/inactive', 'purchase_page_url': 'SOMEURL', 'protection': []}} """ conf_data = config.ConfigFile().config_to_dict() current_config = dict( convert_from_config_key_value(param, value) for param, value in conf_data.get(config.MY_IMUNIFY_KEY, {}).items() ) cp_data = conf_data.get("CONTROL_PANEL") current_config[MU_PLUGIN_INSTALLATION] = convert_from_config_key_value( "smart_advice_allowed", cp_data.get("smart_advice_allowed") )[1] current_config[ADVICE_EMAIL_NOTIFICATION] = cp_data.get( ADVICE_EMAIL_NOTIFICATION ) current_config["protection"] = await get_users_info(users) return current_config def get_upgrade_url_link(username, domain): purchase_url_link = ( config.MyImunifyConfig.PURCHASE_PAGE_URL.rstrip("/") + "/?" + urllib.parse.urlencode( { "m": "cloudlinux_advantage", "action": "provisioning", "suite": "my_imunify_account_protection", "username": username, "domain": domain, "server_ip": hp.HostingPanel().get_server_ip(), } ) ) return purchase_url_link
Upload File
Create Folder