123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- import os
- import json
- import logging
- import distutils.spawn
- from ulauncher.api.client.Extension import Extension
- from ulauncher.api.client.EventListener import EventListener
- from ulauncher.api.shared.event import KeywordQueryEvent, ItemEnterEvent
- from ulauncher.api.shared.item.ExtensionResultItem import ExtensionResultItem
- from ulauncher.api.shared.item.SmallResultItem import SmallResultItem
- from ulauncher.api.shared.action.RenderResultListAction import RenderResultListAction
- from ulauncher.api.shared.action.RunScriptAction import RunScriptAction
- from ulauncher.api.shared.action.ExtensionCustomAction import ExtensionCustomAction
- logging.basicConfig()
- logger = logging.getLogger(__name__)
- global usage_cache
- usage_cache = {}
- # Usage tracking
- script_directory = os.path.dirname(os.path.realpath(__file__))
- usage_db = os.path.join(script_directory, "usage.json")
- if os.path.exists(usage_db):
- with open(usage_db, 'r') as db:
- # Read JSON string
- raw = db.read()
- # JSON to dict
- usage_cache = json.loads(raw)
- # Initialize items cache and x2goclient sessions file path
- x2goclient_bin = ""
- # Locate x2goclient profiles and binary
- x2go_sessions_path = "{}/.x2goclient/sessions".format(os.environ.get('HOME'))
- x2goclient_bin = distutils.spawn.find_executable('x2goclient')
- # This extension is useless without x2goclient
- if x2goclient_bin is None or x2goclient_bin == "":
- logger.error("x2goclient executable path could not be determined")
- exit()
- # Check if x2goclient sessions file exists
- x2go_sessions_path_exists = None
- # Check default paths first
- if os.path.exists(x2go_sessions_path):
- x2go_sessions_path_exists = True
- class x2goclientExtension(Extension):
- def __init__(self):
- super(x2goclientExtension, self).__init__()
- self.subscribe(KeywordQueryEvent, KeywordQueryEventListener())
- self.subscribe(ItemEnterEvent, ItemEnterEventListener())
- def list_sessions(self, query):
- items = []
- items_cache = []
- with open('/home/bryan/.x2goclient/sessions') as lines:
- for line in lines:
- if line.startswith('['):
- item = line.rstrip()
- items.append(item[1:-1])
- if line.startswith('host=') or line.startswith('name='):
- line = line.rstrip()
- line = line.split('=',1)
- item = line[1]
- items.append(item)
- it_items = iter(items)
- sessions = list(zip(it_items, it_items, it_items))
- for session in sessions:
- seid = session[0]
- host = session[1]
- name = session[2]
- # Search for query inside filename and profile description
- keywords = query.split(" ")
- # if (query in base.lower()) or (query in desc.lower()):
- if (query.lower() in host.lower()) or (query.lower() in name.lower()):
- items_cache.append(create_item(seid, host, name))
- items_cache = sorted(items_cache, key=sort_by_usage, reverse=True)
- return items_cache
- class KeywordQueryEventListener(EventListener):
- def on_event(self, event, extension):
- global x2go_sessions_path
- if extension.preferences["sessions"] is not "" or not x2go_sessions_path:
- # Tilde (~) won't work alone, need expanduser()
- x2go_sessions_path = os.path.expanduser(extension.preferences["sessions"])
- # pref_sessions_path = extension.preferences['sessions']
- logger.debug("x2goclient sessions path: {}".format(x2go_sessions_path))
- # Get query
- term = (event.get_argument() or "").lower()
- # Display all items when query empty
- sessions_list = extension.list_sessions(term)
- return RenderResultListAction(sessions_list[:8])
- class ItemEnterEventListener(EventListener):
- def on_event(self, event, extension):
- global usage_cache
- # Get query
- data = event.get_data()
- on_enter = data["id"]
- # The profilefile name is the ID
- base = os.path.basename(on_enter)
- b = os.path.splitext(base)[0]
- # Check usage and increment
- if b in usage_cache:
- usage_cache[b] = usage_cache[b]+1
- else:
- usage_cache[b] = 1
- # Update usage JSON
- with open(usage_db, 'w') as db:
- db.write(json.dumps(usage_cache, indent=2))
- return RunScriptAction('#!/usr/bin/env bash\n{} --sessionid={}\n'.format(x2goclient_bin, on_enter), None).run()
- def create_item(seid, host, name):
- return ExtensionResultItem(
- name=name,
- description=host,
- icon="images/x2goclient.svg",
- on_enter=ExtensionCustomAction(
- {"id": seid})
- )
- def sort_by_usage(i):
- global usage_cache
- # Convert item name to ID format
- j = i._name.lower()
- # Return score according to usage
- if j in usage_cache:
- return usage_cache[j]
- # Default is 0 (no usage rank / unused)
- return 0
- if __name__ == "__main__":
- x2goclientExtension().run()
|