Removes the parallelism in file convertion (during download)

- Parallelism increases memory usage (each process has to convert a 10Go
  file on average, meaning 20Go per process, so could be 100Go for 5
  workers)
- Parallelism might not speed the process up (Looking into it).
  Since everything should be limited by disk speed, making concurrent
  shouldn't really help.

Updated the timings from ETA to elapsed (since ETA was imprecise
anyway).
Also using `datetime` directly to get simpler code IMO.
This commit is contained in:
Nicolas Patry 2023-05-04 10:58:20 +02:00
parent b67908e0cf
commit 43f3055331

View File

@ -1,5 +1,6 @@
import concurrent import concurrent
import time import time
import datetime
import torch import torch
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
@ -78,17 +79,9 @@ def convert_file(pt_file: Path, st_file: Path):
def convert_files(pt_files: List[Path], st_files: List[Path]): def convert_files(pt_files: List[Path], st_files: List[Path]):
assert len(pt_files) == len(st_files) assert len(pt_files) == len(st_files)
executor = ThreadPoolExecutor(max_workers=5) N = len(pt_files)
futures = [
executor.submit(convert_file, pt_file=pt_file, st_file=st_file)
for pt_file, st_file in zip(pt_files, st_files)
]
# We do this instead of using tqdm because we want to parse the logs with the launcher # We do this instead of using tqdm because we want to parse the logs with the launcher
start_time = time.time() start = datetime.datetime.now()
for i, future in enumerate(concurrent.futures.as_completed(futures)): for i, (pt_file, sf_file) in enumerate(zip(pt_files, st_files)):
elapsed = timedelta(seconds=int(time.time() - start_time)) elapsed = datetime.datetime.now() - start
remaining = len(futures) - (i + 1) logger.info(f"Convert: [{i + 1}/{N}] -- Took: {elapsed}")
eta = (elapsed / (i + 1)) * remaining if remaining > 0 else 0
logger.info(f"Convert: [{i + 1}/{len(futures)}] -- ETA: {eta}")