Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6a6355a
fix: pass snapshot UUID instead of object to snapshot_controller.dele…
geoffrey1330 May 6, 2026
c9e40ac
added snaps to debug logs
geoffrey1330 May 6, 2026
527a36c
started spdk container with sudo
geoffrey1330 May 7, 2026
7a94eb2
fix: add error handling for fetching storage node and cluster in lvol…
Hamdy-khader May 7, 2026
4b51172
Enable mTLS
mxsrc Apr 29, 2026
3e90082
Fix flask debug flag
mxsrc May 7, 2026
ff00dad
Merge branch 'main' into modify_snapshot_replication
geoffrey1330 May 7, 2026
1506c4c
added more debug logs
geoffrey1330 May 7, 2026
342e7c9
fix: compare tls_client_auth against ssl.CERT_NONE instead of string …
geoffrey1330 May 8, 2026
64752ca
Merge branch 'main' into modify_snapshot_replication
geoffrey1330 May 8, 2026
8a668c4
Merge branch 'main' into modify_snapshot_replication
geoffrey1330 May 8, 2026
13e385d
fix: use snapshot's owning node for bdev_lvol_clone in replicate_lvol…
geoffrey1330 May 8, 2026
b6e2b79
fix snapshot replication crash loop: handle NVMe attach failures with…
geoffrey1330 May 8, 2026
c703081
fix: return HTTP 400 on replication endpoint failures instead of sile…
geoffrey1330 May 8, 2026
1a43072
fix: match replicated lvol on target cluster by NQN lvol UUID suffix …
geoffrey1330 May 11, 2026
6ade906
Revert "fix: match replicated lvol on target cluster by NQN lvol UUID…
geoffrey1330 May 11, 2026
1993509
fix: match replicated lvol on target cluster by NQN lvol UUID suffix …
geoffrey1330 May 11, 2026
dc09031
feat: add model field to lvol connect response for device symlink res…
geoffrey1330 May 11, 2026
99eeac1
added more logs for sorted snapshot
geoffrey1330 May 11, 2026
00b6981
fix: replicate_lvol_on_source_cluster to search target cluster tasks …
geoffrey1330 May 11, 2026
a8d094e
fix: mark snapshot replication task as done when snapshot is not foun…
geoffrey1330 May 11, 2026
b0cd51c
stop forward replication on source lvol after failover to prevent nor…
geoffrey1330 May 11, 2026
4f5a4c6
start forward replication on source lvol after failback
geoffrey1330 May 11, 2026
23da54f
removed setting replication to false on lvol
geoffrey1330 May 12, 2026
0885ba0
updated endpoint replication_trigger
geoffrey1330 May 12, 2026
14b148b
add more logs to list_replication_tasks
geoffrey1330 May 12, 2026
bfd05cf
updated snapsh id reference name
geoffrey1330 May 12, 2026
5ed24b0
set replicate_as_snap_instance to false
geoffrey1330 May 12, 2026
04be1b9
reverted: updated snapsh id reference name
geoffrey1330 May 12, 2026
02530fc
fix replicate_lvol_on_source/target_cluster snapshot lookup
geoffrey1330 May 12, 2026
9f15841
use function_result to get snapshot id
geoffrey1330 May 13, 2026
16ebd26
set do_replicate to true on replicate_lvol_on_source_cluster
geoffrey1330 May 13, 2026
bc7c0b6
fix replicate_lvol_on_source_cluster to match failback tasks using so…
geoffrey1330 May 13, 2026
9a57c73
fix: inherit source_lvol_id from snapshot instead of embedded lvol in…
geoffrey1330 May 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 64 additions & 43 deletions simplyblock_core/controllers/lvol_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1665,7 +1665,7 @@ def connect_lvol(uuid, ctrl_loss_tmo=constants.LVOL_NVME_CONNECT_CTRL_LOSS_TMO,
if cluster.status == Cluster.STATUS_SUSPENDED and cluster.snapshot_replication_target_cluster:
logger.error("Cluster is suspended, looking for replicated lvol")
for lv in db_controller.get_lvols(cluster.snapshot_replication_target_cluster):
if lv.nqn == lvol.nqn:
if lv.nqn.split(":lvol:")[-1] == lvol.nqn.split(":lvol:")[-1]:
logger.info(f"LVol with same nqn already exists on target cluster: {lv.get_id()}")
lvol = lv
break
Expand Down Expand Up @@ -1726,6 +1726,7 @@ def connect_lvol(uuid, ctrl_loss_tmo=constants.LVOL_NVME_CONNECT_CTRL_LOSS_TMO,

entry = {
"ns_id": lvol.ns_id,
"model": lvol.uuid,
"transport": transport,
"ip": ip,
"port": port,
Expand Down Expand Up @@ -2392,19 +2393,17 @@ def replicate_lvol_on_target_cluster(lvol_id):
if task.function_name == JobSchedule.FN_SNAPSHOT_REPLICATION:
logger.debug(task)
try:
snap = db_controller.get_snapshot_by_id(task.function_params["snapshot_id"])
snap = db_controller.get_snapshot_by_id(task.function_result)
except KeyError:
continue

if snap.lvol.get_id() != lvol_id:
if snap.source_lvol_id != lvol_id:
continue
snaps.append(snap)

if snaps:
snaps = sorted(snaps, key=lambda x: x.created_at)
last_snapshot = snaps[-1]
rep_snap = db_controller.get_snapshot_by_id(last_snapshot.target_replicated_snap_uuid)
snapshot = rep_snap
snapshot = snaps[-1]

if not snapshot:
logger.error(f"Snapshot for replication not found for lvol: {lvol_id}")
Expand Down Expand Up @@ -2477,6 +2476,7 @@ def replicate_lvol_on_target_cluster(lvol_id):
lvol = db_controller.get_lvol_by_id(lvol_id)
lvol.from_source = False
lvol.write_to_db()
replication_stop(lvol_id)
lvol_events.lvol_replicated(lvol, new_lvol)

return new_lvol.lvol_uuid
Expand All @@ -2486,17 +2486,25 @@ def list_replication_tasks(lvol_id):
db_controller = DBController()
lvol = db_controller.get_lvol_by_id(lvol_id)
node = db_controller.get_storage_node_by_id(lvol.node_id)
logger.info("list_replication_tasks: lvol=%s node=%s cluster=%s do_replicate=%s",
lvol_id, node.get_id(), node.cluster_id, lvol.do_replicate)
tasks = []
for task in db_controller.get_job_tasks(node.cluster_id):
all_tasks = db_controller.get_job_tasks(node.cluster_id)
logger.info("list_replication_tasks: total cluster tasks=%d", len(all_tasks))
for task in all_tasks:
if task.function_name == JobSchedule.FN_SNAPSHOT_REPLICATION:
try:
snap = db_controller.get_snapshot_by_id(task.function_params["snapshot_id"])
except KeyError:
logger.warning("list_replication_tasks: snapshot not found for task=%s", task.uuid)
continue
if snap.lvol.get_id() != lvol_id:
if snap.source_lvol_id != lvol_id:
continue
logger.info("list_replication_tasks: matched task=%s snap=%s status=%s",
task.uuid, snap.get_id(), task.status)
tasks.append(task)

logger.info("list_replication_tasks: returning %d tasks for lvol=%s", len(tasks), lvol_id)
return tasks


Expand Down Expand Up @@ -2601,59 +2609,68 @@ def replicate_lvol_on_source_cluster(lvol_id, cluster_id=None, pool_uuid=None):
return False


target_cluster_id = None
if lvol.replication_node_id:
try:
target_node = db_controller.get_storage_node_by_id(lvol.replication_node_id)
target_cluster_id = target_node.cluster_id
except KeyError:
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
pass
if not target_cluster_id:
_src_cl = db_controller.get_cluster_by_id(source_node.cluster_id)
target_cluster_id = _src_cl.snapshot_replication_target_cluster

if not target_cluster_id:
logger.error(f"Target cluster not found for lvol: {lvol_id}")
return False

target_lvol_id = None
lvol_id_in_nqn = lvol.nqn.split(":")[-1]
for lv in db_controller.get_lvols(target_cluster_id):
if lv.nqn.split(":")[-1] == lvol_id_in_nqn:
logger.info(f"LVol with same lvol nqn already exists on target cluster: {lv.get_id()}")
target_lvol_id = lv.get_id()
break

if not target_lvol_id:
logger.error(f"LVol with same nqn does not exist on target cluster: {target_cluster_id}")
return False

snaps = []
snapshot = None
for task in db_controller.get_job_tasks(source_node.cluster_id):
for task in db_controller.get_job_tasks(target_cluster_id):
if task.function_name == JobSchedule.FN_SNAPSHOT_REPLICATION:
logger.debug(task)
if task.status != JobSchedule.STATUS_DONE:
continue
try:
snap = db_controller.get_snapshot_by_id(task.function_params["snapshot_id"])
snap = db_controller.get_snapshot_by_id(task.function_result)
except KeyError:
continue

if snap.lvol.get_id() != lvol_id:
if snap.source_lvol_id != target_lvol_id:
continue
snaps.append(snap)

if snaps:
snaps = sorted(snaps, key=lambda x: x.created_at)
snapshot = snaps[-1]

if not snapshot:
target_node = db_controller.get_storage_node_by_id(lvol.replication_node_id)
logger.info(f"Looking for snapshot in target cluster: {target_node.cluster_id}")
target_lvol_id = None
lvol_id_in_nqn = lvol.nqn.split(":")[-1]
for lv in db_controller.get_lvols(target_node.cluster_id):
if lv.nqn.split(":")[-1] == lvol_id_in_nqn:
logger.info(f"LVol with same lvol nqn already exists on target cluster: {lv.get_id()}")
target_lvol_id = lv.get_id()

if not target_lvol_id:
logger.error(f"LVol with same nqn does not exist on target cluster: {target_node.cluster_id}")
return False

for task in db_controller.get_job_tasks(target_node.cluster_id):
if task.function_name == JobSchedule.FN_SNAPSHOT_REPLICATION:
logger.debug(task)
try:
snap = db_controller.get_snapshot_by_id(task.function_params["snapshot_id"])
except KeyError:
continue

if snap.lvol.get_id() != target_lvol_id:
continue
snaps.append(snap)

if snaps:
snaps = sorted(snaps, key=lambda x: x.created_at)
snapshot = snaps[-1]
snapshot = db_controller.get_snapshot_by_id(snapshot.target_replicated_snap_uuid)

if not snapshot:
logger.error(f"Snapshot for replication not found for lvol: {lvol_id}")
return False

# bdev_lvol_clone must run on the same SPDK that owns the snapshot's LVS
try:
snap_node = db_controller.get_storage_node_by_id(snapshot.lvol.node_id)
if snap_node.status == StorageNode.STATUS_ONLINE:
source_node = snap_node
else:
logger.error(f"Snapshot node {snapshot.lvol.node_id} is not online")
return False
except KeyError:
logger.warning(f"Could not find snapshot node {snapshot.lvol.node_id}, using current source_node")

# create lvol on target node
new_lvol = copy.deepcopy(lvol)
new_lvol.cloned_from_snap = snapshot.get_id()
Expand Down Expand Up @@ -2699,6 +2716,9 @@ def replicate_lvol_on_source_cluster(lvol_id, cluster_id=None, pool_uuid=None):

logger.debug(f"new lvol from_source: {new_lvol.from_source}")

logger.debug(f"new_lvol: {new_lvol}")
logger.debug(f"source_node: {source_node}")

lvol_bdev, error = add_lvol_on_node(new_lvol, source_node)
if error:
logger.error(error)
Expand All @@ -2722,8 +2742,9 @@ def replicate_lvol_on_source_cluster(lvol_id, cluster_id=None, pool_uuid=None):

new_lvol.status = LVol.STATUS_ONLINE
new_lvol.from_source = True
new_lvol.do_replicate = True
new_lvol.write_to_db(db_controller.kv_store)
lvol_events.lvol_replicated(lvol, new_lvol)
lvol_events.lvol_replicated(new_lvol, new_lvol)
logger.debug(f"new lvol from_source: {new_lvol.from_source}")

return new_lvol.lvol_uuid
Expand Down
1 change: 1 addition & 0 deletions simplyblock_core/controllers/snapshot_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ def add(lvol_id, snapshot_name, backup=False, lock=True):
snap.snap_bdev = f"{lvol.lvs_name}/{snap_bdev_name}"
snap.created_at = int(time.time())
snap.lvol = lvol
snap.source_lvol_id = lvol.get_id()
snap.fabric = lvol.fabric
snap.vuid = snap_vuid
snap.status = SnapShot.STATUS_ONLINE
Expand Down
2 changes: 1 addition & 1 deletion simplyblock_core/controllers/tasks_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ def _check_snap_instance_on_node(snapshot_id: str , node_id: str):

_add_task(JobSchedule.FN_SNAPSHOT_REPLICATION, snapshot.cluster_id, node_id, "",
function_params={"snapshot_id": snapshot.get_id(), "replicate_to_source": False,
"replicate_as_snap_instance": True},
"replicate_as_snap_instance": False},
send_to_cluster_log=False)


Expand Down
3 changes: 2 additions & 1 deletion simplyblock_core/models/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ class SnapShot(BaseModel):
source_replicated_snap_uuid: str = ""
next_snap_uuid: str = ""
prev_snap_uuid: str = ""
instances: list = []
instances: list = []
source_lvol_id: str = ""
42 changes: 36 additions & 6 deletions simplyblock_core/services/snapshot_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,20 @@ def process_snap_replicate_start(task, snapshot):
if not ret:
msg = "controller attach failed"
logger.error(msg)
raise RuntimeError(msg)
task.function_result = msg
task.status = JobSchedule.STATUS_SUSPENDED
task.retry += 1
task.write_to_db()
return
bdev_name = ret[0]
if not bdev_name:
msg = "Bdev name not returned from controller attach"
logger.error(msg)
raise RuntimeError(msg)
task.function_result = msg
task.status = JobSchedule.STATUS_SUSPENDED
task.retry += 1
task.write_to_db()
return
bdev_found = False
for i in range(5):
ret = snode.rpc_client().get_bdevs(bdev_name)
Expand All @@ -96,8 +104,13 @@ def process_snap_replicate_start(task, snapshot):
time.sleep(1)

if not bdev_found:
msg = f"Failed to connect to lvol: {remote_lv.get_id()}"
logger.error("lvol Bdev not found after 5 attempts")
raise RuntimeError(f"Failed to connect to lvol: {remote_lv.get_id()}")
task.function_result = msg
task.status = JobSchedule.STATUS_SUSPENDED
task.retry += 1
task.write_to_db()
return

offset = 0
if "offset" in task.function_params and task.function_params["offset"]:
Expand Down Expand Up @@ -125,6 +138,8 @@ def delete_last_snapshot_if_needed(this_task, lvol):
if task.function_name == JobSchedule.FN_SNAPSHOT_REPLICATION:
if task.get_id() == this_task.get_id():
continue
if task.status != JobSchedule.STATUS_DONE:
continue
logger.debug(task)
try:
snap = db.get_snapshot_by_id(task.function_params["snapshot_id"])
Expand All @@ -138,7 +153,7 @@ def delete_last_snapshot_if_needed(this_task, lvol):
snaps = sorted(snaps, key=lambda x: x.created_at)
snapshot = snaps[-1]
logger.info("Deleting snapshot: %s", snapshot.get_id())
ret = snapshot_controller.delete(snapshot)
ret = snapshot_controller.delete(snapshot.get_id())
logger.debug(ret)


Expand Down Expand Up @@ -234,6 +249,7 @@ def process_snap_replicate_finish(task, snapshot):
except Exception as e:
logger.error(e)

new_snapshot.source_lvol_id = snapshot.source_lvol_id or snapshot.lvol.get_id()
new_snapshot.write_to_db()

if snapshot.status == SnapShot.STATUS_IN_REPLICATION:
Expand All @@ -252,7 +268,13 @@ def process_snap_replicate_finish(task, snapshot):


def task_runner(task: JobSchedule):
snapshot = db.get_snapshot_by_id(task.function_params["snapshot_id"])
try:
snapshot = db.get_snapshot_by_id(task.function_params["snapshot_id"])
except KeyError:
task.function_result = "snapshot not found"
task.status = JobSchedule.STATUS_DONE
task.write_to_db(db.kv_store)
return True
if not snapshot:
task.function_result = "snapshot not found"
task.status = JobSchedule.STATUS_DONE
Expand Down Expand Up @@ -360,7 +382,15 @@ def task_runner(task: JobSchedule):
if task.status != JobSchedule.STATUS_DONE:
# get new task object because it could be changed from cancel task
task = db.get_task_by_id(task.uuid)
res = task_runner(task)
try:
res = task_runner(task)
except Exception as e:
logger.exception("task_runner raised for task %s: %s", task.uuid, e)
task.function_result = str(e)
task.status = JobSchedule.STATUS_SUSPENDED
task.retry += 1
task.write_to_db(db.kv_store)
res = False
if not res:
time.sleep(3)

Expand Down
23 changes: 13 additions & 10 deletions simplyblock_web/api/v2/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,32 +220,29 @@ def inflate(cluster: Cluster, pool: StoragePool, volume: Volume) -> Response:

return Response(status_code=204)

@instance_api.post('/replication_trigger', name='clusters:storage-pools:volumes:replication_start', status_code=204, responses={204: {"content": None}})
@instance_api.post('/replication_trigger', name='clusters:storage-pools:volumes:replication_trigger', status_code=204, responses={204: {"content": None}})
def replication_trigger(cluster: Cluster, pool: StoragePool, volume: Volume) -> Response:
if not lvol_controller.replication_trigger(volume.get_id()):
raise ValueError('Failed to start volume snapshot replication')

raise HTTPException(400, 'Failed to trigger volume snapshot replication')
return Response(status_code=204)

@instance_api.post('/replication_start', name='clusters:storage-pools:volumes:replication_start', status_code=204, responses={204: {"content": None}})
def replication_start(cluster: Cluster, pool: StoragePool, volume: Volume) -> Response:
if not lvol_controller.replication_start(volume.get_id(), cluster.get_id()):
raise ValueError('Failed to start volume snapshot replication')

raise HTTPException(400, 'Failed to start volume snapshot replication')
return Response(status_code=204)

@instance_api.post('/replication_stop', name='clusters:storage-pools:volumes:replication_stop', status_code=204, responses={204: {"content": None}})
def replication_stop(cluster: Cluster, pool: StoragePool, volume: Volume) -> Response:
if not lvol_controller.replication_stop(volume.get_id()):
raise ValueError('Failed to stop volume snapshot replication')

raise HTTPException(400, 'Failed to stop volume snapshot replication')
return Response(status_code=204)

@instance_api.get('/connect', name='clusters:storage-pools:volumes:connect')
def connect(cluster: Cluster, pool: StoragePool, volume: Volume):
details, err = lvol_controller.connect_lvol(volume.get_id())
if err:
raise ValueError(err)
raise HTTPException(400, err)
return details


Expand Down Expand Up @@ -306,7 +303,10 @@ def create_snapshot(

@instance_api.post('/replicate_lvol', name='clusters:storage-pools:volumes:replicate_lvol')
def replicate_lvol_on_target_cluster(cluster: Cluster, pool: StoragePool, volume: Volume):
return lvol_controller.replicate_lvol_on_target_cluster(volume.get_id())
result = lvol_controller.replicate_lvol_on_target_cluster(volume.get_id())
if not result:
raise HTTPException(400, 'Failed to replicate lvol on target cluster — no replicated snapshot found')
return result


class ReplicateLVolParams(BaseModel):
Expand All @@ -315,7 +315,10 @@ class ReplicateLVolParams(BaseModel):

@api.post('/replicate_lvol_on_source_cluster', name='clusters:storage-pools:replicate_lvol_on_source_cluster')
def replicate_lvol_on_source_cluster(cluster: Cluster, pool: StoragePool, body: ReplicateLVolParams):
return lvol_controller.replicate_lvol_on_source_cluster(body.lvol_id, cluster.get_id(), pool.get_id())
result = lvol_controller.replicate_lvol_on_source_cluster(body.lvol_id, cluster.get_id(), pool.get_id())
if not result:
raise HTTPException(400, 'Failed to replicate lvol on source cluster')
return result


@instance_api.get('/list_replication_tasks', name='clusters:storage-pools:volumes:list_replication_tasks')
Expand Down
2 changes: 1 addition & 1 deletion simplyblock_web/templates/storage_deploy_spdk.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ spec:
lifecycle:
postStart:
exec:
command: ["/bin/sh", "-c", "sudo modprobe nbd || echo failed to modprobe nbd"]
command: ["/bin/sh", "-c", "sudo -E modprobe nbd || echo failed to modprobe nbd"]
securityContext:
privileged: true
volumeMounts:
Expand Down
Loading