Skip to content

Port numbers management is improved (#164) #165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 15, 2024
99 changes: 82 additions & 17 deletions testgres/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@

from .standby import First

from . import utils

from .utils import \
PgVer, \
eprint, \
get_bin_path, \
get_pg_version, \
reserve_port, \
release_port, \
execute_utility, \
options_string, \
clean_on_error
Expand Down Expand Up @@ -128,6 +128,9 @@ def __repr__(self):


class PostgresNode(object):
# a max number of node start attempts
_C_MAX_START_ATEMPTS = 5

def __init__(self, name=None, base_dir=None, port=None, conn_params: ConnectionParams = ConnectionParams(), bin_dir=None, prefix=None):
"""
PostgresNode constructor.
Expand Down Expand Up @@ -158,7 +161,7 @@ def __init__(self, name=None, base_dir=None, port=None, conn_params: ConnectionP
self.os_ops = LocalOperations(conn_params)

self.host = self.os_ops.host
self.port = port or reserve_port()
self.port = port or utils.reserve_port()

self.ssh_key = self.os_ops.ssh_key

Expand Down Expand Up @@ -471,6 +474,28 @@ def _collect_special_files(self):

return result

def _collect_log_files(self):
# dictionary of log files + size in bytes

files = [
self.pg_log_file
] # yapf: disable

result = {}

for f in files:
# skip missing files
if not self.os_ops.path_exists(f):
continue

file_size = self.os_ops.get_file_size(f)
assert type(file_size) == int # noqa: E721
assert file_size >= 0

result[f] = file_size

return result

def init(self, initdb_params=None, cached=True, **kwargs):
"""
Perform initdb for this node.
Expand Down Expand Up @@ -722,6 +747,22 @@ def slow_start(self, replica=False, dbname='template1', username=None, max_attem
OperationalError},
max_attempts=max_attempts)

def _detect_port_conflict(self, log_files0, log_files1):
assert type(log_files0) == dict # noqa: E721
assert type(log_files1) == dict # noqa: E721

for file in log_files1.keys():
read_pos = 0

if file in log_files0.keys():
read_pos = log_files0[file] # the previous size

file_content = self.os_ops.read_binary(file, read_pos)
file_content_s = file_content.decode()
if 'Is another postmaster already running on port' in file_content_s:
return True
return False

def start(self, params=[], wait=True):
"""
Starts the PostgreSQL node using pg_ctl if node has not been started.
Expand All @@ -736,6 +777,9 @@ def start(self, params=[], wait=True):
Returns:
This instance of :class:`.PostgresNode`.
"""

assert __class__._C_MAX_START_ATEMPTS > 1

if self.is_started:
return self

Expand All @@ -745,27 +789,46 @@ def start(self, params=[], wait=True):
"-w" if wait else '-W', # --wait or --no-wait
"start"] + params # yapf: disable

startup_retries = 5
log_files0 = self._collect_log_files()
assert type(log_files0) == dict # noqa: E721

nAttempt = 0
timeout = 1
while True:
assert nAttempt >= 0
assert nAttempt < __class__._C_MAX_START_ATEMPTS
nAttempt += 1
try:
exit_status, out, error = execute_utility(_params, self.utils_log_file, verbose=True)
if error and 'does not exist' in error:
raise Exception
except Exception as e:
files = self._collect_special_files()
if any(len(file) > 1 and 'Is another postmaster already '
'running on port' in file[1].decode() for
file in files):
logging.warning("Detected an issue with connecting to port {0}. "
"Trying another port after a 5-second sleep...".format(self.port))
self.port = reserve_port()
options = {'port': str(self.port)}
self.set_auto_conf(options)
startup_retries -= 1
time.sleep(5)
continue
assert nAttempt > 0
assert nAttempt <= __class__._C_MAX_START_ATEMPTS
if self._should_free_port and nAttempt < __class__._C_MAX_START_ATEMPTS:
log_files1 = self._collect_log_files()
if self._detect_port_conflict(log_files0, log_files1):
log_files0 = log_files1
logging.warning(
"Detected an issue with connecting to port {0}. "
"Trying another port after a {1}-second sleep...".format(self.port, timeout)
)
time.sleep(timeout)
timeout = min(2 * timeout, 5)
cur_port = self.port
new_port = utils.reserve_port() # can raise
try:
options = {'port': str(new_port)}
self.set_auto_conf(options)
except: # noqa: E722
utils.release_port(new_port)
raise
self.port = new_port
utils.release_port(cur_port)
continue

msg = 'Cannot start node'
files = self._collect_special_files()
raise_from(StartNodeException(msg, files), e)
break
self._maybe_start_logger()
Expand Down Expand Up @@ -930,8 +993,10 @@ def free_port(self):
"""

if self._should_free_port:
port = self.port
self._should_free_port = False
release_port(self.port)
self.port = None
utils.release_port(port)

def cleanup(self, max_attempts=3, full=False):
"""
Expand Down
16 changes: 16 additions & 0 deletions testgres/operations/local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,28 @@ def readlines(self, filename, num_lines=0, binary=False, encoding=None):
buffers * max(2, int(num_lines / max(cur_lines, 1)))
) # Adjust buffer size

def read_binary(self, filename, start_pos):
assert type(filename) == str # noqa: E721
assert type(start_pos) == int # noqa: E721
assert start_pos >= 0

with open(filename, 'rb') as file: # open in a binary mode
file.seek(start_pos, os.SEEK_SET)
r = file.read()
assert type(r) == bytes # noqa: E721
return r

def isfile(self, remote_file):
return os.path.isfile(remote_file)

def isdir(self, dirname):
return os.path.isdir(dirname)

def get_file_size(self, filename):
assert filename is not None
assert type(filename) == str # noqa: E721
return os.path.getsize(filename)

def remove_file(self, filename):
return os.remove(filename)

Expand Down
9 changes: 9 additions & 0 deletions testgres/operations/os_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,18 @@ def read(self, filename, encoding, binary):
def readlines(self, filename):
raise NotImplementedError()

def read_binary(self, filename, start_pos):
assert type(filename) == str # noqa: E721
assert type(start_pos) == int # noqa: E721
assert start_pos >= 0
raise NotImplementedError()

def isfile(self, remote_file):
raise NotImplementedError()

def get_file_size(self, filename):
raise NotImplementedError()

# Processes control
def kill(self, pid, signal):
# Kill the process
Expand Down
83 changes: 83 additions & 0 deletions testgres/operations/remote_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,16 @@ def readlines(self, filename, num_lines=0, binary=False, encoding=None):

return lines

def read_binary(self, filename, start_pos):
assert type(filename) == str # noqa: E721
assert type(start_pos) == int # noqa: E721
assert start_pos >= 0

cmd = "tail -c +{} {}".format(start_pos + 1, __class__._escape_path(filename))
r = self.exec_command(cmd)
assert type(r) == bytes # noqa: E721
return r

def isfile(self, remote_file):
stdout = self.exec_command("test -f {}; echo $?".format(remote_file))
result = int(stdout.strip())
Expand All @@ -350,6 +360,70 @@ def isdir(self, dirname):
response = self.exec_command(cmd)
return response.strip() == b"True"

def get_file_size(self, filename):
C_ERR_SRC = "RemoteOpertions::get_file_size"

assert filename is not None
assert type(filename) == str # noqa: E721
cmd = "du -b " + __class__._escape_path(filename)

s = self.exec_command(cmd, encoding=get_default_encoding())
assert type(s) == str # noqa: E721

if len(s) == 0:
raise Exception(
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned an empty string. Check point [{0}][{1}].".format(
C_ERR_SRC,
"#001",
filename
)
)

i = 0

while i < len(s) and s[i].isdigit():
assert s[i] >= '0'
assert s[i] <= '9'
i += 1

if i == 0:
raise Exception(
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned a bad formatted string. Check point [{0}][{1}].".format(
C_ERR_SRC,
"#002",
filename
)
)

if i == len(s):
raise Exception(
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned a bad formatted string. Check point [{0}][{1}].".format(
C_ERR_SRC,
"#003",
filename
)
)

if not s[i].isspace():
raise Exception(
"[BUG CHECK] Can't get size of file [{2}]. Remote operation returned a bad formatted string. Check point [{0}][{1}].".format(
C_ERR_SRC,
"#004",
filename
)
)

r = 0

for i2 in range(0, i):
ch = s[i2]
assert ch >= '0'
assert ch <= '9'
# Here is needed to check overflow or that it is a human-valid result?
r = (r * 10) + ord(ch) - ord('0')

return r

def remove_file(self, filename):
cmd = "rm {}".format(filename)
return self.exec_command(cmd)
Expand Down Expand Up @@ -386,6 +460,15 @@ def db_connect(self, dbname, user, password=None, host="localhost", port=5432):
)
return conn

def _escape_path(path):
assert type(path) == str # noqa: E721
assert path != "" # Ok?

r = "'"
r += path
r += "'"
return r


def normalize_error(error):
if isinstance(error, bytes):
Expand Down
8 changes: 6 additions & 2 deletions testgres/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, version: str) -> None:
super().__init__(version)


def reserve_port():
def internal__reserve_port():
"""
Generate a new port and add it to 'bound_ports'.
"""
Expand All @@ -45,14 +45,18 @@ def reserve_port():
return port


def release_port(port):
def internal__release_port(port):
"""
Free port provided by reserve_port().
"""

bound_ports.discard(port)


reserve_port = internal__reserve_port
release_port = internal__release_port


def execute_utility(args, logfile=None, verbose=False):
"""
Execute utility (pg_ctl, pg_dump etc).
Expand Down
Loading