在windows尝试python multiprocessing时出现RuntimeError

我正在尝试我的第一个正式的python程序使用线程和多处理在windows机器上。但是我无法启动进程,python给出以下消息。问题是,我没有在主要模块中启动线程。线程在类中的单独模块中处理。

编辑:顺便说一下,这段代码在ubuntu上运行良好。不完全是在窗户上

RuntimeError:
Attempt to start a new process before the current process
has finished its bootstrapping phase.
This probably means that you are on Windows and you have
forgotten to use the proper idiom in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce a Windows executable.

我的原始代码相当长,但我能够在代码的删节版本中重现错误。它分为两个文件,第一个是主模块,除了导入处理进程/线程和调用方法的模块外,它做的很少。第二个模块是代码的主要部分。


testMain.py:

import parallelTestModule


extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)

parallelTestModule.py:

import multiprocessing
from multiprocessing import Process
import threading


class ThreadRunner(threading.Thread):
""" This class represents a single instance of a running thread"""
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
print self.name,'\n'


class ProcessRunner:
""" This class represents a single instance of a running process """
def runp(self, pid, numThreads):
mythreads = []
for tid in range(numThreads):
name = "Proc-"+str(pid)+"-Thread-"+str(tid)
th = ThreadRunner(name)
mythreads.append(th)
for i in mythreads:
i.start()
for i in mythreads:
i.join()


class ParallelExtractor:
def runInParallel(self, numProcesses, numThreads):
myprocs = []
prunner = ProcessRunner()
for pid in range(numProcesses):
pr = Process(target=prunner.runp, args=(pid, numThreads))
myprocs.append(pr)
#        if __name__ == 'parallelTestModule':    #This didnt work
#        if __name__ == '__main__':              #This obviously doesnt work
#        multiprocessing.freeze_support()        #added after seeing error to no avail
for i in myprocs:
i.start()


for i in myprocs:
i.join()
204833 次浏览

在Windows上,子进程将在启动时导入(即执行)主模块。你需要在主模块中插入一个if __name__ == '__main__':保护,以避免递归地创建子进程。

修改testMain.py:

import parallelTestModule


if __name__ == '__main__':
extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)

尝试将代码放在testMain.py中的主函数中

import parallelTestModule


if __name__ ==  '__main__':
extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)

参见文档:

"For an explanation of why (on Windows) the if __name__ == '__main__'
part is necessary, see Programming guidelines."

这说

"确保主模块可以被新的Python安全地导入 解释器,而不会引起意外的副作用(例如启动一个 新流程)。" < / p >

... 通过使用if __name__ == '__main__'

虽然前面的答案是正确的,但有一个小问题需要说明。

如果你的主模块导入了另一个模块,其中的全局变量或类成员变量被定义并初始化为(或使用)一些新对象,你可能必须以相同的方式设置导入条件:

if __name__ ==  '__main__':
import my_module

正如@Ofer所说,当你使用其他库或模块时,你应该在if __name__ == '__main__':中导入它们

所以,在我的例子中,结尾是这样的:

if __name__ == '__main__':
import librosa
import os
import pandas as pd
run_my_program()

在我的例子中,这是代码中的一个简单错误,在创建变量之前使用了一个变量。在尝试上述解决方案之前,值得检查一下。为什么我得到这个特殊的错误消息,天知道。

你好,这是我的多进程结构

from multiprocessing import Process
import time




start = time.perf_counter()




def do_something(time_for_sleep):
print(f'Sleeping {time_for_sleep} second...')
time.sleep(time_for_sleep)
print('Done Sleeping...')






p1 = Process(target=do_something, args=[1])
p2 = Process(target=do_something, args=[2])




if __name__ == '__main__':
p1.start()
p2.start()


p1.join()
p2.join()


finish = time.perf_counter()
print(f'Finished in {round(finish-start,2 )} second(s)')

你不必在if __name__ == '__main__':中导入,只需运行你希望在里面运行的程序

下面的解决方案应该适用于python multiprocessing和pytorch multiprocessing。

正如其他答案所提到的,修复是有if __name__ == '__main__':,但我在确定从哪里开始时遇到了几个问题,因为我正在使用几个脚本和模块。当我可以在main内部调用我的第一个函数时,然后在它开始创建多个进程之前的一切(不确定为什么)。

将它放在第一行(甚至在导入之前)可以工作。只调用第一个函数返回超时错误。下面是我的代码和multiprocessing的第一个文件调用几个函数后使用,但把主要放在第一个似乎是这里唯一的修复。

if __name__ == '__main__':
from mjrl.utils.gym_env import GymEnv
from mjrl.policies.gaussian_mlp import MLP
from mjrl.baselines.quadratic_baseline import QuadraticBaseline
from mjrl.baselines.mlp_baseline import MLPBaseline
from mjrl.algos.npg_cg import NPG
from mjrl.algos.dapg import DAPG
from mjrl.algos.behavior_cloning import BC
from mjrl.utils.train_agent import train_agent
from mjrl.samplers.core import sample_paths
import os
import json
import mjrl.envs
import mj_envs
import time as timer
import pickle
import argparse


import numpy as np


# ===============================================================================
# Get command line arguments
# ===============================================================================


parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
parser.add_argument('--output', type=str, required=True, help='location to store results')
parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
args = parser.parse_args()
JOB_DIR = args.output
if not os.path.exists(JOB_DIR):
os.mkdir(JOB_DIR)
with open(args.config, 'r') as f:
job_data = eval(f.read())
assert 'algorithm' in job_data.keys()
assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
EXP_FILE = JOB_DIR + '/job_config.json'
with open(EXP_FILE, 'w') as f:
json.dump(job_data, f, indent=4)


# ===============================================================================
# Train Loop
# ===============================================================================


e = GymEnv(job_data['env'])
policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])


# Get demonstration data if necessary and behavior clone
if job_data['algorithm'] != 'NPG':
print("========================================")
print("Collecting expert demonstrations")
print("========================================")
demo_paths = pickle.load(open(job_data['demo_file'], 'rb'))


########################################################################################
demo_paths = demo_paths[0:3]
print (job_data['demo_file'], len(demo_paths))
for d in range(len(demo_paths)):
feats = demo_paths[d]['features']
feats = np.vstack(feats)
demo_paths[d]['observations'] = feats


########################################################################################


bc_agent = BC(demo_paths, policy=policy, epochs=job_data['bc_epochs'], batch_size=job_data['bc_batch_size'],
lr=job_data['bc_learn_rate'], loss_type='MSE', set_transforms=False)


in_shift, in_scale, out_shift, out_scale = bc_agent.compute_transformations()
bc_agent.set_transformations(in_shift, in_scale, out_shift, out_scale)
bc_agent.set_variance_with_data(out_scale)


ts = timer.time()
print("========================================")
print("Running BC with expert demonstrations")
print("========================================")
bc_agent.train()
print("========================================")
print("BC training complete !!!")
print("time taken = %f" % (timer.time() - ts))
print("========================================")


# if job_data['eval_rollouts'] >= 1:
#     score = e.evaluate_policy(policy, num_episodes=job_data['eval_rollouts'], mean_action=True)
#     print("Score with behavior cloning = %f" % score[0][0])


if job_data['algorithm'] != 'DAPG':
# We throw away the demo data when training from scratch or fine-tuning with RL without explicit augmentation
demo_paths = None


# ===============================================================================
# RL Loop
# ===============================================================================


rl_agent = DAPG(e, policy, baseline, demo_paths,
normalized_step_size=job_data['rl_step_size'],
lam_0=job_data['lam_0'], lam_1=job_data['lam_1'],
seed=job_data['seed'], save_logs=True
)


print("========================================")
print("Starting reinforcement learning phase")
print("========================================")




ts = timer.time()
train_agent(job_name=JOB_DIR,
agent=rl_agent,
seed=job_data['seed'],
niter=job_data['rl_num_iter'],
gamma=job_data['rl_gamma'],
gae_lambda=job_data['rl_gae'],
num_cpu=job_data['num_cpu'],
sample_mode='trajectories',
num_traj=job_data['rl_num_traj'],
num_samples= job_data['rl_num_samples'],
save_freq=job_data['save_freq'],
evaluation_rollouts=job_data['eval_rollouts'])
print("time taken = %f" % (timer.time()-ts))

我也遇到了同样的问题。@ofter方法是正确的,因为有一些细节需要注意。以下是我修改成功的调试代码,请参考:


if __name__ == '__main__':
import matplotlib.pyplot as plt
import numpy as np
def imgshow(img):
img = img / 2 + 0.5
np_img = img.numpy()
plt.imshow(np.transpose(np_img, (1, 2, 0)))
plt.show()


dataiter = iter(train_loader)
images, labels = dataiter.next()


imgshow(torchvision.utils.make_grid(images))
print(' '.join('%5s' % classes[labels[i]] for i in range(4)))

声明一下,我没有子程序,我只有一个主程序,但我遇到了和你一样的问题。这表明当在程序段中间导入Python库文件时,我们应该添加:

if __name__ == '__main__':

在yolo v5中使用python 3.8.5

if __name__ == '__main__':
from yolov5 import train
train.run()

我在下面这段非常简单的代码中尝试了上面提到的技巧。但我仍然无法阻止它在任何使用Python 3.8/3.10的windows机器上重置。如果你能告诉我哪里错了,我将非常感激。

print('script reset')


def do_something(inp):
print('Done!')


if __name__ == '__main__':
from multiprocessing import Process, get_start_method
print('main reset')
print(get_start_method())
Process(target=do_something, args=[1]).start()
print('Finished')

输出显示:

script reset
main reset
spawn
Finished
script reset
Done!

更新:

据我所知,你们没有阻止包含__main__.start()的脚本重置(这在Linux中不会发生),而是建议变通方法,这样我们就不会看到重置。我们必须尽量减少所有的导入,并将它们分别放在每个函数中,但相对于Linux,它仍然很慢。