Skip to content
Snippets Groups Projects
Commit 5881d849 authored by Timothee P's avatar Timothee P :sunflower:
Browse files

refactor(sonarQube): réduction de compléxité

parent 6ebc5eb9
No related branches found
No related tags found
1 merge request!536REDMINE_ISSUE-24432 | Corriger les erreurs SonarQube & réactiver dans CI
Pipeline #42119 passed
......@@ -355,7 +355,7 @@ class StackedEventSerializer(serializers.ModelSerializer):
model = StackedEvent
fields = '__all__' # Serialize all fields from StackedEvent model
def get_events_feature_map(self, events):
def _get_events_feature_map(self, events):
# Gather all unique feature IDs from the events to minimize database queries with a single query
feature_ids = {event.feature_id for event in events if event.feature_id}
# Retrieve all corresponding Feature objects in a single query, excluding feature with draft status, including their types
......@@ -363,7 +363,7 @@ class StackedEventSerializer(serializers.ModelSerializer):
# Map feature IDs to Feature objects
return {feature.feature_id: feature for feature in features}
def get_grouped_events(self, events, feature_map):
def _get_grouped_events(self, events, feature_map):
# Initialize a nested defaultdict
grouped_events = defaultdict(lambda: defaultdict(list))
......@@ -399,9 +399,9 @@ class StackedEventSerializer(serializers.ModelSerializer):
# Retrieve all related events for the stacked event instance
events = obj.events.all()
# Map feature IDs to Feature objects for quick access
feature_map = self.get_events_feature_map(events)
feature_map = self._get_events_feature_map(events)
# Grouping events by feature type and title
events_grouped = self.get_grouped_events(events, feature_map)
events_grouped = self._get_grouped_events(events, feature_map)
# Serialize the grouped events for output
grouped_data = {}
......
......@@ -45,55 +45,68 @@ class ProjectsAttributeFilter(filters.BaseFilterBackend):
for the attribute when the filter value is 'false', and excludes projects
with the attribute set to 'true' when filtering for 'false'.
"""
def filter_queryset(self, request, queryset, view):
# Retrieve the 'attributes' parameter from the query string.
attributes_param = request.query_params.get('attributes')
if attributes_param:
try:
# Attempt to parse the JSON string into a Python dictionary.
attributes = json.loads(attributes_param)
for attribute_id, value in attributes.items():
# Convert the attribute value string to a boolean if it represents a boolean value.
if value.lower() in ['true', 'false']:
value_bool = value.lower() == 'true'
if not value_bool: # If the filter value is 'false'.
# Find projects that have a 'true' association for this attribute.
projects_with_attr_true = ProjectAttributeAssociation.objects.filter(
attribute_id=attribute_id, value='true'
).values_list('project_id', flat=True)
# Exclude those projects from the queryset, effectively including projects without an association or with a 'false' value.
queryset = queryset.exclude(id__in=projects_with_attr_true)
else: # If the filter value is 'true'.
# Directly filter the projects that have an association with the value 'true'.
queryset = queryset.filter(
projectattributeassociation__attribute_id=attribute_id,
projectattributeassociation__value='true'
)
else:
# For non-boolean values, use OR condition for matching attribute values.
# Initialize an empty Q object to start with no conditions
query = Q()
# Split comma-separated string into a list of values
list_values = value.split(',')
# Loop over each value and build OR conditions
for list_value in list_values:
query |= Q(projectattributeassociation__attribute_id=attribute_id,
projectattributeassociation__value__icontains=list_value)
# Apply the constructed OR conditions to the queryset.
queryset = queryset.filter(query).distinct()
except json.JSONDecodeError:
# If the JSON parsing fails, ignore the filter.
pass
# Ensure no duplicates are included in the final queryset.
attributes = self._parse_attributes(request)
if not attributes:
return queryset.distinct()
for attribute_id, value in attributes.items():
if self._is_boolean(value):
queryset = self._filter_boolean_attribute(queryset, attribute_id, value)
else:
queryset = self._filter_non_boolean_attribute(queryset, attribute_id, value)
return queryset.distinct()
def _parse_attributes(self, request):
"""Parse the JSON 'attributes' parameter from request."""
attributes_param = request.query_params.get('attributes')
if not attributes_param:
return None
try:
return json.loads(attributes_param)
except json.JSONDecodeError:
return None
def _is_boolean(self, value):
"""Check if the value is a string representing a boolean."""
return value.lower() in ['true', 'false']
def _filter_boolean_attribute(self, queryset, attribute_id, value):
"""Apply filtering for boolean attribute values."""
is_true = value.lower() == 'true'
if is_true:
# Directly filter the projects that have an association with the value 'true'.
return queryset.filter(
projectattributeassociation__attribute_id=attribute_id,
projectattributeassociation__value='true'
)
else:
# Find projects that have a 'true' association for this attribute.
projects_with_attr_true = ProjectAttributeAssociation.objects.filter(
attribute_id=attribute_id,
value='true'
).values_list('project_id', flat=True)
# Exclude those projects from the queryset, effectively including projects without an association or with a 'false' value.
return queryset.exclude(id__in=projects_with_attr_true)
def _filter_non_boolean_attribute(self, queryset, attribute_id, value):
"""Apply filtering for non-boolean attribute values using OR logic."""
# Split comma-separated string into a list of values
list_values = value.split(',')
# Initialize an empty Q object to start with no conditions
query = Q()
# Loop over each value and build OR conditions
for list_value in list_values:
query |= Q(
projectattributeassociation__attribute_id=attribute_id,
projectattributeassociation__value__icontains=list_value
)
# Apply the constructed OR conditions to the queryset.
return queryset.filter(query)
class ProjectsUserAccessLevelFilter(filters.BaseFilterBackend):
def filter_queryset(self, request, queryset, view):
......
......@@ -85,54 +85,64 @@ class FeatureView(
]
def get_queryset(self):
"""
Returns the queryset of features filtered by various query parameters.
"""
"""Returns the queryset of features filtered by various query parameters."""
# Start with the base queryset
queryset = super().get_queryset()
# Filter by project slug if provided
project_slug = self.request.query_params.get('project__slug')
if project_slug:
project = get_object_or_404(Project, slug=project_slug)
queryset = Feature.handy.availables(self.request.user, project)
# Filter by feature type slug if provided
feature_type_slug = self.request.query_params.get('feature_type__slug')
queryset = self._filter_by_project_or_feature_type(queryset, project_slug, feature_type_slug)
queryset = self._filter_by_status(queryset)
queryset = self._filter_by_date(queryset)
queryset = self._filter_by_title(queryset)
queryset = self._filter_by_id(queryset)
queryset = self._apply_ordering(queryset)
queryset = self._apply_limit(queryset)
return queryset
def _filter_by_project_or_feature_type(self, queryset, project_slug, feature_type_slug):
if feature_type_slug:
project = get_object_or_404(FeatureType, slug=feature_type_slug).project
feature_type = get_object_or_404(FeatureType, slug=feature_type_slug)
project = feature_type.project
queryset = Feature.handy.availables(self.request.user, project)
queryset = queryset.filter(feature_type__slug=feature_type_slug)
return queryset.filter(feature_type__slug=feature_type_slug)
# Raise an error if neither project_slug nor feature_type_slug is provided
if not feature_type_slug and not project_slug:
raise ValidationError(detail="Must provide parameter project__slug or feature_type__slug")
if project_slug:
project = get_object_or_404(Project, slug=project_slug)
return Feature.handy.availables(self.request.user, project)
raise ValidationError(detail="Must provide parameter project__slug or feature_type__slug")
# Filter by status if provided
def _filter_by_status(self, queryset):
status_value = self.request.query_params.get('status__value')
if status_value:
queryset = queryset.filter(status=status_value)
return queryset.filter(status=status_value)
return queryset
# Filter by a date range if 'from_date' is provided
def _filter_by_date(self, queryset):
from_date = self.request.query_params.get('from_date')
if from_date:
try:
parsed_date = datetime.strptime(from_date, '%Y-%m-%dT%H:%M:%S')
except ValueError:
try:
parsed_date = datetime.strptime(from_date, '%Y-%m-%d')
except ValueError:
raise ValidationError(detail=f"Invalid 'from_date' format. Use YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS")
queryset = queryset.filter(
Q(created_on__gte=parsed_date) |
Q(updated_on__gte=parsed_date) |
parsed_date = self._parse_date(from_date)
return queryset.filter(
Q(created_on__gte=parsed_date) |
Q(updated_on__gte=parsed_date) |
Q(deletion_on__gte=parsed_date)
)
else:
# Exclude deleted features if no date range is provided
queryset = queryset.filter(deletion_on__isnull=True)
return queryset.filter(deletion_on__isnull=True)
def _parse_date(self, date_str):
try:
return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S')
except ValueError:
try:
return datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
raise ValidationError(detail="Invalid 'from_date' format. Use YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS")
# Filter by title if 'title__contains' or 'title__icontains' is provided
def _filter_by_title(self, queryset):
title_contains = self.request.query_params.get('title__contains')
if title_contains:
queryset = queryset.filter(title__contains=title_contains)
......@@ -141,21 +151,27 @@ class FeatureView(
if title_icontains:
queryset = queryset.filter(title__icontains=title_icontains)
# Order the queryset if 'ordering' is provided
return queryset
def _apply_ordering(self, queryset):
ordering = self.request.query_params.get('ordering')
if ordering:
queryset = queryset.order_by(ordering)
return queryset.order_by(ordering)
return queryset
# Limit the queryset if 'limit' is provided
def _apply_limit(self, queryset):
limit = self.request.query_params.get('limit')
if limit:
queryset = queryset[:int(limit)]
try:
return queryset[:int(limit)]
except ValueError:
raise ValidationError(detail="Invalid 'limit' value. Must be an integer.")
return queryset
# Filter by ID if 'id' is provided
def _filter_by_id(self, queryset):
_id = self.request.query_params.get('id')
if _id:
queryset = queryset.filter(pk=_id)
return queryset.filter(pk=_id)
return queryset
@swagger_auto_schema(
......
......@@ -196,24 +196,44 @@ class FeatureTypeAdmin(admin.ModelAdmin):
- For GET requests, it initializes and displays the form used to create a PostgreSQL view.
- For POST requests, it processes the form submission, validates the data, generates SQL for the view, and executes the SQL command.
Args:
request (HttpRequest): The HTTP request object containing metadata and data for the request.
feature_type_id (int): The ID of the `FeatureType` model for which the PostgreSQL view is being created.
Returns:
HttpResponse: A redirect to the `FeatureType` change page if view creation is successful,
or a rendered template response with the form if validation fails or for GET requests.
"""
fds_initial = self._get_feature_detail_initial()
FeatureDetailSelectionFormset = self._get_feature_detail_formset()
CustomFieldsFormSet = self._get_custom_fields_formset()
if request.method == 'POST':
return self._handle_post_request(
request, feature_type_id, fds_initial,
FeatureDetailSelectionFormset, CustomFieldsFormSet
)
else:
return self._handle_get_request(
request, feature_type_id, fds_initial,
FeatureDetailSelectionFormset, CustomFieldsFormSet
)
def _get_feature_detail_initial(self):
""" Prepare initial data for the feature detail formset """
return [{
'related_field': (
str(field.name), f"{field.name} - {field.get_internal_type()}"),
'alias': None
} for field in Feature._meta.get_fields()
if field.name in ('feature_id', 'title', 'description', 'geom')]
# Define formsets for selecting feature details and custom fields
FeatureDetailSelectionFormset = formset_factory(
def _get_feature_detail_formset(self):
""" Define formsets for selecting feature details and custom fields """
return formset_factory(
FeatureSelectFieldAdminForm, # Form for selecting feature details
formset=HiddenDeleteBaseFormSet, # Formset class with support for form deletion
can_delete=True,
extra=0 # No extra empty forms
)
# Define formset for custom fields
CustomFieldsFormSet = modelformset_factory(
def _get_custom_fields_formset(self):
""" Define formset for custom fields """
return modelformset_factory(
CustomField, # Model for custom fields
can_delete=True,
form=CustomFieldModelAdminForm, # Form for editing custom fields
......@@ -221,70 +241,68 @@ class FeatureTypeAdmin(admin.ModelAdmin):
extra=0 # No extra empty forms
)
# Prepare initial data for the feature detail formset
feature_detail_initial = [{
'related_field': (
str(field.name), "{0} - {1}".format(
field.name, field.get_internal_type())),
'alias': None
} for field in Feature._meta.get_fields() if field.name in ('feature_id', 'title', 'description', 'geom')]
if request.method == 'POST':
# Process the form submission
fds_formset = FeatureDetailSelectionFormset(
request.POST or None, prefix='fds',
initial=feature_detail_initial)
cfs_formset = CustomFieldsFormSet(request.POST or None, prefix='cfs')
pg_form = AddPosgresViewAdminForm(request.POST or None)
# Validate all forms
if fds_formset.is_valid() and pg_form.is_valid() and cfs_formset.is_valid():
view_name = pg_form.cleaned_data.get('name') # Get view name from form
status = pg_form.cleaned_data.get('status') or (stat[0] for stat in Feature.STATUS_CHOICES) # Get status from form
fds_data = self.pop_deleted_forms(fds_formset.cleaned_data) # Filter out deleted feature details
cfs_data = self.pop_deleted_forms(cfs_formset.cleaned_data) # Filter out deleted custom fields
# Generate SQL script for creating the PostgreSQL view
sql = render_to_string(
'sql/create_view.sql',
context=dict(
fds_data=fds_data,
cfs_data=cfs_data,
feature_type_ids=str(feature_type_id),
status=status,
schema=getattr(settings, 'DB_SCHEMA', 'public'), # Get database schema from settings
view_name=view_name,
user=settings.DATABASES['default']['USER'], # Database user from settings
))
logger.debug(sql) # Log the generated SQL for debugging
# Execute the SQL script
its_alright = self.exec_sql(request, sql, view_name)
if its_alright:
# Redirect to the change page for the FeatureType if view creation is successful
return redirect('admin:geocontrib_featuretype_change', feature_type_id)
else:
# Log errors if any of the forms are invalid
for formset in [fds_formset, pg_form, cfs_formset]:
logger.error(formset.errors)
def _handle_post_request(self, request, feature_type_id, fds_initial, FDSFormset, CFSFormset):
fds_formset = FDSFormset(request.POST or None, prefix='fds', initial=fds_initial)
cfs_formset = CFSFormset(request.POST or None, prefix='cfs')
pg_form = AddPosgresViewAdminForm(request.POST or None)
if all([fds_formset.is_valid(), pg_form.is_valid(), cfs_formset.is_valid()]): # Validate all forms
return self._create_view_from_forms(
request, feature_type_id, fds_formset, cfs_formset, pg_form
)
else:
# Initialize forms for GET request
pg_form = AddPosgresViewAdminForm() # Create an empty form for PostgreSQL view details
fds_formset = FeatureDetailSelectionFormset(
prefix='fds',
initial=feature_detail_initial) # Provide initial data for feature details
cfs_formset = CustomFieldsFormSet(
queryset=CustomField.objects.filter(feature_type__pk=feature_type_id), # Queryset for custom fields related to the feature type
prefix='cfs')
# Log errors if any of the forms are invalid
for formset in [fds_formset, pg_form, cfs_formset]:
logger.error(formset.errors)
return self._render_template_response(
request, fds_formset, cfs_formset, pg_form
)
def _create_view_from_forms(self, request, feature_type_id, fds_formset, cfs_formset, pg_form):
view_name = pg_form.cleaned_data.get('name') # Get view name from form
status = pg_form.cleaned_data.get('status') or next(iter(Feature.STATUS_CHOICES))[0] # Get status from form
fds_data = self.pop_deleted_forms(fds_formset.cleaned_data) # Filter out deleted feature details
cfs_data = self.pop_deleted_forms(cfs_formset.cleaned_data) # Filter out deleted custom fields
# Generate SQL script for creating the PostgreSQL view
sql = render_to_string('sql/create_view.sql', context={
'fds_data': fds_data,
'cfs_data': cfs_data,
'feature_type_ids': str(feature_type_id),
'status': status,
'schema': getattr(settings, 'DB_SCHEMA', 'public'), # Get database schema from settings
'view_name': view_name,
'user': settings.DATABASES['default']['USER'], # Database user from settings
})
logger.debug(sql) # Log the generated SQL for debugging
# Execute the SQL script
if self.exec_sql(request, sql, view_name):
# Redirect to the change page for the FeatureType if view creation is successful
return redirect('admin:geocontrib_featuretype_change', feature_type_id)
return self._render_template_response(request, fds_formset, cfs_formset, pg_form)
def _handle_get_request(self, request, feature_type_id, fds_initial, FDSFormset, CFSFormset):
# Initialize forms for GET request
pg_form = AddPosgresViewAdminForm() # Create an empty form for PostgreSQL view details
fds_formset = FDSFormset(prefix='fds', initial=fds_initial) # Provide initial data for feature details
cfs_formset = CFSFormset(
queryset=CustomField.objects.filter(feature_type__pk=feature_type_id),
prefix='cfs'
)
return self._render_template_response(request, fds_formset, cfs_formset, pg_form)
def _render_template_response(self, request, fds_formset, cfs_formset, pg_form):
# Prepare the context for rendering the template
context = self.admin_site.each_context(request)
context['opts'] = self.model._meta
context['fds_formset'] = fds_formset
context['cfs_formset'] = cfs_formset
context['pg_form'] = pg_form
context.update({
'opts': self.model._meta,
'fds_formset': fds_formset,
'cfs_formset': cfs_formset,
'pg_form': pg_form,
})
# Render the template with the form for creating the PostgreSQL view
return TemplateResponse(request, "admin/geocontrib/create_postrges_view_form.html", context)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment