v0.15.5
Loading...
Searching...
No Matches
Adv-5 Tutorial: 2D Seepage

For theory please see MoFEM Website.

Importing necessary modules, setting parameters and paths

# Import necessary python libraries
from pyvirtualdisplay import Display
import subprocess
import pyvista as pv
pv.set_plot_theme("document")
import os
import re
import matplotlib.pyplot as plt
import numpy as np
import glob
import pyvista as pv
from datetime import datetime
from IPython.display import display, Image
import pandas as pd
from datetime import datetime
from matplotlib.pyplot import cm
import importlib.util
from ipywidgets import interact, IntSlider, Dropdown, fixed
from vtkmodules.vtkCommonCore import vtkLogger
vtkLogger.SetStderrVerbosity(vtkLogger.VERBOSITY_OFF)
# Get user name and set executables
user_name=!whoami # get user name
user_name=user_name[0]
mofem_view = "$HOME/mofem_view"
mofem_part_path = mofem_view + "/bin/mofem_part"
mofem_executable = mofem_view + "/tutorials/adv-5_poroelasticity/seepage_2d"
# Open the attribute dictionary for storing parameters
class AttrDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
raise AttributeError(f"'AttrDict' object has no attribute '{attr}'")
def __setattr__(self, attr, value):
self[attr] = value
params = AttrDict()

Define utility functions (mesh generation, running the analysis, results post-processing)

def show_mesh(params):
mesh_name = params.mesh_name
# Convert .cub to .vtk using mbconvert
vtk_mesh = mesh_name.replace(".cub", ".vtk")
subprocess.run(["mbconvert", mesh_name, vtk_mesh], check=True, stderr=subprocess.STDOUT)
# Read the .vtk mesh
mesh = pv.read(vtk_mesh)
mesh = mesh.shrink(0.95)
try:
pv.set_jupyter_backend("trame")
except Exception:
pv.set_jupyter_backend("static")
# Plot the mesh in a notebook
p = pv.Plotter(notebook=True)
p.add_mesh(mesh, color="lightgrey", show_edges=True, smooth_shading=False, cmap="turbo")
p.view_xy()
p.show()
return
def partition_display_mesh(params):
mesh_base = os.path.splitext(params.mesh_name)[0]
partition_name = f"{mesh_base}_{params.nparts}p.h5m"
# Run mofem_part quietly
subprocess.run(
[
"mofem_part",
"-file_name", params.mesh_name,
"-nparts", str(params.nparts),
"-dim", "2",
"-adj_dim", "1",
"-output_file", partition_name,
"-log_no_color"
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
base_name = params.mesh_name.replace('.cub', '')
# Run convert.py quietly
subprocess.run(
f"convert.py -np {params.nparts} {base_name}*h5m",
shell=True,
check=True,
stdout=subprocess.DEVNULL
)
new_var = f"{base_name}_{params.nparts}p"
out_to_vtk = os.popen(f"ls -c1 {new_var}*vtk").read().splitlines()
last_file = out_to_vtk[0]
mesh = pv.read(last_file)
mesh = mesh.shrink(0.95)
mesh.cell_data["PARALLEL_PARTITION"] = mesh.cell_data["PARALLEL_PARTITION"].astype(int)
p = pv.Plotter(notebook=True)
p.add_mesh(
mesh,
scalars="PARALLEL_PARTITION",
smooth_shading=True,
cmap = "tab20",
show_edges=False,
show_scalar_bar=True
)
p.camera_position = 'iso'
p.show()
def run_mofem_analysis(params): # runs a mofem analysis
# Remove old output files
!rm -rf out*
mesh_base = os.path.splitext(params.mesh_name)[0]
partition_name = f"{mesh_base}_{params.nparts}p.h5m"
!export OMPI_MCA_btl_vader_single_copy_mechanism=none && \
nice -n 10 mpirun --oversubscribe --allow-run-as-root \
-np {params.nparts} {mofem_executable} \
-file_name {partition_name} \
-order {params.order} \
-ts_dt {params.time_step} \
-young_modulus {params.young_modulus} \
-poisson_ratio {params.poisson_ratio} \
-conductivity {params.conductivity} \
-biot_constant {params.biot} \
-mat_density {params.mat_density} \
-fluid_density {params.fluid_density} \
-storage {params.storage} \
-save_every {params.save_every} \
-plane_strain \
-time_scalar_file {params.time_scale} \
-ts_max_time {params.final_time} \
-field_eval_coords {params.eval_coords}\
2>&1 | tee {params.log_file}
!{core_view}/bin/convert.py -np 12 out*
if getattr(params, "convert_result", False):
!convert.py -np {params.nparts} out*h5m
!convert.py -np {params.nparts} out*h5m
# Remove h5m files
!rm out*h5m
return None
def extract_number(filename):
"""Extract numeric part from filename for sorting."""
match = re.search(r'\d+', filename)
return int(match.group()) if match else -1
def getLastFrame(prefix_with_number):
base_prefix = re.sub(r'_\d+$', '', prefix_with_number)
list_of_files = glob.glob(f"{base_prefix}_*.vtk")
list_of_files.sort(key=extract_number)
last_file = list_of_files[-1]
return pv.read(last_file)
def show_results(params):
if params.final_frame:
mesh = getLastFrame(params.show_file)
else:
matches = glob.glob(f"{params.show_file}*.vtk")
matches.sort(key=extract_number)
mesh = pv.read(matches[0])
if params.warp_field:
mesh = mesh.warp_by_vector(vectors=params.warp_field, factor=params.warp_factor)
cmap = "turbo"
# Plotting
p = pv.Plotter(notebook=True)
p.add_mesh(mesh, scalars=params.show_field, component=None, smooth_shading=True, cmap=cmap)
p.camera_position = "xy"
p.show()
def getName(obj, namespace):
return [name for name in namespace if namespace[name] is obj][0]
def makeGif(params):
vtk_files = sorted(glob.glob(f"{params.show_file}_*.vtk"))
if not vtk_files:
raise FileNotFoundError(f"No .vtk files found matching {params.show_file}_*.vtk")
existing_gifs = glob.glob(f"animation_{params.show_file}_*.gif")
for gif_file in existing_gifs:
try:
os.remove(gif_file)
except Exception:
pass
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
gif_name = f"animation_{params.show_file}_{timestamp}.gif"
p = pv.Plotter(off_screen=True, window_size=[1280, 720])
p.set_background('white')
p.camera_position = "xy"
cmap = "turbo"
p.open_gif(gif_name)
for vtk_file in vtk_files:
mesh = pv.read(vtk_file)
if params.warp_field:
mesh = mesh.warp_by_vector(vectors=params.warp_field, factor=params.warp_factor)
p.add_mesh(mesh, scalars=params.show_field, smooth_shading=True, cmap=cmap)
p.write_frame()
for actor in list(p.actors.keys()):
p.remove_actor(actor)
p.close()
def showGif(params):
gif_files = sorted(glob.glob(f"animation_{params.show_file}_*.gif"))
if not gif_files:
raise FileNotFoundError(f"No GIFs found for {params.show_file}")
latest_gif = gif_files[-1]
display(Image(filename=latest_gif))
def interactive_viewer(params):
pv.set_jupyter_backend("trame")
vtk_files = sorted(glob.glob(f"{params.show_file}_*.vtk"))
if not vtk_files:
raise FileNotFoundError(f"No .vtk files found matching {params.show_file}_*.vtk")
sample_mesh = pv.read(vtk_files[0])
available_fields = list(sample_mesh.point_data.keys())
plotter = pv.Plotter(notebook=True)
plotter.set_background("white")
plotter.camera_position = "xy"
def update_view(time_index, field_name):
plotter.clear_actors()
mesh = pv.read(vtk_files[time_index])
if field_name in mesh.point_data:
data = mesh.point_data[field_name]
if data.ndim > 1 and data.shape[1] > 1:
mesh[field_name + "_mag"] = (data**2).sum(axis=1) ** 0.5
field_name = field_name + "_mag"
mesh.active_scalars_name = field_name
vmin, vmax = float(mesh[field_name].min()), float(mesh[field_name].max())
plotter.add_mesh(mesh, scalars=field_name, cmap="turbo", clim=[vmin, vmax], smooth_shading=True)
time_slider = IntSlider(min=0, max=len(vtk_files)-1, step=1, description='Time step')
field_dropdown = Dropdown(options=available_fields, value=available_fields[0], description='Field')
interact(update_view, time_index=time_slider, field_name=field_dropdown)
plotter.show(jupyter_backend="trame", interactive_update=True)
def plotConvergence(params):
res_values = []
with open("log_test", 'r') as f:
for line in f:
match = re.search(r'SNES Function norm\s+([0-9.eE+-]+)', line)
if match:
res_values.append(float(match.group(1)))
if not res_values:
raise ValueError("No SNES residuals found in the log file.")
plt.plot(res_values, 'r^-')
plt.title('Newton method convergence')
plt.ylabel('absolute residual')
plt.xlabel('accumulated iterations')
plt.yscale('log')
plt.grid(True)
plt.show()
def plot_eval_point(quantity="pressure", components=None, log_file="log_test"):
patterns = {
"pressure": (r"Eval point P:\s*([-0-9.eE+]+)", ["P"]),
"flux": (r"Eval point flux:\s*\[2,1\]\‍(\‍(([-0-9.eE+]+)\‍),\‍(([-0-9.eE+]+)\‍)\‍)", ["qx", "qy"]),
"displacement": (r"Eval point displacement:\s*\[2,1\]\‍(\‍(([-0-9.eE+]+)\‍),\‍(([-0-9.eE+]+)\‍)\‍)", ["Ux", "Uy"]),
"strain": (r"Eval point strain:\s*\[3,1\]\‍(\‍(([-0-9.eE+]+)\‍),\‍(([-0-9.eE+]+)\‍),\‍(([-0-9.eE+]+)\‍)\‍)", ["exx", "exy", "eyy"]),
"stress": (r"Eval point symmetric stress tensor:\s*\[3,1\]\‍(\‍(([-0-9.eE+]+)\‍),\‍(([-0-9.eE+]+)\‍),\‍(([-0-9.eE+]+)\‍)\‍)", ["sxx", "sxy", "syy"]),
}
if quantity not in patterns:
raise ValueError(f"Unknown quantity '{quantity}'. Choose from {list(patterns.keys())}")
pattern, all_labels = patterns[quantity]
if components is None:
components = all_labels
else:
invalid = [c for c in components if c not in all_labels]
if invalid:
raise ValueError(f"Invalid components {invalid} for '{quantity}'. Valid options: {all_labels}")
times = []
all_values = []
current_time = None
with open(log_file, "r") as f:
for line in f:
time_match = re.search(r"TS.*time\s+([-0-9.eE+]+)", line)
if time_match:
current_time = float(time_match.group(1))
continue
match = re.search(pattern, line)
if match and current_time is not None:
comps = [float(g) for g in match.groups()]
times.append(current_time)
all_values.append(comps)
if not times:
raise ValueError(f"No '{quantity}' entries found in {log_file}")
all_values = list(zip(*all_values))
plt.figure(figsize=(10, 6))
for comp_label in components:
idx = all_labels.index(comp_label)
plt.plot(times, all_values[idx], marker="o", label=comp_label)
plt.xlabel("Time [s]")
plt.ylabel(quantity.title())
plt.title(f"{quantity.title()} vs Time")
plt.grid(True)
plt.minorticks_on()
plt.tick_params(axis="both", which="major", length=7)
plt.legend()
plt.tight_layout()
plt.show()
def plot_time_scale(params):
data = np.loadtxt(params.time_scale)
if data.shape[1] != 2:
raise ValueError("time_scale file must have exactly two columns: time and force")
time = data[:, 0]
force = data[:, 1]
plt.figure(figsize=(8, 5))
plt.plot(time, force, marker='o', linestyle='-', label='Applied Force')
plt.xlabel("Time [s]")
plt.title("Applied Force vs Time")
plt.grid(True)
plt.xlim(0, params.final_time)
plt.tight_layout()
plt.show()

Define Mesh Parameters

params.mesh_name = "mesh/mandel.cub" # Input mesh file in mm
show_mesh(params)
params.nparts = 4 # Number of processors and number of partitions for the mesh
partition_display_mesh(params)

Define Material Parameters

params.young_modulus = 1e4 # Young's Modulus [Pa]
params.poisson_ratio = 0.2 # Poisson ratio []
params.conductivity = 1e-3 # Isotrophic permeability coefficient [m^4/Ns]
params.biot = 1 # Biot coefficient []
params.storage = 1e-1 # Storage coefficient [Pa^-1]
params.mat_density = 0 # Density of solid phase [kg/m^3]
params.fluid_density = 0 # Density of fluid phase [kg/m^3]

Define boundary conditions

The following diagram depicts the applied boundary conditions.

bmandel.png

Edit the time_scale.txt file containing column for time and column for scaling of applied force F.

params.time_scale = "time_mandel.txt" # File to control application of force over time
params.final_time = 0.1 # Final time of simulation
plot_time_scale(params) # Plot applied force vs. time

Define Solver Parameters

params.log_file = "log_test" # Name of log file to which results are printed
params.time_step = 0.01 # Time step size
params.save_every = 1 # How regularly .h5m files are produced of results. This can help to reduce simulation time.
params.eval_coords = "0.4,0,0" # Co-ordinates where the results are printed to log file.
params.order = 2 # Order of approximation

Run MoFEM Analysis

params.convert_result = True
run_mofem_analysis(params)

Visualise Results

# Post-processing parameters
params.show_file = "out_9" # File to be displayed
params.show_field = "P" # Field to be displayed: "U", "P", "FLUX", "STRESS", "STRAIN"
params.warp_field = "" # Vector field with which view is warped: "U"
params.warp_factor = 1.0 # Factor for warping
params.final_frame = False # True - over-ride params.show_file and show final simulation frame. False - show params.show_file.
show_results(params)
# Create gif
params.show_file = "out" # Files to make gif
params.show_field = "U" # Field to be displayed: "U", "P", "FLUX", "STRESS", "STRAIN"
params.warp_field= None # Vector field with which view is warped: "U"
params.warp_factor = 1 # Factor for warping
makeGif(params)
showGif(params)
# Interactive view
interactive_viewer(params)

Post-processing

# Plot Newton-Raphson Method Convergence
plotConvergence(params)
# Plot each field over time. Choose which components to add to each plot.
# For example, for only x-displacement: plot_eval_point("displacement", ["Ux"])
plot_eval_point("pressure")
plot_eval_point("displacement", ["Ux", "Uy"])
plot_eval_point("flux", ["qx","qy"])
plot_eval_point("strain", ["exx", "exy", "eyy"])
plot_eval_point("stress", ["sxx", "sxy", "syy"])