22
22
from .user_app_loader import load_user_app
23
23
from .runtime import execution_context
24
24
import logging
25
+ import multiprocessing as mp
25
26
26
27
WATCHDOG_INTERVAL_SECONDS = 10.0
27
28
@@ -43,6 +44,7 @@ def _get_pool() -> ProcessPoolExecutor:
43
44
max_workers = 1 ,
44
45
initializer = _subprocess_init ,
45
46
initargs = (_user_apps , os .getpid ()),
47
+ mp_context = mp .get_context ("spawn" ),
46
48
)
47
49
return _pool
48
50
@@ -69,6 +71,7 @@ def _restart_pool(old_pool: ProcessPoolExecutor | None = None) -> None:
69
71
max_workers = 1 ,
70
72
initializer = _subprocess_init ,
71
73
initargs = (_user_apps , os .getpid ()),
74
+ mp_context = mp .get_context ("spawn" ),
72
75
)
73
76
if prev_pool is not None :
74
77
# Best-effort shutdown of previous pool; letting exceptions bubble up
@@ -124,8 +127,19 @@ def _watch() -> None:
124
127
125
128
def _subprocess_init (user_apps : list [str ], parent_pid : int ) -> None :
126
129
_start_parent_watchdog (parent_pid )
130
+
131
+ # In case any user app is already in this subprocess, e.g. the subprocess is forked, we need to avoid loading it again.
132
+ with _pool_lock :
133
+ already_loaded_apps = set (_user_apps )
134
+
135
+ loaded_apps = []
127
136
for app_target in user_apps :
128
- load_user_app (app_target )
137
+ if app_target not in already_loaded_apps :
138
+ load_user_app (app_target )
139
+ loaded_apps .append (app_target )
140
+
141
+ with _pool_lock :
142
+ _user_apps .extend (loaded_apps )
129
143
130
144
131
145
class _OnceResult :
0 commit comments