|     IEEE Xplore Digital Library     |     IEEE Standards     |     IEEE Spectrum     |     More Sites

Verified Commit cf52cee4 authored by Emi Simpson's avatar Emi Simpson
Browse files

Implement multiple-tag system

parent 32d02697
from asyncio.streams import StreamReader, StreamWriter
from pymysql.cursors import Cursor
from mystic.sources import Source
from typing import Any, AsyncGenerator, Callable, Dict, List, NamedTuple, Optional, Tuple, TypeVar, TypedDict, Union
from typing import Any, AsyncGenerator, Callable, Dict, List, NamedTuple, Optional, Set, Tuple, TypeVar, TypedDict, Union
from mystic.database import Project
from asyncio import BoundedSemaphore, open_unix_connection
import msgpack
......@@ -45,7 +45,7 @@ class CoordinatorConnection:
async def post_job(
project: Project,
tags: List[str],
desc: str,
priority: int,
backends: Dict[str, List[str]],
......@@ -53,9 +53,6 @@ class CoordinatorConnection:
Call "post_job" on the remote instance.
Note that this takes a :class:`mystic.database.Project` instead of a project ID,
like the underlying procedure uses.
This will not wait for or attempt to receive a response before exiting. Errors
should be picked up through the notifications channel. TODO: Elaborate
......@@ -68,7 +65,7 @@ class CoordinatorConnection:
self.msg_count, # message_id
project.alphaid, # project_id
tags, # tags
desc, # description
priority, # priority
backends, # backends
......@@ -107,7 +104,7 @@ class CoordinatorConnection:
for source in source.expand_source():
existing_sources = backends.setdefault(source.source_type, list())
await self.post_job(project, desc, priority, backends)
await self.post_job([project.alphaid, f's-{source.origin_id:x}'], desc, priority, backends)
async def scan_project(
......@@ -134,7 +131,10 @@ class CoordinatorConnection:
for sources
in project.get_folded_data_sources(c, expanded = True).values()
await self.post_job(project, desc, priority, backends)
await self.post_job(
[project.alphaid] + [f's-{s.origin_id:x}' for s in project.get_data_sources(c)],
desc, priority, backends
async def jobs_by_tag(
......@@ -180,21 +180,34 @@ class CoordinatorConnection:
return (await self.jobs_by_tag([project.alphaid]))[project.alphaid]
async def jobs_for_projects(self, projects: List[Project]) -> Dict[Project, List['JobDict']]:
async def pending_sources_for_projects(
c: Cursor,
projects: List[Project]
) -> Set[int]:
A wrapper for :func:`jobs_by_tag` that uses projects instead of tags
For a list of projects, find all sources that are still being processed
Returns a list of source `origin_id`s, where each origin ID represents a source
which is still in the processing queue.
Assumes that jobs are tagged with the alphaid of the project that they're running
for. Results are returned mapped by project
Wraps :func:`jobs_by_tag`
Assumes that jobs are tagged with the string "s-{origin_id_as_hex}" for each
source that the job effects
results = await self.jobs_by_tag([project.alphaid for project in projects])
return dict(
source_tags = {
f's-{source.origin_id:x}': source.origin_id
for project in projects
for source in project.get_data_sources(c)
results = await self.jobs_by_tag(list(source_tags.keys()))
return {
for tag, matches in results.items()
if len(matches) > 0
def subscribe_to_notification(self, method_name: str) -> AsyncGenerator[List[Any], None]:
......@@ -25,7 +25,7 @@ from mystic.sources import SOURCE_PROCESSORS
from flask import request, session, g
from mystic.database import MalformedId, Project, User
from mystic.config import connect_coordinator, service_provider, save_projects_json
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple, cast
from typing import Any, Callable, Coroutine, Dict, List, Optional, Set, Tuple, cast
class ErrorTuple(Exception):
......@@ -406,29 +406,20 @@ def get_pending_jobs(p: Project) -> List[JobDict]:
return await coordinator.jobs_for_project(p)
def get_many_pending_sources(projects: List[Project]) -> Dict[Project, List[str]]:
def get_many_pending_sources(projects: List[Project], c: Cursor) -> Set[int]:
Gets pending sources for all provided projects
Results are formatted in a dict maping each project to its pending sources. Pending
sources are represented only as the Repo URL.
Results are formatted as per
This will be changed tomorrow when I switch to a multi-tag system so don't get too
This will be changed in like two commits when I stop using the multi-tag system here
so don't get too attached
A pending source is a source who MAY be flagged as invalid by a pending job running
in the coordinator.
async def get_jobs() -> Dict[Project, List[JobDict]]:
async def get_jobs() -> Set[int]:
coordinator = await connect_coordinator()
return await coordinator.jobs_for_projects(projects)
return {
for job_dict in jobs
for _, urls in job_dict["remaining_backends"].items()
for repo_url in urls
for project, jobs in
return await coordinator.pending_sources_for_projects(c, projects)
......@@ -55,7 +55,7 @@
{% for source in sources %}
<a href="{{source.url}}">{{source.url}}</a>
<div class="delete-and-warning-container">
{% if source.url is in(pending_sources) %}
{% if source.origin_id is in(pending_sources) %}
<div class="loader icon"></div>
{% elif source.flagged %}
{{utils.show_icon("alert-triangle", "problems occured while scanning", 16)}}
......@@ -2,7 +2,6 @@
{% import "utils.html" as utils %}
{% import "projects-common.html" as project_utils %}
{% set selected = "home" %}
{% set projects_and_pending_sources = jobs.items() %}
{% block title %} Projects {% endblock %}
{% block head %}
......@@ -41,7 +40,7 @@ header {
<a href=#new-project><button class=big> + Add </button></a>
<div id=projects>
{% for project, pending_sources in projects_and_pending_sources %}
{% for project in user.get_projects(cursor) %}
{% set id = project.alphaid %}
<div class=project>
......@@ -42,7 +42,7 @@ def main() -> str:
user=user, cursor=cursor,
pending_sources=get_many_pending_sources(user.get_projects(cursor), cursor)
return render_template(
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment