IEEE.org     |     IEEE Xplore Digital Library     |     IEEE Standards     |     IEEE Spectrum     |     More Sites

Verified Commit 7adbecd2 authored by Emi Simpson's avatar Emi Simpson
Browse files

Flag sources as invalid after encountering an error

parent 23ad7268
Pipeline #235 failed with stages
in 30 seconds
......@@ -21,6 +21,10 @@ A more complete list of responsibilities:
from __future__ import annotations
from base64 import b64encode
from hashlib import sha1
import json
from mystic.error_flagging import flag_errors_daemon
from threading import Thread
from mystic.projects import produce_project_report
from mystic.coordination import CoordinatorConnection
from os import path, getcwd
from elasticsearch.client import Elasticsearch
......@@ -114,6 +118,13 @@ def static_versioned(filename: str) -> str:
file_hashes[filename] = output
return url_for('static', filename=filename, v=file_hashes[filename])
def save_projects_json(c: Cursor, path: Optional[str] = None) -> None:
report = produce_project_report(c)
if path is None:
path = cast(Dict[str, str], current_app.config)['PROJECTS_FILE']
with open(path, 'w') as output_file:
json.dump(report, output_file, indent=2)
def create_app(test_config: Optional[Dict[str, Any]] = None) -> Flask:
app = Flask(__name__)
......@@ -153,6 +164,23 @@ def create_app(test_config: Optional[Dict[str, Any]] = None) -> Flask:
exit(1001)
app.before_first_request(configure_database)
def set_up_error_flagging() -> None:
db_config: Dict[str, str] = current_app.config['DATABASE']
coordinator = connect_coordinator()
projects_json: str = current_app.config['PROJECTS_FILE']
def run_daemon() -> None:
db = pymysql.connect(
host = db_config['host'],
user = db_config['user'],
password = db_config['password'],
database = db_config['database'],
charset='utf8mb4',
cursorclass=Cursor,
)
flag_errors_daemon(coordinator, db, projects_json)
Thread(target=run_daemon).start()
app.before_first_request(set_up_error_flagging)
def close_database(_: Optional[BaseException]) -> Response:
db = getattr(g, 'database', None)
if db is not None:
......
from pymysql.cursors import Cursor
from mystic.sources import Source
from typing import Any, List, NamedTuple, Optional, TypedDict, Union, cast
from typing import Any, Callable, Generator, List, NamedTuple, Optional, TypeVar, TypedDict, Union
from mystic.database import Project
from socket import socket, AF_UNIX, SOCK_STREAM
from threading import BoundedSemaphore
......@@ -161,10 +161,41 @@ class CoordinatorConnection:
assert msg_packed is not None
self.msg_count += 1
self.connection.sendall(msg_packed)
response = self._recieve_response(self.msg_count - 1)
def matching_response(msg: 'RecievedMessage') -> Optional[Response]:
if isinstance(msg, Response):
if msg.msg_id == self.msg_count - 1:
return msg
return None
response = next(self._recieve_specific_messages(matching_response))
assert response is not None, "Connection to coordinator closed unexpectedly"
return response.response
def subscribe_to_notification(self, method_name: str) -> Generator[List[Any], None, None]:
"""
Produces a generator returning any notification type messages to the given method
The generator returns lists of arbitrary parameters. Each list represents the
parameters passed to a single call of the notification.
Any irrelevant notifications will be appended to the inbox to be retrieved by
another method.
The generator returns when the stream closes
"""
def is_matching_notification(msg: 'RecievedMessage') -> Optional['Notification']:
if isinstance(msg, Notification):
if msg.method == method_name:
return msg
return None
return (
n.params
for n
in self._recieve_specific_messages(is_matching_notification)
)
def _receive_single_message(self, already_has_lock: bool = False) -> Optional['RecievedMessage']:
"""
Blocks until a single message is read from the connection
......@@ -201,36 +232,53 @@ class CoordinatorConnection:
else:
raise AssertionError(f"Invalid msgtype recieved from coordinator: {resp[0]}")
def _recieve_response(
M = TypeVar('M', 'Notification', 'Response')
def _recieve_specific_messages(
self,
msgid: int,
) -> Optional['Response']:
check: Callable[['RecievedMessage'], Optional[M]]
) -> Generator[M, None, None]:
"""
Repeatedly call :func:`_recieve_single_message` until a response matching the criteria is recieved
Repeatedly call :func:`_recieve_single_message` until a matching message is found.
The passed `check` argument should verify if a given message meets the criteria,
and return it if so. Otherwise, it should return `None`.
Includes logic to ensure that the message will be received even if it wasn't this
thread that received it, making it compatible with listening on the notification
channel.
channel. Non-matching messages will be added to the inbox to be recieved by a
different call.
Returns `None` if the stream closes before a valid response is recieved
Generator returns `None` if the stream closes before a valid response is recieved
Calls to the generator will block while waiting for a message. It is okay to
non-exhaustively call the generator in order to recieve only a single message.
"""
while True:
if not self.receive_lock.acquire(blocking = False):
self.receive_lock.acquire()
# While we were waiting for the lock, something might've been recieved
for i in range(len(self.inbox)):
msg = self.inbox[i]
if msg.msg_type == 1 and msg[1] == msgid:
del self.inbox[i]
self.receive_lock.release()
return cast(Response, msg)
msg = self._receive_single_message(already_has_lock = True)
self.receive_lock.acquire()
# While we were waiting for the lock, something might've been recieved
for i in range(len(self.inbox)):
msg = check(self.inbox[i])
if msg is not None:
del self.inbox[i]
self.receive_lock.release()
yield msg
break
else:
msg = self._receive_single_message(already_has_lock = True)
# Check if the new message was the one we want
if msg is None or msg.msg_type == 1 and msg[1] == msgid:
self.receive_lock.release()
return cast(Optional[Response], msg)
# If the message is none, the connection closed
if msg is None:
self.receive_lock.release()
return None
else:
msgcheck = check(msg)
if msgcheck is not None:
self.receive_lock.release()
yield msgcheck
else:
self.inbox.append(msg)
self.receive_lock.release()
class JobDict(TypedDict):
......
......@@ -1066,6 +1066,26 @@ class User:
else:
return False
def flag_sources_invalid(c: Cursor, backend: str, repo: str) -> int:
"""
Flag a source as invalid.
This should be called when an error was encountered while processing a backend. This
marks the backend as invalid and prevents it from being included in `projects.json`
until the source is recreated.
This will also indicate to the user that a problem occured that needs to be fixed.
Returns the number of sources which matched the criteria and were flagged.
"""
print(f"b:{backend} r:{repo}")
return c.execute('''
UPDATE data_sources
SET flagged = 1
WHERE data_type = %s
AND data_url = %s;
''', (backend, repo))
class InvalidId(Exception):
"""An ID was invalid in some way"""
def __init__(self, item_id: Union[int, str], message: Optional[str] = None) -> None:
......
import logging
from typing import NoReturn
from pymysql.connections import Connection
from .coordination import CoordinatorConnection
from . import database, config
def flag_errors_daemon(connection: CoordinatorConnection, conn: Connection, projects_json: str) -> NoReturn:
c = conn.cursor()
for notification in connection.subscribe_to_notification('repository_error'):
assert len(notification) == 2, "Incorrect number of arguments passed to 'repository_error' by coordinator"
backend = notification[0]
repo = notification[1]
assert isinstance(backend, str), "Invalid parameter passed by coordinator"
assert isinstance(repo, str), "Invalid parameter passed by coordinator"
matched_sources = database.flag_sources_invalid(c, backend, repo)
conn.commit()
print(f"{matched_sources} sources flagged as invalid ({backend}, {repo})")
config.save_projects_json(c, path = projects_json)
c.close()
raise AssertionError('Connection to coordinator closed, no longer recieving error notifications')
......@@ -12,7 +12,6 @@ receiving an invalid request, or handing processing off to another module.
This module also includes methods like `get_user` and `get_project`, which can be used to
determine the active user/project.
"""
import json
from mystic.coordination import JobDict
from mystic.analytics.queries import QueryMachine
from mystic.analytics.dashboards import DASHBOARDS, Dashboard
......@@ -22,10 +21,9 @@ from flask.helpers import flash
from pymysql.cursors import Cursor
from pymysql.err import IntegrityError
from mystic.sources import SOURCE_PROCESSORS
from flask import request, current_app, session, g
from flask import request, session, g
from mystic.database import MalformedId, Project, User
from mystic.config import connect_coordinator, service_provider
from mystic.projects import produce_project_report
from mystic.config import connect_coordinator, service_provider, save_projects_json
from typing import Callable, Dict, List, Optional, cast
class ErrorTuple(Exception):
......@@ -165,7 +163,7 @@ def subaction_add_source(cursor: Cursor) -> None:
f"add_source",
"This data source has already been added"
)
_save_projects_json(cursor)
save_projects_json(cursor)
if not project.is_draft(cursor):
connect_coordinator().rescan_source(source)
......@@ -204,7 +202,7 @@ def action_delete_source(cursor: Cursor) -> None:
except ValueError:
raise ErrorTuple("add_source", "Source ID is not in a valid integer format")
if get_user_force_login(cursor).delete_source_if_owned(cursor, source_id):
_save_projects_json(cursor)
save_projects_json(cursor)
else:
raise ErrorTuple(
"add_source",
......@@ -293,7 +291,7 @@ def subaction_edit_properties(cursor: Cursor) -> None:
'Tell us a little bit about your project'
)
project.finalize_draft(cursor, name, desc)
_save_projects_json(cursor)
save_projects_json(cursor)
connect_coordinator().scan_project(project, cursor)
else:
needs_update = False
......@@ -304,7 +302,7 @@ def subaction_edit_properties(cursor: Cursor) -> None:
project.set_description(request.form['desc'])
project.save(cursor)
if needs_update:
_save_projects_json(cursor)
save_projects_json(cursor)
def action_delete_project(c: Cursor) -> None:
"""
......@@ -400,8 +398,3 @@ def get_analytics(c: Cursor, es: Elasticsearch, project: Project) -> List[Dashbo
def get_pending_jobs(p: Project) -> List[JobDict]:
coordinator = connect_coordinator()
return coordinator.jobs_for_project(p)
def _save_projects_json(c: Cursor) -> None:
report = produce_project_report(c)
with open(cast(Dict[str, str], current_app.config)['PROJECTS_FILE'], 'w') as output_file:
json.dump(report, output_file, indent=2)
......@@ -46,6 +46,7 @@ def setup_database(c: Cursor) -> None:
project_id INTEGER,
data_type VARCHAR(50) NOT NULL,
data_url VARCHAR(255) NOT NULL,
flagged BOOLEAN NOT NULL DEFAULT 0,
FOREIGN KEY (project_id)
REFERENCES projects(project_id)
ON DELETE CASCADE
......
......@@ -22,6 +22,7 @@ def produce_project_report(c: Cursor) -> Dict[str, Dict[str, List[str]]]:
FROM data_sources
JOIN projects
ON projects.project_id = data_sources.project_id
WHERE data_sources.flagged = 0
''')
def fold_into_report(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment