-
Notifications
You must be signed in to change notification settings - Fork 35
updated derived metrics interface #898
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3fc254b
6bd2c2c
6640472
6bd237c
7747764
e306f8e
e3f7764
f85add6
b95dbcb
e47d3cf
2eeb136
a9b9765
d0704fd
029b7c9
aa46cad
f1bac18
96806fa
8f83274
02bf33f
6f5f716
eb18df8
bec09c5
5d9e362
853b443
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,26 @@ | |
check_version(ffi, libdutil) | ||
|
||
|
||
_mod_names = [ | ||
"NULL", | ||
"POSIX", | ||
"MPI-IO", | ||
"H5F", | ||
"H5D", | ||
"PNETCDF_FILE", | ||
"PNETCDF_VAR", | ||
"BG/Q", | ||
"LUSTRE", | ||
"STDIO", | ||
"DXT_POSIX", | ||
"DXT_MPIIO", | ||
"MDHIM", | ||
"APXC", | ||
"APMPI", | ||
"HEATMAP", | ||
] | ||
def mod_name_to_idx(mod_name): | ||
return _mod_names.index(mod_name) | ||
|
||
_structdefs = { | ||
"BG/Q": "struct darshan_bgq_record **", | ||
|
@@ -685,6 +705,8 @@ def _df_to_rec(rec_dict, mod_name, rec_index_of_interest=None): | |
fcounters_df = rec_dict["fcounters"] | ||
counters_n_cols = counters_df.shape[1] | ||
fcounters_n_cols = fcounters_df.shape[1] | ||
id_col = counters_df.columns.get_loc("id") | ||
rank_col = counters_df.columns.get_loc("rank") | ||
if rec_index_of_interest is None: | ||
num_recs = counters_df.shape[0] | ||
# newer pandas versions can support ... | ||
|
@@ -701,10 +723,60 @@ def _df_to_rec(rec_dict, mod_name, rec_index_of_interest=None): | |
rec_arr.fcounters = fcounters_df.iloc[rec_index_of_interest, 2:].to_numpy() | ||
rec_arr.counters = counters_df.iloc[rec_index_of_interest, 2:].to_numpy() | ||
if num_recs > 1: | ||
rec_arr.id = counters_df.iloc[rec_index_of_interest, 0].to_numpy().reshape((num_recs, 1)) | ||
rec_arr.rank = counters_df.iloc[rec_index_of_interest, 1].to_numpy().reshape((num_recs, 1)) | ||
rec_arr.id = counters_df.iloc[rec_index_of_interest, id_col].to_numpy().reshape((num_recs, 1)) | ||
rec_arr.rank = counters_df.iloc[rec_index_of_interest, rank_col].to_numpy().reshape((num_recs, 1)) | ||
else: | ||
rec_arr.id = counters_df.iloc[rec_index_of_interest, 0] | ||
rec_arr.rank = counters_df.iloc[rec_index_of_interest, 1] | ||
rec_arr.id = counters_df.iloc[rec_index_of_interest, id_col] | ||
rec_arr.rank = counters_df.iloc[rec_index_of_interest, rank_col] | ||
buf = rec_arr.tobytes() | ||
return buf | ||
|
||
|
||
def log_get_derived_metrics(rec_dict, mod_name, nprocs): | ||
""" | ||
Passes a set of records (in pandas format) to the Darshan accumulator | ||
interface, and returns the corresponding derived metrics struct. | ||
|
||
Parameters: | ||
rec_dict: Dictionary containing the counter and fcounter dataframes. | ||
mod_name: Name of the Darshan module. | ||
nprocs: Number of processes participating in accumulation. | ||
|
||
Returns: | ||
darshan_derived_metrics struct (cdata object) | ||
""" | ||
mod_idx = mod_name_to_idx(mod_name) | ||
darshan_accumulator = ffi.new("darshan_accumulator *") | ||
r = libdutil.darshan_accumulator_create(mod_idx, nprocs, darshan_accumulator) | ||
if r != 0: | ||
raise RuntimeError("A nonzero exit code was received from " | ||
"darshan_accumulator_create() at the C level. " | ||
f"This could mean that the {mod_name} module does not " | ||
"support derived metric calculation, or that " | ||
"another kind of error occurred. It may be possible " | ||
"to retrieve additional information from the stderr " | ||
"stream.") | ||
|
||
num_recs = rec_dict["fcounters"].shape[0] | ||
record_array = _df_to_rec(rec_dict, mod_name) | ||
|
||
r_i = libdutil.darshan_accumulator_inject(darshan_accumulator[0], record_array, num_recs) | ||
if r_i != 0: | ||
raise RuntimeError("A nonzero exit code was received from " | ||
"darshan_accumulator_inject() at the C level. " | ||
"It may be possible " | ||
"to retrieve additional information from the stderr " | ||
"stream.") | ||
derived_metrics = ffi.new("struct darshan_derived_metrics *") | ||
total_record = ffi.new(_structdefs[mod_name].replace("**", "*")) | ||
r = libdutil.darshan_accumulator_emit(darshan_accumulator[0], | ||
derived_metrics, | ||
total_record) | ||
libdutil.darshan_accumulator_destroy(darshan_accumulator[0]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose we could check for a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I just punted on error checking that for a couple of reasons:
|
||
if r != 0: | ||
raise RuntimeError("A nonzero exit code was received from " | ||
"darshan_accumulator_emit() at the C level. " | ||
"It may be possible " | ||
"to retrieve additional information from the stderr " | ||
"stream.") | ||
return derived_metrics |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
|
||
def log_get_bytes_bandwidth(derived_metrics, mod_name: str) -> str: | ||
""" | ||
Summarize I/O performance for a given darshan module. | ||
|
||
Parameters | ||
---------- | ||
derived_metrics: | ||
structure (cdata object) describing metrics derived from a | ||
set of records passed to the Darshan accumulator interface | ||
mod_name: str | ||
Name of the darshan module to summarize the I/O | ||
performance for. | ||
|
||
Returns | ||
------- | ||
out: str | ||
A short string summarizing the performance of the given module | ||
in the provided log file, including bandwidth and total data | ||
transferred. | ||
|
||
Raises | ||
------ | ||
RuntimeError | ||
When a provided module name is not supported for the accumulator | ||
interface for provision of the summary data, or for any other | ||
error that occurs in the C/CFFI interface. | ||
ValueError | ||
When a provided module name does not exist in the log file. | ||
|
||
Examples | ||
-------- | ||
|
||
>>> from darshan.log_utils import get_log_path | ||
>>> from darshan.lib.accum import log_get_bytes_bandwidth | ||
|
||
>>> log_path = get_log_path("imbalanced-io.darshan") | ||
>>> log_get_bytes_bandwidth(log_path, "POSIX") | ||
I/O performance estimate (at the POSIX layer): transferred 101785.8 MiB at 164.99 MiB/s | ||
|
||
>>> log_get_bytes_bandwidth(log_path, "MPI-IO") | ||
I/O performance estimate (at the MPI-IO layer): transferred 126326.8 MiB at 101.58 MiB/s | ||
""" | ||
# get total bytes (in MiB) and bandwidth (in MiB/s) for | ||
# a given module -- this information was commonly reported | ||
# in the old perl-based summary reports | ||
total_mib = derived_metrics.total_bytes / 2 ** 20 | ||
total_bw = derived_metrics.agg_perf_by_slowest | ||
ret_str = f"I/O performance estimate (at the {mod_name} layer): transferred {total_mib:.1f} MiB at {total_bw:.2f} MiB/s" | ||
return ret_str |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import darshan | ||
from darshan.backend.cffi_backend import log_get_derived_metrics | ||
from darshan.lib.accum import log_get_bytes_bandwidth | ||
from darshan.log_utils import get_log_path | ||
|
||
import pytest | ||
|
||
|
||
@pytest.mark.parametrize("log_path, mod_name, expected_str", [ | ||
# the expected bytes/bandwidth strings are pasted | ||
# directly from the old perl summary reports; | ||
# exceptions noted below | ||
# in some cases we defer to darshan-parser for the expected | ||
# values; see discussion in gh-839 | ||
("imbalanced-io.darshan", | ||
"STDIO", | ||
"I/O performance estimate (at the STDIO layer): transferred 1.1 MiB at 0.01 MiB/s"), | ||
("imbalanced-io.darshan", | ||
"MPI-IO", | ||
"I/O performance estimate (at the MPI-IO layer): transferred 126326.8 MiB at 101.58 MiB/s"), | ||
# imbalanced-io.darshan does have LUSTRE data, | ||
# but it doesn't support derived metrics at time | ||
# of writing | ||
("imbalanced-io.darshan", | ||
"LUSTRE", | ||
"RuntimeError"), | ||
# APMPI doesn't support derived metrics either | ||
("e3sm_io_heatmap_only.darshan", | ||
"APMPI", | ||
"RuntimeError"), | ||
("imbalanced-io.darshan", | ||
"POSIX", | ||
"I/O performance estimate (at the POSIX layer): transferred 101785.8 MiB at 164.99 MiB/s"), | ||
("laytonjb_test1_id28730_6-7-43012-2131301613401632697_1.darshan", | ||
"STDIO", | ||
"I/O performance estimate (at the STDIO layer): transferred 0.0 MiB at 4.22 MiB/s"), | ||
("runtime_and_dxt_heatmaps_diagonal_write_only.darshan", | ||
"POSIX", | ||
"I/O performance estimate (at the POSIX layer): transferred 0.0 MiB at 0.02 MiB/s"), | ||
("treddy_mpi-io-test_id4373053_6-2-60198-9815401321915095332_1.darshan", | ||
"STDIO", | ||
"I/O performance estimate (at the STDIO layer): transferred 0.0 MiB at 16.47 MiB/s"), | ||
("e3sm_io_heatmap_only.darshan", | ||
"STDIO", | ||
"I/O performance estimate (at the STDIO layer): transferred 0.0 MiB at 3.26 MiB/s"), | ||
("e3sm_io_heatmap_only.darshan", | ||
"MPI-IO", | ||
"I/O performance estimate (at the MPI-IO layer): transferred 73880.2 MiB at 105.69 MiB/s"), | ||
("partial_data_stdio.darshan", | ||
"MPI-IO", | ||
"I/O performance estimate (at the MPI-IO layer): transferred 32.0 MiB at 2317.98 MiB/s"), | ||
("partial_data_stdio.darshan", | ||
"STDIO", | ||
"I/O performance estimate (at the STDIO layer): transferred 16336.0 MiB at 2999.14 MiB/s"), | ||
# the C derived metrics code can't distinguish | ||
# between different kinds of errors at this time, | ||
# but we can still intercept in some cases... | ||
("partial_data_stdio.darshan", | ||
"GARBAGE", | ||
"ValueError"), | ||
("skew-app.darshan", | ||
"POSIX", | ||
"I/O performance estimate (at the POSIX layer): transferred 41615.8 MiB at 157.49 MiB/s"), | ||
("skew-app.darshan", | ||
"MPI-IO", | ||
"I/O performance estimate (at the MPI-IO layer): transferred 41615.8 MiB at 55.22 MiB/s"), | ||
]) | ||
def test_derived_metrics_bytes_and_bandwidth(log_path, mod_name, expected_str): | ||
# test the basic scenario of retrieving | ||
# the total data transferred and bandwidth | ||
# for all records in a given module; the situation | ||
# of accumulating derived metrics with filtering | ||
# (i.e., for a single filename) is not tested here | ||
|
||
log_path = get_log_path(log_path) | ||
with darshan.DarshanReport(log_path, read_all=True) as report: | ||
if expected_str == "ValueError": | ||
with pytest.raises(ValueError, | ||
match=f"mod {mod_name} is not available"): | ||
report.mod_read_all_records(mod_name, dtype="pandas") | ||
else: | ||
report.mod_read_all_records(mod_name, dtype="pandas") | ||
rec_dict = report.records[mod_name][0] | ||
nprocs = report.metadata['job']['nprocs'] | ||
|
||
if expected_str == "RuntimeError": | ||
with pytest.raises(RuntimeError, | ||
match=f"{mod_name} module does not support derived"): | ||
log_get_derived_metrics(rec_dict, mod_name, nprocs) | ||
else: | ||
derived_metrics = log_get_derived_metrics(rec_dict, mod_name, nprocs) | ||
actual_str = log_get_bytes_bandwidth(derived_metrics=derived_metrics, | ||
mod_name=mod_name) | ||
assert actual_str == expected_str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CFFI stuff is of course a bit awkward, especially given how many different entrypoints to the C layer we need to interact with just to get a single final struct back, but I'd probably be inclined to just live with it if we just operate on DataFrames or Arrow in-memory stuff in a few years anyway.
It occurs to me that for some things, like
struct darshan_derived_metrics
, we could probably avoid CFFI traversal by using the bytes from a NumPyrecarray
, but this would be more verbose with minimal gain for now I suspect.