Skip to content

feat: more flexible helper for batch reduction #155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

jokasimr
Copy link
Contributor

@jokasimr jokasimr commented May 5, 2025

This PR implements a shortcut through the workflow that is useful in several common scenarios involving multiple samples, angles and potentially multiple runs per angle.

@MridulS MridulS requested a review from Copilot May 6, 2025 07:28
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a more flexible helper for batch reduction by refactoring the dataset computation workflow and its associated tests. Key changes include:

  • Replacing the function orso_datasets_from_measurements with a more versatile from_measurements that supports both list and mapping inputs.
  • Adjusting the test cases to validate the new helper behavior and result types.
  • Moving the RawChopper class from the amor module to the reflectometry types module and updating corresponding imports.

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
tests/tools_test.py Updated tests to use from_measurements and verify expected parameters.
src/ess/reflectometry/workflow.py Adjusted imports to use the new location of RawChopper and streamline workflow.
src/ess/reflectometry/types.py Added RawChopper class to reflectometry/types to replace its previous location.
src/ess/reflectometry/tools.py Renamed the dataset helper and refactored scaling logic with additional helper functions.
src/ess/amor/types.py Removed duplicate RawChopper class.
src/ess/amor/load.py Removed the import and reference to RawChopper.

return datasets if names is None else dict(zip(names, datasets, strict=True))


def _workflow_needs_quantity_A_even_if_quantitiy_B_is_set(workflow, A, B):
Copy link
Preview

Copilot AI May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name '_workflow_needs_quantity_A_even_if_quantitiy_B_is_set' has a typo in 'quantitiy'. Consider correcting it to '_workflow_needs_quantity_A_even_if_quantity_B_is_set' for clarity.

Suggested change
def _workflow_needs_quantity_A_even_if_quantitiy_B_is_set(workflow, A, B):
def _workflow_needs_quantity_A_even_if_quantity_B_is_set(workflow, A, B):

Copilot uses AI. Check for mistakes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good bot!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🍬

@github-project-automation github-project-automation bot moved this to In progress in Development Board May 6, 2025
@jokasimr jokasimr moved this from In progress to Selected in Development Board May 6, 2025
wf[Filename[SampleRun]] = parameters[Filename[SampleRun]]
return wf

if scale_to_overlap:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed during the standup, I don't really get why we need the helper function, as opposed to just having different workflows.

  • You would have a workflow that computes a reflectivity curve.
  • If you want to compute this for multiple runs, you just map over the run numbers or filenames, and then use compute_mapped
  • If you want to scale_to_overlap, you have a workflow that basically does the mapping and adds a provider that either just scales the results or scales and merges. You then compute a different result (e.g. CombinedScaledReflectivityOverQ)

You can then view the graph of what is being done for everything (because with the helper function you can't see it all, only pieces, and they are not easy to get to either if you just installed the package?)

In any case, we should avoid having a long discussion here on github, this was just to have a starting point for an in-person discussion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible we can implement the same thing using sciline map reduce. Let us see the helper in this PR as a reference implementation and try to re-implement as one big workflow.

Copy link
Member

@nvaytet nvaytet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After in person discussion, it seems difficult to avoid using a helper function to loop over files, and use a single large pipeline for all cases.

We are already using mapping to combine events from multiple files, and having a double mapping to map over different angles, but using the same mapping parameter Filename[SampleRun] would not work.

Having Filename as a scope with 2 params could maybe work, Filename[SampleRun, Angle], but that would require changing it in essreduce in the types common to all other workflows.

In addition, the scale_to_overlap sometimes wants to scale the event weights before the final 1d R(Q) result, and it is not as simple as just inserting a provider at the end of the workflow to apply the scaling.

With all this taken into account, the helper function seems like a good approach.

@@ -279,18 +284,18 @@ def combine_curves(
)


def orso_datasets_from_measurements(
def from_measurements(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we think of a better name? I don't have great suggestions, I thought of something like batch_reduction, but it's not super descriptive either...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure either. I would like to avoid reduction in the name because I feel the term is already overloaded in our context, and strictly speaking you don't have to request a "reduced" data set here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked copilot, and the suggestion was compute_reflectivity_datasets.
I guess it's because of the last thing in the docstring that says

    Returns
    ---------
    list of the computed ORSO datasets, containing one reflectivity curve each

which I guess should be changed if you can request other data than reflectivity curves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's true, I'll change it.
By the way, I asked Nico earlier today and he's thinking about a better name, so we are probably going to have a suggestion from him soon.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed some options and we came up with compute_pipeline_targets or get_workflow_targets. They both seem reasonably descriptive to me but maybe the shorter one is better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to be annoying, but those names seem very similar to Sciline's pipeline.compute and pipeline.get methods, so it is not very clear what this function is bringing on top of Sciline's methods.

In addition, if this is a function that is supposed to compute any kind of results/targets, then the scale_to_overlap argument seems misplaced, as I'm assuming the scaling could only be applied to a particular type of results (the reflectivity curves).

It feels like the scope of the helper is too wide.
Can we instead either:

  1. make a function that specifically computes relfectivity curves (what most users will want most of the time)
    or
  2. make a function that maps workflows over the different runs and angles, and we then have to compute the targets we want from that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to be annoying, but those names seem very similar to Sciline's pipeline.compute and pipeline.get methods, so it is not very clear what this function is bringing on top of Sciline's methods.

Agreed that the name is very similar to scilines .get and .compute methods. It is a bit confusing. I'm ready to go with the batch_reduction name if that is what you prefer.

In addition, if this is a function that is supposed to compute any kind of results/targets, then the scale_to_overlap argument seems misplaced, as I'm assuming the scaling could only be applied to a particular type of results (the reflectivity curves)

It's true that the scale_to_overlap argument isn't relevant for all results/targets. But to say it's only relevant for reflectivity curves is incorrect. It is relevant for all results that display some sort of intensity from the measurements, normalized or not.

make a function that specifically computes reflectivity curves (what most users will want most of the time)

Do you mean the data array containing the reflectivity curve, or the ORSO dataset containing the reflectivity curve?

Why do you think that is better to limit the set of targets it applies to? Do you think the current user interface is error prone? In that case, can you provide a concrete example?

I think the interface is quite unobtrusive, if the user only wants a reflectivity curve they will just only ever pass the ReflectivityOverQ target to the function, and don't even have to be aware that it can do anything else.

make a function that maps workflows over the different runs and angles, and we then have to compute the targets we want from that?

Isn't this what we considered together and concluded that it was difficult to do with sciline currently? I'd be happy to consider this if you made a quick POC showing how it could be done and what the user interface would look like.

Copy link
Member

@nvaytet nvaytet Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess what I meant was either have a function that only computes things where scale_to_overlap is relevant, or leave the scale_to_overlap out, and have another function that applies it to the resulting data as a second step?

Another thing I thought of was if it was possible to make the function return some sort of proxy object, which would resemble a dict but would have extra methods like out.scale_to_overlap(*args) which could be used to apply the scaling easily, still in a one-liner?

I guess that's maybe sort of what I meant by

make a function that maps workflows over the different runs and angles

Can we make the function do most of the init_workflow work to prepare the pipelines, and then the object it returns could have additional methods like .scale_to_overlap() or .compute(Target), which would allow the user to compute any target for a whole bunch of grouped or ungrouped measurements?

So as opposed to

Isn't this what we considered together and concluded that it was difficult to do with sciline currently?

it wouldn't be mapping on a pure sciline.Pipeline, but would do what your function does, store an intermediate state in a new custom class which allows to compute anything from there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it a two step process is problematic because in the first step you don't know if you want to scale the results, and that might mean you compute results that you just throw away because the user was actually interested in the scaled results. If you don't materialize the results in step 1 but instead return some sort of lazy object then the user has to explicitly materialize the results, for no reason, from their perspective, only as a side effect of an optimization in the case when the user wants scaled results.

I agree it's not optimal that the scale_to_overlap argument is not applicable to all kinds of targets that can be requested, but I also find it hard to see that it would cause a problem in practice when using the workflow.
Complicating the interface and the implementation only to avoid this is not worth it IMO.

I implemented the current version in collaboration with Nico to address the issues he and I experienced when using the workflow.
This PR has been so delayed that we've now lost several months of potential feedback and improvements, to add a small helper function in the periphery of the application, and now you want a rewrite with a new experimental proxy object interface.

In my opinion it would be best to just merge this or a lightly modified version of it, and then if you think there are opportunities for significant gains in UX by doing something different you can implement that and open a PR.
If you don't agree I think it's better that we sit down and sort this out together in person.

# Check if any of the targets need ReducibleData if
# ReflectivityOverQ already exists.
# If they don't, we can avoid recomputing ReducibleData.
targets = target if hasattr(target, '__len__') else (target,)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there ever be a danger that someone passed a string as target and the __len__ check fails? I guess if they passed a string, they will have other problems earlier? (target not found in graph)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, but like you said, passing a string is illegal anyway and will lead to error at some point.

@jokasimr
Copy link
Contributor Author

jokasimr commented Jun 4, 2025

Having Filename as a scope with 2 params could maybe work, Filename[SampleRun, Angle], but that would require changing it in essreduce in the types common to all other workflows.

Another problem with this approach is that the Angle is something we read from the file, so it seems backwards that the user has to know the angle before loading the file.

@jokasimr
Copy link
Contributor Author

jokasimr commented Jun 6, 2025

It would be good to merge this so we can get it out and receive some user feedback.
If it is the name that is holding this up I think we should just go with the current name and change it in the future if we come up with something better.
In that case the old name can be kept as an alias to the new, so backwards compatibility is not an issue.

@jokasimr jokasimr requested a review from nvaytet June 10, 2025 09:26
@nvaytet
Copy link
Member

nvaytet commented Jun 10, 2025

Finding a good name is important I think, especially since this looks like one of the main functions that will be used all the time. You can of course keep the alias if you change it, as you say, but why not try to find something better now?
Why not ask @paracini or other users for input?

I would like to avoid reduction in the name because [...] you don't have to request a "reduced" data set here.

ok, but realistically, I would say reduced data will be requested over 90% of the time?

datasets.append(dataset)
return datasets
return datasets if names is None else dict(zip(names, datasets, strict=True))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here is my proposal.

Step 1: add scaling param to workflow

Add an extra parameter in the base workflow called ScalingFactorForOverlap (see implementation in new branch here) which is used to scale the counts of ReducibleData, and set to 1 by default
Screenshot_20250704_105942

Step 2: create PipelineCollection

class PipelineCollection:

    def __init__(self, pipelines):
        self._pipelines = {name: pl.copy() for name, pl in pipelines.items()}

    def __setitem__(self, key, value):
        for name, v in value.items():
            self._pipelines[name][key] = v

    def compute(self, target):
        return {name: pl.compute(target) for name, pl in self._pipelines.items()}

    def copy(self):
        return self.__class__(self._pipelines)

This will be used to compute multiple results in one go, and set params on multiple pipelines in one go.

Step 3: generate the pipeline collection

This code is basically the content of from_measurements, without the handling of scale_to_overlap.

def batch_processor(workflow, runs):
    workflows = {}
    for name, parameters in runs.items():
        wf = workflow.copy()
        for tp, value in parameters.items():
            if tp is Filename[SampleRun]:
                continue
            wf[tp] = value

        if Filename[SampleRun] in parameters:
            if isinstance(parameters[Filename[SampleRun]], list | tuple):
                wf = with_filenames(
                    wf,
                    SampleRun,
                    parameters[Filename[SampleRun]],
                )
            else:
                wf[Filename[SampleRun]] = parameters[Filename[SampleRun]]
        workflows[name] = wf
    return PipelineCollection(workflows)

Use the pipeline collection

wf = pipeline_with_1632_reference()  # original Amor workflow for a single run

# Run params for 3 runs whose event lists should be combined
run = {
    Filename[SampleRun]: list(map(data.amor_run, (1636, 1639, 1641))),
    QBins: sc.geomspace(
        dim='Q', start=0.062, stop=0.18, num=391, unit='1/angstrom'
    ),
    DetectorRotation[SampleRun]: sc.scalar(0.140167, unit='rad'),
    SampleRotation[SampleRun]: sc.scalar(0.0680678, unit='rad'),
}

pc = batch_processor(wf, {'a': run})
pc.compute(ReflectivityOverQ)  # or compute any target you wish.

Step 4: Scale to overlap

Instead of sending reflectivity curves to the function, send the pipeline that can cache results to avoid re-computing them.

# Dummy implementation
def scale_reflectivity_curves_to_overlap(
    pipeline_collection,
    critical_edge_interval: tuple[sc.Variable, sc.Variable] | None = None,
    cache_intermediate_results: bool = True
):

    pc = pipeline_collection.copy()
    if cache_intermediate_results:
        pc[UnscaledReducibleData[SampleRun]] = pc.compute(UnscaledReducibleData[SampleRun])
        
    scaling_factors = {name: np.random.random() for name in pc._pipelines.keys()}

    pc[ScalingFactorForOverlap[SampleRun]] = scaling_factors
    return pc

Now use the scaling function which basically sets the ScalingFactorForOverlap on the pipelines.

new_pc = scale_reflectivity_curves_to_overlap(pc)

new_pc.compute(ReflectivityOverQ)  # Results scaled

Inspect the scaling factors that were used

new_pc.compute(ScalingFactorForOverlap[SampleRun])

Can also write as a one-liner

results = scale_reflectivity_curves_to_overlap(
    batch_processor(wf, {'a': run})
).compute(ReflectivityOverQ)

Pros

  • Removes the complexity in the implementation of from_measurements that handles the scale_to_overlap, including the _workflow_needs_quantity_A_even_if_quantity_B_is_set
  • Removes the need to return both scaled results and scaled factors from scale_reflectivity_curves_to_overlap
  • Improves provenance because we can see in the graph what scaling factor was used
  • Resembles ways of working in other packages because PipelineCollection behaves like a pipeline

Cons

  • Could be confusing for users to have an object that looks like a pipeline but isn't a pipeline?
  • Users have to learn about pipelines, and how they work, to fully understand, as opposed to just calling a python function with args (but there is nothing stopping scientists from writing their own small wrapper around this mechanism?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am all in favor of building higher-level abstractions around sciline.Pipeline. Another example is StreamProcessor, and we need more like that. I think by know most of our team experienced that sciline.Pipeline is a bit too low level for all scientists to understand (and I think this is why @jokasimr is writing helper scripts/functions).

Also, I don't think we should see sciline.Pipeline as single workflow graph that does everything. For example, the map/reduce approach can be a bit clumsy and is somewhat limited, so don't try to use it as the one tool that fits all. Again, this relates to my recent thoughts on needing some higher-level workflow management/engine at ESS/DMSC.

I don't think all this necessarily should (or can) be done in this PR. And I did not look much into the details of the PipelineCollection you propose. I agree with most of your other concerns about the current PR though.

Copy link
Contributor Author

@jokasimr jokasimr Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have just a few comments on this:

I think by know most of our team experienced that sciline.Pipeline is a bit too low level for all scientists to understand (and I think this is why @jokasimr is writing helper scripts/functions).

It's not that it's too hard to understand for the scientists but that it is hard to use in these kind of "daily usage" meta workflow scenarios.

Pros (1) Removes the complexity in the implementation of from_measurements that handles the scale_to_overlap, including the _workflow_needs_quantity_A_even_if_quantity_B_is_set

Note that the only reason that part of the code exists is as an optimization to avoid computing the reflectivity curves twice (once to do the scaling to overlap, and then again when actually computing the desired results).
If that part is the problem we can just remove it and pay the 2x performance loss (as is done in the implementation suggested by @nvaytet).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood the mechanism correctly, you only save the double computation when the target requested is the 1D Reflectivity curve or something further down the workflow (like an orso dataset).

If you request something earlier, like a 2d intensity as a function of theta and Q, you still have to compute the 1d curve to get the scaling factor, and then compute the 2d intensity again?

Copy link
Contributor Author

@jokasimr jokasimr Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood the mechanism correctly, you only save the double computation when the target requested is the 1D Reflectivity curve or something further down the workflow (like an orso dataset).

Yes exactly, and computing a orso dataset or a 1d curve is the most common case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Selected
Development

Successfully merging this pull request may close these issues.

5 participants