def show_mesh(params):
mesh_name = params.mesh_name
vtk_mesh = mesh_name.replace(".cub", ".vtk")
subprocess.run(["mbconvert", mesh_name, vtk_mesh], check=True, stderr=subprocess.STDOUT)
mesh = pv.read(vtk_mesh)
mesh = mesh.shrink(0.95)
try:
pv.set_jupyter_backend("trame")
except Exception:
pv.set_jupyter_backend("static")
p = pv.Plotter(notebook=True)
p.add_mesh(mesh, color="lightgrey", show_edges=True, smooth_shading=False, cmap="turbo")
p.view_xy()
p.show()
return
def partition_display_mesh(params):
mesh_base = os.path.splitext(params.mesh_name)[0]
partition_name = f"{mesh_base}_{params.nparts}p.h5m"
subprocess.run(
[
"mofem_part",
"-file_name", params.mesh_name,
"-nparts", str(params.nparts),
"-dim", "2",
"-adj_dim", "1",
"-output_file", partition_name,
"-log_no_color"
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
base_name = params.mesh_name.replace('.cub', '')
subprocess.run(
f"convert.py -np {params.nparts} {base_name}*h5m",
shell=True,
check=True,
stdout=subprocess.DEVNULL
)
new_var = f"{base_name}_{params.nparts}p"
out_to_vtk = os.popen(f"ls -c1 {new_var}*vtk").read().splitlines()
last_file = out_to_vtk[0]
mesh = pv.read(last_file)
mesh = mesh.shrink(0.95)
mesh.cell_data["PARALLEL_PARTITION"] = mesh.cell_data["PARALLEL_PARTITION"].astype(int)
p = pv.Plotter(notebook=True)
p.add_mesh(
mesh,
scalars="PARALLEL_PARTITION",
smooth_shading=True,
cmap = "tab20",
show_edges=False,
show_scalar_bar=True
)
p.camera_position = 'iso'
p.show()
def run_mofem_analysis(params):
!rm -rf out*
mesh_base = os.path.splitext(params.mesh_name)[0]
partition_name = f"{mesh_base}_{params.nparts}p.h5m"
!export OMPI_MCA_btl_vader_single_copy_mechanism=none && \
nice -n 10 mpirun --oversubscribe --allow-run-as-root \
-np {params.nparts} {mofem_executable} \
-file_name {partition_name} \
-order {params.order} \
-ts_dt {params.time_step} \
-young_modulus {params.young_modulus} \
-poisson_ratio {params.poisson_ratio} \
-conductivity {params.conductivity} \
-biot_constant {params.biot} \
-mat_density {params.mat_density} \
-fluid_density {params.fluid_density} \
-storage {params.storage} \
-save_every {params.save_every} \
-plane_strain \
-time_scalar_file {params.time_scale} \
-ts_max_time {params.final_time} \
-field_eval_coords {params.eval_coords}\
2>&1 | tee {params.log_file}
!{core_view}/bin/convert.py -np 12 out*
if getattr(params, "convert_result", False):
!convert.py -np {params.nparts} out*h5m
!convert.py -np {params.nparts} out*h5m
!rm out*h5m
return None
def extract_number(filename):
"""Extract numeric part from filename for sorting."""
match = re.search(r'\d+', filename)
return int(match.group()) if match else -1
def getLastFrame(prefix_with_number):
base_prefix = re.sub(r'_\d+$', '', prefix_with_number)
list_of_files = glob.glob(f"{base_prefix}_*.vtk")
list_of_files.sort(key=extract_number)
last_file = list_of_files[-1]
return pv.read(last_file)
def show_results(params):
if params.final_frame:
mesh = getLastFrame(params.show_file)
else:
matches = glob.glob(f"{params.show_file}*.vtk")
matches.sort(key=extract_number)
mesh = pv.read(matches[0])
if params.warp_field:
mesh = mesh.warp_by_vector(vectors=params.warp_field, factor=params.warp_factor)
cmap = "turbo"
p = pv.Plotter(notebook=True)
p.add_mesh(mesh, scalars=params.show_field, component=None, smooth_shading=True, cmap=cmap)
p.camera_position = "xy"
p.show()
def getName(obj, namespace):
return [name for name in namespace if namespace[name] is obj][0]
def makeGif(params):
vtk_files = sorted(glob.glob(f"{params.show_file}_*.vtk"))
if not vtk_files:
raise FileNotFoundError(f"No .vtk files found matching {params.show_file}_*.vtk")
existing_gifs = glob.glob(f"animation_{params.show_file}_*.gif")
for gif_file in existing_gifs:
try:
os.remove(gif_file)
except Exception:
pass
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
gif_name = f"animation_{params.show_file}_{timestamp}.gif"
p = pv.Plotter(off_screen=True, window_size=[1280, 720])
p.set_background('white')
p.camera_position = "xy"
cmap = "turbo"
p.open_gif(gif_name)
for vtk_file in vtk_files:
mesh = pv.read(vtk_file)
if params.warp_field:
mesh = mesh.warp_by_vector(vectors=params.warp_field, factor=params.warp_factor)
p.add_mesh(mesh, scalars=params.show_field, smooth_shading=True, cmap=cmap)
p.write_frame()
for actor in list(p.actors.keys()):
p.remove_actor(actor)
p.close()
def showGif(params):
gif_files = sorted(glob.glob(f"animation_{params.show_file}_*.gif"))
if not gif_files:
raise FileNotFoundError(f"No GIFs found for {params.show_file}")
latest_gif = gif_files[-1]
display(Image(filename=latest_gif))
def interactive_viewer(params):
pv.set_jupyter_backend("trame")
vtk_files = sorted(glob.glob(f"{params.show_file}_*.vtk"))
if not vtk_files:
raise FileNotFoundError(f"No .vtk files found matching {params.show_file}_*.vtk")
sample_mesh = pv.read(vtk_files[0])
available_fields = list(sample_mesh.point_data.keys())
plotter = pv.Plotter(notebook=True)
plotter.set_background("white")
plotter.camera_position = "xy"
def update_view(time_index, field_name):
plotter.clear_actors()
mesh = pv.read(vtk_files[time_index])
if field_name in mesh.point_data:
data = mesh.point_data[field_name]
if data.ndim > 1 and data.shape[1] > 1:
mesh[field_name + "_mag"] = (data**2).sum(axis=1) ** 0.5
field_name = field_name + "_mag"
mesh.active_scalars_name = field_name
vmin, vmax = float(mesh[field_name].min()), float(mesh[field_name].max())
plotter.add_mesh(mesh, scalars=field_name, cmap="turbo", clim=[vmin, vmax], smooth_shading=True)
time_slider = IntSlider(min=0, max=len(vtk_files)-1, step=1, description='Time step')
field_dropdown = Dropdown(options=available_fields, value=available_fields[0], description='Field')
interact(update_view, time_index=time_slider, field_name=field_dropdown)
plotter.show(jupyter_backend="trame", interactive_update=True)
def plotConvergence(params):
res_values = []
with open("log_test", 'r') as f:
for line in f:
match = re.search(r'SNES Function norm\s+([0-9.eE+-]+)', line)
if match:
res_values.append(float(match.group(1)))
if not res_values:
raise ValueError("No SNES residuals found in the log file.")
plt.plot(res_values, 'r^-')
plt.title('Newton method convergence')
plt.ylabel('absolute residual')
plt.xlabel('accumulated iterations')
plt.yscale('log')
plt.grid(True)
plt.show()
def plot_eval_point(quantity="pressure", components=None, log_file="log_test"):
patterns = {
"pressure": (r"Eval point P:\s*([-0-9.eE+]+)", ["P"]),
"flux": (r"Eval point flux:\s*\[2,1\]\(\(([-0-9.eE+]+)\),\(([-0-9.eE+]+)\)\)", ["qx", "qy"]),
"displacement": (r"Eval point displacement:\s*\[2,1\]\(\(([-0-9.eE+]+)\),\(([-0-9.eE+]+)\)\)", ["Ux", "Uy"]),
"strain": (r"Eval point strain:\s*\[3,1\]\(\(([-0-9.eE+]+)\),\(([-0-9.eE+]+)\),\(([-0-9.eE+]+)\)\)", ["exx", "exy", "eyy"]),
"stress": (r"Eval point symmetric stress tensor:\s*\[3,1\]\(\(([-0-9.eE+]+)\),\(([-0-9.eE+]+)\),\(([-0-9.eE+]+)\)\)", ["sxx", "sxy", "syy"]),
}
if quantity not in patterns:
raise ValueError(f"Unknown quantity '{quantity}'. Choose from {list(patterns.keys())}")
pattern, all_labels = patterns[quantity]
if components is None:
components = all_labels
else:
invalid = [c for c in components if c not in all_labels]
if invalid:
raise ValueError(f"Invalid components {invalid} for '{quantity}'. Valid options: {all_labels}")
times = []
all_values = []
current_time = None
with open(log_file, "r") as f:
for line in f:
time_match = re.search(r"TS.*time\s+([-0-9.eE+]+)", line)
if time_match:
current_time = float(time_match.group(1))
continue
match = re.search(pattern, line)
if match and current_time is not None:
comps = [float(g) for g in match.groups()]
times.append(current_time)
all_values.append(comps)
if not times:
raise ValueError(f"No '{quantity}' entries found in {log_file}")
all_values = list(zip(*all_values))
plt.figure(figsize=(10, 6))
for comp_label in components:
idx = all_labels.index(comp_label)
plt.plot(times, all_values[idx], marker="o", label=comp_label)
plt.xlabel("Time [s]")
plt.ylabel(quantity.title())
plt.title(f"{quantity.title()} vs Time")
plt.grid(True)
plt.minorticks_on()
plt.tick_params(axis="both", which="major", length=7)
plt.legend()
plt.tight_layout()
plt.show()
def plot_time_scale(params):
data = np.loadtxt(params.time_scale)
if data.shape[1] != 2:
raise ValueError("time_scale file must have exactly two columns: time and force")
time = data[:, 0]
force = data[:, 1]
plt.figure(figsize=(8, 5))
plt.plot(time, force, marker='o', linestyle='-', label='Applied Force')
plt.xlabel("Time [s]")
plt.title("Applied Force vs Time")
plt.grid(True)
plt.xlim(0, params.final_time)
plt.tight_layout()
plt.show()
The following diagram depicts the applied boundary conditions.