Skip to content

Commit de2d52b

Browse files
author
Yuriy Natarov
committed
Sample DAG with SDK
1 parent d7a52c1 commit de2d52b

File tree

1 file changed

+58
-0
lines changed

1 file changed

+58
-0
lines changed

dags/sample_sdk.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import pathlib
2+
3+
import doublecloud
4+
from doublecloud.clickhouse.v1.cluster_service_pb2 import ListClustersRequest
5+
from doublecloud.clickhouse.v1.cluster_service_pb2_grpc import ClusterServiceStub
6+
7+
from airflow.decorators import dag, task
8+
from airflow.hooks.base import BaseHook
9+
10+
11+
@dag(
12+
dag_id=pathlib.Path(__file__).stem,
13+
dag_display_name="List ClickHouse clusters using SDK and passed service account",
14+
tags=["sample", "clickhouse", "sdk", "service_account"],
15+
schedule=None,
16+
catchup=False,
17+
)
18+
def sample_list_ch_clusters():
19+
@task
20+
def get_project_id():
21+
'''
22+
What project to use?
23+
'''
24+
return "cloud"
25+
26+
27+
@task
28+
def display_clusters(project_id):
29+
'''
30+
Lists CH clusters using the SDK
31+
'''
32+
# Fetch the connection using Airflow's connection management system
33+
# To use the functionality, go to Cluster Settings and specify a Service Account
34+
connection = BaseHook.get_connection('doublecloud_api_private_key')
35+
36+
# Setup SDK using data from the connection
37+
sdk = doublecloud.SDK(service_account_key={
38+
"id": connection.extra_dejson.get('kid'),
39+
"service_account_id": connection.login,
40+
"private_key": connection.password,
41+
})
42+
43+
cluster_service = sdk.client(ClusterServiceStub)
44+
response = cluster_service.List(ListClustersRequest(
45+
project_id=project_id,
46+
))
47+
print("Your CH clusters are:")
48+
for cluster in response.clusters:
49+
print(cluster)
50+
51+
display_clusters(project_id=get_project_id())
52+
53+
54+
my_dag = sample_list_ch_clusters()
55+
56+
57+
if __name__ == '__main__':
58+
my_dag.test()

0 commit comments

Comments
 (0)