@@ -96,32 +96,26 @@ def parse_message_status(msg):
96
96
return {"status" : EventListenerJobStatus .PROCESSING , "cleaned_msg" : msg }
97
97
98
98
99
- async def callback (message : AbstractIncomingMessage ):
100
- """This method receives messages from RabbitMQ and processes them.
101
- the extractor info is parsed from the message and if the extractor is new
102
- or is a later version, the db is updated.
103
- """
99
+ async def callback (message : AbstractIncomingMessage , es , rabbitmq_client ):
100
+ """This method receives messages from RabbitMQ and processes them."""
104
101
async with message .process ():
105
102
msg = json .loads (message .body .decode ("utf-8" ))
106
103
107
104
if "event_type" in msg and msg ["event_type" ] == "file_indexed" :
108
105
logger .info (f"This is an event type file indexed!" )
109
- msg = json .loads (message .body .decode ("utf-8" ))
110
106
111
107
# Convert string IDs back to PydanticObjectId if needed
112
108
file_data = msg .get ("file_data" , {})
113
- user = msg .get ("user" , {})
114
- if "id" in file_data and isinstance (file_data ["id" ], str ):
115
- file_data ["id" ] = PydanticObjectId (file_data ["id" ])
109
+ user_data = msg .get ("user" , {}) # Fixed variable name
116
110
117
111
if "id" in file_data and isinstance (file_data ["id" ], str ):
118
112
file_data ["id" ] = PydanticObjectId (file_data ["id" ])
119
113
120
- # Create FileOut object
114
+ # Create FileOut object
121
115
file_out = FileOut (** file_data )
122
116
123
117
# Create UserOut object from the user data in the message
124
- user = UserOut (** user_data )
118
+ user = UserOut (** user_data ) # Use user_data, not user
125
119
126
120
# Now call check_feed_listeners with the injected dependencies
127
121
await check_feed_listeners (
@@ -130,6 +124,7 @@ async def callback(message: AbstractIncomingMessage):
130
124
user ,
131
125
rabbitmq_client , # RabbitMQ client
132
126
)
127
+ return True
133
128
134
129
else :
135
130
job_id = msg ["job_id" ]
0 commit comments