Hi AB29, here are some results for tier 2 latency, this code was shared by a forum member. Runs in a colab notebook for me but you can tweak it for a local machine if you want to experiment.
import openai
import jsonschema
import time
import re
import os
import tiktoken
import httpx
from openai import OpenAI
client = OpenAI()
#openai.api_key = key
from openai import OpenAI
client = OpenAI(timeout=httpx.Timeout(15.0, read=5.0, write=10.0, connect=3.0))
class Printer:
"""
A class for formatted text output, supporting word wrapping, indentation and line breaks.
Attributes:
max_len (int): Maximum line length.
indent (int): Indentation size.
breaks (str): Characters treated as line breaks.
line_length (int): Current line length.
Methods:
print_word(word): Prints a word with the defined formatting rules.
reset(): Starts a new line without printing anything.
"""
def __init__(self, max_len=80, indent=0, breaks=[" ", "-"]):
self.max_len = max_len
self.indent = indent
self.breaks = breaks
self.line_length = -1
def reset(self):
self.line_length = 0
def document(self, text):
# Define a regular expression pattern to split text into words
word_pattern = re.compile(r"[\w']+|[.,!?;]")
# Split the text into words including ending punctuation
words = word_pattern.findall(text)
for chunk in words:
self.word(chunk)
time.sleep(0.1)
def word(self, word):
if ((len(word) + self.line_length > self.max_len
and (word and word[0] in self.breaks))
or self.line_length == -1):
print("") # new line
self.line_length = 0
word = word.lstrip()
if self.line_length == 0: # Indent new lines
print(" " * self.indent, end="")
self.line_length = self.indent
print(word, end="")
if word.endswith("\n"): # Indent after AI's line feed
print(" " * self.indent, end="")
self.line_length = self.indent
self.line_length += len(word)
class Tokenizer:
""" required: import tiktoken; import re;
usage example:
cl100 = Tokenizer()
number_of_tokens = cl100.count("my string")
"""
def __init__(self, model="cl100k_base"):
self.tokenizer = tiktoken.get_encoding(model)
self.chat_strip_match = re.compile(r'<\|.*?\|>')
self.intype = None
def ucount(self, text):
encoded_text = self.tokenizer.encode(text)
return len(encoded_text)
def count(self, text):
text = self.chat_strip_match.sub('', text)
encoded_text = self.tokenizer.encode(text)
return len(encoded_text)
class BotDate:
""" .start/.now : object creation date/time; current date/time
.set/.get : start/reset timer, elapsed time
.print : formatted date/time from epoch seconds
"""
def __init__(self, format_spec="%Y-%m-%d %H:%M%p"):
self.format_spec = format_spec
self.created_time = time.time()
self.start_time = 0
self.stats1 = []
self.stats2 = []
def stats_reset(self):
self.stats1 = []
self.stats2 = []
def start(self):
return self.format_time(self.created_time)
def now(self):
return self.format_time(time.time())
def print(self, epoch_seconds): # format input seconds
return self.format_time(epoch_seconds)
def format_time(self, epoch_seconds):
formatted_time = time.strftime(self.format_spec, time.localtime(epoch_seconds))
return formatted_time
def set(self):
self.start_time = time.perf_counter() # Record the current time when set is called
def get(self): # elapsed time value str
if self.start_time is None:
return "X.XX"
else:
elapsed_time = time.perf_counter() - self.start_time
return elapsed_time
bdate = BotDate()
tok = Tokenizer()
p = Printer()
latency = 0
user = """Write an article about kittens""".strip()
models = ['gpt-3.5-turbo-1106', 'gpt-3.5-turbo-0613']
trials = 3
stats = {model: {"total response time": [],
"latency (s)": [],
"response tokens": [],
"total rate": [],
"stream rate": [],
} for model in models}
for i in range(trials):
for model in models:
print(f"\n[{model}]")
time.sleep(.2)
bdate.set()
# call the chat API using the openai package and model parameters
try:
response = client.chat.completions.create(
messages=[
# {"role": "system", "content": "You are a helpful assistant"},
{"role": "user", "content": user}],
model=model,
top_p=0.0, stream=True, max_tokens=256)
except openai.APIConnectionError as e:
print("The server could not be reached")
print(e.__cause__) # an underlying Exception, likely raised within httpx.
except openai.RateLimitError as e:
print(f"OpenAI rate error {e.status_code}: (e.response)")
except openai.APIStatusError as e:
print(f"OpenAI error {e.status_code}: (e.response)")
# capture the words emitted by the response generator
reply = ""
for part in response:
if reply == "":
latency = bdate.get()
if not (part.choices[0].finish_reason):
word = part.choices[0].delta.content or ""
if reply == "" and word == "\n":
word = ""
reply += word
p.word(word)
total = bdate.get()
# extend model stats lists with total, latency, tokens for model
stats[model]["total response time"].append(total)
stats[model]["latency (s)"].append(latency)
tokens = tok.count(reply)
stats[model]["response tokens"].append(tokens)
stats[model]["total rate"].append(tokens/total)
stats[model]["stream rate"].append((tokens-1)/(1 if total-latency == 0 else total-latency))
print("\n\n")
for key in stats:
print(f"Report for {trials} trials of {key}:")
for sub_key in stats[key]:
values = stats[key][sub_key]
min_value = min(values)
max_value = max(values)
avg_value = sum(values) / len(values)
print(f"- {sub_key.ljust(20, '.')}"
f"Min:{str(f'{min_value:.3f}'.zfill(7))} "
f"Max:{str(f'{max_value:.3f}'.zfill(7))} "
f"Avg:{str(f'{avg_value:.3f}'.zfill(7))}")
print()