Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions wikibaseintegrator/wbi_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import ujson
from requests import Session

from wikibaseintegrator.datatypes import ExternalID, Form, Item, Lexeme, Property, Quantity, Sense, String
from wikibaseintegrator.models import Claim
from wikibaseintegrator.wbi_backoff import wbi_backoff
from wikibaseintegrator.wbi_config import config
from wikibaseintegrator.wbi_exceptions import MaxRetriesReachedException, ModificationFailed, MWApiError, NonExistentEntityError, SearchError
Expand Down Expand Up @@ -743,6 +745,65 @@ def generate_entity_instances(entities: str | list[str], allow_anonymous: bool =
return entity_instances


def wb_cirrus_search(haswbstatement=None, hasnotwbstatement=None, use_statement_quantity=False, hasdescription=None, haslabel=None, max_results=500, dict_result=False,
allow_anonymous=True, **kwargs):
if isinstance(haswbstatement, Claim):
haswbstatement = [haswbstatement]

search_string = ""

for statement in haswbstatement:
if isinstance(statement, (ExternalID, Form, Item, Lexeme, Property, Sense, String)):
search_string += f" haswbstatement:{statement.mainsnak.property_number}={statement.mainsnak.datavalue}"
elif isinstance(statement, Quantity):
search_string += f" wbstatementquantity:{statement.mainsnak.property_number}={statement.mainsnak.datavalue}"
else:
raise ValueError(f"Instance type {statement.DTYPE} is not supported.")

params = {
'action': 'query',
'list': 'search',
'srsearch': search_string,
'srlimit': 50,
'format': 'json'
}

cont_count = 0
results = []

while True:
params.update({'continue': cont_count})

search_results = mediawiki_api_call_helper(data=params, allow_anonymous=allow_anonymous, **kwargs)

if search_results['success'] != 1:
raise SearchError('Wikibase API wbsearchentities failed')

for i in search_results['search']:
if dict_result:
description = i['description'] if 'description' in i else None
aliases = i['aliases'] if 'aliases' in i else None
results.append({
'id': i['id'],
'label': i['label'],
'match': i['match'],
'description': description,
'aliases': aliases
})
else:
results.append(i['id'])

if 'search-continue' not in search_results:
break

cont_count = search_results['search-continue']

if cont_count >= max_results:
break

return results


def delete_page(title: str | None = None, pageid: int | None = None, reason: str | None = None, deletetalk: bool = False, watchlist: str = 'preferences',
watchlistexpiry: str | None = None, login: _Login | None = None, **kwargs: Any) -> dict:
"""
Expand Down