X7ROOT File Manager
Current Path:
/opt/imunify360/venv/lib/python3.11/site-packages/imav/plugins
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
imav
/
plugins
/
π
..
π
__init__.py
(0 B)
π
__pycache__
π
check_license.py
(9.47 KB)
π
conflicts.py
(3.18 KB)
π
event_hook_executor.py
(3.72 KB)
π
event_hooks.py
(3.47 KB)
π
generic_sensor.py
(6.11 KB)
π
im360_register.py
(2.8 KB)
π
imunify_patch_id.py
(2.12 KB)
π
inotify.py
(1.75 KB)
π
malware_filters.py
(3.68 KB)
π
mr_proper.py
(2.67 KB)
π
plesk_notifications.py
(3.82 KB)
π
post_action.py
(1.72 KB)
π
restore_from_backup.py
(3.37 KB)
π
server_pull.py
(2.56 KB)
π
service_manager.py
(3.93 KB)
π
wordpress.py
(11.43 KB)
Editing: plesk_notifications.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.Β See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.Β If not, see <https://www.gnu.org/licenses/>. CopyrightΒ Β© 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import logging from functools import lru_cache from pathlib import Path from defence360agent.contracts.plugins import MessageSink from defence360agent.contracts.hooks import HooksConfig from defence360agent.subsys import notifier logger = logging.getLogger(__name__) SCRIPT_PATH = ( "/opt/psa/admin/plib/modules/imunify360/scripts/send-notifications.php" ) HOOK_PATH = "/opt/imunify360/venv/share/imunify360/scripts/send-notifications" EVENTS = [ "CUSTOM_SCAN_MALWARE_FOUND", "USER_SCAN_MALWARE_FOUND", "REALTIME_MALWARE_FOUND", ] class PleskNotificationsHooks(MessageSink): async def create_sink(self, loop): """MessageSink method""" if self.is_supported(): if not self.is_applied(): await self.add_hooks() else: await self.remove_hooks() @lru_cache(maxsize=1) def is_supported(self) -> bool: return Path(SCRIPT_PATH).exists() and Path(HOOK_PATH).exists() def is_applied(self) -> bool: config = HooksConfig().get() return all( [ HOOK_PATH in rule["SCRIPT"]["scripts"] for event, rule in config.get("rules", {}).items() if event in EVENTS ] ) async def add_hooks(self): config = HooksConfig().get() data = { "rules": { event: rule for event, rule in config.get("rules", {}).items() if event in EVENTS } } updated = False for event, rule in data["rules"].items(): if HOOK_PATH not in rule["SCRIPT"]["scripts"]: rule["SCRIPT"]["enabled"] = True rule["SCRIPT"]["scripts"].append(HOOK_PATH) updated = True if updated: HooksConfig().update(data) try: await notifier.config_updated() except ConnectionRefusedError: logger.warning( "Notifier is not running, cannot send CONFIG_UPDATED event" ) else: logger.info("Hooks added and configuration updated") async def remove_hooks(self): config = HooksConfig().get() data = { "rules": { event: rule for event, rule in config.get("rules", {}).items() if event in EVENTS } } updated = False for event, rule in data["rules"].items(): if HOOK_PATH in rule["SCRIPT"]["scripts"]: rule["SCRIPT"]["scripts"].remove(HOOK_PATH) rule["SCRIPT"]["enabled"] = len(rule["SCRIPT"]["scripts"]) != 0 updated = True if updated: HooksConfig().update(data) try: await notifier.config_updated() except ConnectionRefusedError: logger.warning( "Notifier is not running, cannot send CONFIG_UPDATED event" ) else: logger.info("Hooks removed and configuration updated")
Upload File
Create Folder