11import argparse
2+ import asyncio
23import json
34from datetime import datetime
45from functools import partial
6+ from inspect import getfullargspec , iscoroutinefunction
57from pathlib import Path
68from queue import Empty , Queue
79
8- import requests
9- from jose import jwt
10+ from sqlmodel import Session
1011from workflows .transport .pika_transport import PikaTransport
1112
13+ import murfey .server .api .auth
14+ import murfey .server .api .bootstrap
15+ import murfey .server .api .clem
16+ import murfey .server .api .display
17+ import murfey .server .api .file_io_frontend
18+ import murfey .server .api .file_io_instrument
19+ import murfey .server .api .hub
20+ import murfey .server .api .instrument
21+ import murfey .server .api .mag_table
22+ import murfey .server .api .processing_parameters
23+ import murfey .server .api .prometheus
24+ import murfey .server .api .session_control
25+ import murfey .server .api .session_info
26+ import murfey .server .api .websocket
27+ import murfey .server .api .workflow
28+ from murfey .server .murfey_db import get_murfey_db_session
1229from murfey .util .config import security_from_file
1330
1431
@@ -85,26 +102,56 @@ def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path):
85102 transport .disconnect ()
86103
87104
88- def handle_failed_posts (messages_path : list [Path ], token : str ):
105+ def handle_failed_posts (messages_path : list [Path ], murfey_db : Session ):
89106 """Deal with any messages that have been sent as failed client posts"""
90107 for json_file in messages_path :
91108 with open (json_file , "r" ) as json_data :
92109 message = json .load (json_data )
110+ router_name = message .get ("message" , {}).get ("router_name" , "" )
111+ router_base = router_name .split ("." )[0 ]
112+ function_name = message .get ("message" , {}).get ("function_name" , "" )
113+ if not router_name or not function_name :
114+ print (
115+ f"Cannot repost { json_file } as it does not have a router or function name"
116+ )
117+ continue
93118
94- if not message .get ("message" ) or not message ["message" ].get ("url" ):
95- print (f"{ json_file } is not a failed client post" )
119+ try :
120+ function_to_call = getattr (
121+ getattr (murfey .server .api , router_base ), function_name
122+ )
123+ except AttributeError :
124+ print (f"Cannot repost { json_file } as { function_name } does not exist" )
125+ continue
126+ expected_args = getfullargspec (function_to_call )
127+
128+ call_kwargs = message .get ("message" , {}).get ("kwargs" , {})
129+ call_data = message .get ("message" , {}).get ("data" , {})
130+ function_call_dict = {}
131+
132+ try :
133+ for call_arg in expected_args .args :
134+ call_arg_type = expected_args .annotations .get (call_arg , str )
135+ if call_arg in call_kwargs .keys ():
136+ function_call_dict [call_arg ] = call_arg_type (call_kwargs [call_arg ])
137+ elif call_arg == "db" :
138+ function_call_dict ["db" ] = murfey_db
139+ else :
140+ print (call_data , call_arg_type , call_arg )
141+ function_call_dict [call_arg ] = call_arg_type (** call_data )
142+ except TypeError as e :
143+ print (f"Cannot repost { json_file } due to argument error: { e } " )
96144 continue
97- dest = message ["message" ]["url" ]
98- message_json = message ["message" ]["json" ]
99145
100- response = requests .post (
101- dest , json = message_json , headers = {"Authorization" : f"Bearer { token } " }
102- )
103- if response .status_code != 200 :
104- print (f"Failed to repost { json_file } " )
105- else :
146+ try :
147+ if iscoroutinefunction (function_to_call ):
148+ asyncio .run (function_to_call (** function_call_dict ))
149+ else :
150+ function_to_call (** function_call_dict )
106151 print (f"Reposted { json_file } " )
107152 json_file .unlink ()
153+ except Exception as e :
154+ print (f"Failed to post { json_file } to { function_name } : { e } " )
108155
109156
110157def run ():
@@ -123,26 +170,14 @@ def run():
123170 help = "Security config file" ,
124171 required = True ,
125172 )
126- parser .add_argument (
127- "-u" ,
128- "--username" ,
129- help = "Token username" ,
130- required = True ,
131- )
132173 parser .add_argument (
133174 "-d" , "--dir" , default = "DLQ" , help = "Directory to export messages to"
134175 )
135176 args = parser .parse_args ()
136177
137178 # Read the security config file
138179 security_config = security_from_file (args .config )
139-
140- # Get the token to post to the api with
141- token = jwt .encode (
142- {"user" : args .username },
143- security_config .auth_key ,
144- algorithm = security_config .auth_algorithm ,
145- )
180+ murfey_db = get_murfey_db_session (security_config )
146181
147182 # Purge the queue and repost/reinject any messages found
148183 dlq_dump_path = Path (args .dir )
@@ -152,7 +187,7 @@ def run():
152187 security_config .feedback_queue ,
153188 security_config .rabbitmq_credentials ,
154189 )
155- handle_failed_posts (exported_messages , token )
190+ handle_failed_posts (exported_messages , murfey_db )
156191 handle_dlq_messages (exported_messages , security_config .rabbitmq_credentials )
157192
158193 # Clean up any created directories
0 commit comments