|
1 |
| -import json |
2 | 1 | import logging
|
3 | 2 | from datetime import timedelta
|
4 |
| -from importlib.abc import Traversable |
5 |
| -from typing import Any |
| 3 | +from pathlib import Path |
6 | 4 |
|
7 | 5 | from databricks.labs.blueprint.installation import Installation
|
8 | 6 | from databricks.labs.blueprint.installer import InstallState
|
| 7 | +from databricks.labs.lsql.dashboards import DashboardMetadata, Dashboards |
9 | 8 | from databricks.sdk import WorkspaceClient
|
10 |
| -from databricks.sdk.errors import DatabricksError |
11 |
| -from databricks.sdk.errors import InvalidParameterValue |
| 9 | +from databricks.sdk.errors import ( |
| 10 | + InvalidParameterValue, |
| 11 | + NotFound, |
| 12 | + DeadlineExceeded, |
| 13 | + InternalError, |
| 14 | + ResourceAlreadyExists, |
| 15 | +) |
12 | 16 | from databricks.sdk.retries import retried
|
13 |
| -from databricks.sdk.service.dashboards import Dashboard |
| 17 | +from databricks.sdk.service.dashboards import LifecycleState, Dashboard |
| 18 | + |
| 19 | +from databricks.labs.remorph.config import ReconcileConfig, ReconcileMetadataConfig |
14 | 20 |
|
15 | 21 | logger = logging.getLogger(__name__)
|
16 | 22 |
|
17 | 23 |
|
18 | 24 | class DashboardDeployment:
|
19 |
| - _UPLOAD_TIMEOUT = timedelta(seconds=30) |
20 | 25 |
|
21 |
| - def __init__(self, ws: WorkspaceClient, installation: Installation, install_state: InstallState): |
| 26 | + def __init__( |
| 27 | + self, |
| 28 | + ws: WorkspaceClient, |
| 29 | + installation: Installation, |
| 30 | + install_state: InstallState, |
| 31 | + ): |
22 | 32 | self._ws = ws
|
23 | 33 | self._installation = installation
|
24 | 34 | self._install_state = install_state
|
25 | 35 |
|
26 |
| - def deploy(self, name: str, dashboard_file: Traversable, parameters: dict[str, Any] | None = None): |
27 |
| - logger.debug(f"Deploying dashboard {name} from {dashboard_file.name}") |
28 |
| - dashboard_data = self._substitute_params(dashboard_file, parameters or {}) |
29 |
| - dashboard = self._update_or_create_dashboard(name, dashboard_data, dashboard_file) |
30 |
| - logger.info(f"Dashboard deployed with dashboard_id {dashboard.dashboard_id}") |
31 |
| - logger.info(f"Dashboard URL: {self._ws.config.host}/sql/dashboardsv3/{dashboard.dashboard_id}") |
32 |
| - self._install_state.save() |
33 |
| - |
34 |
| - @retried(on=[DatabricksError], timeout=_UPLOAD_TIMEOUT) |
35 |
| - def _update_or_create_dashboard(self, name: str, dashboard_data, dashboard_file) -> Dashboard: |
36 |
| - if name in self._install_state.dashboards: |
| 36 | + def deploy( |
| 37 | + self, |
| 38 | + folder: Path, |
| 39 | + config: ReconcileConfig, |
| 40 | + ): |
| 41 | + """ |
| 42 | + Create dashboards from Dashboard metadata files. |
| 43 | + The given folder is expected to contain subfolders each containing metadata for individual dashboards. |
| 44 | +
|
| 45 | + :param folder: Path to the base folder. |
| 46 | + :param config: Configuration for reconciliation. |
| 47 | + """ |
| 48 | + logger.info(f"Deploying dashboards from base folder {folder}") |
| 49 | + parent_path = f"{self._installation.install_folder()}/dashboards" |
| 50 | + try: |
| 51 | + self._ws.workspace.mkdirs(parent_path) |
| 52 | + except ResourceAlreadyExists: |
| 53 | + logger.info(f"Dashboard parent path already exists: {parent_path}") |
| 54 | + |
| 55 | + valid_dashboard_refs = set() |
| 56 | + for dashboard_folder in folder.iterdir(): |
| 57 | + if not dashboard_folder.is_dir(): |
| 58 | + continue |
| 59 | + valid_dashboard_refs.add(self._dashboard_reference(dashboard_folder)) |
| 60 | + dashboard = self._update_or_create_dashboard(dashboard_folder, parent_path, config.metadata_config) |
| 61 | + logger.info( |
| 62 | + f"Dashboard deployed with URL: {self._ws.config.host}/sql/dashboardsv3/{dashboard.dashboard_id}" |
| 63 | + ) |
| 64 | + self._install_state.save() |
| 65 | + |
| 66 | + self._remove_deprecated_dashboards(valid_dashboard_refs) |
| 67 | + |
| 68 | + def _dashboard_reference(self, folder: Path) -> str: |
| 69 | + return f"{folder.stem}".lower() |
| 70 | + |
| 71 | + # InternalError and DeadlineExceeded are retried because of Lakeview internal issues |
| 72 | + # These issues have been reported to and are resolved by the Lakeview team |
| 73 | + # Keeping the retry for resilience |
| 74 | + @retried(on=[InternalError, DeadlineExceeded], timeout=timedelta(minutes=3)) |
| 75 | + def _update_or_create_dashboard( |
| 76 | + self, |
| 77 | + folder: Path, |
| 78 | + ws_parent_path: str, |
| 79 | + config: ReconcileMetadataConfig, |
| 80 | + ) -> Dashboard: |
| 81 | + logging.info(f"Reading dashboard folder {folder}") |
| 82 | + metadata = DashboardMetadata.from_path(folder).replace_database( |
| 83 | + catalog=config.catalog, |
| 84 | + catalog_to_replace="remorph", |
| 85 | + database=config.schema, |
| 86 | + database_to_replace="reconcile", |
| 87 | + ) |
| 88 | + |
| 89 | + metadata.display_name = self._name_with_prefix(metadata.display_name) |
| 90 | + reference = self._dashboard_reference(folder) |
| 91 | + dashboard_id = self._install_state.dashboards.get(reference) |
| 92 | + if dashboard_id is not None: |
37 | 93 | try:
|
38 |
| - dashboard_id = self._install_state.dashboards[name] |
39 |
| - logger.info(f"Updating dashboard with id={dashboard_id}") |
40 |
| - updated_dashboard = self._ws.lakeview.update( |
41 |
| - dashboard_id, |
42 |
| - display_name=self._name_with_prefix(name), |
43 |
| - serialized_dashboard=dashboard_data, |
44 |
| - ) |
45 |
| - return updated_dashboard |
46 |
| - except InvalidParameterValue: |
47 |
| - del self._install_state.dashboards[name] |
48 |
| - logger.warning(f"Dashboard {name} does not exist anymore for some reason.") |
49 |
| - return self._update_or_create_dashboard(name, dashboard_data, dashboard_file) |
50 |
| - logger.info(f"Creating new dashboard {name}") |
51 |
| - new_dashboard = self._ws.lakeview.create( |
52 |
| - display_name=self._name_with_prefix(name), |
53 |
| - parent_path=self._install_state.install_folder(), |
54 |
| - serialized_dashboard=dashboard_data, |
| 94 | + dashboard_id = self._handle_existing_dashboard(dashboard_id, metadata.display_name) |
| 95 | + except (NotFound, InvalidParameterValue): |
| 96 | + logger.info(f"Recovering invalid dashboard: {metadata.display_name} ({dashboard_id})") |
| 97 | + try: |
| 98 | + dashboard_path = f"{ws_parent_path}/{metadata.display_name}.lvdash.json" |
| 99 | + self._ws.workspace.delete(dashboard_path) # Cannot recreate dashboard if file still exists |
| 100 | + logger.debug( |
| 101 | + f"Deleted dangling dashboard {metadata.display_name} ({dashboard_id}): {dashboard_path}" |
| 102 | + ) |
| 103 | + except NotFound: |
| 104 | + pass |
| 105 | + dashboard_id = None # Recreate the dashboard if it's reference is corrupted (manually) |
| 106 | + |
| 107 | + dashboard = Dashboards(self._ws).create_dashboard( |
| 108 | + metadata, |
| 109 | + dashboard_id=dashboard_id, |
| 110 | + parent_path=ws_parent_path, |
| 111 | + warehouse_id=self._ws.config.warehouse_id, |
| 112 | + publish=True, |
55 | 113 | )
|
56 |
| - assert new_dashboard.dashboard_id is not None |
57 |
| - self._install_state.dashboards[name] = new_dashboard.dashboard_id |
58 |
| - return new_dashboard |
59 |
| - |
60 |
| - def _substitute_params(self, dashboard_file: Traversable, parameters: dict[str, Any]) -> str: |
61 |
| - if not parameters: |
62 |
| - return dashboard_file.read_text() |
63 |
| - |
64 |
| - with dashboard_file.open() as f: |
65 |
| - dashboard_data = json.load(f) |
66 |
| - |
67 |
| - for dataset in dashboard_data.get("datasets", []): |
68 |
| - for param in dataset.get("parameters", []): |
69 |
| - if param["keyword"] in parameters: |
70 |
| - param["defaultSelection"] = { |
71 |
| - "values": { |
72 |
| - "dataType": "STRING", |
73 |
| - "values": [ |
74 |
| - {"value": parameters[param["keyword"]]}, |
75 |
| - ], |
76 |
| - }, |
77 |
| - } |
78 |
| - |
79 |
| - return json.dumps(dashboard_data) |
| 114 | + assert dashboard.dashboard_id is not None |
| 115 | + self._install_state.dashboards[reference] = dashboard.dashboard_id |
| 116 | + return dashboard |
80 | 117 |
|
81 | 118 | def _name_with_prefix(self, name: str) -> str:
|
82 | 119 | prefix = self._installation.product()
|
83 | 120 | return f"[{prefix.upper()}] {name}"
|
| 121 | + |
| 122 | + def _handle_existing_dashboard(self, dashboard_id: str, display_name: str) -> str | None: |
| 123 | + dashboard = self._ws.lakeview.get(dashboard_id) |
| 124 | + if dashboard.lifecycle_state is None: |
| 125 | + raise NotFound(f"Dashboard life cycle state: {display_name} ({dashboard_id})") |
| 126 | + if dashboard.lifecycle_state == LifecycleState.TRASHED: |
| 127 | + logger.info(f"Recreating trashed dashboard: {display_name} ({dashboard_id})") |
| 128 | + return None # Recreate the dashboard if it is trashed (manually) |
| 129 | + return dashboard_id # Update the existing dashboard |
| 130 | + |
| 131 | + def _remove_deprecated_dashboards(self, valid_dashboard_refs: set[str]): |
| 132 | + for ref, dashboard_id in self._install_state.dashboards.items(): |
| 133 | + if ref not in valid_dashboard_refs: |
| 134 | + try: |
| 135 | + logger.info(f"Removing dashboard_id={dashboard_id}, as it is no longer needed.") |
| 136 | + del self._install_state.dashboards[ref] |
| 137 | + self._ws.lakeview.trash(dashboard_id) |
| 138 | + except (InvalidParameterValue, NotFound): |
| 139 | + logger.warning(f"Dashboard `{dashboard_id}` doesn't exist anymore for some reason.") |
| 140 | + continue |
0 commit comments