9.87 KB
Newer Older
Stelios Karozis's avatar
Stelios Karozis committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
import os
import re

from parso import python_bytes_to_unicode

from jedi.debug import dbg
from jedi.file_io import KnownContentFileIO
from jedi.inference.imports import SubModuleName, load_module_from_path
from jedi.inference.filters import ParserTreeFilter
from jedi.inference.gradual.conversion import convert_names

_IGNORE_FOLDERS = ('.tox', 'venv', '__pycache__')

Stats from a 2016 Lenovo Notebook running Linux:
With os.walk, it takes about 10s to scan 11'000 files (without filesystem
caching). Once cached it only takes 5s. So it is expected that reading all
those files might take a few seconds, but not a lot more.
For now we keep the amount of parsed files really low, since parsing might take
easily 100ms for bigger files.

def _resolve_names(definition_names, avoid_names=()):
    for name in definition_names:
        if name in avoid_names:
            # Avoiding recursions here, because goto on a module name lands
            # on the same module.

        if not isinstance(name, SubModuleName):
            # SubModuleNames are not actually existing names but created
            # names when importing something like `import`.
            yield name

        if name.api_type == 'module':
            for n in _resolve_names(name.goto(), definition_names):
                yield n

def _dictionarize(names):
    return dict(
        (n if n.tree_name is None else n.tree_name, n)
        for n in names

def _find_defining_names(module_context, tree_name):
    found_names = _find_names(module_context, tree_name)

    for name in list(found_names):
        # Convert from/to stubs, because those might also be usages.
        found_names |= set(convert_names(
            only_stubs=not name.get_root_context().is_stub(),

    found_names |= set(_find_global_variables(found_names, tree_name.value))
    for name in list(found_names):
        if name.api_type == 'param' or name.tree_name is None \
                or name.tree_name.parent.type == 'trailer':
        found_names |= set(_add_names_in_same_context(name.parent_context, name.string_name))
    return set(_resolve_names(found_names))

def _find_names(module_context, tree_name):
    name = module_context.create_name(tree_name)
    found_names = set(name.goto())

    return set(_resolve_names(found_names))

def _add_names_in_same_context(context, string_name):
    if context.tree_node is None:

    until_position = None
    while True:
        filter_ = ParserTreeFilter(
        names = set(filter_.get(string_name))
        if not names:
        for name in names:
            yield name
        ordered = sorted(names, key=lambda x: x.start_pos)
        until_position = ordered[0].start_pos

def _find_global_variables(names, search_name):
    for name in names:
        if name.tree_name is None:
        module_context = name.get_root_context()
            method = module_context.get_global_filter
        except AttributeError:
            for global_name in method().get(search_name):
                yield global_name
                c = module_context.create_context(global_name.tree_name)
                for n in _add_names_in_same_context(c, global_name.string_name):
                    yield n

def find_references(module_context, tree_name):
    inf = module_context.inference_state
    search_name = tree_name.value

    # We disable flow analysis, because if we have ifs that are only true in
    # certain cases, we want both sides.
        inf.flow_analysis_enabled = False
        found_names = _find_defining_names(module_context, tree_name)
        inf.flow_analysis_enabled = True

    found_names_dct = _dictionarize(found_names)

    module_contexts = set(d.get_root_context() for d in found_names)
    module_contexts = [module_context] + [m for m in module_contexts if m != module_context]
    # For param no search for other modules is necessary.
    if any(n.api_type == 'param' for n in found_names):
        potential_modules = module_contexts
        potential_modules = get_module_contexts_containing_name(

    non_matching_reference_maps = {}
    for module_context in potential_modules:
        for name_leaf in module_context.tree_node.get_used_names().get(search_name, []):
            new = _dictionarize(_find_names(module_context, name_leaf))
            if any(tree_name in found_names_dct for tree_name in new):
                for tree_name in new:
                    for dct in non_matching_reference_maps.get(tree_name, []):
                        # A reference that was previously searched for matches
                        # with a now found name. Merge.
                        del non_matching_reference_maps[tree_name]
                    except KeyError:
                for name in new:
                    non_matching_reference_maps.setdefault(name, []).append(new)
    return found_names_dct.values()

def _check_fs(inference_state, file_io, regex):
        code =
    except FileNotFoundError:
        return None
    code = python_bytes_to_unicode(code, errors='replace')
    if not
        return None
    new_file_io = KnownContentFileIO(file_io.path, code)
    m = load_module_from_path(inference_state, new_file_io)
    if m.is_compiled():
        return None
    return m.as_context()

def gitignored_lines(folder_io, file_io):
    ignored_paths = set()
    ignored_names = set()
    for l in
        if not l or l.startswith(b'#'):

        p = l.decode('utf-8', 'ignore')
        if p.startswith('/'):
            name = p[1:]
            if name.endswith(os.path.sep):
                name = name[:-1]
            ignored_paths.add(os.path.join(folder_io.path, name))
    return ignored_paths, ignored_names

def recurse_find_python_folders_and_files(folder_io, except_paths=()):
    except_paths = set(except_paths)
    for root_folder_io, folder_ios, file_ios in folder_io.walk():
        # Delete folders that we don't want to iterate over.
        for file_io in file_ios:
            path = file_io.path
            if path.endswith('.py') or path.endswith('.pyi'):
                if path not in except_paths:
                    yield None, file_io

            if path.endswith('.gitignore'):
                ignored_paths, ignored_names = \
                    gitignored_lines(root_folder_io, file_io)
                except_paths |= ignored_paths

        folder_ios[:] = [
            for folder_io in folder_ios
            if folder_io.path not in except_paths
            and folder_io.get_base_name() not in _IGNORE_FOLDERS
        for folder_io in folder_ios:
            yield folder_io, None

def recurse_find_python_files(folder_io, except_paths=()):
    for folder_io, file_io in recurse_find_python_folders_and_files(folder_io, except_paths):
        if file_io is not None:
            yield file_io

def _find_python_files_in_sys_path(inference_state, module_contexts):
    sys_path = inference_state.get_sys_path()
    except_paths = set()
    yielded_paths = [m.py__file__() for m in module_contexts]
    for module_context in module_contexts:
        file_io = module_context.get_value().file_io
        if file_io is None:

        folder_io = file_io.get_parent_folder()
        while True:
            path = folder_io.path
            if not any(path.startswith(p) for p in sys_path) or path in except_paths:
            for file_io in recurse_find_python_files(folder_io, except_paths):
                if file_io.path not in yielded_paths:
                    yield file_io
            folder_io = folder_io.get_parent_folder()

def get_module_contexts_containing_name(inference_state, module_contexts, name,
    Search a name in the directories of modules.

    :param limit_reduction: Divides the limits on opening/parsing files by this
    # Skip non python modules
    for module_context in module_contexts:
        if module_context.is_compiled():
        yield module_context

    # Very short names are not searched in other modules for now to avoid lots
    # of file lookups.
    if len(name) <= 2:

    file_io_iterator = _find_python_files_in_sys_path(inference_state, module_contexts)
    for x in search_in_file_ios(inference_state, file_io_iterator, name,
        yield x  # Python 2...

def search_in_file_ios(inference_state, file_io_iterator, name, limit_reduction=1):
    parse_limit = _PARSED_FILE_LIMIT / limit_reduction
    open_limit = _OPENED_FILE_LIMIT / limit_reduction
    file_io_count = 0
    parsed_file_count = 0
    regex = re.compile(r'\b' + re.escape(name) + r'\b')
    for file_io in file_io_iterator:
        file_io_count += 1
        m = _check_fs(inference_state, file_io, regex)
        if m is not None:
            parsed_file_count += 1
            yield m
            if parsed_file_count >= parse_limit:
                dbg('Hit limit of parsed files: %s', parse_limit)

        if file_io_count >= open_limit:
            dbg('Hit limit of opened files: %s', open_limit)