Files
RasadDam_Backend/apps/warehouse/services/transaction_dashboard_service.py

137 lines
5.5 KiB
Python
Raw Normal View History

from collections import defaultdict
from django.db.models import Sum, Count, Case, When, Q, Value
from django.db.models.functions import Coalesce
from apps.authentication.models import Organization
from apps.authentication.services.service import get_all_org_child
2025-11-29 10:43:38 +03:30
from apps.core.services.filter.search import DynamicSearchService
from apps.warehouse.models import InventoryQuotaSaleTransaction, InventoryQuotaSaleItem
class TransactionDashboardService:
@staticmethod
2025-11-29 10:43:38 +03:30
def get_dashboard(org: Organization, start_date: str = None, end_date: str = None, status: str = None):
orgs_child = get_all_org_child(org=org)
orgs_child.append(org)
if org.type.key == 'ADM':
transactions = InventoryQuotaSaleTransaction.objects.all()
items = InventoryQuotaSaleItem.objects.all().select_related("gov_product", "free_product")
else:
transactions = InventoryQuotaSaleTransaction.objects.filter(
seller_organization__in=orgs_child
)
items = InventoryQuotaSaleItem.objects.filter(
transaction__seller_organization__in=orgs_child
).select_related("gov_product", "free_product")
# filter queryset (transactions & items) by date
2025-11-29 10:43:38 +03:30
if start_date and end_date:
transactions = DynamicSearchService(
queryset=transactions,
start=start_date,
end=end_date,
date_field="create_date",
).apply()
items = DynamicSearchService(
queryset=items,
start=start_date,
end=end_date,
date_field="create_date",
).apply()
2025-11-29 10:43:38 +03:30
# filer by transaction status
if status:
transactions = transactions.filter(transaction_status=status)
items = items.filter(transaction__transaction_status=status)
2025-11-29 10:43:38 +03:30
transaction_stats = transactions.aggregate(
total_transactions=Count("id"),
success_transactions=Count("id", filter=Q(transaction_status="success")),
failed_transactions=Count("id", filter=Q(transaction_status="failed")),
waiting_transactions=Count("id", filter=Q(transaction_status="waiting")),
total_amount=Coalesce(
Sum(
"price_paid", filter=Q(transaction_status='success' if not status else status)
), 0
),
unique_ranchers=Count("rancher", distinct=True),
)
transaction_stats['total_weight'] = items.aggregate(
total_weight=Coalesce(Sum(
"weight",
filter=Q(transaction__transaction_status="success" if not status else status)
), 0)
)['total_weight']
products_stats = items.values(
product_id=Case(
When(gov_product__isnull=False, then="gov_product_id"),
When(free_product__isnull=False, then="free_product_id"),
),
product_name=Case(
When(gov_product__isnull=False, then="gov_product__name"),
When(free_product__isnull=False, then="free_product__product__name"),
),
product_type=Case(
When(gov_product__isnull=False, then=Value("gov")),
When(free_product__isnull=False, then=Value("free")),
)
).annotate(
total_sales=Count("id"),
total_weight=Coalesce(Sum("weight"), 0),
# total_price=Coalesce(Sum("total_price"), 0),
# avg_unit_price=Coalesce(Sum("total_price") / Sum("weight"), 0),
success_sales=Count("id", filter=Q(transaction__transaction_status="success")),
failed_sales=Count("id", filter=Q(transaction__transaction_status="failed")),
waiting_sales=Count("id", filter=Q(transaction__transaction_status="waiting")),
card_payments=Count("id", filter=Q(transaction__price_type="card")),
cash_payments=Count("id", filter=Q(transaction__price_type="cash")),
check_payments=Count("id", filter=Q(transaction__price_type="check")),
credit_payments=Count("id", filter=Q(transaction__price_type="credit")),
extra_items=Count("id", filter=Q(is_extra=True)),
pre_sale_items=Count("id", filter=Q(is_pre_sale=True)),
).order_by("-total_sales")
# calculate sum of item share percentage by product
items_by_product = defaultdict(list)
for item in items:
pid = item.gov_product_id or item.free_product_id
items_by_product[pid].append(item)
for product in products_stats:
pid = product["product_id"]
share_totals = defaultdict(lambda: {"total_price": 0, "count": 0})
for item in items_by_product.get(pid, []):
if item.item_share:
for share in item.item_share:
2025-11-29 10:43:38 +03:30
# share: {"name": "", "price": "", ....}
name = share.get("name")
price = share.get("price", 0)
share_totals[name]["total_price"] += price
share_totals[name]["count"] += 1
product["item_share_stats"] = sorted(
[
{"name": name, **val}
for name, val in share_totals.items()
],
key=lambda x: -x["total_price"]
)
return {
"transaction_summary": transaction_stats,
"product_summary": list(products_stats),
}