Files
youtube-dl/lib/support/base/sub_process.py

345 lines
12 KiB
Python
Raw Normal View History

2022-10-12 01:32:51 +09:00
import io
2022-10-07 01:48:42 +09:00
import json
2025-12-25 19:42:32 +09:00
import locale
2022-10-07 01:48:42 +09:00
import os
import platform
2022-10-12 01:32:51 +09:00
import queue
2022-10-07 01:48:42 +09:00
import subprocess
2022-10-12 01:32:51 +09:00
import threading
import time
2022-10-07 01:48:42 +09:00
import traceback
2022-10-02 20:18:05 +09:00
2022-10-07 01:48:42 +09:00
from . import logger
2022-10-02 20:18:05 +09:00
2022-10-12 01:32:51 +09:00
def demote(user_uid, user_gid):
def result():
os.setgid(user_gid)
os.setuid(user_uid)
return result
2022-10-07 01:48:42 +09:00
class SupportSubprocess(object):
2022-10-02 20:18:05 +09:00
2022-10-21 01:41:49 +09:00
@classmethod
2025-12-25 19:42:32 +09:00
def command_for_windows(cls, command: list):
2022-10-21 01:41:49 +09:00
if platform.system() == 'Windows':
tmp = []
if type(command) == type([]):
for x in command:
if x.find(' ') == -1:
tmp.append(x)
else:
tmp.append(f'"{x}"')
command = ' '.join(tmp)
return command
2022-10-02 20:18:05 +09:00
# 2021-10-25
# timeout 적용
@classmethod
2022-10-13 02:02:30 +09:00
def execute_command_return(cls, command, format=None, log=False, shell=False, env=None, timeout=None, uid=None, gid=None):
2022-10-12 01:32:51 +09:00
2022-10-02 20:18:05 +09:00
try:
2022-10-13 01:36:15 +09:00
logger.debug(f"execute_command_return : {' '.join(command)}")
2022-10-21 01:41:49 +09:00
command = cls.command_for_windows(command)
2022-10-02 20:18:05 +09:00
2022-10-07 01:48:42 +09:00
iter_arg = ''
if platform.system() == 'Windows':
2025-12-25 19:42:32 +09:00
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=shell, env=env, encoding='utf8', bufsize=0)
2022-10-02 20:18:05 +09:00
else:
2022-10-13 02:02:30 +09:00
if uid == None:
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=shell, env=env, encoding='utf8')
else:
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=shell, env=env, preexec_fn=demote(uid, gid), encoding='utf8')
2022-10-02 20:18:05 +09:00
new_ret = {'status':'finish', 'log':None}
2025-12-25 19:42:32 +09:00
def func(ret):
with process.stdout:
try:
for line in iter(process.stdout.readline, iter_arg):
ret.append(line.strip())
if log:
logger.debug(ret[-1])
except:
pass
result = []
thread = threading.Thread(target=func, args=(result,))
thread.setDaemon(True)
thread.start()
#thread.join()
2022-10-02 20:18:05 +09:00
try:
2025-12-25 19:42:32 +09:00
#process.communicate()
2022-10-02 20:18:05 +09:00
process_ret = process.wait(timeout=timeout) # wait for the subprocess to exit
except:
import psutil
process = psutil.Process(process.pid)
for proc in process.children(recursive=True):
proc.kill()
process.kill()
new_ret['status'] = "timeout"
2025-12-25 19:42:32 +09:00
#logger.error(process_ret)
thread.join()
#ret = []
#with process.stdout:
# for line in iter(process.stdout.readline, iter_arg):
# ret.append(line.strip())
# if log:
# logger.debug(ret[-1])
ret = result
#logger.error(ret)
2022-10-02 20:18:05 +09:00
if format is None:
ret2 = '\n'.join(ret)
elif format == 'json':
try:
index = 0
for idx, tmp in enumerate(ret):
#logger.debug(tmp)
if tmp.startswith('{') or tmp.startswith('['):
index = idx
break
ret2 = json.loads(''.join(ret[index:]))
except:
2025-12-25 19:42:32 +09:00
ret2 = ret
2022-10-02 20:18:05 +09:00
new_ret['log'] = ret2
return new_ret
2025-12-25 19:42:32 +09:00
except Exception as e:
logger.error(f"Exception:{str(e)}")
2022-10-02 20:18:05 +09:00
logger.error(traceback.format_exc())
2022-10-07 01:48:42 +09:00
logger.error('command : %s', command)
2025-12-25 19:42:32 +09:00
finally:
try:
if process.stdout:
process.stdout.close()
if process.stdin:
process.stdin.close()
except Exception as e:
pass
2022-10-12 01:32:51 +09:00
2022-10-19 19:50:38 +09:00
__instance_list = []
2022-10-12 01:32:51 +09:00
2025-12-25 19:42:32 +09:00
def __init__(self, command, print_log=False, shell=False, env=None, timeout=None, uid=None, gid=None, stdout_callback=None, call_id=None, callback_line=True):
2022-10-12 01:32:51 +09:00
self.command = command
self.print_log = print_log
self.shell = shell
self.env = env
self.timeout = timeout
self.uid = uid
self.gid = gid
self.stdout_callback = stdout_callback
self.process = None
self.stdout_queue = None
2022-10-19 16:40:29 +09:00
self.call_id = call_id
self.timestamp = time.time()
2025-12-25 19:42:32 +09:00
self.callback_line = callback_line
2022-10-12 01:32:51 +09:00
def start(self, join=True):
try:
2022-10-19 19:50:38 +09:00
self.thread = threading.Thread(target=self.__execute_thread_function, args=())
2022-10-12 01:32:51 +09:00
self.thread.setDaemon(True)
self.thread.start()
if join:
self.thread.join()
except Exception as e:
logger.error(f'Exception:{str(e)}')
logger.error(traceback.format_exc())
2022-10-19 19:50:38 +09:00
def __execute_thread_function(self):
2022-10-12 01:32:51 +09:00
try:
2022-10-21 01:41:49 +09:00
self.command = self.command_for_windows(self.command)
2022-10-19 16:40:29 +09:00
logger.debug(f"{self.command=}")
2022-10-12 01:32:51 +09:00
if platform.system() == 'Windows':
2025-12-25 19:42:32 +09:00
self.process = subprocess.Popen(self.command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=self.shell, env=self.env, encoding='utf8', bufsize=0)
2022-10-12 01:32:51 +09:00
else:
2022-10-13 02:02:30 +09:00
if self.uid == None:
2025-12-25 19:42:32 +09:00
self.process = subprocess.Popen(self.command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=self.shell, env=self.env, encoding='utf8', bufsize=0)
2022-10-13 02:02:30 +09:00
else:
2025-12-25 19:42:32 +09:00
self.process = subprocess.Popen(self.command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=self.shell, env=self.env, preexec_fn=demote(self.uid, self.gid), encoding='utf8', bufsize=0)
2022-10-19 19:50:38 +09:00
SupportSubprocess.__instance_list.append(self)
2025-12-25 19:42:32 +09:00
self.send_stdout_callback(self.call_id, 'START', None)
2022-10-19 19:50:38 +09:00
self.__start_communicate()
self.__start_send_callback()
2022-10-12 01:32:51 +09:00
if self.process is not None:
2022-10-21 01:41:49 +09:00
if self.timeout != None:
self.process.wait(timeout=self.timeout)
self.process_close()
else:
self.process.wait()
2025-12-25 19:42:32 +09:00
self.remove_instance(self)
2022-10-14 15:02:40 +09:00
logger.info(f"{self.command} END")
2022-10-12 01:32:51 +09:00
except Exception as e:
logger.error(f'Exception:{str(e)}')
logger.error(traceback.format_exc())
2022-10-19 19:50:38 +09:00
logger.warning(self.command)
2025-12-25 19:42:32 +09:00
self.send_stdout_callback(self.call_id, 'ERROR', str(e))
self.send_stdout_callback(self.call_id, 'ERROR', str(traceback.format_exc()))
2022-10-12 10:07:36 +09:00
finally:
if self.stdout_callback != None:
2025-12-25 19:42:32 +09:00
#self.stdout_callback(self.call_id, 'thread_end', None)
pass
2022-10-12 01:32:51 +09:00
2022-10-19 19:50:38 +09:00
def __start_communicate(self):
2022-10-12 01:32:51 +09:00
self.stdout_queue = queue.Queue()
sout = io.open(self.process.stdout.fileno(), 'rb', closefd=False)
def Pump(stream):
_queue = queue.Queue()
def rdr():
while True:
2025-12-25 19:42:32 +09:00
try:
buf = self.process.stdout.read(1)
except:
continue
#print(buf)
2022-10-12 01:32:51 +09:00
if buf:
_queue.put( buf )
else:
_queue.put( None )
break
_queue.put( None )
time.sleep(1)
def clct():
active = True
while active:
r = _queue.get()
if r is None:
break
try:
while True:
r1 = _queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except:
pass
if r is not None:
2022-10-12 10:07:36 +09:00
#print(f"{r=}")
self.stdout_queue.put(r)
2025-12-25 19:42:32 +09:00
self.stdout_queue.put('\n')
2022-10-12 10:07:36 +09:00
self.stdout_queue.put('<END>')
2025-12-25 19:42:32 +09:00
self.stdout_queue.put('\n')
2022-10-12 01:32:51 +09:00
for tgt in [rdr, clct]:
th = threading.Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout)
2022-10-19 19:50:38 +09:00
def __start_send_callback(self):
2022-10-12 01:32:51 +09:00
def func():
while self.stdout_queue:
line = self.stdout_queue.get()
2025-12-25 19:42:32 +09:00
#logger.error(line)
2022-10-12 01:32:51 +09:00
if line == '<END>':
2025-12-25 19:42:32 +09:00
self.send_stdout_callback(self.call_id, 'END', None)
2022-10-12 01:32:51 +09:00
break
else:
2025-12-25 19:42:32 +09:00
self.send_stdout_callback(self.call_id, 'LOG', line)
self.remove_instance(self)
def func_callback_line():
previous = ''
while self.stdout_queue:
receive = previous + self.stdout_queue.get()
lines = receive.split('\n')
previous = lines[-1]
for line in lines[:-1]:
line = line.strip()
# TODO
#logger.error(line)
if line == '<END>':
self.send_stdout_callback(self.call_id, 'END', None)
break
else:
self.send_stdout_callback(self.call_id, 'LOG', line)
2022-10-19 16:40:29 +09:00
self.remove_instance(self)
2025-12-25 19:42:32 +09:00
if self.callback_line:
th = threading.Thread(target=func_callback_line, args=())
else:
th = threading.Thread(target=func, args=())
2022-10-12 01:32:51 +09:00
th.setDaemon(True)
th.start()
2022-10-19 16:40:29 +09:00
2022-10-12 01:32:51 +09:00
def process_close(self):
try:
if self.process is not None and self.process.poll() is None:
#import psutil
#process = psutil.Process(instance.process.pid)
#for proc in instance.process.children(recursive=True):
# proc.kill()
self.process.kill()
except Exception as e:
logger.error(f'Exception:{str(e)}')
logger.error(traceback.format_exc())
finally:
try:
2022-10-12 10:07:36 +09:00
#self.stdout_queue = None
2022-10-12 01:32:51 +09:00
self.process.kill()
except: pass
2022-10-19 16:40:29 +09:00
self.remove_instance(self)
2022-10-12 01:32:51 +09:00
def input_command(self, cmd):
if self.process != None:
self.process.stdin.write(f'{cmd}\n')
self.process.stdin.flush()
2025-12-25 19:42:32 +09:00
def send_stdout_callback(self, call_id, mode, data):
try:
if self.stdout_callback != None:
self.stdout_callback(self.call_id, mode, data)
except Exception as e:
logger.error(f'Exception:{str(e)}')
logger.error(f"[{call_id}] [{mode}] [{data}]")
#logger.error(traceback.format_exc())
2022-10-12 01:32:51 +09:00
@classmethod
def all_process_close(cls):
2022-10-19 19:50:38 +09:00
for instance in cls.__instance_list:
2022-10-12 01:32:51 +09:00
instance.process_close()
2022-10-19 19:50:38 +09:00
cls.__instance_list = []
2022-10-12 01:32:51 +09:00
2022-10-19 16:40:29 +09:00
@classmethod
def remove_instance(cls, remove_instance):
new = []
2022-10-19 19:50:38 +09:00
for instance in cls.__instance_list:
2022-10-19 16:40:29 +09:00
if remove_instance.timestamp == instance.timestamp:
continue
new.append(instance)
2022-10-19 19:50:38 +09:00
cls.__instance_list = new
2022-10-19 16:40:29 +09:00
@classmethod
def print(cls):
2022-10-19 19:50:38 +09:00
for instance in cls.__instance_list:
2022-10-19 16:40:29 +09:00
logger.info(instance.command)
@classmethod
def get_instance_by_call_id(cls, call_id):
2022-10-19 19:50:38 +09:00
for instance in cls.__instance_list:
2022-10-19 16:40:29 +09:00
if instance.call_id == call_id:
return instance
2025-12-25 19:42:32 +09:00
@classmethod
def get_list(cls):
return cls.__instance_list