|     IEEE Xplore Digital Library     |     IEEE Standards     |     IEEE Spectrum     |     More Sites

Verified Commit d78e94b7 authored by Emi Simpson's avatar Emi Simpson
Browse files

Integrate new coordinator patch for per-repo jobs

parent 2947426f
from asyncio.streams import StreamReader, StreamWriter
from pymysql.cursors import Cursor
from mystic.sources import Source
from typing import Any, AsyncGenerator, Callable, List, NamedTuple, Optional, Tuple, TypeVar, TypedDict, Union
from typing import Any, AsyncGenerator, Callable, Dict, List, NamedTuple, Optional, Tuple, TypeVar, TypedDict, Union
from mystic.database import Project
from asyncio import BoundedSemaphore, open_unix_connection
import msgpack
......@@ -48,7 +48,7 @@ class CoordinatorConnection:
project: Project,
desc: str,
priority: int,
backends: List[str],
backends: Dict[str, List[str]],
) -> None:
Call "post_job" on the remote instance.
......@@ -103,10 +103,10 @@ class CoordinatorConnection:
project = source.project
res = 'Res' if rescan else 'S'
desc = f'{res}can {source.source_type}'
backends = [
for source in source.expand_source()
backends: Dict[str, List[str]] = dict()
for source in source.expand_source():
existing_sources = backends.setdefault(source.source_type, list())
await self.post_job(project, desc, priority, backends)
async def scan_project(
......@@ -129,11 +129,11 @@ class CoordinatorConnection:
A cursor must be provided to load any missing fields of the project
desc = f'Full scan of {project.get_display_name(c)}'
backends = [
for metasource in project.get_data_sources(c)
for source in metasource.expand_source()
backends = dict(
(sources[0].source_type, [source.url for source in sources])
for sources
in project.get_folded_data_sources(c, expanded = True).values()
await self.post_job(project, desc, priority, backends)
async def jobs_for_project(
......@@ -543,10 +543,14 @@ class Project:
self._owners = [User(uid) for (uid,) in c.fetchall()]
return self._owners
def get_data_sources(self, c: Cursor) -> List[Source]:
def get_data_sources(self, c: Cursor, expanded: bool = False) -> List[Source]:
Retrieve the data sources for this project.
If `expanded` is set, then :func`mystic.sources.Source.expand_source` is called
once on each source before all the results are returned. Otherwise, the output is
a list of unexpanded (meta)sources.
This MAY trigger a database query if the sources are currently unknown.
This does NOT validate that the project id exists in the database, and
......@@ -586,9 +590,24 @@ class Project:
for (source_id, project, source_type, url, flagged)
in source_tuples
return self._data_sources
if not expanded:
return self._data_sources
return [
for metasource in self._data_sources
for source in metasource.expand_source()
def get_folded_data_sources(self, c: Cursor) -> Dict[Type[Source], List[Source]]:
def get_folded_data_sources(self, c: Cursor, expanded: bool = False) -> Dict[Type[Source], List[Source]]:
A wrapper around :func:`get_data_sources` to organize data sources by type
The result is a dict mapping each type of source to a list of all sources of that
type. The `expanded` parameter is passed to the wrapped method, if set, an causes
all returned sources to be expanded once. If not set, all sources are unexpanded
def fold_into_dict(d: Dict[Type[Source], List[Source]], s: Source) -> Dict[Type[Source], List[Source]]:
if type(s) not in d:
d[type(s)] = [s]
......@@ -597,7 +616,7 @@ class Project:
return d
sources: Dict[Type[Source], List[Source]] = dict()
return reduce(fold_into_dict, self.get_data_sources(c), sources)
return reduce(fold_into_dict, self.get_data_sources(c, expanded), sources)
def loaded(self) -> bool:
......@@ -169,7 +169,7 @@
<div class="loader"></div>
#{{job.queue_position - job.remaining_backends|length + 1}}
#{{job.queue_position - job.remaining_backends + 1}}
in queue
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment