import json import os import subprocess import tempfile from enum import Enum from typing import Any, Dict, List import numpy as np from loguru import logger from transformers import LlamaTokenizerFast from benchmarks.utils import kill class ExecutorInputType(Enum): CONSTANT_TOKENS = "constant_tokens" SHAREGPT_CONVERSATIONS = "sharegpt_conversations" class K6Executor: def __init__(self, name, executor_input_type=ExecutorInputType.SHAREGPT_CONVERSATIONS): self.variables = {} self.rendered_file = None self.name = name self.executor_input_type = executor_input_type if executor_input_type == ExecutorInputType.CONSTANT_TOKENS: self.input_filename = "inputs_constant_tokens.json" elif executor_input_type == ExecutorInputType.SHAREGPT_CONVERSATIONS: self.input_filename = "inputs_variable_tokens.json" def __str__(self): # returns an underscore separated string of the variables for filename generation params = "_".join([f"{k}_{v}" for k, v in sorted(self.variables.items()) if type(v) == str or type(v) == int]) return f"{self.executor_input_type.value}_{params}" class K6ConstantArrivalRateExecutor(K6Executor): def __init__(self, pre_allocated_vus: int, rate_per_second: int, duration: str, executor_input_type: ExecutorInputType): super().__init__("constant_arrival_rate", executor_input_type) self.variables = { "pre_allocated_vus": pre_allocated_vus, # it's also the max vus "rate": rate_per_second, "duration": duration } class K6ConstantVUsExecutor(K6Executor): def __init__(self, vus: int, duration: str, executor_input_type: ExecutorInputType): super().__init__("constant_vus", executor_input_type) self.variables = { "vus": vus, "duration": duration } class K6Config: def __init__(self, name: str, executor: K6Executor, tokenizer=LlamaTokenizerFast.from_pretrained("hf-internal-testing/llama-tokenizer"), conversations_input_file=None, input_num_tokens=200, max_new_tokens=200, extra_info=None ): self.executor = executor # max_new_token will be set in k6 template self.executor.variables["max_new_tokens"] = max_new_tokens self.name = name self.tokenizer = tokenizer self.extra_info = extra_info if conversations_input_file is None: self.conversation_input_file = "benchmarks/ShareGPT_V3_unfiltered_cleaned_split.json" self.input_num_tokens = input_num_tokens def __str__(self): return f"K6Config(name={self.name} executor={self.executor})" class K6Benchmark: def __init__(self, k6_config: K6Config, output_dir: str): self.process = None self.k6_config = k6_config self.output_dir = output_dir self.input_tokens_len = k6_config.input_num_tokens self._prepare_inputs() def _prepare_inputs(self): get_tokens_count = lambda txt: len(self.k6_config.tokenizer.encode(txt)) MAX_SAMPLES = 5000 # create a first input file with a constant number of tokens # check if the file already exists if not os.path.exists("inputs_constant_tokens.json"): logger.info(f'Preparing input file with {self.input_tokens_len} input tokens') outputs = [] with open(self.k6_config.conversation_input_file, "r") as f: data = json.load(f) for doc in data: for conversation in doc["conversations"]: if not conversation["from"] == "human": continue if get_tokens_count(conversation["value"]) < self.input_tokens_len: continue # encode the message encoding = self.k6_config.tokenizer(conversation["value"], truncation=True, max_length=self.input_tokens_len) # find last encoded characters span = encoding.token_to_chars(len(encoding["input_ids"]) - 1) outputs.append( {"message": conversation["value"][0:span.end], "num_tokens": len(encoding["input_ids"])}) if len(outputs) >= MAX_SAMPLES: # limit the number of inputs break with open("inputs_constant_tokens.json", "w") as f: f.write(json.dumps(outputs)) # create a second input file with a sampling of inputs # check if the file already exists if not os.path.exists("inputs_variable_tokens.json"): logger.info( f'Preparing input file by randomly sampling shareGPT conversations at "{self.k6_config.conversation_input_file}"') outputs = [] with open(self.k6_config.conversation_input_file, "r") as f: data = json.load(f) num_docs = len(data) # generate random indexes to sample the data indexes = np.random.choice(num_docs, 200, replace=False) for i in indexes: doc = data[i] for conversation in doc["conversations"]: if not conversation["from"] == "human": continue # encode the message without truncation encoding = self.k6_config.tokenizer(conversation["value"]) outputs.append( {"message": conversation["value"], "num_tokens": len(encoding["input_ids"])}) if len(outputs) >= MAX_SAMPLES: # limit the number of inputs break with open("inputs_variable_tokens.json", "w") as f: f.write(json.dumps(outputs)) def run(self): env_vars = [] for key, val in self.k6_config.executor.variables.items(): env_vars += ["-e", f"{key.upper()}={val}"] env_vars += ["-e", f"INPUT_FILENAME={self.k6_config.executor.input_filename}"] env_vars += ["-e", f"TEST_EXECUTOR={self.k6_config.executor.name}"] args = ["k6", "run", "--out", "json=results.json"] + env_vars + ["main.js"] logger.info(f"Running k6 with parameters: {args}") logger.info(f"K6Config is: {self.k6_config}") # start a k6 subprocess self.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) while buffer := os.read(self.process.stdout.fileno(), 2048): # read the output of the process, don't buffer on new lines print(buffer.decode(), end='') self.process.wait() logger.info(f"K6 process finished with return code {self.process.returncode}") logger.info(f"Writing results to {self.get_results_path()}") self.add_config_to_summary() self.add_config_to_results() def stop(self): if self.process: kill(self.process.pid) def add_config_to_summary(self): with open("summary.json", "r") as f: summary = json.load(f) summary["k6_config"] = { "name": self.k6_config.name, "input_type": self.k6_config.executor.executor_input_type.value, "extra_info": self.k6_config.extra_info, **self.k6_config.executor.variables } # create directory if it doesn't exist os.makedirs(self._get_output_dir(), exist_ok=True) with open(self.get_summary_path(), "w") as f2: json.dump(summary, f2) def add_config_to_results(self): with open("results.json", "r") as f: results = f.readlines() # append the k6 config to the results in jsonlines format results += "\n" results += json.dumps({ "name": self.k6_config.name, "input_type": self.k6_config.executor.executor_input_type.value, "extra_info": self.k6_config.extra_info, **self.k6_config.executor.variables }) # create directory if it doesn't exist os.makedirs(self._get_output_dir(), exist_ok=True) with open(self.get_results_path(), "w") as f2: f2.writelines(results) def _get_output_dir(self): # check if output_dir is relative or absolute if self.output_dir.startswith("/"): return f"{self.output_dir}/{self.k6_config.executor.name}" else: return f"{os.getcwd()}/{self.output_dir}/{self.k6_config.executor.name}" def _get_output_path(self): return f"{self._get_output_dir()}/{self.k6_config.name}_{self.k6_config.executor}" def get_results_path(self): return f"{self._get_output_path()}.json" def get_summary_path(self): return f"{self._get_output_path()}.summary.json"