diff --git a/simplyblock_core/controllers/lvol_controller.py b/simplyblock_core/controllers/lvol_controller.py index b653ea93a..6b8058814 100644 --- a/simplyblock_core/controllers/lvol_controller.py +++ b/simplyblock_core/controllers/lvol_controller.py @@ -1665,7 +1665,7 @@ def connect_lvol(uuid, ctrl_loss_tmo=constants.LVOL_NVME_CONNECT_CTRL_LOSS_TMO, if cluster.status == Cluster.STATUS_SUSPENDED and cluster.snapshot_replication_target_cluster: logger.error("Cluster is suspended, looking for replicated lvol") for lv in db_controller.get_lvols(cluster.snapshot_replication_target_cluster): - if lv.nqn == lvol.nqn: + if lv.nqn.split(":lvol:")[-1] == lvol.nqn.split(":lvol:")[-1]: logger.info(f"LVol with same nqn already exists on target cluster: {lv.get_id()}") lvol = lv break @@ -1726,6 +1726,7 @@ def connect_lvol(uuid, ctrl_loss_tmo=constants.LVOL_NVME_CONNECT_CTRL_LOSS_TMO, entry = { "ns_id": lvol.ns_id, + "model": lvol.uuid, "transport": transport, "ip": ip, "port": port, @@ -2392,19 +2393,17 @@ def replicate_lvol_on_target_cluster(lvol_id): if task.function_name == JobSchedule.FN_SNAPSHOT_REPLICATION: logger.debug(task) try: - snap = db_controller.get_snapshot_by_id(task.function_params["snapshot_id"]) + snap = db_controller.get_snapshot_by_id(task.function_result) except KeyError: continue - if snap.lvol.get_id() != lvol_id: + if snap.source_lvol_id != lvol_id: continue snaps.append(snap) if snaps: snaps = sorted(snaps, key=lambda x: x.created_at) - last_snapshot = snaps[-1] - rep_snap = db_controller.get_snapshot_by_id(last_snapshot.target_replicated_snap_uuid) - snapshot = rep_snap + snapshot = snaps[-1] if not snapshot: logger.error(f"Snapshot for replication not found for lvol: {lvol_id}") @@ -2477,6 +2476,7 @@ def replicate_lvol_on_target_cluster(lvol_id): lvol = db_controller.get_lvol_by_id(lvol_id) lvol.from_source = False lvol.write_to_db() + replication_stop(lvol_id) lvol_events.lvol_replicated(lvol, new_lvol) return new_lvol.lvol_uuid @@ -2486,17 +2486,25 @@ def list_replication_tasks(lvol_id): db_controller = DBController() lvol = db_controller.get_lvol_by_id(lvol_id) node = db_controller.get_storage_node_by_id(lvol.node_id) + logger.info("list_replication_tasks: lvol=%s node=%s cluster=%s do_replicate=%s", + lvol_id, node.get_id(), node.cluster_id, lvol.do_replicate) tasks = [] - for task in db_controller.get_job_tasks(node.cluster_id): + all_tasks = db_controller.get_job_tasks(node.cluster_id) + logger.info("list_replication_tasks: total cluster tasks=%d", len(all_tasks)) + for task in all_tasks: if task.function_name == JobSchedule.FN_SNAPSHOT_REPLICATION: try: snap = db_controller.get_snapshot_by_id(task.function_params["snapshot_id"]) except KeyError: + logger.warning("list_replication_tasks: snapshot not found for task=%s", task.uuid) continue - if snap.lvol.get_id() != lvol_id: + if snap.source_lvol_id != lvol_id: continue + logger.info("list_replication_tasks: matched task=%s snap=%s status=%s", + task.uuid, snap.get_id(), task.status) tasks.append(task) + logger.info("list_replication_tasks: returning %d tasks for lvol=%s", len(tasks), lvol_id) return tasks @@ -2601,17 +2609,46 @@ def replicate_lvol_on_source_cluster(lvol_id, cluster_id=None, pool_uuid=None): return False + target_cluster_id = None + if lvol.replication_node_id: + try: + target_node = db_controller.get_storage_node_by_id(lvol.replication_node_id) + target_cluster_id = target_node.cluster_id + except KeyError: + pass + if not target_cluster_id: + _src_cl = db_controller.get_cluster_by_id(source_node.cluster_id) + target_cluster_id = _src_cl.snapshot_replication_target_cluster + + if not target_cluster_id: + logger.error(f"Target cluster not found for lvol: {lvol_id}") + return False + + target_lvol_id = None + lvol_id_in_nqn = lvol.nqn.split(":")[-1] + for lv in db_controller.get_lvols(target_cluster_id): + if lv.nqn.split(":")[-1] == lvol_id_in_nqn: + logger.info(f"LVol with same lvol nqn already exists on target cluster: {lv.get_id()}") + target_lvol_id = lv.get_id() + break + + if not target_lvol_id: + logger.error(f"LVol with same nqn does not exist on target cluster: {target_cluster_id}") + return False + snaps = [] snapshot = None - for task in db_controller.get_job_tasks(source_node.cluster_id): + for task in db_controller.get_job_tasks(target_cluster_id): if task.function_name == JobSchedule.FN_SNAPSHOT_REPLICATION: logger.debug(task) + if task.status != JobSchedule.STATUS_DONE: + continue try: - snap = db_controller.get_snapshot_by_id(task.function_params["snapshot_id"]) + snap = db_controller.get_snapshot_by_id(task.function_result) except KeyError: continue - if snap.lvol.get_id() != lvol_id: + if snap.source_lvol_id != target_lvol_id: continue snaps.append(snap) @@ -2619,41 +2656,21 @@ def replicate_lvol_on_source_cluster(lvol_id, cluster_id=None, pool_uuid=None): snaps = sorted(snaps, key=lambda x: x.created_at) snapshot = snaps[-1] - if not snapshot: - target_node = db_controller.get_storage_node_by_id(lvol.replication_node_id) - logger.info(f"Looking for snapshot in target cluster: {target_node.cluster_id}") - target_lvol_id = None - lvol_id_in_nqn = lvol.nqn.split(":")[-1] - for lv in db_controller.get_lvols(target_node.cluster_id): - if lv.nqn.split(":")[-1] == lvol_id_in_nqn: - logger.info(f"LVol with same lvol nqn already exists on target cluster: {lv.get_id()}") - target_lvol_id = lv.get_id() - - if not target_lvol_id: - logger.error(f"LVol with same nqn does not exist on target cluster: {target_node.cluster_id}") - return False - - for task in db_controller.get_job_tasks(target_node.cluster_id): - if task.function_name == JobSchedule.FN_SNAPSHOT_REPLICATION: - logger.debug(task) - try: - snap = db_controller.get_snapshot_by_id(task.function_params["snapshot_id"]) - except KeyError: - continue - - if snap.lvol.get_id() != target_lvol_id: - continue - snaps.append(snap) - - if snaps: - snaps = sorted(snaps, key=lambda x: x.created_at) - snapshot = snaps[-1] - snapshot = db_controller.get_snapshot_by_id(snapshot.target_replicated_snap_uuid) - if not snapshot: logger.error(f"Snapshot for replication not found for lvol: {lvol_id}") return False + # bdev_lvol_clone must run on the same SPDK that owns the snapshot's LVS + try: + snap_node = db_controller.get_storage_node_by_id(snapshot.lvol.node_id) + if snap_node.status == StorageNode.STATUS_ONLINE: + source_node = snap_node + else: + logger.error(f"Snapshot node {snapshot.lvol.node_id} is not online") + return False + except KeyError: + logger.warning(f"Could not find snapshot node {snapshot.lvol.node_id}, using current source_node") + # create lvol on target node new_lvol = copy.deepcopy(lvol) new_lvol.cloned_from_snap = snapshot.get_id() @@ -2699,6 +2716,9 @@ def replicate_lvol_on_source_cluster(lvol_id, cluster_id=None, pool_uuid=None): logger.debug(f"new lvol from_source: {new_lvol.from_source}") + logger.debug(f"new_lvol: {new_lvol}") + logger.debug(f"source_node: {source_node}") + lvol_bdev, error = add_lvol_on_node(new_lvol, source_node) if error: logger.error(error) @@ -2722,8 +2742,9 @@ def replicate_lvol_on_source_cluster(lvol_id, cluster_id=None, pool_uuid=None): new_lvol.status = LVol.STATUS_ONLINE new_lvol.from_source = True + new_lvol.do_replicate = True new_lvol.write_to_db(db_controller.kv_store) - lvol_events.lvol_replicated(lvol, new_lvol) + lvol_events.lvol_replicated(new_lvol, new_lvol) logger.debug(f"new lvol from_source: {new_lvol.from_source}") return new_lvol.lvol_uuid diff --git a/simplyblock_core/controllers/snapshot_controller.py b/simplyblock_core/controllers/snapshot_controller.py index c50e77a4d..b8a450376 100644 --- a/simplyblock_core/controllers/snapshot_controller.py +++ b/simplyblock_core/controllers/snapshot_controller.py @@ -271,6 +271,7 @@ def add(lvol_id, snapshot_name, backup=False, lock=True): snap.snap_bdev = f"{lvol.lvs_name}/{snap_bdev_name}" snap.created_at = int(time.time()) snap.lvol = lvol + snap.source_lvol_id = lvol.get_id() snap.fabric = lvol.fabric snap.vuid = snap_vuid snap.status = SnapShot.STATUS_ONLINE diff --git a/simplyblock_core/controllers/tasks_controller.py b/simplyblock_core/controllers/tasks_controller.py index dadcd8247..c8e57d9b8 100644 --- a/simplyblock_core/controllers/tasks_controller.py +++ b/simplyblock_core/controllers/tasks_controller.py @@ -594,7 +594,7 @@ def _check_snap_instance_on_node(snapshot_id: str , node_id: str): _add_task(JobSchedule.FN_SNAPSHOT_REPLICATION, snapshot.cluster_id, node_id, "", function_params={"snapshot_id": snapshot.get_id(), "replicate_to_source": False, - "replicate_as_snap_instance": True}, + "replicate_as_snap_instance": False}, send_to_cluster_log=False) diff --git a/simplyblock_core/models/snapshot.py b/simplyblock_core/models/snapshot.py index ab91a0087..d6f014247 100644 --- a/simplyblock_core/models/snapshot.py +++ b/simplyblock_core/models/snapshot.py @@ -34,4 +34,5 @@ class SnapShot(BaseModel): source_replicated_snap_uuid: str = "" next_snap_uuid: str = "" prev_snap_uuid: str = "" - instances: list = [] \ No newline at end of file + instances: list = [] + source_lvol_id: str = "" \ No newline at end of file diff --git a/simplyblock_core/services/snapshot_replication.py b/simplyblock_core/services/snapshot_replication.py index 2527853b0..a71caaaf4 100644 --- a/simplyblock_core/services/snapshot_replication.py +++ b/simplyblock_core/services/snapshot_replication.py @@ -80,12 +80,20 @@ def process_snap_replicate_start(task, snapshot): if not ret: msg = "controller attach failed" logger.error(msg) - raise RuntimeError(msg) + task.function_result = msg + task.status = JobSchedule.STATUS_SUSPENDED + task.retry += 1 + task.write_to_db() + return bdev_name = ret[0] if not bdev_name: msg = "Bdev name not returned from controller attach" logger.error(msg) - raise RuntimeError(msg) + task.function_result = msg + task.status = JobSchedule.STATUS_SUSPENDED + task.retry += 1 + task.write_to_db() + return bdev_found = False for i in range(5): ret = snode.rpc_client().get_bdevs(bdev_name) @@ -96,8 +104,13 @@ def process_snap_replicate_start(task, snapshot): time.sleep(1) if not bdev_found: + msg = f"Failed to connect to lvol: {remote_lv.get_id()}" logger.error("lvol Bdev not found after 5 attempts") - raise RuntimeError(f"Failed to connect to lvol: {remote_lv.get_id()}") + task.function_result = msg + task.status = JobSchedule.STATUS_SUSPENDED + task.retry += 1 + task.write_to_db() + return offset = 0 if "offset" in task.function_params and task.function_params["offset"]: @@ -125,6 +138,8 @@ def delete_last_snapshot_if_needed(this_task, lvol): if task.function_name == JobSchedule.FN_SNAPSHOT_REPLICATION: if task.get_id() == this_task.get_id(): continue + if task.status != JobSchedule.STATUS_DONE: + continue logger.debug(task) try: snap = db.get_snapshot_by_id(task.function_params["snapshot_id"]) @@ -138,7 +153,7 @@ def delete_last_snapshot_if_needed(this_task, lvol): snaps = sorted(snaps, key=lambda x: x.created_at) snapshot = snaps[-1] logger.info("Deleting snapshot: %s", snapshot.get_id()) - ret = snapshot_controller.delete(snapshot) + ret = snapshot_controller.delete(snapshot.get_id()) logger.debug(ret) @@ -234,6 +249,7 @@ def process_snap_replicate_finish(task, snapshot): except Exception as e: logger.error(e) + new_snapshot.source_lvol_id = snapshot.source_lvol_id or snapshot.lvol.get_id() new_snapshot.write_to_db() if snapshot.status == SnapShot.STATUS_IN_REPLICATION: @@ -252,7 +268,13 @@ def process_snap_replicate_finish(task, snapshot): def task_runner(task: JobSchedule): - snapshot = db.get_snapshot_by_id(task.function_params["snapshot_id"]) + try: + snapshot = db.get_snapshot_by_id(task.function_params["snapshot_id"]) + except KeyError: + task.function_result = "snapshot not found" + task.status = JobSchedule.STATUS_DONE + task.write_to_db(db.kv_store) + return True if not snapshot: task.function_result = "snapshot not found" task.status = JobSchedule.STATUS_DONE @@ -360,7 +382,15 @@ def task_runner(task: JobSchedule): if task.status != JobSchedule.STATUS_DONE: # get new task object because it could be changed from cancel task task = db.get_task_by_id(task.uuid) - res = task_runner(task) + try: + res = task_runner(task) + except Exception as e: + logger.exception("task_runner raised for task %s: %s", task.uuid, e) + task.function_result = str(e) + task.status = JobSchedule.STATUS_SUSPENDED + task.retry += 1 + task.write_to_db(db.kv_store) + res = False if not res: time.sleep(3) diff --git a/simplyblock_web/api/v2/volume.py b/simplyblock_web/api/v2/volume.py index 10bc42cc7..915245af8 100644 --- a/simplyblock_web/api/v2/volume.py +++ b/simplyblock_web/api/v2/volume.py @@ -220,32 +220,29 @@ def inflate(cluster: Cluster, pool: StoragePool, volume: Volume) -> Response: return Response(status_code=204) -@instance_api.post('/replication_trigger', name='clusters:storage-pools:volumes:replication_start', status_code=204, responses={204: {"content": None}}) +@instance_api.post('/replication_trigger', name='clusters:storage-pools:volumes:replication_trigger', status_code=204, responses={204: {"content": None}}) def replication_trigger(cluster: Cluster, pool: StoragePool, volume: Volume) -> Response: if not lvol_controller.replication_trigger(volume.get_id()): - raise ValueError('Failed to start volume snapshot replication') - + raise HTTPException(400, 'Failed to trigger volume snapshot replication') return Response(status_code=204) @instance_api.post('/replication_start', name='clusters:storage-pools:volumes:replication_start', status_code=204, responses={204: {"content": None}}) def replication_start(cluster: Cluster, pool: StoragePool, volume: Volume) -> Response: if not lvol_controller.replication_start(volume.get_id(), cluster.get_id()): - raise ValueError('Failed to start volume snapshot replication') - + raise HTTPException(400, 'Failed to start volume snapshot replication') return Response(status_code=204) @instance_api.post('/replication_stop', name='clusters:storage-pools:volumes:replication_stop', status_code=204, responses={204: {"content": None}}) def replication_stop(cluster: Cluster, pool: StoragePool, volume: Volume) -> Response: if not lvol_controller.replication_stop(volume.get_id()): - raise ValueError('Failed to stop volume snapshot replication') - + raise HTTPException(400, 'Failed to stop volume snapshot replication') return Response(status_code=204) @instance_api.get('/connect', name='clusters:storage-pools:volumes:connect') def connect(cluster: Cluster, pool: StoragePool, volume: Volume): details, err = lvol_controller.connect_lvol(volume.get_id()) if err: - raise ValueError(err) + raise HTTPException(400, err) return details @@ -306,7 +303,10 @@ def create_snapshot( @instance_api.post('/replicate_lvol', name='clusters:storage-pools:volumes:replicate_lvol') def replicate_lvol_on_target_cluster(cluster: Cluster, pool: StoragePool, volume: Volume): - return lvol_controller.replicate_lvol_on_target_cluster(volume.get_id()) + result = lvol_controller.replicate_lvol_on_target_cluster(volume.get_id()) + if not result: + raise HTTPException(400, 'Failed to replicate lvol on target cluster — no replicated snapshot found') + return result class ReplicateLVolParams(BaseModel): @@ -315,7 +315,10 @@ class ReplicateLVolParams(BaseModel): @api.post('/replicate_lvol_on_source_cluster', name='clusters:storage-pools:replicate_lvol_on_source_cluster') def replicate_lvol_on_source_cluster(cluster: Cluster, pool: StoragePool, body: ReplicateLVolParams): - return lvol_controller.replicate_lvol_on_source_cluster(body.lvol_id, cluster.get_id(), pool.get_id()) + result = lvol_controller.replicate_lvol_on_source_cluster(body.lvol_id, cluster.get_id(), pool.get_id()) + if not result: + raise HTTPException(400, 'Failed to replicate lvol on source cluster') + return result @instance_api.get('/list_replication_tasks', name='clusters:storage-pools:volumes:list_replication_tasks') diff --git a/simplyblock_web/templates/storage_deploy_spdk.yaml.j2 b/simplyblock_web/templates/storage_deploy_spdk.yaml.j2 index c40720239..ff0e34d47 100644 --- a/simplyblock_web/templates/storage_deploy_spdk.yaml.j2 +++ b/simplyblock_web/templates/storage_deploy_spdk.yaml.j2 @@ -102,7 +102,7 @@ spec: lifecycle: postStart: exec: - command: ["/bin/sh", "-c", "sudo modprobe nbd || echo failed to modprobe nbd"] + command: ["/bin/sh", "-c", "sudo -E modprobe nbd || echo failed to modprobe nbd"] securityContext: privileged: true volumeMounts: